From 97abfdabfd41b44f3273a869eca75cea34b0fdc8 Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Mon, 8 Oct 2018 15:30:53 +0800 Subject: core: Add block randomness in compaction-chain module (#179) --- core/compaction-chain.go | 74 ++++++++++++++++++++++++++---------- core/compaction-chain_test.go | 87 +++++++++++++++++++++++++++++++++++++++---- core/consensus.go | 16 ++++++-- core/consensus_test.go | 3 ++ core/lattice_test.go | 2 +- simulation/app.go | 3 ++ 6 files changed, 153 insertions(+), 32 deletions(-) diff --git a/core/compaction-chain.go b/core/compaction-chain.go index c6d423d..3037768 100644 --- a/core/compaction-chain.go +++ b/core/compaction-chain.go @@ -20,42 +20,49 @@ package core import ( + "fmt" "sync" - "time" "github.com/dexon-foundation/dexon-consensus-core/common" - "github.com/dexon-foundation/dexon-consensus-core/core/blockdb" "github.com/dexon-foundation/dexon-consensus-core/core/types" ) -type pendingAck struct { - receivedTime time.Time -} +// Errors for compaction chain module. +var ( + ErrBlockNotRegistered = fmt.Errorf( + "block not registered") +) type compactionChain struct { - db blockdb.Reader - pendingAckLock sync.RWMutex - pendingAck map[common.Hash]*pendingAck - prevBlockLock sync.RWMutex - prevBlock *types.Block - witnessAcksLock sync.RWMutex + blocks map[common.Hash]*types.Block + pendingBlocks []*types.Block + blocksLock sync.RWMutex + prevBlockLock sync.RWMutex + prevBlock *types.Block } -func newCompactionChain( - db blockdb.Reader, -) *compactionChain { +func newCompactionChain() *compactionChain { return &compactionChain{ - db: db, - pendingAck: make(map[common.Hash]*pendingAck), + blocks: make(map[common.Hash]*types.Block), } } -func (cc *compactionChain) sanityCheck(witnessBlock *types.Block) error { - return nil +func (cc *compactionChain) registerBlock(block *types.Block) { + if cc.blockRegistered(block.Hash) { + return + } + cc.blocksLock.Lock() + defer cc.blocksLock.Unlock() + cc.blocks[block.Hash] = block +} + +func (cc *compactionChain) blockRegistered(hash common.Hash) (exist bool) { + cc.blocksLock.RLock() + defer cc.blocksLock.RUnlock() + _, exist = cc.blocks[hash] + return } -// TODO(jimmy-dexon): processBlock can be extraced to -// another struct. func (cc *compactionChain) processBlock(block *types.Block) error { prevBlock := cc.lastBlock() if prevBlock != nil { @@ -64,6 +71,33 @@ func (cc *compactionChain) processBlock(block *types.Block) error { cc.prevBlockLock.Lock() defer cc.prevBlockLock.Unlock() cc.prevBlock = block + cc.blocksLock.Lock() + defer cc.blocksLock.Unlock() + cc.pendingBlocks = append(cc.pendingBlocks, block) + return nil +} + +func (cc *compactionChain) extractBlocks() []*types.Block { + deliveringBlocks := make([]*types.Block, 0) + cc.blocksLock.Lock() + defer cc.blocksLock.Unlock() + for len(cc.pendingBlocks) != 0 && len(cc.pendingBlocks[0].Randomness) != 0 { + var block *types.Block + block, cc.pendingBlocks = cc.pendingBlocks[0], cc.pendingBlocks[1:] + delete(cc.blocks, block.Hash) + deliveringBlocks = append(deliveringBlocks, block) + } + return deliveringBlocks +} + +func (cc *compactionChain) processBlockRandomnessResult( + rand *types.BlockRandomnessResult) error { + if !cc.blockRegistered(rand.BlockHash) { + return ErrBlockNotRegistered + } + cc.blocksLock.Lock() + defer cc.blocksLock.Unlock() + cc.blocks[rand.BlockHash].Randomness = rand.Randomness return nil } diff --git a/core/compaction-chain_test.go b/core/compaction-chain_test.go index 6f62aa2..5ceb5c2 100644 --- a/core/compaction-chain_test.go +++ b/core/compaction-chain_test.go @@ -22,24 +22,19 @@ import ( "time" "github.com/dexon-foundation/dexon-consensus-core/common" - "github.com/dexon-foundation/dexon-consensus-core/core/blockdb" "github.com/dexon-foundation/dexon-consensus-core/core/types" "github.com/stretchr/testify/suite" ) type CompactionChainTestSuite struct { suite.Suite - db blockdb.BlockDatabase } func (s *CompactionChainTestSuite) SetupTest() { - var err error - s.db, err = blockdb.NewMemBackedBlockDB() - s.Require().Nil(err) } func (s *CompactionChainTestSuite) newCompactionChain() *compactionChain { - return newCompactionChain(s.db) + return newCompactionChain() } func (s *CompactionChainTestSuite) generateBlocks( @@ -74,12 +69,88 @@ func (s *CompactionChainTestSuite) TestProcessBlock() { var prevBlock *types.Block for _, block := range blocks { s.Equal(cc.prevBlock, prevBlock) - err := cc.processBlock(block) - s.Require().Nil(err) + s.Require().NoError(cc.processBlock(block)) prevBlock = block } } +func (s *CompactionChainTestSuite) TestExtractBlocks() { + cc := s.newCompactionChain() + blocks := make([]*types.Block, 10) + for idx := range blocks { + blocks[idx] = &types.Block{ + Hash: common.NewRandomHash(), + } + s.Require().False(cc.blockRegistered(blocks[idx].Hash)) + cc.registerBlock(blocks[idx]) + s.Require().True(cc.blockRegistered(blocks[idx].Hash)) + } + // Randomness is ready for extract. + for i := 0; i < 3; i++ { + s.Require().NoError(cc.processBlock(blocks[i])) + h := common.NewRandomHash() + s.Require().NoError(cc.processBlockRandomnessResult( + &types.BlockRandomnessResult{ + BlockHash: blocks[i].Hash, + Randomness: h[:], + })) + } + delivered := cc.extractBlocks() + s.Require().Len(delivered, 3) + + // Randomness is not yet ready for extract. + for i := 3; i < 6; i++ { + s.Require().NoError(cc.processBlock(blocks[i])) + } + delivered = append(delivered, cc.extractBlocks()...) + s.Require().Len(delivered, 3) + + // Make some randomness ready. + for i := 3; i < 6; i++ { + h := common.NewRandomHash() + s.Require().NoError(cc.processBlockRandomnessResult( + &types.BlockRandomnessResult{ + BlockHash: blocks[i].Hash, + Randomness: h[:], + })) + } + delivered = append(delivered, cc.extractBlocks()...) + s.Require().Len(delivered, 6) + + // Later block's randomness is ready. + for i := 6; i < 10; i++ { + s.Require().NoError(cc.processBlock(blocks[i])) + if i < 8 { + continue + } + h := common.NewRandomHash() + s.Require().NoError(cc.processBlockRandomnessResult( + &types.BlockRandomnessResult{ + BlockHash: blocks[i].Hash, + Randomness: h[:], + })) + } + delivered = append(delivered, cc.extractBlocks()...) + s.Require().Len(delivered, 6) + + // Prior block's randomness is ready. + for i := 6; i < 8; i++ { + h := common.NewRandomHash() + s.Require().NoError(cc.processBlockRandomnessResult( + &types.BlockRandomnessResult{ + BlockHash: blocks[i].Hash, + Randomness: h[:], + })) + } + delivered = append(delivered, cc.extractBlocks()...) + s.Require().Len(delivered, 10) + + // The delivered order should be the same as processing order. + for i, block := range delivered { + s.Equal(block.Hash, blocks[i].Hash) + } +} + func TestCompactionChain(t *testing.T) { suite.Run(t, new(CompactionChainTestSuite)) } diff --git a/core/consensus.go b/core/consensus.go index ae30619..e0f8ef2 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -97,6 +97,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( log.Println(ErrUnknownBlockConfirmed, hash) return } + recv.consensus.ccModule.registerBlock(block) voteList := make([]types.Vote, 0, len(votes)) for _, vote := range votes { voteList = append(voteList, *vote) @@ -263,7 +264,7 @@ func NewConsensus( con := &Consensus{ ID: ID, currentConfig: config, - ccModule: newCompactionChain(db), + ccModule: newCompactionChain(), lattice: lattice, nbModule: nbModule, gov: gov, @@ -541,6 +542,9 @@ func (con *Consensus) ProcessVote(vote *types.Vote) (err error) { // ProcessAgreementResult processes the randomness request. func (con *Consensus) ProcessAgreementResult( rand *types.AgreementResult) error { + if !con.ccModule.blockRegistered(rand.BlockHash) { + return nil + } if con.round != rand.Round { return nil } @@ -610,6 +614,9 @@ func (con *Consensus) ProcessAgreementResult( // ProcessBlockRandomnessResult processes the randomness result. func (con *Consensus) ProcessBlockRandomnessResult( rand *types.BlockRandomnessResult) error { + if !con.ccModule.blockRegistered(rand.BlockHash) { + return nil + } // TODO(jimmy-dexon): reuse the GPK. round := rand.Round gpk, err := NewDKGGroupPublicKey(round, @@ -623,7 +630,7 @@ func (con *Consensus) ProcessBlockRandomnessResult( rand.BlockHash, crypto.Signature{Signature: rand.Randomness}) { return ErrIncorrectBlockRandomnessResult } - return nil + return con.ccModule.processBlockRandomnessResult(rand) } // preProcessBlock performs Byzantine Agreement on the block. @@ -655,10 +662,13 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { if err = con.ccModule.processBlock(b); err != nil { return } + go con.event.NotifyTime(b.ConsensusTimestamp) + } + deliveredBlocks = con.ccModule.extractBlocks() + for _, b := range deliveredBlocks { if err = con.db.Put(*b); err != nil { return } - go con.event.NotifyTime(b.ConsensusTimestamp) con.nbModule.BlockDelivered(*b) // TODO(mission): Find a way to safely recycle the block. // We should deliver block directly to diff --git a/core/consensus_test.go b/core/consensus_test.go index 6827352..4c0f41c 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -194,6 +194,9 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { // to all core.Consensus objects. broadcast := func(b *types.Block) { for _, obj := range objs { + h := common.NewRandomHash() + b.Randomness = h[:] + obj.con.ccModule.registerBlock(b) req.Nil(obj.con.processBlock(b)) } } diff --git a/core/lattice_test.go b/core/lattice_test.go index 1270a3c..11caa11 100644 --- a/core/lattice_test.go +++ b/core/lattice_test.go @@ -109,7 +109,7 @@ func (s *LatticeTestSuite) newTestLatticeMgr( app := test.NewApp() // Setup lattice. return &testLatticeMgr{ - ccModule: newCompactionChain(db), + ccModule: newCompactionChain(), app: app, db: db, lattice: NewLattice( diff --git a/simulation/app.go b/simulation/app.go index a46f3a6..e88f956 100644 --- a/simulation/app.go +++ b/simulation/app.go @@ -126,6 +126,9 @@ func (a *simApp) TotalOrderingDelivered(blockHashes common.Hashes, early bool) { // BlockDelivered is called when a block in compaction chain is delivered. func (a *simApp) BlockDelivered(block types.Block) { + if len(block.Randomness) == 0 { + panic(fmt.Errorf("Block %s randomness is empty", block.Hash)) + } func() { a.blockByHashMutex.Lock() defer a.blockByHashMutex.Unlock() -- cgit v1.2.3