diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2019-03-26 16:55:16 +0800 |
---|---|---|
committer | Jimmy Hu <jimmy.hu@dexon.org> | 2019-03-27 15:25:10 +0800 |
commit | 495b3737414685d609f7b41355928c699189d6ad (patch) | |
tree | 70e648db3dff486a56a64bb61b7ef53f46ab1372 | |
parent | 7783bc4ba52bfc534d5b4d91e78abb2ddad7d078 (diff) | |
download | dexon-consensus-495b3737414685d609f7b41355928c699189d6ad.tar dexon-consensus-495b3737414685d609f7b41355928c699189d6ad.tar.gz dexon-consensus-495b3737414685d609f7b41355928c699189d6ad.tar.bz2 dexon-consensus-495b3737414685d609f7b41355928c699189d6ad.tar.lz dexon-consensus-495b3737414685d609f7b41355928c699189d6ad.tar.xz dexon-consensus-495b3737414685d609f7b41355928c699189d6ad.tar.zst dexon-consensus-495b3737414685d609f7b41355928c699189d6ad.zip |
core: sign block hash for empty block (#517)
* core: sign block hash for empty block
* run force synced empty block at startup
-rw-r--r-- | core/agreement-mgr.go | 1 | ||||
-rw-r--r-- | core/blockchain.go | 30 | ||||
-rw-r--r-- | core/blockchain_test.go | 111 | ||||
-rw-r--r-- | core/consensus.go | 114 |
4 files changed, 241 insertions, 15 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index 9086e51..a88e74a 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -401,6 +401,7 @@ Loop: Height: math.MaxUint64, } mgr.voteFilter = utils.NewVoteFilter() + mgr.recv.emptyBlockHashMap = &sync.Map{} if err := mgr.baRoutineForOneRound(&setting); err != nil { mgr.logger.Error("BA routine failed", "error", err, diff --git a/core/blockchain.go b/core/blockchain.go index 283d22e..51747d8 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -223,6 +223,11 @@ func (bc *blockChain) extractBlocks() (ret []*types.Block) { defer bc.lock.Unlock() for len(bc.confirmedBlocks) > 0 { c := bc.confirmedBlocks[0] + if c.Position.Round >= DKGDelayRound && + len(c.Finalization.Randomness) == 0 && + !bc.setRandomnessFromPending(c) { + break + } c, bc.confirmedBlocks = bc.confirmedBlocks[0], bc.confirmedBlocks[1:] // TODO(mission): remove these duplicated field if we fully converted // to single chain. @@ -385,24 +390,29 @@ func (bc *blockChain) nextBlock() (uint64, time.Time) { return tip.Position.Height + 1, tip.Timestamp.Add(config.minBlockInterval) } -func (bc *blockChain) pendingBlocksWithoutRandomness() (hashes common.Hashes) { +func (bc *blockChain) pendingBlocksWithoutRandomness() []*types.Block { bc.lock.RLock() defer bc.lock.RUnlock() + blocks := make([]*types.Block, 0) for _, b := range bc.confirmedBlocks { - if b.Position.Round == 0 || len(b.Finalization.Randomness) > 0 { + if b.Position.Round < DKGDelayRound || + len(b.Finalization.Randomness) > 0 || + bc.setRandomnessFromPending(b) { continue } - hashes = append(hashes, b.Hash) + blocks = append(blocks, b) } for _, r := range bc.pendingBlocks { - if r.position.Round == 0 { + if r.position.Round < DKGDelayRound { continue } - if r.block != nil && len(r.block.Finalization.Randomness) == 0 { - hashes = append(hashes, r.block.Hash) + if r.block != nil && + len(r.block.Finalization.Randomness) == 0 && + !bc.setRandomnessFromPending(r.block) { + blocks = append(blocks, r.block) } } - return + return blocks } func (bc *blockChain) lastDeliveredBlock() *types.Block { @@ -637,9 +647,9 @@ func (bc *blockChain) processAgreementResult(result *types.AgreementResult) erro if !ok { return ErrIncorrectAgreementResult } - bc.lock.RLock() - defer bc.lock.RUnlock() - if !result.Position.Newer(bc.lastConfirmed.Position) { + bc.lock.Lock() + defer bc.lock.Unlock() + if !result.Position.Newer(bc.lastDelivered.Position) { return nil } bc.pendingRandomnesses[result.Position] = result diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 6a615c1..1e3f184 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -18,6 +18,7 @@ package core import ( + "fmt" "testing" "time" @@ -125,6 +126,15 @@ func (s *BlockChainTestSuite) newBlock(parent *types.Block, round uint64, return b } +func (s *BlockChainTestSuite) newRandomnessFromBlock( + b *types.Block) *types.AgreementResult { + return &types.AgreementResult{ + BlockHash: b.Hash, + Position: b.Position, + Randomness: common.GenerateRandomBytes(), + } +} + func (s *BlockChainTestSuite) newBlockChain(initB *types.Block, roundLength uint64) (bc *blockChain) { initRound := uint64(0) @@ -159,6 +169,64 @@ func (s *BlockChainTestSuite) newRoundOneInitBlock() *types.Block { return initBlock } +func (s *BlockChainTestSuite) baseConcurrentAceessTest(initBlock *types.Block, + blocks []*types.Block, rands []*types.AgreementResult) { + var ( + bc = s.newBlockChain(initBlock, uint64(len(blocks)+1)) + start = make(chan struct{}) + newNotif = make(chan struct{}, 1) + delivered []*types.Block + ) + add := func(v interface{}) { + <-start + switch val := v.(type) { + case *types.Block: + if err := bc.addBlock(val); err != nil { + // Never assertion in sub routine when testing. + panic(err) + } + case *types.AgreementResult: + if err := bc.processAgreementResult(val); err != nil { + // Never assertion in sub routine when testing. + panic(err) + } + default: + panic(fmt.Errorf("unknown type: %v", v)) + } + select { + case newNotif <- struct{}{}: + default: + } + } + for _, b := range blocks { + go add(b) + } + for _, r := range rands { + go add(r) + } + close(start) + for { + select { + case <-newNotif: + delivered = append(delivered, bc.extractBlocks()...) + case <-time.After(100 * time.Millisecond): + delivered = append(delivered, bc.extractBlocks()...) + } + if len(delivered) == len(blocks) { + break + } + } + // Check result. + b := delivered[0] + s.Require().Equal(b.Position.Height, uint64(1)) + s.Require().NotEmpty(b.Finalization.Randomness) + for _, bb := range delivered[1:] { + s.Require().Equal(b.Position.Height+1, bb.Position.Height) + s.Require().NotEmpty(b.Finalization.Randomness) + b = bb + } +} + func (s *BlockChainTestSuite) TestBasicUsage() { initBlock := s.newRoundOneInitBlock() bc := s.newBlockChain(initBlock, 10) @@ -185,12 +253,35 @@ func (s *BlockChainTestSuite) TestBasicUsage() { s.Require().NoError(bc.addBlock(b1)) s.Require().NoError(bc.addBlock(b0)) extracted := bc.extractBlocks() - s.Require().Len(extracted, 6) - s.Require().Equal(extracted[4].Hash, b4.Hash) + s.Require().Len(extracted, 4) + bc.pendingRandomnesses[b4.Position] = &types.AgreementResult{ + BlockHash: b4.Hash, + Randomness: common.GenerateRandomBytes(), + } + extracted = bc.extractBlocks() + s.Require().Len(extracted, 2) + s.Require().Equal(extracted[0].Hash, b4.Hash) extracted = bc.extractBlocks() s.Require().Len(extracted, 0) } +func (s *BlockChainTestSuite) TestConcurrentAccess() { + // Raise one go routine for each block and randomness. And let them try to + // add to blockChain at the same time. Make sure we can delivered them all. + var ( + retry = 10 + initBlock = s.newRoundOneInitBlock() + blocks = s.newBlocks(500, initBlock) + rands = []*types.AgreementResult{} + ) + for _, b := range blocks { + rands = append(rands, s.newRandomnessFromBlock(b)) + } + for i := 0; i < retry; i++ { + s.baseConcurrentAceessTest(initBlock, blocks, rands) + } +} + func (s *BlockChainTestSuite) TestSanityCheck() { bc := s.newBlockChain(nil, 4) // Empty block is not allowed. @@ -321,6 +412,22 @@ func (s *BlockChainTestSuite) TestNextBlockAndTipRound() { s.Require().Equal(bc.tipRound(), uint64(1)) } +func (s *BlockChainTestSuite) TestPendingBlocksWithoutRandomness() { + initBlock := s.newRoundOneInitBlock() + bc := s.newBlockChain(initBlock, 10) + b0, err := bc.addEmptyBlock(types.Position{Round: 1, Height: 1}) + s.Require().NoError(err) + b1, err := bc.addEmptyBlock(types.Position{Round: 1, Height: 2}) + s.Require().NoError(err) + b2, err := bc.addEmptyBlock(types.Position{Round: 1, Height: 3}) + s.Require().NoError(err) + s.Require().Equal(bc.pendingBlocksWithoutRandomness(), []*types.Block{ + b0, b1, b2}) + s.Require().NoError(bc.processAgreementResult(s.newRandomnessFromBlock(b0))) + s.Require().Equal(bc.pendingBlocksWithoutRandomness(), []*types.Block{ + b1, b2}) +} + func (s *BlockChainTestSuite) TestLastXBlock() { initBlock := s.newRoundOneInitBlock() bc := s.newBlockChain(initBlock, 10) diff --git a/core/consensus.go b/core/consensus.go index 5106f18..4210c09 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -65,6 +65,7 @@ type consensusBAReceiver struct { agreementModule *agreement changeNotaryHeightValue *atomic.Value roundValue *atomic.Value + emptyBlockHashMap *sync.Map isNotary bool restartNotary chan types.Position npks *typesDKG.NodePublicKeys @@ -79,6 +80,25 @@ func (recv *consensusBAReceiver) changeNotaryHeight() uint64 { return recv.changeNotaryHeightValue.Load().(uint64) } +func (recv *consensusBAReceiver) emptyBlockHash(pos types.Position) ( + common.Hash, error) { + hashVal, ok := recv.emptyBlockHashMap.Load(pos) + if ok { + return hashVal.(common.Hash), nil + } + emptyBlock, err := recv.consensus.bcModule.prepareBlock( + pos, time.Time{}, true) + if err != nil { + return common.Hash{}, err + } + hash, err := utils.HashBlock(emptyBlock) + if err != nil { + return common.Hash{}, err + } + recv.emptyBlockHashMap.Store(pos, hash) + return hash, nil +} + func (recv *consensusBAReceiver) VerifyPartialSignature(vote *types.Vote) bool { if recv.round() >= DKGDelayRound && vote.BlockHash != types.SkipBlockHash { if vote.Type == types.VoteCom || vote.Type == types.VoteFastCom { @@ -98,7 +118,15 @@ func (recv *consensusBAReceiver) VerifyPartialSignature(vote *types.Vote) bool { } blockHash := vote.BlockHash if blockHash == types.NullBlockHash { - blockHash = utils.HashPosition(vote.Position) + var err error + blockHash, err = recv.emptyBlockHash(vote.Position) + if err != nil { + recv.consensus.logger.Error( + "Failed to verify vote for empty block", + "position", vote.Position, + "error", err) + return false + } } return pubKey.VerifySignature( vote.BlockHash, crypto.Signature(vote.PartialSignature)) @@ -117,8 +145,15 @@ func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { return } if vote.BlockHash == types.NullBlockHash { - vote.PartialSignature = recv.psigSigner.sign( - utils.HashPosition(vote.Position)) + hash, err := recv.emptyBlockHash(vote.Position) + if err != nil { + recv.consensus.logger.Error( + "Failed to propose vote for empty block", + "position", vote.Position, + "error", err) + return + } + vote.PartialSignature = recv.psigSigner.sign(hash) } else { vote.PartialSignature = recv.psigSigner.sign(vote.BlockHash) } @@ -955,6 +990,8 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) { // Run starts running DEXON Consensus. func (con *Consensus) Run() { + // There may have emptys block in blockchain added by force sync. + blocksWithoutRandomness := con.bcModule.pendingBlocksWithoutRandomness() // Launch BA routines. con.baMgr.run() // Launch network handler. @@ -986,6 +1023,7 @@ func (con *Consensus) Run() { } con.logger.Trace("Finish dumping cached messages") } + con.generateBlockRandomness(blocksWithoutRandomness) // Sleep until dMoment come. time.Sleep(con.dMoment.Sub(time.Now().UTC())) // Take some time to bootstrap. @@ -998,6 +1036,76 @@ func (con *Consensus) Run() { } } +func (con *Consensus) generateBlockRandomness(blocks []*types.Block) { + con.logger.Debug("Start generating block randomness", "blocks", blocks) + isNotarySet := make(map[uint64]bool) + for _, block := range blocks { + if block.Position.Round < DKGDelayRound { + continue + } + doRun, exist := isNotarySet[block.Position.Round] + if !exist { + curNotarySet, err := con.nodeSetCache.GetNotarySet(block.Position.Round) + if err != nil { + con.logger.Error("Error getting notary set when generate block tsig", + "round", block.Position.Round, + "error", err) + continue + } + _, exist := curNotarySet[con.ID] + isNotarySet[block.Position.Round] = exist + doRun = exist + } + if !doRun { + continue + } + go func(block *types.Block) { + psig, err := con.cfgModule.preparePartialSignature( + block.Position.Round, block.Hash) + if err != nil { + con.logger.Error("Failed to prepare partial signature", + "block", block, + "error", err) + } else if err = con.signer.SignDKGPartialSignature(psig); err != nil { + con.logger.Error("Failed to sign DKG partial signature", + "block", block, + "error", err) + } else if err = con.cfgModule.processPartialSignature(psig); err != nil { + con.logger.Error("Failed to process partial signature", + "block", block, + "error", err) + } else { + con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", + "proposer", psig.ProposerID, + "block", block) + con.network.BroadcastDKGPartialSignature(psig) + sig, err := con.cfgModule.runTSig(block.Position.Round, block.Hash) + if err != nil { + con.logger.Error("Failed to run Block Tsig", + "block", block, + "error", err) + return + } + result := &types.AgreementResult{ + BlockHash: block.Hash, + Position: block.Position, + Randomness: sig.Signature[:], + } + if err := con.bcModule.processAgreementResult(result); err != nil { + con.logger.Error("Failed to process BlockRandomness", + "result", result, + "error", err) + return + } + con.logger.Debug("Broadcast BlockRandomness", + "block", block, + "result", result) + con.network.BroadcastAgreementResult(result) + } + }(block) + } +} + // runDKG starts running DKG protocol. func (con *Consensus) runDKG(round, reset uint64, config *types.Config) { con.dkgReady.L.Lock() |