diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2019-01-10 18:01:10 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-01-10 18:01:10 +0800 |
commit | fa25817354d5b7d40f5911004232392acfe7fe53 (patch) | |
tree | 6e9c68971d4e5fecbd126568ebdb7f1026935d93 /core | |
parent | 995e571375298923bf07d205237a664b03de3b57 (diff) | |
download | dexon-consensus-fa25817354d5b7d40f5911004232392acfe7fe53.tar dexon-consensus-fa25817354d5b7d40f5911004232392acfe7fe53.tar.gz dexon-consensus-fa25817354d5b7d40f5911004232392acfe7fe53.tar.bz2 dexon-consensus-fa25817354d5b7d40f5911004232392acfe7fe53.tar.lz dexon-consensus-fa25817354d5b7d40f5911004232392acfe7fe53.tar.xz dexon-consensus-fa25817354d5b7d40f5911004232392acfe7fe53.tar.zst dexon-consensus-fa25817354d5b7d40f5911004232392acfe7fe53.zip |
core: fix issues in consensus core (#415)
Diffstat (limited to 'core')
-rw-r--r-- | core/agreement-mgr.go | 11 | ||||
-rw-r--r-- | core/agreement.go | 10 | ||||
-rw-r--r-- | core/consensus.go | 27 |
3 files changed, 31 insertions, 17 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index a995cca..eb4abda 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -22,6 +22,7 @@ import ( "errors" "math" "sync" + "sync/atomic" "time" "github.com/dexon-foundation/dexon-consensus/common" @@ -180,7 +181,9 @@ func (mgr *agreementMgr) appendConfig( consensus: mgr.con, chainID: i, restartNotary: make(chan types.Position, 1), + roundValue: &atomic.Value{}, } + recv.roundValue.Store(uint64(0)) agrModule := newAgreement( mgr.con.ID, recv, @@ -399,7 +402,7 @@ Loop: <-setting.ticker.Tick() } // Run BA for this round. - recv.round = currentRound + recv.roundValue.Store(currentRound) recv.changeNotaryTime = roundEndTime recv.restartNotary <- types.Position{ChainID: math.MaxUint32} if err := mgr.baRoutineForOneRound(&setting); err != nil { @@ -439,11 +442,11 @@ Loop: } var nextHeight uint64 for { - nextHeight, err = mgr.lattice.NextHeight(recv.round, setting.chainID) + nextHeight, err = mgr.lattice.NextHeight(recv.round(), setting.chainID) if err != nil { mgr.logger.Debug("Error getting next height", "error", err, - "round", recv.round, + "round", recv.round(), "chainID", setting.chainID) err = nil nextHeight = oldPos.Height @@ -459,7 +462,7 @@ Loop: time.Sleep(100 * time.Millisecond) } nextPos := types.Position{ - Round: recv.round, + Round: recv.round(), ChainID: setting.chainID, Height: nextHeight, } diff --git a/core/agreement.go b/core/agreement.go index 11ed89a..f9cf546 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -281,16 +281,12 @@ func (a *agreement) leader() types.NodeID { // nextState is called at the specific clock time. func (a *agreement) nextState() (err error) { - if func() bool { - a.lock.RLock() - defer a.lock.RUnlock() - return a.hasOutput - }() { + a.lock.Lock() + defer a.lock.Unlock() + if a.hasOutput { a.state = newSleepState(a.data) return } - a.lock.Lock() - defer a.lock.Unlock() a.state, err = a.state.nextState() return } diff --git a/core/consensus.go b/core/consensus.go index 07fb4da..ad1efeb 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -22,6 +22,7 @@ import ( "encoding/hex" "fmt" "sync" + "sync/atomic" "time" "github.com/dexon-foundation/dexon-consensus/common" @@ -69,11 +70,15 @@ type consensusBAReceiver struct { agreementModule *agreement chainID uint32 changeNotaryTime time.Time - round uint64 + roundValue *atomic.Value isNotary bool restartNotary chan types.Position } +func (recv *consensusBAReceiver) round() uint64 { + return recv.roundValue.Load().(uint64) +} + func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { if !recv.isNotary { return @@ -99,7 +104,7 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash { if !recv.isNotary { return common.Hash{} } - block := recv.consensus.proposeBlock(recv.chainID, recv.round) + block := recv.consensus.proposeBlock(recv.chainID, recv.round()) if block == nil { recv.consensus.logger.Error("unable to propose block") return nullBlockHash @@ -125,7 +130,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( recv.consensus.logger.Info("Empty block is confirmed", "position", &aID) var err error - block, err = recv.consensus.proposeEmptyBlock(recv.round, recv.chainID) + block, err = recv.consensus.proposeEmptyBlock(recv.round(), recv.chainID) if err != nil { recv.consensus.logger.Error("Propose empty block failed", "error", err) return @@ -143,9 +148,19 @@ func (recv *consensusBAReceiver) ConfirmBlock( defer recv.consensus.lock.Unlock() recv.consensus.baConfirmedBlock[hash] = ch }() - recv.consensus.network.PullBlocks(common.Hashes{hash}) go func() { - block = <-ch + hashes := common.Hashes{hash} + PullBlockLoop: + for { + recv.consensus.logger.Debug("Calling Network.PullBlock for BA block", + "hash", hash) + recv.consensus.network.PullBlocks(hashes) + select { + case block = <-ch: + break PullBlockLoop + case <-time.After(1 * time.Second): + } + } recv.consensus.logger.Info("Receive unknown block", "hash", hash.String()[:6], "position", &block.Position, @@ -245,7 +260,7 @@ CleanChannelLoop: } newPos := block.Position if block.Timestamp.After(recv.changeNotaryTime) { - recv.round++ + recv.roundValue.Store(recv.round() + 1) newPos.Round++ } recv.restartNotary <- newPos |