From 29f38589a29e434282a783433d9fbb565ce4231b Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Wed, 17 Oct 2018 19:06:40 +0800 Subject: core: Some sync functions (#220) --- core/compaction-chain.go | 61 ++++++++++++++++++++++++++++++++++++++----- core/compaction-chain_test.go | 5 +++- core/consensus.go | 24 ++++++++++++++--- core/lattice.go | 8 ++++-- core/lattice_test.go | 29 +++++++++++--------- core/types/block.go | 5 ++++ 6 files changed, 106 insertions(+), 26 deletions(-) (limited to 'core') diff --git a/core/compaction-chain.go b/core/compaction-chain.go index 3bf87f1..ea26562 100644 --- a/core/compaction-chain.go +++ b/core/compaction-chain.go @@ -22,6 +22,7 @@ import ( "sync" "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" "github.com/dexon-foundation/dexon-consensus-core/core/types" ) @@ -32,15 +33,18 @@ var ( ) type compactionChain struct { - blocks map[common.Hash]*types.Block - pendingBlocks []*types.Block - blocksLock sync.RWMutex - prevBlockLock sync.RWMutex - prevBlock *types.Block + gov Governance + blocks map[common.Hash]*types.Block + pendingBlocks []*types.Block + pendingFinalizedBlocks []*types.Block + blocksLock sync.RWMutex + prevBlockLock sync.RWMutex + prevBlock *types.Block } -func newCompactionChain() *compactionChain { +func newCompactionChain(gov Governance) *compactionChain { return &compactionChain{ + gov: gov, blocks: make(map[common.Hash]*types.Block), } } @@ -77,6 +81,51 @@ func (cc *compactionChain) processBlock(block *types.Block) error { return nil } +func (cc *compactionChain) processFinalizedBlock(block *types.Block) ( + []*types.Block, error) { + blocks := func() []*types.Block { + cc.blocksLock.Lock() + defer cc.blocksLock.Unlock() + blocks := cc.pendingFinalizedBlocks + cc.pendingFinalizedBlocks = []*types.Block{} + return blocks + }() + threshold := make(map[uint64]int) + gpks := make(map[uint64]*DKGGroupPublicKey) + toPending := []*types.Block{} + confirmed := []*types.Block{} + blocks = append(blocks, block) + for _, b := range blocks { + if !cc.gov.IsDKGFinal(b.Position.Round) { + toPending = append(toPending, b) + continue + } + round := b.Position.Round + if _, exist := gpks[round]; !exist { + threshold[round] = int(cc.gov.Configuration(round).DKGSetSize)/3 + 1 + var err error + gpks[round], err = NewDKGGroupPublicKey( + round, + cc.gov.DKGMasterPublicKeys(round), cc.gov.DKGComplaints(round), + threshold[round]) + if err != nil { + continue + } + } + gpk := gpks[round] + if ok := gpk.VerifySignature(b.Hash, crypto.Signature{ + Type: "bls", + Signature: b.Finalization.Randomness}); !ok { + continue + } + confirmed = append(confirmed, b) + } + cc.blocksLock.Lock() + defer cc.blocksLock.Unlock() + cc.pendingFinalizedBlocks = append(cc.pendingFinalizedBlocks, toPending...) + return confirmed, nil +} + func (cc *compactionChain) extractBlocks() []*types.Block { deliveringBlocks := make([]*types.Block, 0) cc.blocksLock.Lock() diff --git a/core/compaction-chain_test.go b/core/compaction-chain_test.go index 0da0924..6931674 100644 --- a/core/compaction-chain_test.go +++ b/core/compaction-chain_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/test" "github.com/dexon-foundation/dexon-consensus-core/core/types" "github.com/stretchr/testify/suite" ) @@ -34,7 +35,9 @@ func (s *CompactionChainTestSuite) SetupTest() { } func (s *CompactionChainTestSuite) newCompactionChain() *compactionChain { - return newCompactionChain() + gov, err := test.NewGovernance(4, 100*time.Millisecond) + s.Require().NoError(err) + return newCompactionChain(gov) } func (s *CompactionChainTestSuite) generateBlocks( diff --git a/core/consensus.go b/core/consensus.go index 186d769..f4e3295 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -269,7 +269,7 @@ func NewConsensus( con := &Consensus{ ID: ID, currentConfig: config, - ccModule: newCompactionChain(), + ccModule: newCompactionChain(gov), lattice: lattice, nbModule: nbModule, gov: gov, @@ -521,8 +521,15 @@ func (con *Consensus) processMsg(msgChan <-chan interface{}) { switch val := msg.(type) { case *types.Block: - if err := con.preProcessBlock(val); err != nil { - log.Println(err) + // For sync mode. + if val.IsFinalized() { + if err := con.processFinalizedBlock(val); err != nil { + log.Println(err) + } + } else { + if err := con.preProcessBlock(val); err != nil { + log.Println(err) + } } case *types.Vote: if err := con.ProcessVote(val); err != nil { @@ -683,7 +690,7 @@ func (con *Consensus) ProcessBlockRandomnessResult( // preProcessBlock performs Byzantine Agreement on the block. func (con *Consensus) preProcessBlock(b *types.Block) (err error) { - if err = con.lattice.SanityCheck(b); err != nil { + if err = con.lattice.SanityCheck(b, true); err != nil { return } if err = con.baModules[b.Position.ChainID].processBlock(b); err != nil { @@ -730,6 +737,15 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { return } +// processFinalizedBlock is the entry point for syncing blocks. +func (con *Consensus) processFinalizedBlock(block *types.Block) (err error) { + // TODO(jimmy-dexon): drop block that is already in compaction chain. + if err = con.lattice.SanityCheck(block, false); err != nil { + return + } + return +} + // PrepareBlock would setup header fields of block based on its ProposerID. func (con *Consensus) prepareBlock(b *types.Block, proposeTime time.Time) (err error) { diff --git a/core/lattice.go b/core/lattice.go index ea5286d..a0028a7 100644 --- a/core/lattice.go +++ b/core/lattice.go @@ -86,11 +86,12 @@ func (s *Lattice) PrepareBlock( return } -// SanityCheck check if a block is valid based on current lattice status. +// SanityCheck check if a block is valid. +// If checkRelation is true, it also checks with current lattice status. // // If some acking blocks don't exists, Lattice would help to cache this block // and retry when lattice updated in Lattice.ProcessBlock. -func (s *Lattice) SanityCheck(b *types.Block) (err error) { +func (s *Lattice) SanityCheck(b *types.Block, checkRelation bool) (err error) { // Verify block's signature. if err = s.authModule.VerifyBlock(b); err != nil { return @@ -110,6 +111,9 @@ func (s *Lattice) SanityCheck(b *types.Block) (err error) { err = ErrInvalidBlock return err } + if !checkRelation { + return + } s.lock.RLock() defer s.lock.RUnlock() if err = s.data.sanityCheck(b); err != nil { diff --git a/core/lattice_test.go b/core/lattice_test.go index f84182c..603424b 100644 --- a/core/lattice_test.go +++ b/core/lattice_test.go @@ -56,7 +56,7 @@ func (mgr *testLatticeMgr) processBlock(b *types.Block) (err error) { verified []*types.Block pendings = []*types.Block{b} ) - if err = mgr.lattice.SanityCheck(b); err != nil { + if err = mgr.lattice.SanityCheck(b, true); err != nil { if err == ErrAckingBlockNotExists { err = nil } @@ -98,15 +98,18 @@ func (s *LatticeTestSuite) newTestLatticeMgr( var req = s.Require() // Setup private key. prvKey, err := ecdsa.NewPrivateKey() - req.Nil(err) + req.NoError(err) // Setup blockdb. db, err := blockdb.NewMemBackedBlockDB() - req.Nil(err) + req.NoError(err) // Setup application. app := test.NewApp() + // Setup governance. + gov, err := test.NewGovernance(int(cfg.NotarySetSize), cfg.LambdaBA) + req.NoError(err) // Setup lattice. return &testLatticeMgr{ - ccModule: newCompactionChain(), + ccModule: newCompactionChain(gov), app: app, db: db, lattice: NewLattice( @@ -147,22 +150,22 @@ func (s *LatticeTestSuite) TestBasicUsage() { // or the consensus time would be wrong. b, err := master.prepareBlock(i) req.NotNil(b) - req.Nil(err) + req.NoError(err) // We've ignored the error for "acking blocks don't exist". req.Nil(master.processBlock(b)) } for i := 0; i < (blockNum - int(chainNum)); i++ { b, err := master.prepareBlock(uint32(rand.Intn(int(chainNum)))) req.NotNil(b) - req.Nil(err) + req.NoError(err) // We've ignored the error for "acking blocks don't exist". req.Nil(master.processBlock(b)) } // Now we have some blocks, replay them on different lattices. iter, err := master.db.GetAll() - req.Nil(err) + req.NoError(err) revealer, err := test.NewRandomRevealer(iter) - req.Nil(err) + req.NoError(err) for i := 0; i < otherLatticeNum; i++ { revealer.Reset() revealed := "" @@ -175,7 +178,7 @@ func (s *LatticeTestSuite) TestBasicUsage() { break } } - req.Nil(err) + req.NoError(err) req.Nil(other.processBlock(&b)) revealed += b.Hash.String() + "," revealSeq[revealed] = struct{}{} @@ -218,11 +221,11 @@ func (s *LatticeTestSuite) TestSanityCheck() { Timestamp: time.Now().UTC(), } req.NoError(auth.SignBlock(b)) - req.NoError(lattice.SanityCheck(b)) + req.NoError(lattice.SanityCheck(b, true)) // A block with incorrect signature should not pass sanity check. b.Signature, err = auth.prvKey.Sign(common.NewRandomHash()) req.NoError(err) - req.Equal(lattice.SanityCheck(b), ErrIncorrectSignature) + req.Equal(lattice.SanityCheck(b, false), ErrIncorrectSignature) // A block with un-sorted acks should not pass sanity check. b.Acks = common.NewSortedHashes(common.Hashes{ common.NewRandomHash(), @@ -233,10 +236,10 @@ func (s *LatticeTestSuite) TestSanityCheck() { }) b.Acks[0], b.Acks[1] = b.Acks[1], b.Acks[0] req.NoError(auth.SignBlock(b)) - req.Equal(lattice.SanityCheck(b), ErrAcksNotSorted) + req.Equal(lattice.SanityCheck(b, false), ErrAcksNotSorted) // A block with incorrect hash should not pass sanity check. b.Hash = common.NewRandomHash() - req.Equal(lattice.SanityCheck(b), ErrIncorrectHash) + req.Equal(lattice.SanityCheck(b, false), ErrIncorrectHash) } func TestLattice(t *testing.T) { diff --git a/core/types/block.go b/core/types/block.go index 710da9b..bf27934 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -118,6 +118,11 @@ func (b *Block) IsGenesis() bool { return b.Position.Height == 0 && b.ParentHash == common.Hash{} } +// IsFinalized checks if the finalization data is ready. +func (b *Block) IsFinalized() bool { + return b.Finalization.Height != 0 +} + // IsAcking checks if a block acking another by it's hash. func (b *Block) IsAcking(hash common.Hash) bool { idx := sort.Search(len(b.Acks), func(i int) bool { -- cgit v1.2.3