aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2019-01-04 12:08:55 +0800
committerhaoping-ku <haoping.ku@dexon.org>2019-01-04 12:08:55 +0800
commit47cb69d390aedf53f44cae35629b3568a166e3fb (patch)
tree699aa6f972a78e069456458b8a159797d8595f9a
parentc5bfa29c2ad5d51147d0a5c1b725369b75da3cd8 (diff)
downloaddexon-consensus-47cb69d390aedf53f44cae35629b3568a166e3fb.tar
dexon-consensus-47cb69d390aedf53f44cae35629b3568a166e3fb.tar.gz
dexon-consensus-47cb69d390aedf53f44cae35629b3568a166e3fb.tar.bz2
dexon-consensus-47cb69d390aedf53f44cae35629b3568a166e3fb.tar.lz
dexon-consensus-47cb69d390aedf53f44cae35629b3568a166e3fb.tar.xz
dexon-consensus-47cb69d390aedf53f44cae35629b3568a166e3fb.tar.zst
dexon-consensus-47cb69d390aedf53f44cae35629b3568a166e3fb.zip
core: check if deliverable for each added block in total ordering (#395)
* Check if deliverable multiple times for each added block * Fix format
-rw-r--r--core/lattice.go65
-rw-r--r--core/total-ordering-syncer_test.go14
-rw-r--r--core/total-ordering.go26
-rw-r--r--core/total-ordering_test.go79
4 files changed, 110 insertions, 74 deletions
diff --git a/core/lattice.go b/core/lattice.go
index 2b1555d..b30306a 100644
--- a/core/lattice.go
+++ b/core/lattice.go
@@ -245,26 +245,27 @@ func (l *Lattice) ProcessBlock(
toDelivered []*types.Block
deliveredMode uint32
)
-
l.lock.Lock()
defer l.lock.Unlock()
-
if inLattice, err = l.addBlockToLattice(input); err != nil {
return
}
-
if len(inLattice) == 0 {
return
}
-
for _, b = range inLattice {
- toDelivered, deliveredMode, err = l.toModule.processBlock(b)
- if err != nil {
+ if err = l.toModule.addBlock(b); err != nil {
// All errors from total ordering is serious, should panic.
panic(err)
}
+ }
+ for {
+ toDelivered, deliveredMode, err = l.toModule.extractBlocks()
+ if err != nil {
+ panic(err)
+ }
if len(toDelivered) == 0 {
- continue
+ break
}
hashes := make(common.Hashes, len(toDelivered))
for idx := range toDelivered {
@@ -275,7 +276,7 @@ func (l *Lattice) ProcessBlock(
}
// Perform consensus timestamp module.
if err = l.ctModule.processBlocks(toDelivered); err != nil {
- return
+ break
}
delivered = append(delivered, toDelivered...)
}
@@ -317,32 +318,40 @@ func (l *Lattice) AppendConfig(round uint64, config *types.Config) (err error) {
}
// ProcessFinalizedBlock is used for syncing lattice data.
-func (l *Lattice) ProcessFinalizedBlock(b *types.Block) ([]*types.Block, error) {
+func (l *Lattice) ProcessFinalizedBlock(
+ b *types.Block) (delivered []*types.Block, err error) {
+ var (
+ toDelivered []*types.Block
+ deliveredMode uint32
+ )
l.lock.Lock()
defer l.lock.Unlock()
// Syncing state for core.latticeData module.
- if err := l.data.addFinalizedBlock(b); err != nil {
- return nil, err
+ if err = l.data.addFinalizedBlock(b); err != nil {
+ return
}
l.pool.purgeBlocks(b.Position.ChainID, b.Position.Height)
// Syncing state for core.totalOrdering module.
- toDelivered, deliveredMode, err := l.toModule.processBlock(b)
- if err != nil {
- return nil, err
- }
- if len(toDelivered) == 0 {
- return nil, nil
- }
- hashes := make(common.Hashes, len(toDelivered))
- for idx := range toDelivered {
- hashes[idx] = toDelivered[idx].Hash
- }
- if l.debug != nil {
- l.debug.TotalOrderingDelivered(hashes, deliveredMode)
+ if err = l.toModule.addBlock(b); err != nil {
+ return
}
- // Sync core.consensusTimestamp module.
- if err = l.ctModule.processBlocks(toDelivered); err != nil {
- return nil, err
+ for {
+ toDelivered, deliveredMode, err = l.toModule.extractBlocks()
+ if err != nil || len(toDelivered) == 0 {
+ break
+ }
+ hashes := make(common.Hashes, len(toDelivered))
+ for idx := range toDelivered {
+ hashes[idx] = toDelivered[idx].Hash
+ }
+ if l.debug != nil {
+ l.debug.TotalOrderingDelivered(hashes, deliveredMode)
+ }
+ // Sync core.consensusTimestamp module.
+ if err = l.ctModule.processBlocks(toDelivered); err != nil {
+ break
+ }
+ delivered = append(delivered, toDelivered...)
}
- return toDelivered, nil
+ return
}
diff --git a/core/total-ordering-syncer_test.go b/core/total-ordering-syncer_test.go
index 79a6830..4a4e8e0 100644
--- a/core/total-ordering-syncer_test.go
+++ b/core/total-ordering-syncer_test.go
@@ -76,10 +76,16 @@ func (s *TotalOrderingSyncerTestSuite) genDeliverySet(numChains uint32) (
}
s.Require().NoError(err)
// Perform total ordering.
- blocks, _, err := to.processBlock(&b)
- s.Require().NoError(err)
- if len(blocks) > 0 {
- deliverySet = append(deliverySet, blocks)
+ s.Require().NoError(to.addBlock(&b))
+ for {
+ blocks, _, err := to.extractBlocks()
+ s.Require().NoError(err)
+ if len(blocks) == 0 {
+ break
+ }
+ if len(blocks) > 0 {
+ deliverySet = append(deliverySet, blocks)
+ }
}
}
return
diff --git a/core/total-ordering.go b/core/total-ordering.go
index 744471a..de8dd0b 100644
--- a/core/total-ordering.go
+++ b/core/total-ordering.go
@@ -1159,14 +1159,11 @@ CheckNextCandidateLoop:
}
// flushBlocks flushes blocks.
-func (to *totalOrdering) flushBlocks(
- b *types.Block) (flushed []*types.Block, mode uint32, err error) {
-
+func (to *totalOrdering) flushBlocks() (
+ flushed []*types.Block, mode uint32, err error) {
mode = TotalOrderingModeFlush
cfg := to.getCurrentConfig()
- if cfg.isLastBlock(b) {
- to.flushReadyChains[b.Position.ChainID] = struct{}{}
- }
+
// Flush blocks until last blocks from all chains appeared.
if len(to.flushReadyChains) < int(cfg.numChains) {
return
@@ -1281,9 +1278,8 @@ func (to *totalOrdering) getCurrentConfig() *totalOrderingConfig {
return to.configs[cfgIdx]
}
-// processBlock is the entry point of totalOrdering.
-func (to *totalOrdering) processBlock(
- b *types.Block) ([]*types.Block, uint32, error) {
+// addBlock adds a block to the working set of total ordering module.
+func (to *totalOrdering) addBlock(b *types.Block) error {
// NOTE: Block b is assumed to be in topologically sorted, i.e., all its
// acking blocks are during or after total ordering stage.
cfg := to.getCurrentConfig()
@@ -1291,7 +1287,7 @@ func (to *totalOrdering) processBlock(
to.buildBlockRelation(b)
isOldest, err := to.updateVectors(b)
if err != nil {
- return nil, uint32(0), err
+ return err
}
// Mark the proposer of incoming block as dirty.
if b.Position.ChainID < cfg.numChains {
@@ -1310,8 +1306,16 @@ func (to *totalOrdering) processBlock(
to.prepareCandidate(b)
}
}
+ if to.duringFlush && cfg.isLastBlock(b) {
+ to.flushReadyChains[b.Position.ChainID] = struct{}{}
+ }
+ return nil
+}
+
+// extractBlocks check if there is any deliverable set.
+func (to *totalOrdering) extractBlocks() ([]*types.Block, uint32, error) {
if to.duringFlush {
- return to.flushBlocks(b)
+ return to.flushBlocks()
}
return to.deliverBlocks()
}
diff --git a/core/total-ordering_test.go b/core/total-ordering_test.go
index 846770b..24ad646 100644
--- a/core/total-ordering_test.go
+++ b/core/total-ordering_test.go
@@ -68,29 +68,35 @@ func (s *TotalOrderingTestSuite) performOneRun(
s.Require().NoError(err)
revealed += b.Hash.String() + ","
// Perform total ordering.
- blocks, mode, err := to.processBlock(&b)
- s.Require().NoError(err)
- for _, b := range blocks {
- ordered += b.Hash.String() + ","
- // Make sure the round ID is increasing, and no interleave.
- s.Require().True(b.Position.Round >= curRound)
- curRound = b.Position.Round
- // Make sure all acking blocks are already delivered.
- for _, ack := range b.Acks {
- s.Require().Contains(revealedDAG, ack)
- }
- if mode == TotalOrderingModeFlush {
- // For blocks delivered by flushing, the acking relations would
- // exist in one deliver set, however, only later block would
- // ack previous block, not backward.
- revealedDAG[b.Hash] = struct{}{}
+ s.Require().NoError(to.addBlock(&b))
+ for {
+ blocks, mode, err := to.extractBlocks()
+ s.Require().NoError(err)
+ if len(blocks) == 0 {
+ break
}
- }
- // For blocks not delivered by flushing, the acking relations only exist
- // between deliver sets.
- if mode != TotalOrderingModeFlush {
for _, b := range blocks {
- revealedDAG[b.Hash] = struct{}{}
+ ordered += b.Hash.String() + ","
+ // Make sure the round ID is increasing, and no interleave.
+ s.Require().True(b.Position.Round >= curRound)
+ curRound = b.Position.Round
+ // Make sure all acking blocks are already delivered.
+ for _, ack := range b.Acks {
+ s.Require().Contains(revealedDAG, ack)
+ }
+ if mode == TotalOrderingModeFlush {
+ // For blocks delivered by flushing, the acking relations
+ // would exist in one deliver set, however, only later block
+ // would ack previous block, not backward.
+ revealedDAG[b.Hash] = struct{}{}
+ }
+ }
+ // For blocks not delivered by flushing, the acking relations only
+ // exist between deliver sets.
+ if mode != TotalOrderingModeFlush {
+ for _, b := range blocks {
+ revealedDAG[b.Hash] = struct{}{}
+ }
}
}
}
@@ -118,7 +124,9 @@ func (s *TotalOrderingTestSuite) checkRandomResult(
}
func (s *TotalOrderingTestSuite) checkNotDeliver(to *totalOrdering, b *types.Block) {
- blocks, mode, err := to.processBlock(b)
+ err := to.addBlock(b)
+ s.NoError(err)
+ blocks, mode, err := to.extractBlocks()
s.Empty(blocks)
s.Equal(mode, TotalOrderingModeNormal)
s.Nil(err)
@@ -458,7 +466,8 @@ func (s *TotalOrderingTestSuite) TestEarlyDeliver() {
s.Equal(candidate.ackedStatus[3].minHeight, b30.Position.Height)
s.Equal(candidate.ackedStatus[3].count, uint64(2))
- blocks, mode, err := to.processBlock(b32)
+ s.Require().NoError(to.addBlock(b32))
+ blocks, mode, err := to.extractBlocks()
s.Require().Len(blocks, 1)
s.Equal(mode, TotalOrderingModeEarly)
s.Nil(err)
@@ -720,7 +729,8 @@ func (s *TotalOrderingTestSuite) TestBasicCaseForK2() {
s.Equal(candidate.ackedStatus[4].count, uint64(0))
// Check the first deliver.
- blocks, mode, err := to.processBlock(b02)
+ s.NoError(to.addBlock(b02))
+ blocks, mode, err := to.extractBlocks()
s.Equal(mode, TotalOrderingModeEarly)
s.Nil(err)
s.checkHashSequence(blocks, common.Hashes{b00.Hash, b10.Hash})
@@ -761,7 +771,8 @@ func (s *TotalOrderingTestSuite) TestBasicCaseForK2() {
s.checkNotDeliver(to, b13)
// Check the second deliver.
- blocks, mode, err = to.processBlock(b03)
+ s.NoError(to.addBlock(b03))
+ blocks, mode, err = to.extractBlocks()
s.Equal(mode, TotalOrderingModeEarly)
s.Nil(err)
s.checkHashSequence(blocks, common.Hashes{b11.Hash, b20.Hash})
@@ -813,7 +824,8 @@ func (s *TotalOrderingTestSuite) TestBasicCaseForK2() {
// Make 'Acking Node Set' contains blocks from all chains,
// this should trigger not-early deliver.
- blocks, mode, err = to.processBlock(b23)
+ s.NoError(to.addBlock(b23))
+ blocks, mode, err = to.extractBlocks()
s.Equal(mode, TotalOrderingModeNormal)
s.Nil(err)
s.checkHashSequence(blocks, common.Hashes{b01.Hash, b30.Hash})
@@ -929,7 +941,8 @@ func (s *TotalOrderingTestSuite) TestBasicCaseForK0() {
req.Equal(candidate.ackedStatus[3].count, uint64(2))
// This new block should trigger non-early deliver.
- blocks, mode, err := to.processBlock(b40)
+ req.NoError(to.addBlock(b40))
+ blocks, mode, err := to.extractBlocks()
req.Equal(mode, TotalOrderingModeNormal)
req.Nil(err)
s.checkHashSequence(blocks, common.Hashes{b20.Hash})
@@ -1246,7 +1259,8 @@ func (s *TotalOrderingTestSuite) TestSync() {
}
}
s.Require().NoError(err)
- bs, _, err := to1.processBlock(&b)
+ s.Require().NoError(to1.addBlock(&b))
+ bs, _, err := to1.extractBlocks()
s.Require().Nil(err)
if len(bs) > 0 {
deliveredBlockSets1 = append(deliveredBlockSets1, bs)
@@ -1263,7 +1277,8 @@ func (s *TotalOrderingTestSuite) TestSync() {
deliveredBlockSets2 := [][]*types.Block{}
for i := offset; i < len(deliveredBlockSets1); i++ {
for _, b := range deliveredBlockSets1[i] {
- bs, _, err := to2.processBlock(b)
+ req.NoError(to2.addBlock(b))
+ bs, _, err := to2.extractBlocks()
req.NoError(err)
if len(bs) > 0 {
deliveredBlockSets2 = append(deliveredBlockSets2, bs)
@@ -1373,7 +1388,8 @@ func (s *TotalOrderingTestSuite) TestSyncWithConfigChange() {
deliveredBlockSets1 := [][]*types.Block{}
deliveredBlockModes := []uint32{}
for _, b := range blocks {
- bs, mode, err := to1.processBlock(b)
+ req.NoError(to1.addBlock(b))
+ bs, mode, err := to1.extractBlocks()
req.NoError(err)
if len(bs) > 0 {
deliveredBlockSets1 = append(deliveredBlockSets1, bs)
@@ -1405,7 +1421,8 @@ func (s *TotalOrderingTestSuite) TestSyncWithConfigChange() {
deliveredBlockSets2 := [][]*types.Block{}
for i := offset; i < len(deliveredBlockSets1); i++ {
for _, b := range deliveredBlockSets1[i] {
- bs, _, err := to2.processBlock(b)
+ req.NoError(to2.addBlock(b))
+ bs, _, err := to2.extractBlocks()
req.NoError(err)
if len(bs) > 0 {
deliveredBlockSets2 = append(deliveredBlockSets2, bs)