diff options
-rw-r--r-- | core/lattice.go | 65 | ||||
-rw-r--r-- | core/total-ordering-syncer_test.go | 14 | ||||
-rw-r--r-- | core/total-ordering.go | 26 | ||||
-rw-r--r-- | core/total-ordering_test.go | 79 |
4 files changed, 110 insertions, 74 deletions
diff --git a/core/lattice.go b/core/lattice.go index 2b1555d..b30306a 100644 --- a/core/lattice.go +++ b/core/lattice.go @@ -245,26 +245,27 @@ func (l *Lattice) ProcessBlock( toDelivered []*types.Block deliveredMode uint32 ) - l.lock.Lock() defer l.lock.Unlock() - if inLattice, err = l.addBlockToLattice(input); err != nil { return } - if len(inLattice) == 0 { return } - for _, b = range inLattice { - toDelivered, deliveredMode, err = l.toModule.processBlock(b) - if err != nil { + if err = l.toModule.addBlock(b); err != nil { // All errors from total ordering is serious, should panic. panic(err) } + } + for { + toDelivered, deliveredMode, err = l.toModule.extractBlocks() + if err != nil { + panic(err) + } if len(toDelivered) == 0 { - continue + break } hashes := make(common.Hashes, len(toDelivered)) for idx := range toDelivered { @@ -275,7 +276,7 @@ func (l *Lattice) ProcessBlock( } // Perform consensus timestamp module. if err = l.ctModule.processBlocks(toDelivered); err != nil { - return + break } delivered = append(delivered, toDelivered...) } @@ -317,32 +318,40 @@ func (l *Lattice) AppendConfig(round uint64, config *types.Config) (err error) { } // ProcessFinalizedBlock is used for syncing lattice data. -func (l *Lattice) ProcessFinalizedBlock(b *types.Block) ([]*types.Block, error) { +func (l *Lattice) ProcessFinalizedBlock( + b *types.Block) (delivered []*types.Block, err error) { + var ( + toDelivered []*types.Block + deliveredMode uint32 + ) l.lock.Lock() defer l.lock.Unlock() // Syncing state for core.latticeData module. - if err := l.data.addFinalizedBlock(b); err != nil { - return nil, err + if err = l.data.addFinalizedBlock(b); err != nil { + return } l.pool.purgeBlocks(b.Position.ChainID, b.Position.Height) // Syncing state for core.totalOrdering module. - toDelivered, deliveredMode, err := l.toModule.processBlock(b) - if err != nil { - return nil, err - } - if len(toDelivered) == 0 { - return nil, nil - } - hashes := make(common.Hashes, len(toDelivered)) - for idx := range toDelivered { - hashes[idx] = toDelivered[idx].Hash - } - if l.debug != nil { - l.debug.TotalOrderingDelivered(hashes, deliveredMode) + if err = l.toModule.addBlock(b); err != nil { + return } - // Sync core.consensusTimestamp module. - if err = l.ctModule.processBlocks(toDelivered); err != nil { - return nil, err + for { + toDelivered, deliveredMode, err = l.toModule.extractBlocks() + if err != nil || len(toDelivered) == 0 { + break + } + hashes := make(common.Hashes, len(toDelivered)) + for idx := range toDelivered { + hashes[idx] = toDelivered[idx].Hash + } + if l.debug != nil { + l.debug.TotalOrderingDelivered(hashes, deliveredMode) + } + // Sync core.consensusTimestamp module. + if err = l.ctModule.processBlocks(toDelivered); err != nil { + break + } + delivered = append(delivered, toDelivered...) } - return toDelivered, nil + return } diff --git a/core/total-ordering-syncer_test.go b/core/total-ordering-syncer_test.go index 79a6830..4a4e8e0 100644 --- a/core/total-ordering-syncer_test.go +++ b/core/total-ordering-syncer_test.go @@ -76,10 +76,16 @@ func (s *TotalOrderingSyncerTestSuite) genDeliverySet(numChains uint32) ( } s.Require().NoError(err) // Perform total ordering. - blocks, _, err := to.processBlock(&b) - s.Require().NoError(err) - if len(blocks) > 0 { - deliverySet = append(deliverySet, blocks) + s.Require().NoError(to.addBlock(&b)) + for { + blocks, _, err := to.extractBlocks() + s.Require().NoError(err) + if len(blocks) == 0 { + break + } + if len(blocks) > 0 { + deliverySet = append(deliverySet, blocks) + } } } return diff --git a/core/total-ordering.go b/core/total-ordering.go index 744471a..de8dd0b 100644 --- a/core/total-ordering.go +++ b/core/total-ordering.go @@ -1159,14 +1159,11 @@ CheckNextCandidateLoop: } // flushBlocks flushes blocks. -func (to *totalOrdering) flushBlocks( - b *types.Block) (flushed []*types.Block, mode uint32, err error) { - +func (to *totalOrdering) flushBlocks() ( + flushed []*types.Block, mode uint32, err error) { mode = TotalOrderingModeFlush cfg := to.getCurrentConfig() - if cfg.isLastBlock(b) { - to.flushReadyChains[b.Position.ChainID] = struct{}{} - } + // Flush blocks until last blocks from all chains appeared. if len(to.flushReadyChains) < int(cfg.numChains) { return @@ -1281,9 +1278,8 @@ func (to *totalOrdering) getCurrentConfig() *totalOrderingConfig { return to.configs[cfgIdx] } -// processBlock is the entry point of totalOrdering. -func (to *totalOrdering) processBlock( - b *types.Block) ([]*types.Block, uint32, error) { +// addBlock adds a block to the working set of total ordering module. +func (to *totalOrdering) addBlock(b *types.Block) error { // NOTE: Block b is assumed to be in topologically sorted, i.e., all its // acking blocks are during or after total ordering stage. cfg := to.getCurrentConfig() @@ -1291,7 +1287,7 @@ func (to *totalOrdering) processBlock( to.buildBlockRelation(b) isOldest, err := to.updateVectors(b) if err != nil { - return nil, uint32(0), err + return err } // Mark the proposer of incoming block as dirty. if b.Position.ChainID < cfg.numChains { @@ -1310,8 +1306,16 @@ func (to *totalOrdering) processBlock( to.prepareCandidate(b) } } + if to.duringFlush && cfg.isLastBlock(b) { + to.flushReadyChains[b.Position.ChainID] = struct{}{} + } + return nil +} + +// extractBlocks check if there is any deliverable set. +func (to *totalOrdering) extractBlocks() ([]*types.Block, uint32, error) { if to.duringFlush { - return to.flushBlocks(b) + return to.flushBlocks() } return to.deliverBlocks() } diff --git a/core/total-ordering_test.go b/core/total-ordering_test.go index 846770b..24ad646 100644 --- a/core/total-ordering_test.go +++ b/core/total-ordering_test.go @@ -68,29 +68,35 @@ func (s *TotalOrderingTestSuite) performOneRun( s.Require().NoError(err) revealed += b.Hash.String() + "," // Perform total ordering. - blocks, mode, err := to.processBlock(&b) - s.Require().NoError(err) - for _, b := range blocks { - ordered += b.Hash.String() + "," - // Make sure the round ID is increasing, and no interleave. - s.Require().True(b.Position.Round >= curRound) - curRound = b.Position.Round - // Make sure all acking blocks are already delivered. - for _, ack := range b.Acks { - s.Require().Contains(revealedDAG, ack) - } - if mode == TotalOrderingModeFlush { - // For blocks delivered by flushing, the acking relations would - // exist in one deliver set, however, only later block would - // ack previous block, not backward. - revealedDAG[b.Hash] = struct{}{} + s.Require().NoError(to.addBlock(&b)) + for { + blocks, mode, err := to.extractBlocks() + s.Require().NoError(err) + if len(blocks) == 0 { + break } - } - // For blocks not delivered by flushing, the acking relations only exist - // between deliver sets. - if mode != TotalOrderingModeFlush { for _, b := range blocks { - revealedDAG[b.Hash] = struct{}{} + ordered += b.Hash.String() + "," + // Make sure the round ID is increasing, and no interleave. + s.Require().True(b.Position.Round >= curRound) + curRound = b.Position.Round + // Make sure all acking blocks are already delivered. + for _, ack := range b.Acks { + s.Require().Contains(revealedDAG, ack) + } + if mode == TotalOrderingModeFlush { + // For blocks delivered by flushing, the acking relations + // would exist in one deliver set, however, only later block + // would ack previous block, not backward. + revealedDAG[b.Hash] = struct{}{} + } + } + // For blocks not delivered by flushing, the acking relations only + // exist between deliver sets. + if mode != TotalOrderingModeFlush { + for _, b := range blocks { + revealedDAG[b.Hash] = struct{}{} + } } } } @@ -118,7 +124,9 @@ func (s *TotalOrderingTestSuite) checkRandomResult( } func (s *TotalOrderingTestSuite) checkNotDeliver(to *totalOrdering, b *types.Block) { - blocks, mode, err := to.processBlock(b) + err := to.addBlock(b) + s.NoError(err) + blocks, mode, err := to.extractBlocks() s.Empty(blocks) s.Equal(mode, TotalOrderingModeNormal) s.Nil(err) @@ -458,7 +466,8 @@ func (s *TotalOrderingTestSuite) TestEarlyDeliver() { s.Equal(candidate.ackedStatus[3].minHeight, b30.Position.Height) s.Equal(candidate.ackedStatus[3].count, uint64(2)) - blocks, mode, err := to.processBlock(b32) + s.Require().NoError(to.addBlock(b32)) + blocks, mode, err := to.extractBlocks() s.Require().Len(blocks, 1) s.Equal(mode, TotalOrderingModeEarly) s.Nil(err) @@ -720,7 +729,8 @@ func (s *TotalOrderingTestSuite) TestBasicCaseForK2() { s.Equal(candidate.ackedStatus[4].count, uint64(0)) // Check the first deliver. - blocks, mode, err := to.processBlock(b02) + s.NoError(to.addBlock(b02)) + blocks, mode, err := to.extractBlocks() s.Equal(mode, TotalOrderingModeEarly) s.Nil(err) s.checkHashSequence(blocks, common.Hashes{b00.Hash, b10.Hash}) @@ -761,7 +771,8 @@ func (s *TotalOrderingTestSuite) TestBasicCaseForK2() { s.checkNotDeliver(to, b13) // Check the second deliver. - blocks, mode, err = to.processBlock(b03) + s.NoError(to.addBlock(b03)) + blocks, mode, err = to.extractBlocks() s.Equal(mode, TotalOrderingModeEarly) s.Nil(err) s.checkHashSequence(blocks, common.Hashes{b11.Hash, b20.Hash}) @@ -813,7 +824,8 @@ func (s *TotalOrderingTestSuite) TestBasicCaseForK2() { // Make 'Acking Node Set' contains blocks from all chains, // this should trigger not-early deliver. - blocks, mode, err = to.processBlock(b23) + s.NoError(to.addBlock(b23)) + blocks, mode, err = to.extractBlocks() s.Equal(mode, TotalOrderingModeNormal) s.Nil(err) s.checkHashSequence(blocks, common.Hashes{b01.Hash, b30.Hash}) @@ -929,7 +941,8 @@ func (s *TotalOrderingTestSuite) TestBasicCaseForK0() { req.Equal(candidate.ackedStatus[3].count, uint64(2)) // This new block should trigger non-early deliver. - blocks, mode, err := to.processBlock(b40) + req.NoError(to.addBlock(b40)) + blocks, mode, err := to.extractBlocks() req.Equal(mode, TotalOrderingModeNormal) req.Nil(err) s.checkHashSequence(blocks, common.Hashes{b20.Hash}) @@ -1246,7 +1259,8 @@ func (s *TotalOrderingTestSuite) TestSync() { } } s.Require().NoError(err) - bs, _, err := to1.processBlock(&b) + s.Require().NoError(to1.addBlock(&b)) + bs, _, err := to1.extractBlocks() s.Require().Nil(err) if len(bs) > 0 { deliveredBlockSets1 = append(deliveredBlockSets1, bs) @@ -1263,7 +1277,8 @@ func (s *TotalOrderingTestSuite) TestSync() { deliveredBlockSets2 := [][]*types.Block{} for i := offset; i < len(deliveredBlockSets1); i++ { for _, b := range deliveredBlockSets1[i] { - bs, _, err := to2.processBlock(b) + req.NoError(to2.addBlock(b)) + bs, _, err := to2.extractBlocks() req.NoError(err) if len(bs) > 0 { deliveredBlockSets2 = append(deliveredBlockSets2, bs) @@ -1373,7 +1388,8 @@ func (s *TotalOrderingTestSuite) TestSyncWithConfigChange() { deliveredBlockSets1 := [][]*types.Block{} deliveredBlockModes := []uint32{} for _, b := range blocks { - bs, mode, err := to1.processBlock(b) + req.NoError(to1.addBlock(b)) + bs, mode, err := to1.extractBlocks() req.NoError(err) if len(bs) > 0 { deliveredBlockSets1 = append(deliveredBlockSets1, bs) @@ -1405,7 +1421,8 @@ func (s *TotalOrderingTestSuite) TestSyncWithConfigChange() { deliveredBlockSets2 := [][]*types.Block{} for i := offset; i < len(deliveredBlockSets1); i++ { for _, b := range deliveredBlockSets1[i] { - bs, _, err := to2.processBlock(b) + req.NoError(to2.addBlock(b)) + bs, _, err := to2.extractBlocks() req.NoError(err) if len(bs) > 0 { deliveredBlockSets2 = append(deliveredBlockSets2, bs) |