diff options
-rw-r--r-- | core/agreement-state.go | 2 | ||||
-rw-r--r-- | core/agreement.go | 22 | ||||
-rw-r--r-- | core/consensus.go | 40 |
3 files changed, 40 insertions, 24 deletions
diff --git a/core/agreement-state.go b/core/agreement-state.go index 426b062..4997ddc 100644 --- a/core/agreement-state.go +++ b/core/agreement-state.go @@ -69,9 +69,9 @@ func newInitialState(a *agreementData) *initialState { func (s *initialState) state() agreementStateType { return stateInitial } func (s *initialState) clocks() int { return 0 } func (s *initialState) nextState() (agreementState, error) { + hash := s.a.recv.ProposeBlock() s.a.lock.Lock() defer s.a.lock.Unlock() - hash := s.a.recv.ProposeBlock() s.a.recv.ProposeVote(&types.Vote{ Type: types.VoteInit, BlockHash: hash, diff --git a/core/agreement.go b/core/agreement.go index f3b1003..72aefc6 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -325,6 +325,9 @@ func (a *agreement) processVote(vote *types.Vote) error { } // Check if the agreement requires fast-forwarding. + if len(a.fastForward) > 0 { + return nil + } if vote.Type == types.VotePreCom { if hash, ok := a.data.countVoteNoLock(vote.Period, vote.Type); ok && hash != skipBlockHash { @@ -362,7 +365,9 @@ func (a *agreement) processVote(vote *types.Vote) error { addPullBlocks(types.VoteInit) addPullBlocks(types.VotePreCom) addPullBlocks(types.VoteCom) - a.data.recv.PullBlocks(hashes) + if len(hashes) > 0 { + a.data.recv.PullBlocks(hashes) + } a.fastForward <- vote.Period + 1 return nil } @@ -394,14 +399,15 @@ func (a *agreement) done() <-chan struct{} { // processBlock is the entry point for processing Block. func (a *agreement) processBlock(block *types.Block) error { + a.lock.Lock() + defer a.lock.Unlock() a.data.blocksLock.Lock() defer a.data.blocksLock.Unlock() - aID := a.agreementID() - if block.Position != aID { + if block.Position != a.aID { // Agreement module has stopped. - if !isStop(aID) { - if aID.Newer(&block.Position) { + if !isStop(a.aID) { + if a.aID.Newer(&block.Position) { return nil } } @@ -421,13 +427,17 @@ func (a *agreement) processBlock(block *types.Block) error { return err } a.data.blocks[block.ProposerID] = block - a.addCandidateBlock(block) + a.addCandidateBlockNoLock(block) return nil } func (a *agreement) addCandidateBlock(block *types.Block) { a.lock.Lock() defer a.lock.Unlock() + a.addCandidateBlockNoLock(block) +} + +func (a *agreement) addCandidateBlockNoLock(block *types.Block) { a.candidateBlock[block.Hash] = block } diff --git a/core/consensus.go b/core/consensus.go index fd651a1..5256168 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -71,11 +71,11 @@ type consensusBAReceiver struct { } func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { - if err := recv.agreementModule.prepareVote(vote); err != nil { - recv.consensus.logger.Error("Failed to prepare vote", "error", err) - return - } go func() { + if err := recv.agreementModule.prepareVote(vote); err != nil { + recv.consensus.logger.Error("Failed to prepare vote", "error", err) + return + } if err := recv.agreementModule.processVote(vote); err != nil { recv.consensus.logger.Error("Failed to process vote", "error", err) return @@ -92,7 +92,6 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash { recv.consensus.logger.Error("unable to propose block") return nullBlockHash } - recv.consensus.baModules[recv.chainID].addCandidateBlock(block) if err := recv.consensus.preProcessBlock(block); err != nil { recv.consensus.logger.Error("Failed to pre-process block", "error", err) return common.Hash{} @@ -129,10 +128,15 @@ func (recv *consensusBAReceiver) ConfirmBlock( recv.consensus.baConfirmedBlock[hash] = ch }() recv.consensus.network.PullBlocks(common.Hashes{hash}) - block = <-ch - recv.consensus.logger.Info("Receive unknown block", - "hash", hash, - "chainID", recv.chainID) + go func() { + block = <-ch + recv.consensus.logger.Info("Receive unknown block", + "hash", hash, + "chainID", recv.chainID) + recv.agreementModule.addCandidateBlock(block) + recv.ConfirmBlock(block.Hash, votes) + }() + return } } recv.consensus.ccModule.registerBlock(block) @@ -668,14 +672,16 @@ MessageLoop: continue MessageLoop } } - con.lock.Lock() - defer con.lock.Unlock() - // In case of multiple delivered block. - if _, exist := con.baConfirmedBlock[val.Hash]; !exist { - continue MessageLoop - } - delete(con.baConfirmedBlock, val.Hash) - ch <- val + func() { + con.lock.Lock() + defer con.lock.Unlock() + // In case of multiple delivered block. + if _, exist := con.baConfirmedBlock[val.Hash]; !exist { + return + } + delete(con.baConfirmedBlock, val.Hash) + ch <- val + }() } else if val.IsFinalized() { // For sync mode. if err := con.processFinalizedBlock(val); err != nil { |