diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2018-11-01 15:29:06 +0800 |
---|---|---|
committer | haoping-ku <37325897+haoping-ku@users.noreply.github.com> | 2018-11-01 15:29:06 +0800 |
commit | 1467d8a82b32fb91250f7717a8be47858a38d85b (patch) | |
tree | f452fd42b6b3a22c301f5be89572479c084a5c69 | |
parent | ebfa4a6164dab7db29859538c1aa0e9659bd951a (diff) | |
download | dexon-consensus-1467d8a82b32fb91250f7717a8be47858a38d85b.tar dexon-consensus-1467d8a82b32fb91250f7717a8be47858a38d85b.tar.gz dexon-consensus-1467d8a82b32fb91250f7717a8be47858a38d85b.tar.bz2 dexon-consensus-1467d8a82b32fb91250f7717a8be47858a38d85b.tar.lz dexon-consensus-1467d8a82b32fb91250f7717a8be47858a38d85b.tar.xz dexon-consensus-1467d8a82b32fb91250f7717a8be47858a38d85b.tar.zst dexon-consensus-1467d8a82b32fb91250f7717a8be47858a38d85b.zip |
core: Total Ordering syncer (#277)
* Add simple processFinalizedBlock logic
* processBlock
* Bootstrap mode
* Sort before flush
* Add syncer test
* Clean more data for memory
* Add comments. Fix some stuffs.
-rw-r--r-- | core/total-ordering-syncer.go | 174 | ||||
-rw-r--r-- | core/total-ordering-syncer_test.go | 311 |
2 files changed, 485 insertions, 0 deletions
diff --git a/core/total-ordering-syncer.go b/core/total-ordering-syncer.go new file mode 100644 index 0000000..270e637 --- /dev/null +++ b/core/total-ordering-syncer.go @@ -0,0 +1,174 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package core + +import ( + "sort" + "sync" + + "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/types" +) + +type totalOrderingSyncer struct { + lock sync.RWMutex + + numChains uint32 + syncHeight map[uint32]uint64 + syncDeliverySetIdx int + pendingBlocks []*types.Block + inPendingBlocks map[common.Hash]struct{} + + bootstrapChain map[uint32]struct{} + + // Data to restore delivery set. + pendingDeliveryBlocks []*types.Block + deliverySet map[int][]*types.Block + mapToDeliverySet map[common.Hash]int +} + +func newTotalOrderingSyncer(numChains uint32) *totalOrderingSyncer { + return &totalOrderingSyncer{ + numChains: numChains, + syncHeight: make(map[uint32]uint64), + syncDeliverySetIdx: -1, + inPendingBlocks: make(map[common.Hash]struct{}), + bootstrapChain: make(map[uint32]struct{}), + deliverySet: make(map[int][]*types.Block), + mapToDeliverySet: make(map[common.Hash]int), + } +} + +func (tos *totalOrderingSyncer) synced() bool { + tos.lock.RLock() + defer tos.lock.RUnlock() + return tos.syncDeliverySetIdx != -1 +} + +func (tos *totalOrderingSyncer) processBlock( + block *types.Block) (delivered []*types.Block) { + if tos.synced() { + if tos.syncHeight[block.Position.ChainID] >= block.Position.Height { + return + } + delivered = append(delivered, block) + return + } + tos.lock.Lock() + defer tos.lock.Unlock() + tos.inPendingBlocks[block.Hash] = struct{}{} + tos.pendingBlocks = append(tos.pendingBlocks, block) + if block.Position.Height == 0 { + tos.bootstrapChain[block.Position.ChainID] = struct{}{} + } + if uint32(len(tos.bootstrapChain)) == tos.numChains { + // Bootstrap mode. + delivered = tos.pendingBlocks + tos.syncDeliverySetIdx = 0 + for i := uint32(0); i < tos.numChains; i++ { + tos.syncHeight[i] = uint64(0) + } + } else { + maxDeliverySetIdx := -1 + // TODO(jimmy-dexon): below for loop can be optimized. + PendingBlockLoop: + for i, block := range tos.pendingBlocks { + idx, exist := tos.mapToDeliverySet[block.Hash] + if !exist { + continue + } + deliverySet := tos.deliverySet[idx] + // Check if all the blocks in deliverySet are in the pendingBlocks. + for _, dBlock := range deliverySet { + if _, exist := tos.inPendingBlocks[dBlock.Hash]; !exist { + continue PendingBlockLoop + } + } + if idx > maxDeliverySetIdx { + maxDeliverySetIdx = idx + } + // Check if all of the chains have delivered. + for _, dBlock := range deliverySet { + if h, exist := tos.syncHeight[dBlock.Position.ChainID]; exist { + if dBlock.Position.Height < h { + continue + } + } + tos.syncHeight[dBlock.Position.ChainID] = dBlock.Position.Height + } + if uint32(len(tos.syncHeight)) != tos.numChains { + continue + } + // Core is fully synced, it can start delivering blocks from idx. + tos.syncDeliverySetIdx = maxDeliverySetIdx + delivered = make([]*types.Block, 0, i) + break + } + if tos.syncDeliverySetIdx == -1 { + return + } + // Generating delivering blocks. + for i := maxDeliverySetIdx; i < len(tos.deliverySet); i++ { + deliverySet := tos.deliverySet[i] + sort.Sort(types.ByHash(deliverySet)) + for _, block := range deliverySet { + if block.Position.Height > tos.syncHeight[block.Position.ChainID] { + tos.syncHeight[block.Position.ChainID] = block.Position.Height + } + delivered = append(delivered, block) + } + } + // Flush remaining blocks. + for _, block := range tos.pendingBlocks { + if _, exist := tos.mapToDeliverySet[block.Hash]; exist { + continue + } + if block.Position.Height > tos.syncHeight[block.Position.ChainID] { + tos.syncHeight[block.Position.ChainID] = block.Position.Height + } + delivered = append(delivered, block) + } + } + // Clean internal data model to save memory. + tos.pendingBlocks = nil + tos.inPendingBlocks = nil + tos.bootstrapChain = nil + tos.pendingDeliveryBlocks = nil + tos.deliverySet = nil + tos.mapToDeliverySet = nil + return +} + +// The finalized block should be passed by the order of consensus height. +func (tos *totalOrderingSyncer) processFinalizedBlock(block *types.Block) { + tos.lock.Lock() + defer tos.lock.Unlock() + if len(tos.pendingDeliveryBlocks) > 0 { + if block.Hash.Less( + tos.pendingDeliveryBlocks[len(tos.pendingDeliveryBlocks)-1].Hash) { + // pendingDeliveryBlocks forms a deliverySet. + idx := len(tos.deliverySet) + tos.deliverySet[idx] = tos.pendingDeliveryBlocks + for _, block := range tos.pendingDeliveryBlocks { + tos.mapToDeliverySet[block.Hash] = idx + } + tos.pendingDeliveryBlocks = []*types.Block{} + } + } + tos.pendingDeliveryBlocks = append(tos.pendingDeliveryBlocks, block) +} diff --git a/core/total-ordering-syncer_test.go b/core/total-ordering-syncer_test.go new file mode 100644 index 0000000..4fe0c89 --- /dev/null +++ b/core/total-ordering-syncer_test.go @@ -0,0 +1,311 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package core + +import ( + "testing" + "time" + + "github.com/stretchr/testify/suite" + + "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/blockdb" + "github.com/dexon-foundation/dexon-consensus-core/core/test" + "github.com/dexon-foundation/dexon-consensus-core/core/types" +) + +type TotalOrderingSyncerTestSuite struct { + suite.Suite +} + +func (s *TotalOrderingSyncerTestSuite) genDeliverySet(numChains uint32) ( + deliverySet [][]*types.Block, revealer *test.RandomDAGRevealer) { + phi := uint64(numChains) / 2 + + genesisTime := time.Now().UTC() + + genesisConfig := &totalOrderingConfig{ + roundBasedConfig: roundBasedConfig{ + roundInterval: 1000 * time.Second, + }, + k: 0, + phi: phi, + numChains: numChains, + } + genesisConfig.setRoundBeginTime(genesisTime) + to := newTotalOrdering(genesisConfig) + + gen := test.NewBlocksGenerator(&test.BlocksGeneratorConfig{ + NumChains: numChains, + MinBlockTimeInterval: 100 * time.Millisecond, + MaxBlockTimeInterval: 500 * time.Millisecond, + }, nil, hashBlock) + + db, err := blockdb.NewMemBackedBlockDB() + s.Require().NoError(err) + s.Require().NoError(gen.Generate( + 0, + genesisTime, + genesisTime.Add(20*time.Second), + db)) + iter, err := db.GetAll() + s.Require().NoError(err) + + revealer, err = test.NewRandomDAGRevealer(iter) + s.Require().NoError(err) + + revealer.Reset() + for { + // Reveal next block. + b, err := revealer.Next() + if err != nil { + if err == blockdb.ErrIterationFinished { + err = nil + break + } + } + s.Require().NoError(err) + // Perform total ordering. + blocks, _, err := to.processBlock(&b) + s.Require().NoError(err) + if len(blocks) > 0 { + deliverySet = append(deliverySet, blocks) + } + } + return +} + +func (s *TotalOrderingSyncerTestSuite) TestRandomSync() { + numChains := uint32(13) + skipSet := 2 + skipDAG := int(numChains) * skipSet + repeat := 100 + if testing.Short() { + repeat = 10 + } + + for ; repeat >= 0; repeat-- { + toc := newTotalOrderingSyncer(numChains) + deliverySet, revealer := s.genDeliverySet(numChains) + blockToDeliverySet := make(map[common.Hash]int) + deliverySetMap := make(map[int][]common.Hash) + offset := 0 + for i, delivery := range deliverySet { + if i > 0 { + // The hash of last block of previous delivery set is less than the hash + // of first block of current delivery set. The syncer cannot seperate + // these two delevery set so they need to be combined. This will not + // affect the final result because the output of syncer is also sorted + // and it will be the same as the output of total ordering. + if deliverySet[i-1][len(deliverySet[i-1])-1].Hash.Less( + delivery[0].Hash) { + offset++ + } + } + for _, block := range delivery { + blockToDeliverySet[block.Hash] = i - offset + deliverySetMap[i-offset] = append(deliverySetMap[i-offset], block.Hash) + } + } + + revealer.Reset() + for i := 0; i < skipDAG; i++ { + _, err := revealer.Next() + s.Require().NoError(err) + } + + for _, delivery := range deliverySet { + for _, block := range delivery { + toc.processFinalizedBlock(block) + } + } + + minDeliverySetIdx := -1 + deliverySetMap2 := make(map[int][]common.Hash) + + for { + b, err := revealer.Next() + if err != nil { + if err != blockdb.ErrIterationFinished { + s.Require().NoError(err) + } + err = nil + break + } + deliver := toc.processBlock(&b) + for _, block := range deliver { + idx, exist := blockToDeliverySet[block.Hash] + if !exist { + continue + } + if minDeliverySetIdx == -1 { + minDeliverySetIdx = idx + } + s.Require().True(idx >= minDeliverySetIdx) + deliverySetMap2[idx] = append(deliverySetMap2[idx], block.Hash) + } + } + + s.Require().NotEqual(-1, minDeliverySetIdx) + for i := minDeliverySetIdx; ; i++ { + if _, exist := deliverySetMap[i]; !exist { + break + } + for _, v := range deliverySetMap[i] { + s.Contains(deliverySetMap2[i], v) + } + s.Require().Len(deliverySetMap2[i], len(deliverySetMap[i])) + } + } +} + +// TestMissingMiddleDeliverySet tests the following case +// The number denotes the index of delivery set. +// X means that the block is not synced in lattice. +// O means that the block is synced but the index is not important. +// The assumption is that once the block is synced, all newer blocks +// on the same chain will be synced as well. +// ******************** +// O O O 5 6 +// 3 3 3 X 5 +// ------------ +// 0 1 2 3 4(ChainID) +// ******************** +// In this case, the block of delivery set 4 is not synced in lattice; +// therefore, the minimum index of delivery set should be 5 instead of 3. +// (Note: delivery set 6 is to make syncer identify delivery set 5) + +func (s *TotalOrderingSyncerTestSuite) TestMissingMiddleDeliverySet() { + numChains := uint32(5) + b00 := &types.Block{ + Hash: common.Hash{0x10}, + Position: types.Position{ + ChainID: uint32(0), + Height: uint64(3), + }, + } + b10 := &types.Block{ + Hash: common.Hash{0x20}, + Position: types.Position{ + ChainID: uint32(1), + Height: uint64(3), + }, + } + b20 := &types.Block{ + Hash: common.Hash{0x30}, + Position: types.Position{ + ChainID: uint32(2), + Height: uint64(3), + }, + } + b30 := &types.Block{ + Hash: common.Hash{0x21}, + Position: types.Position{ + ChainID: uint32(3), + Height: uint64(3), + }, + } + b31 := &types.Block{ + Hash: common.Hash{0x12}, + Position: types.Position{ + ChainID: uint32(3), + Height: uint64(4), + }, + } + b40 := &types.Block{ + Hash: common.Hash{0x22}, + Position: types.Position{ + ChainID: uint32(4), + Height: uint64(3), + }, + } + b41 := &types.Block{ + Hash: common.Hash{0x12}, + Position: types.Position{ + ChainID: uint32(4), + Height: uint64(4), + }, + } + blocks := []*types.Block{b00, b10, b20, b30, b31, b40, b41} + + // Test process sequence 1. + toc := newTotalOrderingSyncer(numChains) + + for _, block := range blocks { + toc.processFinalizedBlock(block) + } + + s.Require().Len(toc.processBlock(b00), 0) + s.Require().Len(toc.processBlock(b10), 0) + s.Require().Len(toc.processBlock(b20), 0) + s.Require().Len(toc.processBlock(b31), 0) + deliver := toc.processBlock(b40) + s.Require().Len(deliver, 2) + s.Equal(deliver[0], b31) + s.Equal(deliver[1], b40) + + // Test process sequence 2. + toc2 := newTotalOrderingSyncer(numChains) + + for _, block := range blocks { + toc2.processFinalizedBlock(block) + } + + s.Require().Len(toc2.processBlock(b31), 0) + s.Require().Len(toc2.processBlock(b40), 0) + s.Require().Len(toc2.processBlock(b20), 0) + s.Require().Len(toc2.processBlock(b10), 0) + deliver = toc2.processBlock(b00) + s.Require().Len(deliver, 2) + s.Equal(deliver[0], b31) + s.Equal(deliver[1], b40) + +} + +func (s *TotalOrderingSyncerTestSuite) TestBootstrap() { + numChains := uint32(13) + toc := newTotalOrderingSyncer(numChains) + deliverySet, revealer := s.genDeliverySet(numChains) + deliveredNum := 0 + for _, delivery := range deliverySet { + deliveredNum += len(delivery) + } + + actualDeliveredNum := 0 + revealer.Reset() + for { + b, err := revealer.Next() + if err != nil { + if err != blockdb.ErrIterationFinished { + s.Require().NoError(err) + } + err = nil + break + } + deliver := toc.processBlock(&b) + actualDeliveredNum += len(deliver) + } + + // The last few blocks revealer might not be in the output of total order. + // So the deliveredNum might be less than actualDeliveredNum. + s.True(actualDeliveredNum >= deliveredNum) +} + +func TestTotalOrderingSyncer(t *testing.T) { + suite.Run(t, new(TotalOrderingSyncerTestSuite)) +} |