From d42b96124b8d1d43828b4c36b158408ff17aea69 Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Fri, 19 Oct 2018 13:31:43 +0800 Subject: core: Sync from BA and test. (#224) --- core/compaction-chain.go | 122 ++++++++++++++++++++++-------------- core/compaction-chain_test.go | 141 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 199 insertions(+), 64 deletions(-) (limited to 'core') diff --git a/core/compaction-chain.go b/core/compaction-chain.go index 6511727..5b13f7f 100644 --- a/core/compaction-chain.go +++ b/core/compaction-chain.go @@ -39,12 +39,12 @@ type finalizedBlockHeap = types.ByFinalizationHeight type compactionChain struct { gov Governance + chainUnsynced uint32 tsigVerifier *TSigVerifierCache blocks map[common.Hash]*types.Block pendingBlocks []*types.Block pendingFinalizedBlocks *finalizedBlockHeap - blocksLock sync.RWMutex - prevBlockLock sync.RWMutex + lock sync.RWMutex prevBlock *types.Block } @@ -60,23 +60,28 @@ func newCompactionChain(gov Governance) *compactionChain { } func (cc *compactionChain) init(initBlock *types.Block) { - cc.prevBlockLock.Lock() - defer cc.prevBlockLock.Unlock() + cc.lock.Lock() + defer cc.lock.Unlock() cc.prevBlock = initBlock + cc.pendingBlocks = []*types.Block{} + if initBlock.Finalization.Height == 0 { + cc.chainUnsynced = cc.gov.Configuration(uint64(0)).NumChains + cc.pendingBlocks = append(cc.pendingBlocks, initBlock) + } } func (cc *compactionChain) registerBlock(block *types.Block) { if cc.blockRegistered(block.Hash) { return } - cc.blocksLock.Lock() - defer cc.blocksLock.Unlock() + cc.lock.Lock() + defer cc.lock.Unlock() cc.blocks[block.Hash] = block } func (cc *compactionChain) blockRegistered(hash common.Hash) (exist bool) { - cc.blocksLock.RLock() - defer cc.blocksLock.RUnlock() + cc.lock.RLock() + defer cc.lock.RUnlock() _, exist = cc.blocks[hash] return } @@ -86,23 +91,64 @@ func (cc *compactionChain) processBlock(block *types.Block) error { if prevBlock == nil { return ErrNotInitiazlied } - block.Finalization.Height = prevBlock.Finalization.Height + 1 - cc.prevBlockLock.Lock() - defer cc.prevBlockLock.Unlock() - cc.prevBlock = block - cc.blocksLock.Lock() - defer cc.blocksLock.Unlock() + cc.lock.Lock() + defer cc.lock.Unlock() + if prevBlock.Finalization.Height == 0 && block.Position.Height == 0 { + cc.chainUnsynced-- + } cc.pendingBlocks = append(cc.pendingBlocks, block) return nil } +func (cc *compactionChain) extractBlocks() []*types.Block { + prevBlock := cc.lastBlock() + + // Check if we're synced. + if !func() bool { + cc.lock.RLock() + defer cc.lock.RUnlock() + if len(cc.pendingBlocks) == 0 { + return false + } + // Finalization.Height == 0 is syncing from bootstrap. + if prevBlock.Finalization.Height == 0 { + return cc.chainUnsynced == 0 + } + if prevBlock.Hash != cc.pendingBlocks[0].Hash { + return false + } + return true + }() { + return []*types.Block{} + } + deliveringBlocks := make([]*types.Block, 0) + cc.lock.Lock() + defer cc.lock.Unlock() + // cc.pendingBlocks[0] will not be popped and will equal to cc.prevBlock. + for len(cc.pendingBlocks) > 1 && + (len(cc.pendingBlocks[1].Finalization.Randomness) != 0 || + cc.pendingBlocks[1].Position.Round == 0) { + delete(cc.blocks, cc.pendingBlocks[0].Hash) + cc.pendingBlocks = cc.pendingBlocks[1:] + + block := cc.pendingBlocks[0] + block.Finalization.Height = prevBlock.Finalization.Height + 1 + deliveringBlocks = append(deliveringBlocks, block) + prevBlock = block + } + + cc.prevBlock = prevBlock + + return deliveringBlocks +} + func (cc *compactionChain) processFinalizedBlock(block *types.Block) { if block.Finalization.Height <= cc.lastBlock().Finalization.Height { return } - cc.blocksLock.Lock() - defer cc.blocksLock.Unlock() + cc.lock.Lock() + defer cc.lock.Unlock() heap.Push(cc.pendingFinalizedBlocks, block) return @@ -112,8 +158,8 @@ func (cc *compactionChain) extractFinalizedBlocks() []*types.Block { prevBlock := cc.lastBlock() blocks := func() []*types.Block { - cc.blocksLock.Lock() - defer cc.blocksLock.Unlock() + cc.lock.Lock() + defer cc.lock.Unlock() blocks := []*types.Block{} prevHeight := prevBlock.Finalization.Height for cc.pendingFinalizedBlocks.Len() != 0 { @@ -179,50 +225,32 @@ func (cc *compactionChain) extractFinalizedBlocks() []*types.Block { confirmed = append(confirmed, b) prevBlock = b } - func() { - if len(confirmed) == 0 { - return - } - cc.prevBlockLock.Lock() - defer cc.prevBlockLock.Unlock() - cc.prevBlock = prevBlock - }() - cc.blocksLock.Lock() - defer cc.blocksLock.Unlock() + cc.lock.Lock() + defer cc.lock.Unlock() + if len(confirmed) != 0 && cc.prevBlock.Finalization.Height == 0 { + // Pop the initBlock inserted when bootstrapping. + cc.pendingBlocks = cc.pendingBlocks[1:] + } + cc.prevBlock = prevBlock for _, b := range toPending { heap.Push(cc.pendingFinalizedBlocks, b) } return confirmed } -func (cc *compactionChain) extractBlocks() []*types.Block { - deliveringBlocks := make([]*types.Block, 0) - cc.blocksLock.Lock() - defer cc.blocksLock.Unlock() - for len(cc.pendingBlocks) != 0 && - (len(cc.pendingBlocks[0].Finalization.Randomness) != 0 || - cc.pendingBlocks[0].Position.Round == 0) { - var block *types.Block - block, cc.pendingBlocks = cc.pendingBlocks[0], cc.pendingBlocks[1:] - delete(cc.blocks, block.Hash) - deliveringBlocks = append(deliveringBlocks, block) - } - return deliveringBlocks -} - func (cc *compactionChain) processBlockRandomnessResult( rand *types.BlockRandomnessResult) error { if !cc.blockRegistered(rand.BlockHash) { return ErrBlockNotRegistered } - cc.blocksLock.Lock() - defer cc.blocksLock.Unlock() + cc.lock.Lock() + defer cc.lock.Unlock() cc.blocks[rand.BlockHash].Finalization.Randomness = rand.Randomness return nil } func (cc *compactionChain) lastBlock() *types.Block { - cc.prevBlockLock.RLock() - defer cc.prevBlockLock.RUnlock() + cc.lock.RLock() + defer cc.lock.RUnlock() return cc.prevBlock } diff --git a/core/compaction-chain_test.go b/core/compaction-chain_test.go index f7aff8a..e73482f 100644 --- a/core/compaction-chain_test.go +++ b/core/compaction-chain_test.go @@ -76,23 +76,22 @@ func (s *CompactionChainTestSuite) TestProcessBlock() { } now = now.Add(100 * time.Millisecond) } - prevBlock := &types.Block{} for _, block := range blocks { - s.Equal(cc.prevBlock, prevBlock) s.Require().NoError(cc.processBlock(block)) - s.Equal(prevBlock.Finalization.Height+1, block.Finalization.Height) - prevBlock = block } + s.Len(cc.pendingBlocks, len(blocks)+1) } func (s *CompactionChainTestSuite) TestExtractBlocks() { cc := s.newCompactionChain() + s.Require().Equal(uint32(4), cc.gov.Configuration(uint64(0)).NumChains) blocks := make([]*types.Block, 10) for idx := range blocks { blocks[idx] = &types.Block{ Hash: common.NewRandomHash(), Position: types.Position{ - Round: 1, + Round: 1, + ChainID: uint32(idx % 4), }, } s.Require().False(cc.blockRegistered(blocks[idx].Hash)) @@ -100,7 +99,7 @@ func (s *CompactionChainTestSuite) TestExtractBlocks() { s.Require().True(cc.blockRegistered(blocks[idx].Hash)) } // Randomness is ready for extract. - for i := 0; i < 3; i++ { + for i := 0; i < 4; i++ { s.Require().NoError(cc.processBlock(blocks[i])) h := common.NewRandomHash() s.Require().NoError(cc.processBlockRandomnessResult( @@ -110,17 +109,18 @@ func (s *CompactionChainTestSuite) TestExtractBlocks() { })) } delivered := cc.extractBlocks() - s.Require().Len(delivered, 3) + s.Require().Len(delivered, 4) + s.Require().Equal(uint32(0), cc.chainUnsynced) // Randomness is not yet ready for extract. - for i := 3; i < 6; i++ { + for i := 4; i < 6; i++ { s.Require().NoError(cc.processBlock(blocks[i])) } delivered = append(delivered, cc.extractBlocks()...) - s.Require().Len(delivered, 3) + s.Require().Len(delivered, 4) // Make some randomness ready. - for i := 3; i < 6; i++ { + for i := 4; i < 6; i++ { h := common.NewRandomHash() s.Require().NoError(cc.processBlockRandomnessResult( &types.BlockRandomnessResult{ @@ -161,12 +161,17 @@ func (s *CompactionChainTestSuite) TestExtractBlocks() { // The delivered order should be the same as processing order. for i, block := range delivered { + if i > 1 { + s.Equal(delivered[i-1].Finalization.Height+1, + delivered[i].Finalization.Height) + } s.Equal(block.Hash, blocks[i].Hash) } } func (s *CompactionChainTestSuite) TestExtractBlocksRound0() { cc := s.newCompactionChain() + s.Require().Equal(uint32(4), cc.gov.Configuration(uint64(0)).NumChains) blocks := make([]*types.Block, 10) for idx := range blocks { blocks[idx] = &types.Block{ @@ -180,14 +185,14 @@ func (s *CompactionChainTestSuite) TestExtractBlocksRound0() { s.Require().True(cc.blockRegistered(blocks[idx].Hash)) } // Round 0 should be able to be extracted without randomness. - for i := 0; i < 3; i++ { + for i := 0; i < 4; i++ { s.Require().NoError(cc.processBlock(blocks[i])) } delivered := cc.extractBlocks() - s.Require().Len(delivered, 3) + s.Require().Len(delivered, 4) // Round 0 should be able to be extracted without randomness. - for i := 3; i < 10; i++ { + for i := 4; i < 10; i++ { s.Require().NoError(cc.processBlock(blocks[i])) } delivered = append(delivered, cc.extractBlocks()...) @@ -294,15 +299,117 @@ func (s *CompactionChainTestSuite) TestSyncFinalizedBlock() { s.Equal(confirmed[1].Hash, blocks[5].Hash) // Ignore finalized block if it already confirmed. + cc.init(blocks[5]) cc.processFinalizedBlock(blocks[6]) + s.Require().NoError(cc.processBlock(blocks[5])) s.Require().NoError(cc.processBlock(blocks[6])) + confirmed = cc.extractBlocks() + s.Require().Len(confirmed, 1) + s.Equal(confirmed[0].Hash, blocks[6].Hash) s.Equal(blocks[6].Hash, cc.lastBlock().Hash) s.Require().Len(cc.extractFinalizedBlocks(), 0) - s.Require().NoError(cc.processBlock(blocks[7])) - cc.processFinalizedBlock(blocks[7]) - s.Len(*cc.pendingFinalizedBlocks, 0) - s.Equal(blocks[7].Hash, cc.lastBlock().Hash) + s.Require().Len(*cc.pendingFinalizedBlocks, 0) +} + +func (s *CompactionChainTestSuite) TestSync() { + cc := s.newCompactionChain() + mock := newMockTSigVerifier(true) + for i := 0; i < cc.tsigVerifier.cacheSize; i++ { + cc.tsigVerifier.verifier[uint64(i)] = mock + } + now := time.Now().UTC() + blocks := make([]*types.Block, 20) + for idx := range blocks { + blocks[idx] = &types.Block{ + Hash: common.NewRandomHash(), + Finalization: types.FinalizationResult{ + Timestamp: now, + Height: uint64(idx + 1), + }, + } + now = now.Add(100 * time.Millisecond) + if idx > 10 { + blocks[idx].Finalization.Height = 0 + } + } + cc.init(blocks[1]) + s.Require().NoError(cc.processBlock(blocks[11])) + s.Len(cc.extractBlocks(), 0) + for i := 2; i <= 10; i++ { + cc.processFinalizedBlock(blocks[i]) + } + s.Require().Len(cc.extractFinalizedBlocks(), 9) + // Syncing is almost done here. The finalized block matches the first block + // processed. + b11 := blocks[11].Clone() + b11.Finalization.Height = uint64(12) + cc.processFinalizedBlock(b11) + s.Require().Len(cc.extractFinalizedBlocks(), 1) + s.Len(cc.extractBlocks(), 0) + // Sync is done. + s.Require().NoError(cc.processBlock(blocks[12])) + confirmed := cc.extractBlocks() + s.Require().Len(confirmed, 1) + s.Equal(confirmed[0].Hash, blocks[12].Hash) + s.Equal(uint64(13), blocks[12].Finalization.Height) + for i := 13; i < 20; i++ { + s.Require().NoError(cc.processBlock(blocks[i])) + } + confirmed = cc.extractBlocks() + s.Require().Len(confirmed, 7) + offset := 13 + for i, b := range confirmed { + s.Equal(blocks[offset+i].Hash, b.Hash) + s.Equal(uint64(offset+i+1), b.Finalization.Height) + } +} + +func (s *CompactionChainTestSuite) TestBootstrapSync() { + cc := s.newCompactionChain() + numChains := cc.gov.Configuration(uint64(0)).NumChains + s.Require().Equal(uint32(4), numChains) + mock := newMockTSigVerifier(true) + for i := 0; i < cc.tsigVerifier.cacheSize; i++ { + cc.tsigVerifier.verifier[uint64(i)] = mock + } + now := time.Now().UTC() + blocks := make([]*types.Block, 20) + for idx := range blocks { + blocks[idx] = &types.Block{ + Hash: common.NewRandomHash(), + Position: types.Position{ + Height: uint64(idx) / uint64(numChains), + }, + Finalization: types.FinalizationResult{ + Timestamp: now, + Height: uint64(idx + 1), + }, + } + now = now.Add(100 * time.Millisecond) + if idx > 2 { + blocks[idx].Finalization.Height = 0 + } + } + cc.init(&types.Block{}) + b2 := blocks[2].Clone() + b2.Finalization.Height = 0 + s.Require().NoError(cc.processBlock(b2)) + s.Require().NoError(cc.processBlock(blocks[3])) + s.Len(cc.extractBlocks(), 0) + cc.processFinalizedBlock(blocks[2]) + cc.processFinalizedBlock(blocks[1]) s.Require().Len(cc.extractFinalizedBlocks(), 0) + s.Require().Len(cc.extractBlocks(), 0) + cc.processFinalizedBlock(blocks[0]) + confirmed := cc.extractFinalizedBlocks() + s.Require().Len(confirmed, 3) + s.Equal(confirmed[0].Hash, blocks[0].Hash) + s.Equal(confirmed[1].Hash, blocks[1].Hash) + s.Equal(confirmed[2].Hash, blocks[2].Hash) + confirmed = cc.extractBlocks() + s.Require().Len(confirmed, 1) + s.Equal(confirmed[0].Hash, blocks[3].Hash) + s.Equal(uint64(4), blocks[3].Finalization.Height) } func TestCompactionChain(t *testing.T) { -- cgit v1.2.3