From 2f1e71d9d298d1f6ade8d17a1db7a657b0223872 Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Thu, 20 Sep 2018 14:37:53 +0800 Subject: core: add blockpool (#121) core.blockPool is used to cache blocks arrived out-of-order. Our consensus should retry those blocks after their acking blocks added to lattice. --- core/blocklattice.go | 6 ++-- core/blocklattice_test.go | 26 +++++++-------- core/blockpool.go | 70 +++++++++++++++++++++++++++++++++++++++ core/blockpool_test.go | 83 +++++++++++++++++++++++++++++++++++++++++++++++ core/types/block.go | 27 +++++++++++---- 5 files changed, 190 insertions(+), 22 deletions(-) create mode 100644 core/blockpool.go create mode 100644 core/blockpool_test.go diff --git a/core/blocklattice.go b/core/blocklattice.go index 5e8d48c..4dc43a4 100644 --- a/core/blocklattice.go +++ b/core/blocklattice.go @@ -240,9 +240,9 @@ func (bl *blockLattice) areAllAcksInLattice(b *types.Block) bool { return true } -// processBlock processes block, it does sanity check, inserts block into -// lattice, handles strong acking and deletes blocks which will not be used. -func (bl *blockLattice) processBlock( +// addBlock processes block, it does sanity check, inserts block into +// lattice and deletes blocks which will not be used. +func (bl *blockLattice) addBlock( block *types.Block) (deliverable []*types.Block, err error) { var ( diff --git a/core/blocklattice_test.go b/core/blocklattice_test.go index 41bd5c3..e0c9fb0 100644 --- a/core/blocklattice_test.go +++ b/core/blocklattice_test.go @@ -82,7 +82,7 @@ func (s *BlockLatticeTest) genTestCase1() (bl *blockLattice) { // Add genesis blocks. for i := uint32(0); i < chainNum; i++ { b = s.prepareGenesisBlock(i) - delivered, err = bl.processBlock(b) + delivered, err = bl.addBlock(b) // Genesis blocks are safe to be added to DAG, they acks no one. req.Len(delivered, 1) req.Nil(err) @@ -101,7 +101,7 @@ func (s *BlockLatticeTest) genTestCase1() (bl *blockLattice) { Acks: common.NewSortedHashes(common.Hashes{h}), } s.hashBlock(b) - delivered, err = bl.processBlock(b) + delivered, err = bl.addBlock(b) req.Len(delivered, 1) req.Equal(delivered[0].Hash, b.Hash) req.Nil(err) @@ -122,7 +122,7 @@ func (s *BlockLatticeTest) genTestCase1() (bl *blockLattice) { }), } s.hashBlock(b) - delivered, err = bl.processBlock(b) + delivered, err = bl.addBlock(b) req.Len(delivered, 1) req.Equal(delivered[0].Hash, b.Hash) req.Nil(err) @@ -141,7 +141,7 @@ func (s *BlockLatticeTest) genTestCase1() (bl *blockLattice) { Acks: common.NewSortedHashes(common.Hashes{h}), } s.hashBlock(b) - delivered, err = bl.processBlock(b) + delivered, err = bl.addBlock(b) req.Len(delivered, 1) req.Equal(delivered[0].Hash, b.Hash) req.Nil(err) @@ -160,7 +160,7 @@ func (s *BlockLatticeTest) genTestCase1() (bl *blockLattice) { Acks: common.NewSortedHashes(common.Hashes{h}), } s.hashBlock(b) - delivered, err = bl.processBlock(b) + delivered, err = bl.addBlock(b) req.Len(delivered, 1) req.Equal(delivered[0].Hash, b.Hash) req.Nil(err) @@ -372,7 +372,7 @@ func (s *BlockLatticeTest) TestRandomIntensiveAcking() { // Generate genesis blocks. for i := uint32(0); i < chainNum; i++ { b = s.prepareGenesisBlock(i) - delivered, err = bl.processBlock(b) + delivered, err = bl.addBlock(b) req.Len(delivered, 1) req.Nil(err) } @@ -386,7 +386,7 @@ func (s *BlockLatticeTest) TestRandomIntensiveAcking() { } bl.prepareBlock(b) s.hashBlock(b) - delivered, err = bl.processBlock(b) + delivered, err = bl.addBlock(b) req.Nil(err) extracted = append(extracted, delivered...) } @@ -441,7 +441,7 @@ func (s *BlockLatticeTest) TestRandomlyGeneratedBlocks() { revealedHashes = append(revealedHashes, b.Hash) // Pass blocks to blocklattice. - delivered, err = bl.processBlock(&b) + delivered, err = bl.addBlock(&b) req.Nil(err) for _, b := range delivered { deliveredHashes = append(deliveredHashes, b.Hash) @@ -516,16 +516,16 @@ func (s *BlockLatticeTest) TestPrepareBlock() { time.Sleep(minInterval) b30 := s.prepareGenesisBlock(3) // Submit these blocks to blocklattice. - delivered, err = bl.processBlock(b00) + delivered, err = bl.addBlock(b00) req.Len(delivered, 1) req.Nil(err) - delivered, err = bl.processBlock(b10) + delivered, err = bl.addBlock(b10) req.Len(delivered, 1) req.Nil(err) - delivered, err = bl.processBlock(b20) + delivered, err = bl.addBlock(b20) req.Len(delivered, 1) req.Nil(err) - delivered, err = bl.processBlock(b30) + delivered, err = bl.addBlock(b30) req.Len(delivered, 1) req.Nil(err) // We should be able to collect all 4 genesis blocks by calling @@ -544,7 +544,7 @@ func (s *BlockLatticeTest) TestPrepareBlock() { req.Contains(b11.Acks, b30.Hash) req.Equal(b11.ParentHash, b10.Hash) req.Equal(b11.Position.Height, uint64(1)) - delivered, err = bl.processBlock(b11) + delivered, err = bl.addBlock(b11) req.Len(delivered, 1) req.Nil(err) // Propose/Process a block based on collected info. diff --git a/core/blockpool.go b/core/blockpool.go new file mode 100644 index 0000000..bfde881 --- /dev/null +++ b/core/blockpool.go @@ -0,0 +1,70 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// . + +package core + +import ( + "container/heap" + + "github.com/dexon-foundation/dexon-consensus-core/core/types" +) + +// blockPool is a slice of heap of blocks, indexed by chainID, +// and the heap is sorted based on heights of blocks. +type blockPool []types.ByHeight + +// newBlockPool constructs a blockPool. +func newBlockPool(chainNum uint32) (pool blockPool) { + pool = make(blockPool, chainNum) + for _, p := range pool { + heap.Init(&p) + } + return +} + +// addBlock adds a block into pending set and make sure these +// blocks are sorted by height. +func (p blockPool) addBlock(b *types.Block) { + heap.Push(&p[b.Position.ChainID], b) +} + +// purgeBlocks purge blocks of that chain with less-or-equal height. +// NOTE: we won't check the validity of 'chainID', the caller should +// be sure what he is expecting. +func (p blockPool) purgeBlocks(chainID uint32, height uint64) { + for { + if len(p[chainID]) == 0 || p[chainID][0].Position.Height > height { + break + } + heap.Pop(&p[chainID]) + } +} + +// tip get the blocks with lowest height of the chain if any. +func (p blockPool) tip(chainID uint32) *types.Block { + if len(p[chainID]) == 0 { + return nil + } + return p[chainID][0] +} + +// removeTip removes block with lowest height of the specified chain. +func (p blockPool) removeTip(chainID uint32) { + if len(p[chainID]) > 0 { + heap.Pop(&p[chainID]) + } +} diff --git a/core/blockpool_test.go b/core/blockpool_test.go new file mode 100644 index 0000000..c245c91 --- /dev/null +++ b/core/blockpool_test.go @@ -0,0 +1,83 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// . + +package core + +import ( + "testing" + + "github.com/dexon-foundation/dexon-consensus-core/core/types" + "github.com/stretchr/testify/suite" +) + +type BlockPoolTestSuite struct { + suite.Suite +} + +func (s *BlockPoolTestSuite) TestBasicUsage() { + // This test case try this flow: + // - add some blocks into pool. + // - get tips, check if expected. + // - get tips, should be identical to previous call. + // - remove tips, and get tips again, check if expected. + // - purge one chain, check if expected. + var ( + req = s.Require() + pool = newBlockPool(3) + ) + addBlockWithPosition := func(chainID uint32, height uint64) { + pool.addBlock(&types.Block{ + Position: types.Position{ + ChainID: chainID, + Height: height, + }}) + } + chkPos := func(b *types.Block, chainID uint32, height uint64) { + req.Equal(b.Position.ChainID, chainID) + req.Equal(b.Position.Height, height) + } + addBlockWithPosition(0, 0) + addBlockWithPosition(0, 1) + addBlockWithPosition(0, 2) + addBlockWithPosition(0, 3) + addBlockWithPosition(2, 0) + addBlockWithPosition(2, 1) + addBlockWithPosition(2, 2) + // Check each tip. + chkPos(pool.tip(0), 0, 0) + chkPos(pool.tip(2), 2, 0) + req.Nil(pool.tip(1)) + // Remove tips of chain#0, #1. + pool.removeTip(0) + pool.removeTip(1) + // Get tips of chain#0, #2 back to check. + chkPos(pool.tip(0), 0, 1) + chkPos(pool.tip(2), 2, 0) // Chain#2 is untouched. + // Purge with height lower than lowest height. + pool.purgeBlocks(0, 0) + chkPos(pool.tip(0), 0, 1) // Chain#0 is not affected. + // Purge with height in range. + pool.purgeBlocks(0, 2) + chkPos(pool.tip(0), 0, 3) // Height = 1, 2 are purged. + // Purge with height higher than highest height. + pool.purgeBlocks(0, 4) + req.Nil(pool.tip(0)) // Whole chain is purged. +} + +func TestBlockPool(t *testing.T) { + suite.Run(t, new(BlockPoolTestSuite)) +} diff --git a/core/types/block.go b/core/types/block.go index 949876f..ddd0abd 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -125,14 +125,29 @@ func (b ByHash) Swap(i int, j int) { // ByHeight is the helper type for sorting slice of blocks by height. type ByHeight []*Block -func (b ByHeight) Len() int { - return len(b) +// Len implements Len method in sort.Sort interface. +func (bs ByHeight) Len() int { + return len(bs) } -func (b ByHeight) Less(i int, j int) bool { - return b[i].Position.Height < b[j].Position.Height +// Less implements Less method in sort.Sort interface. +func (bs ByHeight) Less(i int, j int) bool { + return bs[i].Position.Height < bs[j].Position.Height } -func (b ByHeight) Swap(i int, j int) { - b[i], b[j] = b[j], b[i] +// Swap implements Swap method in sort.Sort interface. +func (bs ByHeight) Swap(i int, j int) { + bs[i], bs[j] = bs[j], bs[i] +} + +// Push implements Push method in heap interface. +func (bs *ByHeight) Push(x interface{}) { + *bs = append(*bs, x.(*Block)) +} + +// Pop implements Pop method in heap interface. +func (bs *ByHeight) Pop() (ret interface{}) { + n := len(*bs) + *bs, ret = (*bs)[0:n-1], (*bs)[n-1] + return } -- cgit v1.2.3