diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2018-08-31 13:34:00 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-08-31 13:34:00 +0800 |
commit | 18c6a28ff021c9dc643091b5dc420b51183253cf (patch) | |
tree | 3017ffcfc03532289b1856f5f9a1a83012eb5d8e | |
parent | 123a7ee3bcf96c5bbef2ea16737d1a8e25f5ef30 (diff) | |
download | dexon-consensus-18c6a28ff021c9dc643091b5dc420b51183253cf.tar dexon-consensus-18c6a28ff021c9dc643091b5dc420b51183253cf.tar.gz dexon-consensus-18c6a28ff021c9dc643091b5dc420b51183253cf.tar.bz2 dexon-consensus-18c6a28ff021c9dc643091b5dc420b51183253cf.tar.lz dexon-consensus-18c6a28ff021c9dc643091b5dc420b51183253cf.tar.xz dexon-consensus-18c6a28ff021c9dc643091b5dc420b51183253cf.tar.zst dexon-consensus-18c6a28ff021c9dc643091b5dc420b51183253cf.zip |
Add methods to Application interface. (#86)
-rw-r--r-- | core/consensus.go | 1 | ||||
-rw-r--r-- | core/interfaces.go | 6 | ||||
-rw-r--r-- | core/nonblocking-application.go | 16 | ||||
-rw-r--r-- | core/nonblocking-application_test.go | 13 | ||||
-rw-r--r-- | core/test/app.go | 9 | ||||
-rw-r--r-- | simulation/app.go | 9 | ||||
-rw-r--r-- | simulation/validator.go | 94 |
7 files changed, 56 insertions, 92 deletions
diff --git a/core/consensus.go b/core/consensus.go index d6b5efd..4d1a386 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -245,6 +245,7 @@ func (con *Consensus) ProcessBlock(b *types.Block) (err error) { if err = con.rbModule.processBlock(b); err != nil { return err } + con.app.BlockConfirmed(b.Clone()) for _, b := range con.rbModule.extractBlocks() { // Notify application layer that some block is strongly acked. con.app.StronglyAcked(b.Hash) diff --git a/core/interfaces.go b/core/interfaces.go index 364f2da..4376742 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -30,6 +30,12 @@ type Application interface { // PreparePayload is called when consensus core is preparing a block. PreparePayloads(shardID, chainID, height uint64) [][]byte + // VerifyPayloads verifies if the payloads are valid. + VerifyPayloads(payloads [][]byte) bool + + // BlockConfirmed is called when a block is confirmed and added to lattice. + BlockConfirmed(block *types.Block) + // StronglyAcked is called when a block is strongly acked. StronglyAcked(blockHash common.Hash) diff --git a/core/nonblocking-application.go b/core/nonblocking-application.go index 72f63b9..fb25745 100644 --- a/core/nonblocking-application.go +++ b/core/nonblocking-application.go @@ -26,6 +26,10 @@ import ( "github.com/dexon-foundation/dexon-consensus-core/core/types" ) +type blockConfirmedEvent struct { + block *types.Block +} + type stronglyAckedEvent struct { blockHash common.Hash } @@ -90,6 +94,8 @@ func (app *nonBlockingApplication) run() { switch e := event.(type) { case stronglyAckedEvent: app.app.StronglyAcked(e.blockHash) + case blockConfirmedEvent: + app.app.BlockConfirmed(e.block) case totalOrderingDeliverEvent: app.app.TotalOrderingDeliver(e.blockHashes, e.early) case deliverBlockEvent: @@ -120,6 +126,16 @@ func (app *nonBlockingApplication) PreparePayloads( return app.app.PreparePayloads(shardID, chainID, height) } +// VerifyPayloads cannot be non-blocking. +func (app *nonBlockingApplication) VerifyPayloads(payloads [][]byte) bool { + return true +} + +// BlockConfirmed is called when a block is confirmed and added to lattice. +func (app *nonBlockingApplication) BlockConfirmed(block *types.Block) { + app.addEvent(blockConfirmedEvent{block}) +} + // StronglyAcked is called when a block is strongly acked. func (app *nonBlockingApplication) StronglyAcked(blockHash common.Hash) { app.addEvent(stronglyAckedEvent{blockHash}) diff --git a/core/nonblocking-application_test.go b/core/nonblocking-application_test.go index 14fb670..65c5700 100644 --- a/core/nonblocking-application_test.go +++ b/core/nonblocking-application_test.go @@ -29,6 +29,7 @@ import ( type slowApp struct { sleep time.Duration + blockConfirmed map[common.Hash]struct{} stronglyAcked map[common.Hash]struct{} totalOrderingDeliver map[common.Hash]struct{} deliverBlock map[common.Hash]struct{} @@ -38,6 +39,7 @@ type slowApp struct { func newSlowApp(sleep time.Duration) *slowApp { return &slowApp{ sleep: sleep, + blockConfirmed: make(map[common.Hash]struct{}), stronglyAcked: make(map[common.Hash]struct{}), totalOrderingDeliver: make(map[common.Hash]struct{}), deliverBlock: make(map[common.Hash]struct{}), @@ -49,6 +51,15 @@ func (app *slowApp) PreparePayloads(_, _, _ uint64) [][]byte { return [][]byte{} } +func (app *slowApp) VerifyPayloads(_ [][]byte) bool { + return true +} + +func (app *slowApp) BlockConfirmed(block *types.Block) { + time.Sleep(app.sleep) + app.blockConfirmed[block.Hash] = struct{}{} +} + func (app *slowApp) StronglyAcked(blockHash common.Hash) { time.Sleep(app.sleep) app.stronglyAcked[blockHash] = struct{}{} @@ -88,6 +99,7 @@ func (s *NonBlockingAppTestSuite) TestNonBlockingApplication() { // Start doing some 'heavy' job. for _, hash := range hashes { + nbapp.BlockConfirmed(&types.Block{Hash: hash}) nbapp.StronglyAcked(hash) nbapp.DeliverBlock(hash, time.Now().UTC()) nbapp.NotaryAckDeliver(&types.NotaryAck{Hash: hash}) @@ -99,6 +111,7 @@ func (s *NonBlockingAppTestSuite) TestNonBlockingApplication() { nbapp.wait() for _, hash := range hashes { + s.Contains(app.blockConfirmed, hash) s.Contains(app.stronglyAcked, hash) s.Contains(app.totalOrderingDeliver, hash) s.Contains(app.deliverBlock, hash) diff --git a/core/test/app.go b/core/test/app.go index e36a184..3ed65f7 100644 --- a/core/test/app.go +++ b/core/test/app.go @@ -107,6 +107,15 @@ func (app *App) PreparePayloads(shardID, chainID, height uint64) [][]byte { return [][]byte{} } +// VerifyPayloads implements Application. +func (app *App) VerifyPayloads(payloads [][]byte) bool { + return true +} + +// BlockConfirmed implements Application interface. +func (app *App) BlockConfirmed(block *types.Block) { +} + // StronglyAcked implements Application interface. func (app *App) StronglyAcked(blockHash common.Hash) { app.ackedLock.Lock() diff --git a/simulation/app.go b/simulation/app.go index 78e1d9a..f12290c 100644 --- a/simulation/app.go +++ b/simulation/app.go @@ -54,7 +54,8 @@ func newSimApp(id types.ValidatorID, Network PeerServerNetwork) *simApp { } } -func (a *simApp) addBlock(block *types.Block) { +// BlockConfirmed implements core.Application. +func (a *simApp) BlockConfirmed(block *types.Block) { a.blockByHashMutex.Lock() defer a.blockByHashMutex.Unlock() @@ -62,6 +63,11 @@ func (a *simApp) addBlock(block *types.Block) { a.blockByHash[block.Hash] = block } +// VerifyPayloads implements core.Application. +func (a *simApp) VerifyPayloads(payloads [][]byte) bool { + return true +} + // getAckedBlocks will return all unconfirmed blocks' hash with lower Height // than the block with ackHash. func (a *simApp) getAckedBlocks(ackHash common.Hash) (output common.Hashes) { @@ -101,7 +107,6 @@ func (a *simApp) StronglyAcked(blockHash common.Hash) { // TotalOrderingDeliver is called when blocks are delivered by the total // ordering algorithm. func (a *simApp) TotalOrderingDeliver(blockHashes common.Hashes, early bool) { - now := time.Now() blocks := make([]*types.Block, len(blockHashes)) a.blockByHashMutex.RLock() diff --git a/simulation/validator.go b/simulation/validator.go index a54c848..6d73c50 100644 --- a/simulation/validator.go +++ b/simulation/validator.go @@ -105,26 +105,15 @@ func (v *Validator) Run() { time.Duration(v.config.ProposeIntervalMean)*time.Millisecond), v.prvKey, v.sigToPub) - genesisBlock := &types.Block{ - ProposerID: v.ID, - ChainID: v.chainID, - } - err := v.consensus.PrepareGenesisBlock(genesisBlock, time.Now().UTC()) - if err != nil { - panic(err) - } - isStopped := make(chan struct{}, 2) + go v.consensus.Run() + isShutdown := make(chan struct{}) - v.app.addBlock(genesisBlock) - v.consensus.ProcessBlock(genesisBlock) - v.BroadcastGenesisBlock(genesisBlock) - go v.MsgServer(isStopped) go v.CheckServerInfo(isShutdown) - go v.BlockProposer(isStopped, isShutdown) // Blocks forever. - <-isStopped + <-isShutdown + v.consensus.Stop() if err := v.db.Close(); err != nil { fmt.Println(err) } @@ -151,78 +140,3 @@ func (v *Validator) CheckServerInfo(isShutdown chan struct{}) { time.Sleep(250 * time.Millisecond) } } - -// MsgServer listen to the network channel for message and handle it. -func (v *Validator) MsgServer( - isStopped chan struct{}) { - - for { - var msg interface{} - select { - case msg = <-v.msgChannel: - case <-isStopped: - return - } - - switch val := msg.(type) { - case *types.Block: - v.app.addBlock(val) - if err := v.consensus.ProcessBlock(val); err != nil { - fmt.Println(err) - } - types.RecycleBlock(val) - case *types.NotaryAck: - if err := v.consensus.ProcessNotaryAck(val); err != nil { - fmt.Println(err) - } - case *types.Vote: - if err := v.consensus.ProcessVote(val); err != nil { - fmt.Println(err) - } - } - } -} - -// BroadcastGenesisBlock broadcasts genesis block to all peers. -func (v *Validator) BroadcastGenesisBlock(genesisBlock *types.Block) { - // Wait until all peer joined the network. - for v.network.NumPeers() != v.config.Num { - time.Sleep(time.Second) - } - v.network.BroadcastBlock(genesisBlock) -} - -// BlockProposer propose blocks to be send to the DEXON network. -func (v *Validator) BlockProposer(isStopped, isShutdown chan struct{}) { - model := &NormalNetwork{ - Sigma: v.config.ProposeIntervalSigma, - Mean: v.config.ProposeIntervalMean, - } -ProposingBlockLoop: - for { - time.Sleep(model.Delay()) - - block := &types.Block{ - ProposerID: v.ID, - ChainID: v.chainID, - Hash: common.NewRandomHash(), - } - if err := v.consensus.PrepareBlock(block, time.Now().UTC()); err != nil { - panic(err) - } - v.app.addBlock(block) - if err := v.consensus.ProcessBlock(block); err != nil { - fmt.Println(err) - //panic(err) - } - v.network.BroadcastBlock(block) - select { - case <-isShutdown: - isStopped <- struct{}{} - isStopped <- struct{}{} - break ProposingBlockLoop - default: - break - } - } -} |