aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2018-08-31 13:34:00 +0800
committerGitHub <noreply@github.com>2018-08-31 13:34:00 +0800
commit18c6a28ff021c9dc643091b5dc420b51183253cf (patch)
tree3017ffcfc03532289b1856f5f9a1a83012eb5d8e
parent123a7ee3bcf96c5bbef2ea16737d1a8e25f5ef30 (diff)
downloaddexon-consensus-18c6a28ff021c9dc643091b5dc420b51183253cf.tar
dexon-consensus-18c6a28ff021c9dc643091b5dc420b51183253cf.tar.gz
dexon-consensus-18c6a28ff021c9dc643091b5dc420b51183253cf.tar.bz2
dexon-consensus-18c6a28ff021c9dc643091b5dc420b51183253cf.tar.lz
dexon-consensus-18c6a28ff021c9dc643091b5dc420b51183253cf.tar.xz
dexon-consensus-18c6a28ff021c9dc643091b5dc420b51183253cf.tar.zst
dexon-consensus-18c6a28ff021c9dc643091b5dc420b51183253cf.zip
Add methods to Application interface. (#86)
-rw-r--r--core/consensus.go1
-rw-r--r--core/interfaces.go6
-rw-r--r--core/nonblocking-application.go16
-rw-r--r--core/nonblocking-application_test.go13
-rw-r--r--core/test/app.go9
-rw-r--r--simulation/app.go9
-rw-r--r--simulation/validator.go94
7 files changed, 56 insertions, 92 deletions
diff --git a/core/consensus.go b/core/consensus.go
index d6b5efd..4d1a386 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -245,6 +245,7 @@ func (con *Consensus) ProcessBlock(b *types.Block) (err error) {
if err = con.rbModule.processBlock(b); err != nil {
return err
}
+ con.app.BlockConfirmed(b.Clone())
for _, b := range con.rbModule.extractBlocks() {
// Notify application layer that some block is strongly acked.
con.app.StronglyAcked(b.Hash)
diff --git a/core/interfaces.go b/core/interfaces.go
index 364f2da..4376742 100644
--- a/core/interfaces.go
+++ b/core/interfaces.go
@@ -30,6 +30,12 @@ type Application interface {
// PreparePayload is called when consensus core is preparing a block.
PreparePayloads(shardID, chainID, height uint64) [][]byte
+ // VerifyPayloads verifies if the payloads are valid.
+ VerifyPayloads(payloads [][]byte) bool
+
+ // BlockConfirmed is called when a block is confirmed and added to lattice.
+ BlockConfirmed(block *types.Block)
+
// StronglyAcked is called when a block is strongly acked.
StronglyAcked(blockHash common.Hash)
diff --git a/core/nonblocking-application.go b/core/nonblocking-application.go
index 72f63b9..fb25745 100644
--- a/core/nonblocking-application.go
+++ b/core/nonblocking-application.go
@@ -26,6 +26,10 @@ import (
"github.com/dexon-foundation/dexon-consensus-core/core/types"
)
+type blockConfirmedEvent struct {
+ block *types.Block
+}
+
type stronglyAckedEvent struct {
blockHash common.Hash
}
@@ -90,6 +94,8 @@ func (app *nonBlockingApplication) run() {
switch e := event.(type) {
case stronglyAckedEvent:
app.app.StronglyAcked(e.blockHash)
+ case blockConfirmedEvent:
+ app.app.BlockConfirmed(e.block)
case totalOrderingDeliverEvent:
app.app.TotalOrderingDeliver(e.blockHashes, e.early)
case deliverBlockEvent:
@@ -120,6 +126,16 @@ func (app *nonBlockingApplication) PreparePayloads(
return app.app.PreparePayloads(shardID, chainID, height)
}
+// VerifyPayloads cannot be non-blocking.
+func (app *nonBlockingApplication) VerifyPayloads(payloads [][]byte) bool {
+ return true
+}
+
+// BlockConfirmed is called when a block is confirmed and added to lattice.
+func (app *nonBlockingApplication) BlockConfirmed(block *types.Block) {
+ app.addEvent(blockConfirmedEvent{block})
+}
+
// StronglyAcked is called when a block is strongly acked.
func (app *nonBlockingApplication) StronglyAcked(blockHash common.Hash) {
app.addEvent(stronglyAckedEvent{blockHash})
diff --git a/core/nonblocking-application_test.go b/core/nonblocking-application_test.go
index 14fb670..65c5700 100644
--- a/core/nonblocking-application_test.go
+++ b/core/nonblocking-application_test.go
@@ -29,6 +29,7 @@ import (
type slowApp struct {
sleep time.Duration
+ blockConfirmed map[common.Hash]struct{}
stronglyAcked map[common.Hash]struct{}
totalOrderingDeliver map[common.Hash]struct{}
deliverBlock map[common.Hash]struct{}
@@ -38,6 +39,7 @@ type slowApp struct {
func newSlowApp(sleep time.Duration) *slowApp {
return &slowApp{
sleep: sleep,
+ blockConfirmed: make(map[common.Hash]struct{}),
stronglyAcked: make(map[common.Hash]struct{}),
totalOrderingDeliver: make(map[common.Hash]struct{}),
deliverBlock: make(map[common.Hash]struct{}),
@@ -49,6 +51,15 @@ func (app *slowApp) PreparePayloads(_, _, _ uint64) [][]byte {
return [][]byte{}
}
+func (app *slowApp) VerifyPayloads(_ [][]byte) bool {
+ return true
+}
+
+func (app *slowApp) BlockConfirmed(block *types.Block) {
+ time.Sleep(app.sleep)
+ app.blockConfirmed[block.Hash] = struct{}{}
+}
+
func (app *slowApp) StronglyAcked(blockHash common.Hash) {
time.Sleep(app.sleep)
app.stronglyAcked[blockHash] = struct{}{}
@@ -88,6 +99,7 @@ func (s *NonBlockingAppTestSuite) TestNonBlockingApplication() {
// Start doing some 'heavy' job.
for _, hash := range hashes {
+ nbapp.BlockConfirmed(&types.Block{Hash: hash})
nbapp.StronglyAcked(hash)
nbapp.DeliverBlock(hash, time.Now().UTC())
nbapp.NotaryAckDeliver(&types.NotaryAck{Hash: hash})
@@ -99,6 +111,7 @@ func (s *NonBlockingAppTestSuite) TestNonBlockingApplication() {
nbapp.wait()
for _, hash := range hashes {
+ s.Contains(app.blockConfirmed, hash)
s.Contains(app.stronglyAcked, hash)
s.Contains(app.totalOrderingDeliver, hash)
s.Contains(app.deliverBlock, hash)
diff --git a/core/test/app.go b/core/test/app.go
index e36a184..3ed65f7 100644
--- a/core/test/app.go
+++ b/core/test/app.go
@@ -107,6 +107,15 @@ func (app *App) PreparePayloads(shardID, chainID, height uint64) [][]byte {
return [][]byte{}
}
+// VerifyPayloads implements Application.
+func (app *App) VerifyPayloads(payloads [][]byte) bool {
+ return true
+}
+
+// BlockConfirmed implements Application interface.
+func (app *App) BlockConfirmed(block *types.Block) {
+}
+
// StronglyAcked implements Application interface.
func (app *App) StronglyAcked(blockHash common.Hash) {
app.ackedLock.Lock()
diff --git a/simulation/app.go b/simulation/app.go
index 78e1d9a..f12290c 100644
--- a/simulation/app.go
+++ b/simulation/app.go
@@ -54,7 +54,8 @@ func newSimApp(id types.ValidatorID, Network PeerServerNetwork) *simApp {
}
}
-func (a *simApp) addBlock(block *types.Block) {
+// BlockConfirmed implements core.Application.
+func (a *simApp) BlockConfirmed(block *types.Block) {
a.blockByHashMutex.Lock()
defer a.blockByHashMutex.Unlock()
@@ -62,6 +63,11 @@ func (a *simApp) addBlock(block *types.Block) {
a.blockByHash[block.Hash] = block
}
+// VerifyPayloads implements core.Application.
+func (a *simApp) VerifyPayloads(payloads [][]byte) bool {
+ return true
+}
+
// getAckedBlocks will return all unconfirmed blocks' hash with lower Height
// than the block with ackHash.
func (a *simApp) getAckedBlocks(ackHash common.Hash) (output common.Hashes) {
@@ -101,7 +107,6 @@ func (a *simApp) StronglyAcked(blockHash common.Hash) {
// TotalOrderingDeliver is called when blocks are delivered by the total
// ordering algorithm.
func (a *simApp) TotalOrderingDeliver(blockHashes common.Hashes, early bool) {
-
now := time.Now()
blocks := make([]*types.Block, len(blockHashes))
a.blockByHashMutex.RLock()
diff --git a/simulation/validator.go b/simulation/validator.go
index a54c848..6d73c50 100644
--- a/simulation/validator.go
+++ b/simulation/validator.go
@@ -105,26 +105,15 @@ func (v *Validator) Run() {
time.Duration(v.config.ProposeIntervalMean)*time.Millisecond),
v.prvKey, v.sigToPub)
- genesisBlock := &types.Block{
- ProposerID: v.ID,
- ChainID: v.chainID,
- }
- err := v.consensus.PrepareGenesisBlock(genesisBlock, time.Now().UTC())
- if err != nil {
- panic(err)
- }
- isStopped := make(chan struct{}, 2)
+ go v.consensus.Run()
+
isShutdown := make(chan struct{})
- v.app.addBlock(genesisBlock)
- v.consensus.ProcessBlock(genesisBlock)
- v.BroadcastGenesisBlock(genesisBlock)
- go v.MsgServer(isStopped)
go v.CheckServerInfo(isShutdown)
- go v.BlockProposer(isStopped, isShutdown)
// Blocks forever.
- <-isStopped
+ <-isShutdown
+ v.consensus.Stop()
if err := v.db.Close(); err != nil {
fmt.Println(err)
}
@@ -151,78 +140,3 @@ func (v *Validator) CheckServerInfo(isShutdown chan struct{}) {
time.Sleep(250 * time.Millisecond)
}
}
-
-// MsgServer listen to the network channel for message and handle it.
-func (v *Validator) MsgServer(
- isStopped chan struct{}) {
-
- for {
- var msg interface{}
- select {
- case msg = <-v.msgChannel:
- case <-isStopped:
- return
- }
-
- switch val := msg.(type) {
- case *types.Block:
- v.app.addBlock(val)
- if err := v.consensus.ProcessBlock(val); err != nil {
- fmt.Println(err)
- }
- types.RecycleBlock(val)
- case *types.NotaryAck:
- if err := v.consensus.ProcessNotaryAck(val); err != nil {
- fmt.Println(err)
- }
- case *types.Vote:
- if err := v.consensus.ProcessVote(val); err != nil {
- fmt.Println(err)
- }
- }
- }
-}
-
-// BroadcastGenesisBlock broadcasts genesis block to all peers.
-func (v *Validator) BroadcastGenesisBlock(genesisBlock *types.Block) {
- // Wait until all peer joined the network.
- for v.network.NumPeers() != v.config.Num {
- time.Sleep(time.Second)
- }
- v.network.BroadcastBlock(genesisBlock)
-}
-
-// BlockProposer propose blocks to be send to the DEXON network.
-func (v *Validator) BlockProposer(isStopped, isShutdown chan struct{}) {
- model := &NormalNetwork{
- Sigma: v.config.ProposeIntervalSigma,
- Mean: v.config.ProposeIntervalMean,
- }
-ProposingBlockLoop:
- for {
- time.Sleep(model.Delay())
-
- block := &types.Block{
- ProposerID: v.ID,
- ChainID: v.chainID,
- Hash: common.NewRandomHash(),
- }
- if err := v.consensus.PrepareBlock(block, time.Now().UTC()); err != nil {
- panic(err)
- }
- v.app.addBlock(block)
- if err := v.consensus.ProcessBlock(block); err != nil {
- fmt.Println(err)
- //panic(err)
- }
- v.network.BroadcastBlock(block)
- select {
- case <-isShutdown:
- isStopped <- struct{}{}
- isStopped <- struct{}{}
- break ProposingBlockLoop
- default:
- break
- }
- }
-}