From 8303e9d054957195717f41804a456e2720b0c4bb Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Thu, 18 Oct 2018 14:48:05 +0800 Subject: core: sync compaction chain (#222) --- core/compaction-chain.go | 114 +++++++++++++++++++++++++++++++---------- core/compaction-chain_test.go | 107 ++++++++++++++++++++++++++++++++++++++ core/consensus.go | 20 +++++++- core/dkg-tsig-protocol.go | 85 ++++++++++++++++++++++++++++++ core/dkg-tsig-protocol_test.go | 61 ++++++++++++++++++++++ core/types/block.go | 31 +++++++++++ 6 files changed, 391 insertions(+), 27 deletions(-) (limited to 'core') diff --git a/core/compaction-chain.go b/core/compaction-chain.go index ac8c458..6511727 100644 --- a/core/compaction-chain.go +++ b/core/compaction-chain.go @@ -18,6 +18,7 @@ package core import ( + "container/heap" "fmt" "sync" @@ -34,20 +35,27 @@ var ( "not initialized") ) +type finalizedBlockHeap = types.ByFinalizationHeight + type compactionChain struct { gov Governance + tsigVerifier *TSigVerifierCache blocks map[common.Hash]*types.Block pendingBlocks []*types.Block - pendingFinalizedBlocks []*types.Block + pendingFinalizedBlocks *finalizedBlockHeap blocksLock sync.RWMutex prevBlockLock sync.RWMutex prevBlock *types.Block } func newCompactionChain(gov Governance) *compactionChain { + pendingFinalizedBlocks := &finalizedBlockHeap{} + heap.Init(pendingFinalizedBlocks) return &compactionChain{ - gov: gov, - blocks: make(map[common.Hash]*types.Block), + gov: gov, + tsigVerifier: NewTSigVerifierCache(gov, 7), + blocks: make(map[common.Hash]*types.Block), + pendingFinalizedBlocks: pendingFinalizedBlocks, } } @@ -88,49 +96,103 @@ func (cc *compactionChain) processBlock(block *types.Block) error { return nil } -func (cc *compactionChain) processFinalizedBlock(block *types.Block) ( - []*types.Block, error) { +func (cc *compactionChain) processFinalizedBlock(block *types.Block) { + if block.Finalization.Height <= cc.lastBlock().Finalization.Height { + return + } + + cc.blocksLock.Lock() + defer cc.blocksLock.Unlock() + heap.Push(cc.pendingFinalizedBlocks, block) + + return +} + +func (cc *compactionChain) extractFinalizedBlocks() []*types.Block { + prevBlock := cc.lastBlock() + blocks := func() []*types.Block { cc.blocksLock.Lock() defer cc.blocksLock.Unlock() - blocks := cc.pendingFinalizedBlocks - cc.pendingFinalizedBlocks = []*types.Block{} + blocks := []*types.Block{} + prevHeight := prevBlock.Finalization.Height + for cc.pendingFinalizedBlocks.Len() != 0 { + tip := (*cc.pendingFinalizedBlocks)[0] + // Pop blocks that are already confirmed. + if tip.Finalization.Height <= prevBlock.Finalization.Height { + heap.Pop(cc.pendingFinalizedBlocks) + continue + } + // Since we haven't verified the finalized block, + // it is possible to be forked. + if tip.Finalization.Height == prevHeight || + tip.Finalization.Height == prevHeight+1 { + prevHeight = tip.Finalization.Height + blocks = append(blocks, tip) + heap.Pop(cc.pendingFinalizedBlocks) + } else { + break + } + } return blocks }() - threshold := make(map[uint64]int) - gpks := make(map[uint64]*DKGGroupPublicKey) toPending := []*types.Block{} confirmed := []*types.Block{} - blocks = append(blocks, block) for _, b := range blocks { - if !cc.gov.IsDKGFinal(b.Position.Round) { - toPending = append(toPending, b) + if b.Hash == prevBlock.Hash && + b.Finalization.Height == prevBlock.Finalization.Height { continue } round := b.Position.Round - if _, exist := gpks[round]; !exist { - threshold[round] = int(cc.gov.Configuration(round).DKGSetSize)/3 + 1 - var err error - gpks[round], err = NewDKGGroupPublicKey( - round, - cc.gov.DKGMasterPublicKeys(round), cc.gov.DKGComplaints(round), - threshold[round]) - if err != nil { - continue - } + v, ok, err := cc.tsigVerifier.UpdateAndGet(round) + if err != nil { + continue + } + if !ok { + toPending = append(toPending, b) + continue } - gpk := gpks[round] - if ok := gpk.VerifySignature(b.Hash, crypto.Signature{ + if ok := v.VerifySignature(b.Hash, crypto.Signature{ Type: "bls", Signature: b.Finalization.Randomness}); !ok { continue } + // Fork resolution: choose block with smaller hash. + if prevBlock.Finalization.Height == + b.Finalization.Height { + //TODO(jimmy-dexon): remove this panic after test. + if true { + // workaround to `go vet` error + panic(fmt.Errorf( + "forked finalized block %s,%s", prevBlock.Hash, b.Hash)) + } + if b.Hash.Less(prevBlock.Hash) { + confirmed = confirmed[:len(confirmed)-1] + } else { + continue + } + } + if b.Finalization.Height-prevBlock.Finalization.Height > 1 { + toPending = append(toPending, b) + continue + } confirmed = append(confirmed, b) + prevBlock = b } + func() { + if len(confirmed) == 0 { + return + } + cc.prevBlockLock.Lock() + defer cc.prevBlockLock.Unlock() + cc.prevBlock = prevBlock + }() cc.blocksLock.Lock() defer cc.blocksLock.Unlock() - cc.pendingFinalizedBlocks = append(cc.pendingFinalizedBlocks, toPending...) - return confirmed, nil + for _, b := range toPending { + heap.Push(cc.pendingFinalizedBlocks, b) + } + return confirmed } func (cc *compactionChain) extractBlocks() []*types.Block { diff --git a/core/compaction-chain_test.go b/core/compaction-chain_test.go index bffe57a..f7aff8a 100644 --- a/core/compaction-chain_test.go +++ b/core/compaction-chain_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" "github.com/dexon-foundation/dexon-consensus-core/core/test" "github.com/dexon-foundation/dexon-consensus-core/core/types" "github.com/stretchr/testify/suite" @@ -198,6 +199,112 @@ func (s *CompactionChainTestSuite) TestExtractBlocksRound0() { } } +type mockTSigVerifier struct { + defaultRet bool + ret map[common.Hash]bool +} + +func newMockTSigVerifier(defaultRet bool) *mockTSigVerifier { + return &mockTSigVerifier{ + defaultRet: defaultRet, + ret: make(map[common.Hash]bool), + } +} + +func (m *mockTSigVerifier) VerifySignature( + hash common.Hash, _ crypto.Signature) bool { + if ret, exist := m.ret[hash]; exist { + return ret + } + return m.defaultRet +} + +func (s *CompactionChainTestSuite) TestSyncFinalizedBlock() { + cc := s.newCompactionChain() + mock := newMockTSigVerifier(true) + for i := 0; i < cc.tsigVerifier.cacheSize; i++ { + cc.tsigVerifier.verifier[uint64(i)] = mock + } + now := time.Now().UTC() + blocks := make([]*types.Block, 10) + for idx := range blocks { + blocks[idx] = &types.Block{ + Hash: common.NewRandomHash(), + Finalization: types.FinalizationResult{ + Timestamp: now, + Height: uint64(idx + 1), + }, + } + now = now.Add(100 * time.Millisecond) + } + cc.processFinalizedBlock(blocks[1]) + cc.processFinalizedBlock(blocks[3]) + s.Len(cc.extractFinalizedBlocks(), 0) + + cc.processFinalizedBlock(blocks[0]) + confirmed := cc.extractFinalizedBlocks() + s.Equal(blocks[1].Hash, cc.lastBlock().Hash) + s.Require().Len(confirmed, 2) + s.Equal(confirmed[0].Hash, blocks[0].Hash) + s.Equal(confirmed[1].Hash, blocks[1].Hash) + hash := common.NewRandomHash() + cc.processFinalizedBlock(&types.Block{ + Hash: hash, + Finalization: types.FinalizationResult{ + Height: uint64(3), + }, + }) + // Should not deliver block with error tsig + mock.ret[hash] = false + s.Len(cc.extractFinalizedBlocks(), 0) + // The error block should be discarded. + s.Len(cc.extractFinalizedBlocks(), 0) + + // Shuold not deliver block if dkg is not final + round99 := uint64(99) + s.Require().False(cc.gov.IsDKGFinal(round99)) + blocks[2].Position.Round = round99 + cc.processFinalizedBlock(blocks[2]) + s.Len(cc.extractFinalizedBlocks(), 0) + + // Deliver blocks. + blocks[3].Position.Round = round99 + cc.tsigVerifier.verifier[round99] = mock + confirmed = cc.extractFinalizedBlocks() + s.Equal(blocks[3].Hash, cc.lastBlock().Hash) + s.Require().Len(confirmed, 2) + s.Equal(confirmed[0].Hash, blocks[2].Hash) + s.Equal(confirmed[1].Hash, blocks[3].Hash) + + // Inserting a bad block. The later block should not be discarded. + cc.processFinalizedBlock(blocks[5]) + cc.processFinalizedBlock(&types.Block{ + Hash: hash, + Finalization: types.FinalizationResult{ + Height: uint64(5), + }, + }) + s.Len(cc.extractFinalizedBlocks(), 0) + // Good block is inserted, the later block should be delivered. + cc.processFinalizedBlock(blocks[4]) + confirmed = cc.extractFinalizedBlocks() + s.Equal(blocks[5].Hash, cc.lastBlock().Hash) + s.Require().Len(confirmed, 2) + s.Equal(confirmed[0].Hash, blocks[4].Hash) + s.Equal(confirmed[1].Hash, blocks[5].Hash) + + // Ignore finalized block if it already confirmed. + cc.processFinalizedBlock(blocks[6]) + s.Require().NoError(cc.processBlock(blocks[6])) + s.Equal(blocks[6].Hash, cc.lastBlock().Hash) + s.Require().Len(cc.extractFinalizedBlocks(), 0) + s.Require().NoError(cc.processBlock(blocks[7])) + cc.processFinalizedBlock(blocks[7]) + s.Len(*cc.pendingFinalizedBlocks, 0) + s.Equal(blocks[7].Hash, cc.lastBlock().Hash) + s.Require().Len(cc.extractFinalizedBlocks(), 0) +} + func TestCompactionChain(t *testing.T) { suite.Run(t, new(CompactionChainTestSuite)) } diff --git a/core/consensus.go b/core/consensus.go index 3601720..d3f8b5d 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -740,10 +740,28 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { // processFinalizedBlock is the entry point for syncing blocks. func (con *Consensus) processFinalizedBlock(block *types.Block) (err error) { - // TODO(jimmy-dexon): drop block that is already in compaction chain. if err = con.lattice.SanityCheck(block, false); err != nil { return } + con.ccModule.processFinalizedBlock(block) + for { + confirmed := con.ccModule.extractFinalizedBlocks() + if len(confirmed) == 0 { + break + } + if err = con.lattice.ctModule.processBlocks(confirmed); err != nil { + return + } + for _, b := range confirmed { + if err = con.db.Put(*b); err != nil { + if err != blockdb.ErrBlockExists { + return + } + err = nil + } + con.nbModule.BlockDelivered(b.Hash, b.Finalization) + } + } return } diff --git a/core/dkg-tsig-protocol.go b/core/dkg-tsig-protocol.go index 795f6d3..1b5f547 100644 --- a/core/dkg-tsig-protocol.go +++ b/core/dkg-tsig-protocol.go @@ -19,6 +19,7 @@ package core import ( "fmt" + "sync" "github.com/dexon-foundation/dexon-consensus-core/common" "github.com/dexon-foundation/dexon-consensus-core/core/crypto" @@ -46,6 +47,8 @@ var ( "incorrect partialSignature") ErrNotEnoughtPartialSignatures = fmt.Errorf( "not enough of partial signatures") + ErrRoundAlreadyPurged = fmt.Errorf( + "cache of round already been purged") ) type dkgReceiver interface { @@ -95,6 +98,20 @@ type DKGGroupPublicKey struct { threshold int } +// TSigVerifier is the interface verifying threshold signature. +type TSigVerifier interface { + VerifySignature(hash common.Hash, sig crypto.Signature) bool +} + +// TSigVerifierCache is the cache for TSigVerifier. +type TSigVerifierCache struct { + gov Governance + verifier map[uint64]TSigVerifier + minRound uint64 + cacheSize int + lock sync.RWMutex +} + type tsigProtocol struct { groupPublicKey *DKGGroupPublicKey hash common.Hash @@ -405,6 +422,74 @@ func (gpk *DKGGroupPublicKey) VerifySignature( return gpk.groupPublicKey.VerifySignature(hash, sig) } +// NewTSigVerifierCache creats a DKGGroupPublicKey instance. +func NewTSigVerifierCache(gov Governance, cacheSize int) *TSigVerifierCache { + return &TSigVerifierCache{ + gov: gov, + verifier: make(map[uint64]TSigVerifier), + cacheSize: cacheSize, + } +} + +// UpdateAndGet calls Update and then Get. +func (tc *TSigVerifierCache) UpdateAndGet(round uint64) ( + TSigVerifier, bool, error) { + ok, err := tc.Update(round) + if err != nil { + return nil, false, err + } + if !ok { + return nil, false, nil + } + v, ok := tc.Get(round) + return v, ok, nil +} + +// Update the cache and returns if success. +func (tc *TSigVerifierCache) Update(round uint64) (bool, error) { + tc.lock.Lock() + defer tc.lock.Unlock() + if round < tc.minRound { + return false, ErrRoundAlreadyPurged + } + if _, exist := tc.verifier[round]; exist { + return true, nil + } + if !tc.gov.IsDKGFinal(round) { + return false, nil + } + gpk, err := NewDKGGroupPublicKey(round, + tc.gov.DKGMasterPublicKeys(round), + tc.gov.DKGComplaints(round), + int(tc.gov.Configuration(round).DKGSetSize/3)+1) + if err != nil { + return false, err + } + if len(tc.verifier) == 0 { + tc.minRound = round + } + tc.verifier[round] = gpk + if len(tc.verifier) > tc.cacheSize { + delete(tc.verifier, tc.minRound) + } + for { + if _, exist := tc.verifier[tc.minRound]; !exist { + tc.minRound++ + } else { + break + } + } + return true, nil +} + +// Get the TSigVerifier of round and returns if it exists. +func (tc *TSigVerifierCache) Get(round uint64) (TSigVerifier, bool) { + tc.lock.RLock() + defer tc.lock.RUnlock() + verifier, exist := tc.verifier[round] + return verifier, exist +} + func newTSigProtocol( gpk *DKGGroupPublicKey, hash common.Hash) *tsigProtocol { diff --git a/core/dkg-tsig-protocol_test.go b/core/dkg-tsig-protocol_test.go index d03d811..e533a2f 100644 --- a/core/dkg-tsig-protocol_test.go +++ b/core/dkg-tsig-protocol_test.go @@ -635,6 +635,67 @@ func (s *DKGTSIGProtocolTestSuite) TestProposeFinalize() { Round: 1, }, final) } + +func (s *DKGTSIGProtocolTestSuite) TestTSigVerifierCache() { + k := 3 + n := 10 + gov, err := test.NewGovernance(n, 100) + s.Require().NoError(err) + for i := 0; i < 10; i++ { + round := uint64(i + 1) + receivers, protocols := s.newProtocols(k, n, round) + + for _, receiver := range receivers { + gov.AddDKGMasterPublicKey(round, receiver.mpk) + } + + for _, protocol := range protocols { + protocol.proposeFinalize() + } + + for _, recv := range receivers { + s.Require().Len(recv.final, 1) + gov.AddDKGFinalize(recv.final[0].Round, recv.final[0]) + } + s.Require().True(gov.IsDKGFinal(round)) + } + + cache := NewTSigVerifierCache(gov, 3) + for i := 0; i < 5; i++ { + round := uint64(i + 1) + ok, err := cache.Update(round) + s.Require().NoError(err) + s.True(ok) + } + s.Len(cache.verifier, 3) + + for i := 0; i < 2; i++ { + round := uint64(i + 1) + _, exist := cache.Get(round) + s.False(exist) + } + + for i := 3; i < 5; i++ { + round := uint64(i + 1) + _, exist := cache.Get(round) + s.True(exist) + } + + ok, err := cache.Update(uint64(1)) + s.Require().Equal(ErrRoundAlreadyPurged, err) + + cache = NewTSigVerifierCache(gov, 1) + ok, err = cache.Update(uint64(3)) + s.Require().NoError(err) + s.Require().True(ok) + s.Equal(uint64(3), cache.minRound) + + ok, err = cache.Update(uint64(5)) + s.Require().NoError(err) + s.Require().True(ok) + s.Equal(uint64(5), cache.minRound) +} + func TestDKGTSIGProtocol(t *testing.T) { suite.Run(t, new(DKGTSIGProtocolTestSuite)) } diff --git a/core/types/block.go b/core/types/block.go index bf27934..e384f95 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -175,3 +175,34 @@ func (bs *ByHeight) Pop() (ret interface{}) { *bs, ret = (*bs)[0:n-1], (*bs)[n-1] return } + +// ByFinalizationHeight is the helper type for sorting slice of blocks by +// finalization height. +type ByFinalizationHeight []*Block + +// Len implements Len method in sort.Sort interface. +func (bs ByFinalizationHeight) Len() int { + return len(bs) +} + +// Less implements Less method in sort.Sort interface. +func (bs ByFinalizationHeight) Less(i int, j int) bool { + return bs[i].Finalization.Height < bs[j].Finalization.Height +} + +// Swap implements Swap method in sort.Sort interface. +func (bs ByFinalizationHeight) Swap(i int, j int) { + bs[i], bs[j] = bs[j], bs[i] +} + +// Push implements Push method in heap interface. +func (bs *ByFinalizationHeight) Push(x interface{}) { + *bs = append(*bs, x.(*Block)) +} + +// Pop implements Pop method in heap interface. +func (bs *ByFinalizationHeight) Pop() (ret interface{}) { + n := len(*bs) + *bs, ret = (*bs)[0:n-1], (*bs)[n-1] + return +} -- cgit v1.2.3