From 0382bbafbee5ffb820b9d31f7cfe8f6a48968a48 Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Tue, 2 Apr 2019 17:07:05 +0800 Subject: core: optimize message handle (#542) * core: optimize for handling agremenet result * core: disable clone vote * core: touch npks --- core/blockchain.go | 30 ++++++++++++---- core/blockchain_test.go | 37 +++++++++++++++----- core/configuration-chain.go | 11 ++++++ core/consensus.go | 85 ++++++++++++++++++++++++++++++++++----------- core/dkg-tsig-protocol.go | 2 +- core/utils/round-event.go | 2 +- 6 files changed, 129 insertions(+), 38 deletions(-) (limited to 'core') diff --git a/core/blockchain.go b/core/blockchain.go index 2d67d62..9fbb861 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -137,11 +137,14 @@ type blockChain struct { vGetter tsigVerifierGetter app Application logger common.Logger - pendingRandomnesses map[types.Position]*types.AgreementResult + pendingRandomnesses map[types.Position][]byte configs []blockChainConfig pendingBlocks pendingBlockRecords confirmedBlocks types.BlocksByPosition dMoment time.Time + + // Do not access this variable besides processAgreementResult. + lastPosition types.Position } func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block, @@ -157,7 +160,7 @@ func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block, logger: logger, dMoment: dMoment, pendingRandomnesses: make( - map[types.Position]*types.AgreementResult), + map[types.Position][]byte), } } @@ -629,10 +632,7 @@ func (bc *blockChain) confirmBlock(b *types.Block) { func (bc *blockChain) setRandomnessFromPending(b *types.Block) bool { if r, exist := bc.pendingRandomnesses[b.Position]; exist { - if !r.BlockHash.Equal(b.Hash) { - panic(fmt.Errorf("mismathed randomness: %s %s", b, r)) - } - b.Randomness = r.Randomness + b.Randomness = r delete(bc.pendingRandomnesses, b.Position) return true } @@ -643,6 +643,9 @@ func (bc *blockChain) processAgreementResult(result *types.AgreementResult) erro if result.Position.Round < DKGDelayRound { return nil } + if !result.Position.Newer(bc.lastPosition) { + return ErrSkipButNoError + } ok, err := bc.verifyRandomness( result.BlockHash, result.Position.Round, result.Randomness) if err != nil { @@ -656,6 +659,19 @@ func (bc *blockChain) processAgreementResult(result *types.AgreementResult) erro if !result.Position.Newer(bc.lastDelivered.Position) { return nil } - bc.pendingRandomnesses[result.Position] = result + bc.pendingRandomnesses[result.Position] = result.Randomness + bc.lastPosition = bc.lastDelivered.Position return nil } + +func (bc *blockChain) addBlockRandomness(pos types.Position, rand []byte) { + if pos.Round < DKGDelayRound { + return + } + bc.lock.Lock() + defer bc.lock.Unlock() + if !pos.Newer(bc.lastDelivered.Position) { + return + } + bc.pendingRandomnesses[pos] = rand +} diff --git a/core/blockchain_test.go b/core/blockchain_test.go index d64c2a1..ea604a2 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -19,6 +19,7 @@ package core import ( "fmt" + "math/rand" "testing" "time" @@ -175,13 +176,16 @@ func (s *BlockChainTestSuite) newRoundOneInitBlock() *types.Block { } func (s *BlockChainTestSuite) baseConcurrentAceessTest(initBlock *types.Block, - blocks []*types.Block, rands []*types.AgreementResult) { + blocks []*types.Block, results []*types.AgreementResult) { var ( bc = s.newBlockChain(initBlock, uint64(len(blocks)+1)) start = make(chan struct{}) newNotif = make(chan struct{}, 1) delivered []*types.Block ) + resultsCopy := make([]*types.AgreementResult, len(results)) + copy(resultsCopy, results) + type randomnessResult types.AgreementResult add := func(v interface{}) { <-start switch val := v.(type) { @@ -192,9 +196,13 @@ func (s *BlockChainTestSuite) baseConcurrentAceessTest(initBlock *types.Block, } case *types.AgreementResult: if err := bc.processAgreementResult(val); err != nil { - // Never assertion in sub routine when testing. - panic(err) + if err != ErrSkipButNoError { + // Never assertion in sub routine when testing. + panic(err) + } } + case *randomnessResult: + bc.addBlockRandomness(val.Position, val.Randomness) default: panic(fmt.Errorf("unknown type: %v", v)) } @@ -203,12 +211,26 @@ func (s *BlockChainTestSuite) baseConcurrentAceessTest(initBlock *types.Block, default: } } + rand.Shuffle(len(resultsCopy), func(i, j int) { + resultsCopy[i], resultsCopy[j] = resultsCopy[j], resultsCopy[i] + }) for _, b := range blocks { go add(b) } - for _, r := range rands { - go add(r) + for i, r := range resultsCopy { + if i >= len(resultsCopy)/2 { + break + } + go add((*randomnessResult)(r)) } + go func() { + for i, a := range resultsCopy { + if i < len(resultsCopy)/2 { + continue + } + add(a) + } + }() close(start) for { select { @@ -257,10 +279,7 @@ func (s *BlockChainTestSuite) TestBasicUsage() { s.Require().NoError(bc.addBlock(b0)) extracted := bc.extractBlocks() s.Require().Len(extracted, 4) - bc.pendingRandomnesses[b4.Position] = &types.AgreementResult{ - BlockHash: b4.Hash, - Randomness: common.GenerateRandomBytes(), - } + bc.pendingRandomnesses[b4.Position] = common.GenerateRandomBytes() extracted = bc.extractBlocks() s.Require().Len(extracted, 2) s.Require().Equal(extracted[0].Hash, b4.Hash) diff --git a/core/configuration-chain.go b/core/configuration-chain.go index c8aac38..fbd504d 100644 --- a/core/configuration-chain.go +++ b/core/configuration-chain.go @@ -586,8 +586,19 @@ func (cc *configurationChain) recoverDKGInfo( // Restore group public key. cc.logger.Debug("Calling Governance.DKGMasterPublicKeys for recoverDKGInfo", "round", round) + mpk := cc.gov.DKGMasterPublicKeys(round) cc.logger.Debug("Calling Governance.DKGComplaints for recoverDKGInfo", "round", round) + comps := cc.gov.DKGComplaints(round) + qualifies, _, err := typesDKG.CalcQualifyNodes(mpk, comps, threshold) + if err != nil { + return err + } + if len(qualifies) < + utils.GetDKGValidThreshold(utils.GetConfigWithPanic( + cc.gov, round, cc.logger)) { + return typesDKG.ErrNotReachThreshold + } npks, err := typesDKG.NewNodePublicKeys(round, cc.gov.DKGMasterPublicKeys(round), cc.gov.DKGComplaints(round), diff --git a/core/consensus.go b/core/consensus.go index 9702231..e657c64 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -286,17 +286,13 @@ func (recv *consensusBAReceiver) ConfirmBlock( IsEmptyBlock: isEmptyBlockConfirmed, Randomness: block.Randomness, } + recv.consensus.baMgr.touchAgreementResult(result) recv.consensus.logger.Debug("Broadcast AgreementResult", "result", result) recv.consensus.network.BroadcastAgreementResult(result) if block.IsEmpty() { - if err := - recv.consensus.bcModule.processAgreementResult( - result); err != nil { - recv.consensus.logger.Warn( - "Failed to process agreement result", - "result", result) - } + recv.consensus.bcModule.addBlockRandomness( + block.Position, block.Randomness) } if block.Position.Round >= DKGDelayRound { recv.consensus.logger.Debug( @@ -729,12 +725,13 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) { // modules see the up-to-date node set, we need to make sure this action // should be taken as the first one. con.roundEvent.Register(func(evts []utils.RoundEventParam) { - defer elapse("purge-node-set", evts[len(evts)-1])() + defer elapse("purge-cache", evts[len(evts)-1])() for _, e := range evts { if e.Reset == 0 { continue } con.nodeSetCache.Purge(e.Round + 1) + con.tsigVerifierCache.Purge(e.Round + 1) } }) // Register round event handler to abort previous running DKG if any. @@ -850,13 +847,64 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) { e := evts[len(evts)-1] defer elapse("touch-NodeSetCache", e)() con.event.RegisterHeight(e.NextTouchNodeSetCacheHeight(), func(uint64) { - if err := con.nodeSetCache.Touch(e.Round + 1); err != nil { - con.logger.Warn("Failed to update nodeSetCache", - "round", e.Round+1, - "error", err) + if e.Reset == 0 { + return } + go func() { + nextRound := e.Round + 1 + if err := con.nodeSetCache.Touch(nextRound); err != nil { + con.logger.Warn("Failed to update nodeSetCache", + "round", nextRound, + "error", err) + } + }() }) }) + con.roundEvent.Register(func(evts []utils.RoundEventParam) { + e := evts[len(evts)-1] + if e.Reset != 0 { + return + } + defer elapse("touch-DKGCache", e)() + go func() { + if _, err := + con.tsigVerifierCache.Update(e.Round); err != nil { + con.logger.Warn("Failed to update tsig cache", + "round", e.Round, + "error", err) + } + }() + go func() { + threshold := utils.GetDKGThreshold( + utils.GetConfigWithPanic(con.gov, e.Round, con.logger)) + // Restore group public key. + con.logger.Debug( + "Calling Governance.DKGMasterPublicKeys for recoverDKGInfo", + "round", e.Round) + con.logger.Debug( + "Calling Governance.DKGComplaints for recoverDKGInfo", + "round", e.Round) + _, qualifies, err := typesDKG.CalcQualifyNodes( + con.gov.DKGMasterPublicKeys(e.Round), + con.gov.DKGComplaints(e.Round), + threshold) + if err != nil { + con.logger.Warn("Failed to calculate dkg set", + "round", e.Round, + "error", err) + return + } + if _, exist := qualifies[con.ID]; !exist { + return + } + if _, _, err := + con.cfgModule.getDKGInfo(e.Round, true); err != nil { + con.logger.Warn("Failed to recover DKG info", + "round", e.Round, + "error", err) + } + }() + }) // checkCRS is a generator of checker to check if CRS for that round is // ready or not. checkCRS := func(round uint64) func() bool { @@ -1045,12 +1093,7 @@ func (con *Consensus) generateBlockRandomness(blocks []*types.Block) { Position: block.Position, Randomness: sig.Signature[:], } - if err := con.bcModule.processAgreementResult(result); err != nil { - con.logger.Error("Failed to process BlockRandomness", - "result", result, - "error", err) - return - } + con.bcModule.addBlockRandomness(block.Position, sig.Signature[:]) con.logger.Debug("Broadcast BlockRandomness", "block", block, "result", result) @@ -1275,8 +1318,7 @@ MessageLoop: // ProcessVote is the entry point to submit ont vote to a Consensus instance. func (con *Consensus) ProcessVote(vote *types.Vote) (err error) { - v := vote.Clone() - err = con.baMgr.processVote(v) + err = con.baMgr.processVote(vote) return } @@ -1293,6 +1335,9 @@ func (con *Consensus) ProcessAgreementResult( } if err := con.bcModule.processAgreementResult(rand); err != nil { con.baMgr.untouchAgreementResult(rand) + if err == ErrSkipButNoError { + return nil + } return err } // Syncing BA Module. diff --git a/core/dkg-tsig-protocol.go b/core/dkg-tsig-protocol.go index 9a71bea..e4ae14c 100644 --- a/core/dkg-tsig-protocol.go +++ b/core/dkg-tsig-protocol.go @@ -555,7 +555,7 @@ func (tc *TSigVerifierCache) UpdateAndGet(round uint64) ( return v, ok, nil } -// Purge the cache and returns if success. +// Purge the cache. func (tc *TSigVerifierCache) Purge(round uint64) { tc.lock.Lock() defer tc.lock.Unlock() diff --git a/core/utils/round-event.go b/core/utils/round-event.go index 472c724..602d2da 100644 --- a/core/utils/round-event.go +++ b/core/utils/round-event.go @@ -79,7 +79,7 @@ func (e RoundEventParam) NextRoundHeight() uint64 { // NextTouchNodeSetCacheHeight returns the height to touch the node set cache. func (e RoundEventParam) NextTouchNodeSetCacheHeight() uint64 { - return e.BeginHeight + e.Config.RoundLength*9/10 + return e.BeginHeight + e.Config.RoundLength/2 } // NextDKGResetHeight returns the height to reset DKG for next period. -- cgit v1.2.3