aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/agreement-state.go2
-rw-r--r--core/agreement.go22
-rw-r--r--core/consensus.go40
3 files changed, 40 insertions, 24 deletions
diff --git a/core/agreement-state.go b/core/agreement-state.go
index 426b062..4997ddc 100644
--- a/core/agreement-state.go
+++ b/core/agreement-state.go
@@ -69,9 +69,9 @@ func newInitialState(a *agreementData) *initialState {
func (s *initialState) state() agreementStateType { return stateInitial }
func (s *initialState) clocks() int { return 0 }
func (s *initialState) nextState() (agreementState, error) {
+ hash := s.a.recv.ProposeBlock()
s.a.lock.Lock()
defer s.a.lock.Unlock()
- hash := s.a.recv.ProposeBlock()
s.a.recv.ProposeVote(&types.Vote{
Type: types.VoteInit,
BlockHash: hash,
diff --git a/core/agreement.go b/core/agreement.go
index f3b1003..72aefc6 100644
--- a/core/agreement.go
+++ b/core/agreement.go
@@ -325,6 +325,9 @@ func (a *agreement) processVote(vote *types.Vote) error {
}
// Check if the agreement requires fast-forwarding.
+ if len(a.fastForward) > 0 {
+ return nil
+ }
if vote.Type == types.VotePreCom {
if hash, ok := a.data.countVoteNoLock(vote.Period, vote.Type); ok &&
hash != skipBlockHash {
@@ -362,7 +365,9 @@ func (a *agreement) processVote(vote *types.Vote) error {
addPullBlocks(types.VoteInit)
addPullBlocks(types.VotePreCom)
addPullBlocks(types.VoteCom)
- a.data.recv.PullBlocks(hashes)
+ if len(hashes) > 0 {
+ a.data.recv.PullBlocks(hashes)
+ }
a.fastForward <- vote.Period + 1
return nil
}
@@ -394,14 +399,15 @@ func (a *agreement) done() <-chan struct{} {
// processBlock is the entry point for processing Block.
func (a *agreement) processBlock(block *types.Block) error {
+ a.lock.Lock()
+ defer a.lock.Unlock()
a.data.blocksLock.Lock()
defer a.data.blocksLock.Unlock()
- aID := a.agreementID()
- if block.Position != aID {
+ if block.Position != a.aID {
// Agreement module has stopped.
- if !isStop(aID) {
- if aID.Newer(&block.Position) {
+ if !isStop(a.aID) {
+ if a.aID.Newer(&block.Position) {
return nil
}
}
@@ -421,13 +427,17 @@ func (a *agreement) processBlock(block *types.Block) error {
return err
}
a.data.blocks[block.ProposerID] = block
- a.addCandidateBlock(block)
+ a.addCandidateBlockNoLock(block)
return nil
}
func (a *agreement) addCandidateBlock(block *types.Block) {
a.lock.Lock()
defer a.lock.Unlock()
+ a.addCandidateBlockNoLock(block)
+}
+
+func (a *agreement) addCandidateBlockNoLock(block *types.Block) {
a.candidateBlock[block.Hash] = block
}
diff --git a/core/consensus.go b/core/consensus.go
index fd651a1..5256168 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -71,11 +71,11 @@ type consensusBAReceiver struct {
}
func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) {
- if err := recv.agreementModule.prepareVote(vote); err != nil {
- recv.consensus.logger.Error("Failed to prepare vote", "error", err)
- return
- }
go func() {
+ if err := recv.agreementModule.prepareVote(vote); err != nil {
+ recv.consensus.logger.Error("Failed to prepare vote", "error", err)
+ return
+ }
if err := recv.agreementModule.processVote(vote); err != nil {
recv.consensus.logger.Error("Failed to process vote", "error", err)
return
@@ -92,7 +92,6 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash {
recv.consensus.logger.Error("unable to propose block")
return nullBlockHash
}
- recv.consensus.baModules[recv.chainID].addCandidateBlock(block)
if err := recv.consensus.preProcessBlock(block); err != nil {
recv.consensus.logger.Error("Failed to pre-process block", "error", err)
return common.Hash{}
@@ -129,10 +128,15 @@ func (recv *consensusBAReceiver) ConfirmBlock(
recv.consensus.baConfirmedBlock[hash] = ch
}()
recv.consensus.network.PullBlocks(common.Hashes{hash})
- block = <-ch
- recv.consensus.logger.Info("Receive unknown block",
- "hash", hash,
- "chainID", recv.chainID)
+ go func() {
+ block = <-ch
+ recv.consensus.logger.Info("Receive unknown block",
+ "hash", hash,
+ "chainID", recv.chainID)
+ recv.agreementModule.addCandidateBlock(block)
+ recv.ConfirmBlock(block.Hash, votes)
+ }()
+ return
}
}
recv.consensus.ccModule.registerBlock(block)
@@ -668,14 +672,16 @@ MessageLoop:
continue MessageLoop
}
}
- con.lock.Lock()
- defer con.lock.Unlock()
- // In case of multiple delivered block.
- if _, exist := con.baConfirmedBlock[val.Hash]; !exist {
- continue MessageLoop
- }
- delete(con.baConfirmedBlock, val.Hash)
- ch <- val
+ func() {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ // In case of multiple delivered block.
+ if _, exist := con.baConfirmedBlock[val.Hash]; !exist {
+ return
+ }
+ delete(con.baConfirmedBlock, val.Hash)
+ ch <- val
+ }()
} else if val.IsFinalized() {
// For sync mode.
if err := con.processFinalizedBlock(val); err != nil {