diff options
author | Mission Liao <mission.liao@dexon.org> | 2018-09-25 14:10:16 +0800 |
---|---|---|
committer | Wei-Ning Huang <aitjcize@gmail.com> | 2018-09-25 14:10:16 +0800 |
commit | 6c8d26d2e797e8420fc3de4b15e4c556f968aba0 (patch) | |
tree | 22beecc01da7a9ce5cac36135a89010d6d4ed4f2 | |
parent | ca935bdbac190766f29fb73433a82ee5806bc8f9 (diff) | |
download | dexon-consensus-6c8d26d2e797e8420fc3de4b15e4c556f968aba0.tar dexon-consensus-6c8d26d2e797e8420fc3de4b15e4c556f968aba0.tar.gz dexon-consensus-6c8d26d2e797e8420fc3de4b15e4c556f968aba0.tar.bz2 dexon-consensus-6c8d26d2e797e8420fc3de4b15e4c556f968aba0.tar.lz dexon-consensus-6c8d26d2e797e8420fc3de4b15e4c556f968aba0.tar.xz dexon-consensus-6c8d26d2e797e8420fc3de4b15e4c556f968aba0.tar.zst dexon-consensus-6c8d26d2e797e8420fc3de4b15e4c556f968aba0.zip |
core: add debug (#133)
* Split interface
* Rename nonblocking-application to nonblocking
Parts needs nonblocking gets more.
* Implement core.nonBlocking based on interface split
* Fix: the witness parent hash could be parent on compaction chain.
* Rename Application.DeliverBlock to BlockDeliver
To sync with naming of other methods.
* Change methods' fingerprint
- BlockConfirmed provides block hash only.
- BlockDeliver provde a whole block.
-rw-r--r-- | core/compaction-chain.go | 23 | ||||
-rw-r--r-- | core/consensus.go | 28 | ||||
-rw-r--r-- | core/consensus_test.go | 48 | ||||
-rw-r--r-- | core/interfaces.go | 26 | ||||
-rw-r--r-- | core/nonblocking.go (renamed from core/nonblocking-application.go) | 124 | ||||
-rw-r--r-- | core/nonblocking_test.go (renamed from core/nonblocking-application_test.go) | 41 | ||||
-rw-r--r-- | core/shard.go | 13 | ||||
-rw-r--r-- | core/shard_test.go | 3 | ||||
-rw-r--r-- | core/test/app.go | 12 | ||||
-rw-r--r-- | core/test/app_test.go | 21 | ||||
-rw-r--r-- | core/test/stopper_test.go | 4 | ||||
-rw-r--r-- | integration_test/node.go | 3 | ||||
-rw-r--r-- | simulation/app.go | 83 |
13 files changed, 190 insertions, 239 deletions
diff --git a/core/compaction-chain.go b/core/compaction-chain.go index f5c5548..4405bbc 100644 --- a/core/compaction-chain.go +++ b/core/compaction-chain.go @@ -107,29 +107,6 @@ func (cc *compactionChain) processBlock(block *types.Block) error { return nil } -func (cc *compactionChain) processWitnessResult( - block *types.Block, result types.WitnessResult) error { - block.Witness.Data = result.Data - - // block is a genesis block, no need to update witness parent hash. - if block.IsGenesis() { - return nil - } - - prevBlock, err := cc.db.Get(block.ParentHash) - if err != nil { - return err - } - - hash, err := hashWitness(&prevBlock) - if err != nil { - return err - } - - block.Witness.ParentHash = hash - return nil -} - func (cc *compactionChain) prepareWitnessAck( block *types.Block, prvKey crypto.PrivateKey) ( witnessAck *types.WitnessAck, err error) { diff --git a/core/consensus.go b/core/consensus.go index dc5bbba..1758507 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -172,7 +172,7 @@ func (recv *consensusDKGReceiver) ProposeDKGAntiNackComplaint( // Consensus implements DEXON Consensus algorithm. type Consensus struct { ID types.NodeID - app Application + nbModule *nonBlocking gov Governance config *types.Config baModules []*agreement @@ -239,13 +239,15 @@ func NewConsensus( len(gov.GetNotarySet())/3, sigToPub) + // Check if the application implement Debug interface. + debug, _ := app.(Debug) con := &Consensus{ ID: ID, rbModule: rb, toModule: to, ctModule: newConsensusTimestamp(), ccModule: newCompactionChain(db, sigToPub), - app: newNonBlockingApplication(app), + nbModule: newNonBlocking(app, debug), gov: gov, config: config, db: db, @@ -530,7 +532,7 @@ func (con *Consensus) ProcessVote(vote *types.Vote) (err error) { // processWitnessData process witness acks. func (con *Consensus) processWitnessData() { - ch := con.app.BlockProcessedChan() + ch := con.nbModule.BlockProcessedChan() for { select { @@ -541,14 +543,10 @@ func (con *Consensus) processWitnessData() { if err != nil { panic(err) } - - if err = con.ccModule.processWitnessResult(&block, result); err != nil { - panic(err) - } + block.Witness.Data = result.Data if err := con.db.Update(block); err != nil { panic(err) } - // TODO(w): move the acking interval into governance. if block.Witness.Height%5 != 0 { continue @@ -562,7 +560,7 @@ func (con *Consensus) processWitnessData() { if err != nil { panic(err) } - con.app.WitnessAckDeliver(witnessAck) + con.nbModule.WitnessAckDeliver(witnessAck) } } } @@ -625,10 +623,10 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { if err = con.rbModule.processBlock(b); err != nil { return err } - con.app.BlockConfirmed(block) + con.nbModule.BlockConfirmed(block.Hash) for _, b := range con.rbModule.extractBlocks() { // Notify application layer that some block is strongly acked. - con.app.StronglyAcked(b.Hash) + con.nbModule.StronglyAcked(b.Hash) // Perform total ordering. deliveredBlocks, earlyDelivered, err = con.toModule.processBlock(b) if err != nil { @@ -647,7 +645,7 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { for idx := range deliveredBlocks { hashes[idx] = deliveredBlocks[idx].Hash } - con.app.TotalOrderingDeliver(hashes, earlyDelivered) + con.nbModule.TotalOrderingDeliver(hashes, earlyDelivered) // Perform timestamp generation. err = con.ctModule.processBlocks(deliveredBlocks) if err != nil { @@ -660,10 +658,10 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { if err = con.db.Update(*b); err != nil { return } - con.app.DeliverBlock(b.Hash, b.Witness.Timestamp) + con.nbModule.BlockDeliver(*b) // TODO(mission): Find a way to safely recycle the block. // We should deliver block directly to - // nonBlockingApplication and let them recycle the + // nonBlocking and let them recycle the // block. } } @@ -690,7 +688,7 @@ func (con *Consensus) prepareBlock(b *types.Block, con.rbModule.prepareBlock(b) b.Timestamp = proposeTime - b.Payload = con.app.PreparePayload(b.Position) + b.Payload = con.nbModule.PreparePayload(b.Position) b.Hash, err = hashBlock(b) if err != nil { return diff --git a/core/consensus_test.go b/core/consensus_test.go index c5ef452..bd95a00 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -81,7 +81,7 @@ func (s *ConsensusTestSuite) prepareGenesisBlock( } func (s *ConsensusTestSuite) prepareConsensus( - gov *test.Governance, nID types.NodeID) (*Application, *Consensus) { + gov *test.Governance, nID types.NodeID) (*test.App, *Consensus) { app := test.NewApp() db, err := blockdb.NewMemBackedBlockDB() @@ -89,7 +89,7 @@ func (s *ConsensusTestSuite) prepareConsensus( prv, exist := gov.GetPrivateKey(nID) s.Require().Nil(exist) con := NewConsensus(app, gov, db, &network{}, prv, eth.SigToPub) - return &con.app, con + return app, con } func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { @@ -118,13 +118,13 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { // Setup core.Consensus and test.App. objs := map[types.NodeID]*struct { - app *Application + app *test.App con *Consensus }{} for _, nID := range nodes { app, con := s.prepareConsensus(gov, nID) objs[nID] = &struct { - app *Application + app *test.App con *Consensus }{app, con} } @@ -309,14 +309,8 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { req.Equal(t, app.Delivered[b11.Hash].ConsensusTime) } for _, obj := range objs { - app := *obj.app - if nbapp, ok := app.(*nonBlockingApplication); ok { - nbapp.wait() - app = nbapp.app - } - testApp, ok := app.(*test.App) - s.Require().True(ok) - verify(testApp) + obj.con.nbModule.wait() + verify(obj.app) } } @@ -338,23 +332,16 @@ func (s *ConsensusTestSuite) TestPrepareBlock() { nodes = append(nodes, nID) } // Setup core.Consensus and test.App. - objs := map[types.NodeID]*struct { - app *Application - con *Consensus - }{} + cons := map[types.NodeID]*Consensus{} for _, nID := range nodes { - app, con := s.prepareConsensus(gov, nID) - objs[nID] = &struct { - app *Application - con *Consensus - }{app, con} + _, con := s.prepareConsensus(gov, nID) + cons[nID] = con } - b00 := s.prepareGenesisBlock(nodes[0], 0, objs[nodes[0]].con) - b10 := s.prepareGenesisBlock(nodes[1], 1, objs[nodes[1]].con) - b20 := s.prepareGenesisBlock(nodes[2], 2, objs[nodes[2]].con) - b30 := s.prepareGenesisBlock(nodes[3], 3, objs[nodes[3]].con) - for _, obj := range objs { - con := obj.con + b00 := s.prepareGenesisBlock(nodes[0], 0, cons[nodes[0]]) + b10 := s.prepareGenesisBlock(nodes[1], 1, cons[nodes[1]]) + b20 := s.prepareGenesisBlock(nodes[2], 2, cons[nodes[2]]) + b30 := s.prepareGenesisBlock(nodes[3], 3, cons[nodes[3]]) + for _, con := range cons { req.Nil(con.processBlock(b00)) req.Nil(con.processBlock(b10)) req.Nil(con.processBlock(b20)) @@ -365,20 +352,19 @@ func (s *ConsensusTestSuite) TestPrepareBlock() { } // Sleep to make sure 'now' is slower than b10's timestamp. time.Sleep(100 * time.Millisecond) - req.Nil(objs[nodes[1]].con.prepareBlock(b11, time.Now().UTC())) + req.Nil(cons[nodes[1]].prepareBlock(b11, time.Now().UTC())) // Make sure we would assign 'now' to the timestamp belongs to // the proposer. req.True( b11.Timestamp.Sub( b10.Timestamp) > 100*time.Millisecond) - for _, obj := range objs { - con := obj.con + for _, con := range cons { req.Nil(con.processBlock(b11)) } b12 := &types.Block{ ProposerID: nodes[1], } - req.Nil(objs[nodes[1]].con.prepareBlock(b12, time.Now().UTC())) + req.Nil(cons[nodes[1]].prepareBlock(b12, time.Now().UTC())) req.Len(b12.Acks, 1) req.Contains(b12.Acks, b11.Hash) } diff --git a/core/interfaces.go b/core/interfaces.go index 5e1002c..03caa63 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -33,8 +33,22 @@ type Application interface { // VerifyPayloads verifies if the payloads are valid. VerifyPayloads(payloads []byte) bool + // BlockDeliver is called when a block is add to the compaction chain. + BlockDeliver(block types.Block) + + // BlockProcessedChan returns a channel to receive the block hashes that have + // finished processing by the application. + BlockProcessedChan() <-chan types.WitnessResult + + // WitnessAckDeliver is called when a witness ack is created. + WitnessAckDeliver(witnessAck *types.WitnessAck) +} + +// Debug describes the application interface that requires +// more detailed consensus execution. +type Debug interface { // BlockConfirmed is called when a block is confirmed and added to lattice. - BlockConfirmed(block *types.Block) + BlockConfirmed(blockHash common.Hash) // StronglyAcked is called when a block is strongly acked. StronglyAcked(blockHash common.Hash) @@ -42,16 +56,6 @@ type Application interface { // TotalOrderingDeliver is called when the total ordering algorithm deliver // a set of block. TotalOrderingDeliver(blockHashes common.Hashes, early bool) - - // DeliverBlock is called when a block is add to the compaction chain. - DeliverBlock(blockHash common.Hash, timestamp time.Time) - - // BlockProcessedChan returns a channel to receive the block hashes that have - // finished processing by the application. - BlockProcessedChan() <-chan types.WitnessResult - - // WitnessAckDeliver is called when a witness ack is created. - WitnessAckDeliver(witnessAck *types.WitnessAck) } // Network describs the network interface that interacts with DEXON consensus diff --git a/core/nonblocking-application.go b/core/nonblocking.go index 98f92fc..5d7311e 100644 --- a/core/nonblocking-application.go +++ b/core/nonblocking.go @@ -20,14 +20,13 @@ package core import ( "fmt" "sync" - "time" "github.com/dexon-foundation/dexon-consensus-core/common" "github.com/dexon-foundation/dexon-consensus-core/core/types" ) type blockConfirmedEvent struct { - block *types.Block + blockHash common.Hash } type stronglyAckedEvent struct { @@ -39,128 +38,137 @@ type totalOrderingDeliverEvent struct { early bool } -type deliverBlockEvent struct { - blockHash common.Hash - timestamp time.Time +type blockDeliverEvent struct { + block *types.Block } type witnessAckEvent struct { witnessAck *types.WitnessAck } -// nonBlockingApplication implements Application and is a decorator for -// Application that makes the methods to be non-blocking. -type nonBlockingApplication struct { +// nonBlocking implements these interfaces and is a decorator for +// them that makes the methods to be non-blocking. +// - Application +// - Debug +// - It also provides nonblockig for blockdb update. +type nonBlocking struct { app Application + debug Debug eventChan chan interface{} events []interface{} eventsChange *sync.Cond running sync.WaitGroup } -func newNonBlockingApplication(app Application) *nonBlockingApplication { - nonBlockingApp := &nonBlockingApplication{ +func newNonBlocking(app Application, debug Debug) *nonBlocking { + nonBlockingModule := &nonBlocking{ app: app, + debug: debug, eventChan: make(chan interface{}, 6), events: make([]interface{}, 0, 100), eventsChange: sync.NewCond(&sync.Mutex{}), } - go nonBlockingApp.run() - return nonBlockingApp + go nonBlockingModule.run() + return nonBlockingModule } -func (app *nonBlockingApplication) addEvent(event interface{}) { - app.eventsChange.L.Lock() - defer app.eventsChange.L.Unlock() - app.events = append(app.events, event) - app.eventsChange.Broadcast() +func (nb *nonBlocking) addEvent(event interface{}) { + nb.eventsChange.L.Lock() + defer nb.eventsChange.L.Unlock() + nb.events = append(nb.events, event) + nb.eventsChange.Broadcast() } -func (app *nonBlockingApplication) run() { +func (nb *nonBlocking) run() { // This go routine consume the first event from events and call the - // corresponding method of app. + // corresponding methods of Application/Debug/blockdb. for { var event interface{} func() { - app.eventsChange.L.Lock() - defer app.eventsChange.L.Unlock() - for len(app.events) == 0 { - app.eventsChange.Wait() + nb.eventsChange.L.Lock() + defer nb.eventsChange.L.Unlock() + for len(nb.events) == 0 { + nb.eventsChange.Wait() } - event = app.events[0] - app.events = app.events[1:] - app.running.Add(1) + event = nb.events[0] + nb.events = nb.events[1:] + nb.running.Add(1) }() switch e := event.(type) { case stronglyAckedEvent: - app.app.StronglyAcked(e.blockHash) + nb.debug.StronglyAcked(e.blockHash) case blockConfirmedEvent: - app.app.BlockConfirmed(e.block) + nb.debug.BlockConfirmed(e.blockHash) case totalOrderingDeliverEvent: - app.app.TotalOrderingDeliver(e.blockHashes, e.early) - case deliverBlockEvent: - app.app.DeliverBlock(e.blockHash, e.timestamp) + nb.debug.TotalOrderingDeliver(e.blockHashes, e.early) + case blockDeliverEvent: + nb.app.BlockDeliver(*e.block) case witnessAckEvent: - app.app.WitnessAckDeliver(e.witnessAck) + nb.app.WitnessAckDeliver(e.witnessAck) default: fmt.Printf("Unknown event %v.", e) } - app.running.Done() - app.eventsChange.Broadcast() + nb.running.Done() + nb.eventsChange.Broadcast() } } // wait will wait for all event in events finishes. -func (app *nonBlockingApplication) wait() { - app.eventsChange.L.Lock() - defer app.eventsChange.L.Unlock() - for len(app.events) > 0 { - app.eventsChange.Wait() +func (nb *nonBlocking) wait() { + nb.eventsChange.L.Lock() + defer nb.eventsChange.L.Unlock() + for len(nb.events) > 0 { + nb.eventsChange.Wait() } - app.running.Wait() + nb.running.Wait() } // PreparePayload cannot be non-blocking. -func (app *nonBlockingApplication) PreparePayload( +func (nb *nonBlocking) PreparePayload( position types.Position) []byte { - return app.app.PreparePayload(position) + return nb.app.PreparePayload(position) } // VerifyPayloads cannot be non-blocking. -func (app *nonBlockingApplication) VerifyPayloads(payloads []byte) bool { - return app.app.VerifyPayloads(payloads) +func (nb *nonBlocking) VerifyPayloads(payloads []byte) bool { + return nb.app.VerifyPayloads(payloads) } // BlockConfirmed is called when a block is confirmed and added to lattice. -func (app *nonBlockingApplication) BlockConfirmed(block *types.Block) { - app.addEvent(blockConfirmedEvent{block}) +func (nb *nonBlocking) BlockConfirmed(blockHash common.Hash) { + if nb.debug != nil { + nb.addEvent(blockConfirmedEvent{blockHash}) + } } // StronglyAcked is called when a block is strongly acked. -func (app *nonBlockingApplication) StronglyAcked(blockHash common.Hash) { - app.addEvent(stronglyAckedEvent{blockHash}) +func (nb *nonBlocking) StronglyAcked(blockHash common.Hash) { + if nb.debug != nil { + nb.addEvent(stronglyAckedEvent{blockHash}) + } } // TotalOrderingDeliver is called when the total ordering algorithm deliver // a set of block. -func (app *nonBlockingApplication) TotalOrderingDeliver( +func (nb *nonBlocking) TotalOrderingDeliver( blockHashes common.Hashes, early bool) { - app.addEvent(totalOrderingDeliverEvent{blockHashes, early}) + if nb.debug != nil { + nb.addEvent(totalOrderingDeliverEvent{blockHashes, early}) + } } -// DeliverBlock is called when a block is add to the compaction chain. -func (app *nonBlockingApplication) DeliverBlock( - blockHash common.Hash, timestamp time.Time) { - app.addEvent(deliverBlockEvent{blockHash, timestamp}) +// BlockDeliver is called when a block is add to the compaction chain. +func (nb *nonBlocking) BlockDeliver(block types.Block) { + nb.addEvent(blockDeliverEvent{&block}) } // BlockProcessedChan returns a channel to receive the block hashes that have // finished processing by the application. -func (app *nonBlockingApplication) BlockProcessedChan() <-chan types.WitnessResult { - return app.app.BlockProcessedChan() +func (nb *nonBlocking) BlockProcessedChan() <-chan types.WitnessResult { + return nb.app.BlockProcessedChan() } // WitnessAckDeliver is called when a witness ack is created. -func (app *nonBlockingApplication) WitnessAckDeliver(witnessAck *types.WitnessAck) { - app.addEvent(witnessAckEvent{witnessAck}) +func (nb *nonBlocking) WitnessAckDeliver(witnessAck *types.WitnessAck) { + nb.addEvent(witnessAckEvent{witnessAck}) } diff --git a/core/nonblocking-application_test.go b/core/nonblocking_test.go index 52757d3..f7dc9bc 100644 --- a/core/nonblocking-application_test.go +++ b/core/nonblocking_test.go @@ -32,7 +32,7 @@ type slowApp struct { blockConfirmed map[common.Hash]struct{} stronglyAcked map[common.Hash]struct{} totalOrderingDeliver map[common.Hash]struct{} - deliverBlock map[common.Hash]struct{} + blockDeliver map[common.Hash]struct{} witnessAck map[common.Hash]struct{} witnessResultChan chan types.WitnessResult } @@ -43,7 +43,7 @@ func newSlowApp(sleep time.Duration) *slowApp { blockConfirmed: make(map[common.Hash]struct{}), stronglyAcked: make(map[common.Hash]struct{}), totalOrderingDeliver: make(map[common.Hash]struct{}), - deliverBlock: make(map[common.Hash]struct{}), + blockDeliver: make(map[common.Hash]struct{}), witnessAck: make(map[common.Hash]struct{}), witnessResultChan: make(chan types.WitnessResult), } @@ -57,9 +57,9 @@ func (app *slowApp) VerifyPayloads(_ []byte) bool { return true } -func (app *slowApp) BlockConfirmed(block *types.Block) { +func (app *slowApp) BlockConfirmed(blockHash common.Hash) { time.Sleep(app.sleep) - app.blockConfirmed[block.Hash] = struct{}{} + app.blockConfirmed[blockHash] = struct{}{} } func (app *slowApp) StronglyAcked(blockHash common.Hash) { @@ -74,9 +74,9 @@ func (app *slowApp) TotalOrderingDeliver(blockHashes common.Hashes, early bool) } } -func (app *slowApp) DeliverBlock(blockHash common.Hash, timestamp time.Time) { +func (app *slowApp) BlockDeliver(block types.Block) { time.Sleep(app.sleep) - app.deliverBlock[blockHash] = struct{}{} + app.blockDeliver[block.Hash] = struct{}{} } func (app *slowApp) BlockProcessedChan() <-chan types.WitnessResult { @@ -88,14 +88,14 @@ func (app *slowApp) WitnessAckDeliver(witnessAck *types.WitnessAck) { app.witnessAck[witnessAck.Hash] = struct{}{} } -type NonBlockingAppTestSuite struct { +type NonBlockingTestSuite struct { suite.Suite } -func (s *NonBlockingAppTestSuite) TestNonBlockingApplication() { +func (s *NonBlockingTestSuite) TestNonBlocking() { sleep := 50 * time.Millisecond app := newSlowApp(sleep) - nbapp := newNonBlockingApplication(app) + nbModule := newNonBlocking(app, app) hashes := make(common.Hashes, 10) for idx := range hashes { hashes[idx] = common.NewRandomHash() @@ -105,26 +105,29 @@ func (s *NonBlockingAppTestSuite) TestNonBlockingApplication() { // Start doing some 'heavy' job. for _, hash := range hashes { - nbapp.BlockConfirmed(&types.Block{Hash: hash}) - nbapp.StronglyAcked(hash) - nbapp.DeliverBlock(hash, time.Now().UTC()) - nbapp.WitnessAckDeliver(&types.WitnessAck{Hash: hash}) + nbModule.BlockConfirmed(hash) + nbModule.StronglyAcked(hash) + nbModule.BlockDeliver(types.Block{ + Hash: hash, + Witness: types.Witness{Timestamp: time.Now().UTC()}, + }) + nbModule.WitnessAckDeliver(&types.WitnessAck{Hash: hash}) } - nbapp.TotalOrderingDeliver(hashes, true) + nbModule.TotalOrderingDeliver(hashes, true) - // nonBlockingApplication should be non-blocking. + // nonBlocking should be non-blocking. s.True(shouldFinish.After(time.Now().UTC())) - nbapp.wait() + nbModule.wait() for _, hash := range hashes { s.Contains(app.blockConfirmed, hash) s.Contains(app.stronglyAcked, hash) s.Contains(app.totalOrderingDeliver, hash) - s.Contains(app.deliverBlock, hash) + s.Contains(app.blockDeliver, hash) s.Contains(app.witnessAck, hash) } } -func TestNonBlockingApplication(t *testing.T) { - suite.Run(t, new(NonBlockingAppTestSuite)) +func TestNonBlocking(t *testing.T) { + suite.Run(t, new(NonBlockingTestSuite)) } diff --git a/core/shard.go b/core/shard.go index b6a17e1..13c73d5 100644 --- a/core/shard.go +++ b/core/shard.go @@ -36,6 +36,7 @@ type Shard struct { sigToPub SigToPubFn chainNum uint32 app Application + debug Debug db blockdb.BlockDatabase pool blockPool lattice *blockLattice @@ -50,6 +51,7 @@ func NewShard( prvKey crypto.PrivateKey, sigToPub SigToPubFn, app Application, + debug Debug, db blockdb.BlockDatabase) (s *Shard) { s = &Shard{ @@ -59,6 +61,7 @@ func NewShard( sigToPub: sigToPub, chainNum: cfg.NumChains, app: app, + debug: debug, db: db, pool: newBlockPool(cfg.NumChains), lattice: newBlockLattice(ID, cfg.NumChains), @@ -155,8 +158,10 @@ func (s *Shard) ProcessBlock( } // TODO(mission): remove this hack, BA related stuffs should not // be done here. - s.app.StronglyAcked(input.Hash) - s.app.BlockConfirmed(input) + if s.debug != nil { + s.debug.StronglyAcked(input.Hash) + s.debug.BlockConfirmed(input.Hash) + } // Purge blocks in pool with the same chainID and lower height. s.pool.purgeBlocks(input.Position.ChainID, input.Position.Height) // Replay tips in pool to check their validity. @@ -186,7 +191,9 @@ func (s *Shard) ProcessBlock( for idx := range toDelivered { hashes[idx] = toDelivered[idx].Hash } - s.app.TotalOrderingDeliver(hashes, earlyDelivered) + if s.debug != nil { + s.debug.TotalOrderingDeliver(hashes, earlyDelivered) + } // Perform timestamp generation. if err = s.ctModule.processBlocks(toDelivered); err != nil { return diff --git a/core/shard_test.go b/core/shard_test.go index db341a3..b332f30 100644 --- a/core/shard_test.go +++ b/core/shard_test.go @@ -77,7 +77,7 @@ func (mgr *testShardMgr) processBlock(b *types.Block) (err error) { if err = mgr.db.Update(*b); err != nil { return } - mgr.app.DeliverBlock(b.Hash, b.Witness.Timestamp) + mgr.app.BlockDeliver(*b) } // Update pending blocks for verified block (pass sanity check). pendings = append(pendings, verified...) @@ -110,6 +110,7 @@ func (s *ShardTestSuite) newTestShardMgr(cfg *types.Config) *testShardMgr { prvKey, eth.SigToPub, app, + app, db)} } diff --git a/core/test/app.go b/core/test/app.go index 617bc38..7a0ad97 100644 --- a/core/test/app.go +++ b/core/test/app.go @@ -115,7 +115,7 @@ func (app *App) VerifyPayloads(payloads []byte) bool { } // BlockConfirmed implements Application interface. -func (app *App) BlockConfirmed(block *types.Block) { +func (app *App) BlockConfirmed(_ common.Hash) { } // StronglyAcked implements Application interface. @@ -145,16 +145,16 @@ func (app *App) TotalOrderingDeliver(blockHashes common.Hashes, early bool) { } } -// DeliverBlock implements Application interface. -func (app *App) DeliverBlock(blockHash common.Hash, timestamp time.Time) { +// BlockDeliver implements Application interface. +func (app *App) BlockDeliver(block types.Block) { app.deliveredLock.Lock() defer app.deliveredLock.Unlock() - app.Delivered[blockHash] = &AppDeliveredRecord{ - ConsensusTime: timestamp, + app.Delivered[block.Hash] = &AppDeliveredRecord{ + ConsensusTime: block.Witness.Timestamp, When: time.Now().UTC(), } - app.DeliverSequence = append(app.DeliverSequence, blockHash) + app.DeliverSequence = append(app.DeliverSequence, block.Hash) } // BlockProcessedChan returns a channel to receive the block hashes that have diff --git a/core/test/app_test.go b/core/test/app_test.go index 0003852..649ccbe 100644 --- a/core/test/app_test.go +++ b/core/test/app_test.go @@ -73,9 +73,18 @@ func (s *AppTestSuite) setupAppByTotalOrderDeliver( func (s *AppTestSuite) deliverBlockWithTimeFromSequenceLength( app *App, hash common.Hash) { - app.DeliverBlock( - hash, - time.Time{}.Add(time.Duration(len(app.DeliverSequence))*time.Second)) + s.deliverBlock(app, hash, time.Time{}.Add( + time.Duration(len(app.DeliverSequence))*time.Second)) +} + +func (s *AppTestSuite) deliverBlock( + app *App, hash common.Hash, timestamp time.Time) { + + app.BlockDeliver(types.Block{ + Hash: hash, + Witness: types.Witness{ + Timestamp: timestamp, + }}) } func (s *AppTestSuite) TestCompare() { @@ -105,7 +114,7 @@ func (s *AppTestSuite) TestCompare() { wrongTime := time.Time{}.Add( time.Duration(len(app3.DeliverSequence)) * time.Second) wrongTime = wrongTime.Add(1 * time.Second) - app3.DeliverBlock(s.to3.BlockHashes[0], wrongTime) + s.deliverBlock(app3, s.to3.BlockHashes[0], wrongTime) req.Equal(ErrMismatchConsensusTime, app1.Compare(app3)) req.Equal(ErrMismatchConsensusTime, app3.Compare(app1)) // An App without any delivered blocks. @@ -124,7 +133,7 @@ func (s *AppTestSuite) TestVerify() { s.setupAppByTotalOrderDeliver(app1, s.to3) req.Nil(app1.Verify()) // A delivered block without strongly ack - app1.DeliverBlock(common.NewRandomHash(), time.Time{}) + s.deliverBlock(app1, common.NewRandomHash(), time.Time{}) req.Equal(ErrDeliveredBlockNotAcked, app1.Verify()) // The consensus time is out of order. app2 := NewApp() @@ -133,7 +142,7 @@ func (s *AppTestSuite) TestVerify() { app2.StronglyAcked(h) } app2.TotalOrderingDeliver(s.to2.BlockHashes, s.to2.Early) - app2.DeliverBlock(s.to2.BlockHashes[0], time.Time{}) + s.deliverBlock(app2, s.to2.BlockHashes[0], time.Time{}) req.Equal(ErrConsensusTimestampOutOfOrder, app2.Verify()) // A delivered block is not found in total ordering delivers. app3 := NewApp() diff --git a/core/test/stopper_test.go b/core/test/stopper_test.go index 262e178..cb52032 100644 --- a/core/test/stopper_test.go +++ b/core/test/stopper_test.go @@ -61,7 +61,9 @@ func (s *StopperTestSuite) TestStopByConfirmedBlocks() { } app.TotalOrderingDeliver(hashes, false) for _, h := range hashes { - app.DeliverBlock(h, time.Time{}) + app.BlockDeliver(types.Block{ + Hash: h, + Witness: types.Witness{Timestamp: time.Time{}}}) } } } diff --git a/integration_test/node.go b/integration_test/node.go index 3193d99..2280b18 100644 --- a/integration_test/node.go +++ b/integration_test/node.go @@ -118,6 +118,7 @@ func NewNode( privateKey, eth.SigToPub, app, + app, db), } } @@ -203,7 +204,7 @@ func (n *Node) processBlock(b *types.Block) (err error) { if err = n.db.Update(*b); err != nil { return } - n.app.DeliverBlock(b.Hash, b.Witness.Timestamp) + n.app.BlockDeliver(*b) } // Update pending blocks for verified block (pass sanity check). pendings = append(pendings, verified...) diff --git a/simulation/app.go b/simulation/app.go index d00cf19..36567bf 100644 --- a/simulation/app.go +++ b/simulation/app.go @@ -57,12 +57,7 @@ func newSimApp(id types.NodeID, netModule *network) *simApp { } // BlockConfirmed implements core.Application. -func (a *simApp) BlockConfirmed(block *types.Block) { - a.blockByHashMutex.Lock() - defer a.blockByHashMutex.Unlock() - - // TODO(jimmy-dexon) : Remove block in this hash if it's no longer needed. - a.blockByHash[block.Hash] = block +func (a *simApp) BlockConfirmed(_ common.Hash) { } // VerifyPayloads implements core.Application. @@ -109,78 +104,38 @@ func (a *simApp) StronglyAcked(blockHash common.Hash) { // TotalOrderingDeliver is called when blocks are delivered by the total // ordering algorithm. func (a *simApp) TotalOrderingDeliver(blockHashes common.Hashes, early bool) { - now := time.Now() - blocks := make([]*types.Block, len(blockHashes)) - a.blockByHashMutex.RLock() - defer a.blockByHashMutex.RUnlock() - for idx := range blockHashes { - blocks[idx] = a.blockByHash[blockHashes[idx]] - } - a.Outputs = blocks - a.Early = early - fmt.Println("OUTPUT", a.NodeID, a.Early, a.Outputs) - - confirmLatency := []time.Duration{} - - payload := []timestampMessage{} - for _, block := range blocks { - if block.ProposerID == a.NodeID { - confirmLatency = append(confirmLatency, - now.Sub(block.Timestamp)) - } - for _, hash := range block.Acks { - for _, blockHash := range a.getAckedBlocks(hash) { - payload = append(payload, timestampMessage{ - BlockHash: blockHash, - Event: timestampAck, - Timestamp: now, - }) - delete(a.blockSeen, block.Hash) - } - } - } - if len(payload) > 0 { - jsonPayload, err := json.Marshal(payload) - if err != nil { - fmt.Println(err) - } else { - msg := &message{ - Type: blockTimestamp, - Payload: jsonPayload, - } - a.netModule.report(msg) - } - } - + fmt.Println("OUTPUT", a.NodeID, early, blockHashes) blockList := &BlockList{ - ID: a.DeliverID, - BlockHash: blockHashes, - ConfirmLatency: confirmLatency, + ID: a.DeliverID, + BlockHash: blockHashes, } a.netModule.report(blockList) a.DeliverID++ - for _, block := range blocks { - a.blockSeen[block.Hash] = now - a.unconfirmedBlocks[block.ProposerID] = append( - a.unconfirmedBlocks[block.ProposerID], block.Hash) - } } -// DeliverBlock is called when a block in compaction chain is delivered. -func (a *simApp) DeliverBlock(blockHash common.Hash, timestamp time.Time) { - seenTime, exist := a.blockSeen[blockHash] +// BlockDeliver is called when a block in compaction chain is delivered. +func (a *simApp) BlockDeliver(block types.Block) { + func() { + a.blockByHashMutex.Lock() + defer a.blockByHashMutex.Unlock() + + // TODO(jimmy-dexon) : Remove block in this hash if it's no longer needed. + a.blockByHash[block.Hash] = &block + }() + + seenTime, exist := a.blockSeen[block.Hash] if !exist { return } now := time.Now() payload := []timestampMessage{ { - BlockHash: blockHash, + BlockHash: block.Hash, Event: blockSeen, Timestamp: seenTime, }, { - BlockHash: blockHash, + BlockHash: block.Hash, Event: timestampConfirm, Timestamp: now, }, @@ -198,8 +153,8 @@ func (a *simApp) DeliverBlock(blockHash common.Hash, timestamp time.Time) { go func() { a.witnessResultChan <- types.WitnessResult{ - BlockHash: blockHash, - Data: []byte(fmt.Sprintf("Block %s", blockHash)), + BlockHash: block.Hash, + Data: []byte(fmt.Sprintf("Block %s", block.Hash)), } }() } |