diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2019-01-26 15:33:24 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-01-26 15:33:24 +0800 |
commit | 049c9820e803f0453442d858d2ef36219981bf4c (patch) | |
tree | d6a2e5ba1ed6f2bd6d9b00ce80ae1a0e775e223f | |
parent | 514eed02d017f8d8badd3e1fedb0c8b9dcffac38 (diff) | |
download | dexon-consensus-049c9820e803f0453442d858d2ef36219981bf4c.tar dexon-consensus-049c9820e803f0453442d858d2ef36219981bf4c.tar.gz dexon-consensus-049c9820e803f0453442d858d2ef36219981bf4c.tar.bz2 dexon-consensus-049c9820e803f0453442d858d2ef36219981bf4c.tar.lz dexon-consensus-049c9820e803f0453442d858d2ef36219981bf4c.tar.xz dexon-consensus-049c9820e803f0453442d858d2ef36219981bf4c.tar.zst dexon-consensus-049c9820e803f0453442d858d2ef36219981bf4c.zip |
core: Optimize message processing (#434)
* core: more strict with 'first' agreement result
* core: Fast filter randomness result and agreement result
* Optimize touchAgreementResult
* core: remove lock in checking first block randomness
* psig to go routine
* polish
* core: polish
-rw-r--r-- | core/agreement-mgr.go | 107 | ||||
-rw-r--r-- | core/compaction-chain.go | 55 | ||||
-rw-r--r-- | core/consensus.go | 89 |
3 files changed, 134 insertions, 117 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index e07b23f..bcf1013 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -35,6 +35,8 @@ var ( ErrPreviousRoundIsNotFinished = errors.New("previous round is not finished") ) +const maxResultCache = 100 + // genValidLeader generate a validLeader function for agreement modules. func genValidLeader( mgr *agreementMgr) func(*types.Block) (bool, error) { @@ -80,26 +82,26 @@ type baRoundSetting struct { type agreementMgr struct { // TODO(mission): unbound Consensus instance from this module. - con *Consensus - ID types.NodeID - app Application - gov Governance - network Network - logger common.Logger - cache *utils.NodeSetCache - signer *utils.Signer - lattice *Lattice - ctx context.Context - lastEndTime time.Time - initRound uint64 - configs []*agreementMgrConfig - baModules []*agreement - lastBAResult []types.Position - voteFilters []*utils.VoteFilter - waitGroup sync.WaitGroup - pendingVotes map[uint64][]*types.Vote - pendingBlocks map[uint64][]*types.Block - isRunning bool + con *Consensus + ID types.NodeID + app Application + gov Governance + network Network + logger common.Logger + cache *utils.NodeSetCache + signer *utils.Signer + lattice *Lattice + ctx context.Context + lastEndTime time.Time + initRound uint64 + configs []*agreementMgrConfig + baModules []*agreement + processedBAResult map[types.Position]struct{} + voteFilters []*utils.VoteFilter + waitGroup sync.WaitGroup + pendingVotes map[uint64][]*types.Vote + pendingBlocks map[uint64][]*types.Block + isRunning bool // This lock should be used when attempting to: // - add a new baModule. @@ -115,18 +117,19 @@ type agreementMgr struct { func newAgreementMgr(con *Consensus, initRound uint64, initRoundBeginTime time.Time) *agreementMgr { return &agreementMgr{ - con: con, - ID: con.ID, - app: con.app, - gov: con.gov, - network: con.network, - logger: con.logger, - cache: con.nodeSetCache, - signer: con.signer, - lattice: con.lattice, - ctx: con.ctx, - initRound: initRound, - lastEndTime: initRoundBeginTime, + con: con, + ID: con.ID, + app: con.app, + gov: con.gov, + network: con.network, + logger: con.logger, + cache: con.nodeSetCache, + signer: con.signer, + lattice: con.lattice, + ctx: con.ctx, + initRound: initRound, + lastEndTime: initRoundBeginTime, + processedBAResult: make(map[types.Position]struct{}, maxResultCache), } } @@ -204,10 +207,6 @@ func (mgr *agreementMgr) appendConfig( recv.agreementModule = agrModule mgr.baModules = append(mgr.baModules, agrModule) mgr.voteFilters = append(mgr.voteFilters, utils.NewVoteFilter()) - mgr.lastBAResult = append(mgr.lastBAResult, types.Position{ - Round: round, - ChainID: i, - }) if mgr.isRunning { mgr.waitGroup.Add(1) go func(idx uint32) { @@ -256,19 +255,27 @@ func (mgr *agreementMgr) processBlock(b *types.Block) error { return mgr.baModules[b.Position.ChainID].processBlock(b) } -func (mgr *agreementMgr) firstAgreementResult( - result *types.AgreementResult) (bool, error) { - mgr.lock.RLock() - defer mgr.lock.RUnlock() - if result.Position.ChainID >= uint32(len(mgr.lastBAResult)) { - mgr.logger.Error("Process unknown result for unknown chain to BA", - "position", &result.Position, - "baChain", len(mgr.lastBAResult), - "baRound", len(mgr.configs), - "initRound", mgr.initRound) - return false, utils.ErrInvalidChainID +func (mgr *agreementMgr) touchAgreementResult( + result *types.AgreementResult) (first bool) { + // DO NOT LOCK THIS FUNCTION!!!!!!!! YOU WILL REGRET IT!!!!! + if _, exist := mgr.processedBAResult[result.Position]; !exist { + first = true + if len(mgr.processedBAResult) > maxResultCache { + for k := range mgr.processedBAResult { + // Randomly drop one element. + delete(mgr.processedBAResult, k) + break + } + } + mgr.processedBAResult[result.Position] = struct{}{} } - return result.Position.Newer(&mgr.lastBAResult[result.Position.ChainID]), nil + return +} + +func (mgr *agreementMgr) untouchAgreementResult( + result *types.AgreementResult) { + // DO NOT LOCK THIS FUNCTION!!!!!!!! YOU WILL REGRET IT!!!!! + delete(mgr.processedBAResult, result.Position) } func (mgr *agreementMgr) processAgreementResult( @@ -283,10 +290,6 @@ func (mgr *agreementMgr) processAgreementResult( "initRound", mgr.initRound) return utils.ErrInvalidChainID } - // TODO(jimmy): lock in this function is not safe. - if result.Position.Newer(&mgr.lastBAResult[result.Position.ChainID]) { - mgr.lastBAResult[result.Position.ChainID] = result.Position - } agreement := mgr.baModules[result.Position.ChainID] aID := agreement.agreementID() if isStop(aID) { diff --git a/core/compaction-chain.go b/core/compaction-chain.go index b0bc236..d7c2f85 100644 --- a/core/compaction-chain.go +++ b/core/compaction-chain.go @@ -41,6 +41,7 @@ var ( ) const maxPendingPeriod = 3 * time.Second +const maxRandomnessCache = 100 type pendingRandomnessResult struct { receivedTime time.Time @@ -50,24 +51,26 @@ type pendingRandomnessResult struct { type finalizedBlockHeap = types.ByFinalizationHeight type compactionChain struct { - gov Governance - chainUnsynced uint32 - tsigVerifier *TSigVerifierCache - blocks map[common.Hash]*types.Block - blockRandomness map[common.Hash][]byte - pendingRandomness map[common.Hash]pendingRandomnessResult - pendingBlocks []*types.Block - lock sync.RWMutex - prevBlock *types.Block + gov Governance + chainUnsynced uint32 + tsigVerifier *TSigVerifierCache + blocks map[common.Hash]*types.Block + blockRandomness map[common.Hash][]byte + pendingRandomness map[common.Hash]pendingRandomnessResult + processedRandomnessResult map[types.Position]struct{} + pendingBlocks []*types.Block + lock sync.RWMutex + prevBlock *types.Block } func newCompactionChain(gov Governance) *compactionChain { return &compactionChain{ - gov: gov, - tsigVerifier: NewTSigVerifierCache(gov, 7), - blocks: make(map[common.Hash]*types.Block), - blockRandomness: make(map[common.Hash][]byte), - pendingRandomness: make(map[common.Hash]pendingRandomnessResult), + gov: gov, + tsigVerifier: NewTSigVerifierCache(gov, 7), + blocks: make(map[common.Hash]*types.Block), + blockRandomness: make(map[common.Hash][]byte), + pendingRandomness: make(map[common.Hash]pendingRandomnessResult), + processedRandomnessResult: make(map[types.Position]struct{}, maxRandomnessCache), } } @@ -207,17 +210,21 @@ func (cc *compactionChain) processFinalizedBlock(block *types.Block) error { return nil } -func (cc *compactionChain) firstBlockRandomnessResult( - rand *types.BlockRandomnessResult) bool { - cc.lock.RLock() - defer cc.lock.RUnlock() - if _, exist := cc.pendingRandomness[rand.BlockHash]; exist { - return false - } - if _, exist := cc.blockRandomness[rand.BlockHash]; exist { - return false +func (cc *compactionChain) touchBlockRandomnessResult( + rand *types.BlockRandomnessResult) (first bool) { + // DO NOT LOCK THIS FUNCTION!!!!!!!! YOU WILL REGRET IT!!!!! + if _, exist := cc.processedRandomnessResult[rand.Position]; !exist { + first = true + if len(cc.processedRandomnessResult) > maxRandomnessCache { + for k := range cc.processedRandomnessResult { + // Randomly drop one element. + delete(cc.processedRandomnessResult, k) + break + } + } + cc.processedRandomnessResult[rand.Position] = struct{}{} } - return true + return } func (cc *compactionChain) processBlockRandomnessResult( diff --git a/core/consensus.go b/core/consensus.go index 623ad2b..f465010 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -1021,17 +1021,17 @@ func (con *Consensus) ProcessVote(vote *types.Vote) (err error) { // ProcessAgreementResult processes the randomness request. func (con *Consensus) ProcessAgreementResult( rand *types.AgreementResult) error { + if !con.baMgr.touchAgreementResult(rand) { + return nil + } + // Sanity Check. if err := VerifyAgreementResult(rand, con.nodeSetCache); err != nil { + con.baMgr.untouchAgreementResult(rand) return err } con.lattice.AddShallowBlock(rand.BlockHash, rand.Position) - first, err := con.baMgr.firstAgreementResult(rand) - if err != nil { - return err - } - // Syncing BA Module. if err := con.baMgr.processAgreementResult(rand); err != nil { return err @@ -1046,34 +1046,45 @@ func (con *Consensus) ProcessAgreementResult( return nil } - if first { - con.logger.Debug("Rebroadcast AgreementResult", - "result", rand) - con.network.BroadcastAgreementResult(rand) - } - dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round) - if err != nil { - return err - } - if _, exist := dkgSet[con.ID]; !exist { - return nil - } - psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash) - if err != nil { - return err - } - if err = con.signer.SignDKGPartialSignature(psig); err != nil { - return err - } - if err = con.cfgModule.processPartialSignature(psig); err != nil { - return err - } - con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", - "proposer", psig.ProposerID, - "round", psig.Round, - "hash", psig.Hash.String()[:6]) - con.network.BroadcastDKGPartialSignature(psig) + con.logger.Debug("Rebroadcast AgreementResult", + "result", rand) + con.network.BroadcastAgreementResult(rand) + go func() { + dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round) + if err != nil { + con.logger.Error("Failed to get dkg set", + "round", rand.Position.Round, "error", err) + return + } + if _, exist := dkgSet[con.ID]; !exist { + return + } + psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash) + if err != nil { + con.logger.Error("Failed to prepare psig", + "round", rand.Position.Round, + "hash", rand.BlockHash.String()[:6], + "error", err) + return + } + if err = con.signer.SignDKGPartialSignature(psig); err != nil { + con.logger.Error("Failed to sign psig", + "hash", rand.BlockHash.String()[:6], + "error", err) + return + } + if err = con.cfgModule.processPartialSignature(psig); err != nil { + con.logger.Error("Failed process psig", + "hash", rand.BlockHash.String()[:6], + "error", err) + return + } + con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", + "proposer", psig.ProposerID, + "round", psig.Round, + "hash", psig.Hash.String()[:6]) + con.network.BroadcastDKGPartialSignature(psig) tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash) if err != nil { if err != ErrTSigAlreadyRunning { @@ -1089,11 +1100,9 @@ func (con *Consensus) ProcessAgreementResult( Position: rand.Position, Randomness: tsig.Signature, } - if err := con.ProcessBlockRandomnessResult(result, true); err != nil { - con.logger.Error("Failed to process randomness result", - "error", err) - return - } + // ProcessBlockRandomnessResult is not thread-safe so we put the result in + // the message channnel to be processed in the main thread. + con.msgChan <- result }() return nil } @@ -1104,10 +1113,8 @@ func (con *Consensus) ProcessBlockRandomnessResult( if rand.Position.Round == 0 { return nil } - if needBroadcast { - if !con.ccModule.firstBlockRandomnessResult(rand) { - needBroadcast = false - } + if !con.ccModule.touchBlockRandomnessResult(rand) { + return nil } if err := con.ccModule.processBlockRandomnessResult(rand); err != nil { if err == ErrBlockNotRegistered { |