diff options
-rw-r--r-- | core/agreement-mgr.go | 11 | ||||
-rw-r--r-- | core/agreement.go | 10 | ||||
-rw-r--r-- | core/consensus.go | 27 | ||||
-rw-r--r-- | integration_test/consensus_test.go | 11 |
4 files changed, 40 insertions, 19 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index a995cca..eb4abda 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -22,6 +22,7 @@ import ( "errors" "math" "sync" + "sync/atomic" "time" "github.com/dexon-foundation/dexon-consensus/common" @@ -180,7 +181,9 @@ func (mgr *agreementMgr) appendConfig( consensus: mgr.con, chainID: i, restartNotary: make(chan types.Position, 1), + roundValue: &atomic.Value{}, } + recv.roundValue.Store(uint64(0)) agrModule := newAgreement( mgr.con.ID, recv, @@ -399,7 +402,7 @@ Loop: <-setting.ticker.Tick() } // Run BA for this round. - recv.round = currentRound + recv.roundValue.Store(currentRound) recv.changeNotaryTime = roundEndTime recv.restartNotary <- types.Position{ChainID: math.MaxUint32} if err := mgr.baRoutineForOneRound(&setting); err != nil { @@ -439,11 +442,11 @@ Loop: } var nextHeight uint64 for { - nextHeight, err = mgr.lattice.NextHeight(recv.round, setting.chainID) + nextHeight, err = mgr.lattice.NextHeight(recv.round(), setting.chainID) if err != nil { mgr.logger.Debug("Error getting next height", "error", err, - "round", recv.round, + "round", recv.round(), "chainID", setting.chainID) err = nil nextHeight = oldPos.Height @@ -459,7 +462,7 @@ Loop: time.Sleep(100 * time.Millisecond) } nextPos := types.Position{ - Round: recv.round, + Round: recv.round(), ChainID: setting.chainID, Height: nextHeight, } diff --git a/core/agreement.go b/core/agreement.go index 11ed89a..f9cf546 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -281,16 +281,12 @@ func (a *agreement) leader() types.NodeID { // nextState is called at the specific clock time. func (a *agreement) nextState() (err error) { - if func() bool { - a.lock.RLock() - defer a.lock.RUnlock() - return a.hasOutput - }() { + a.lock.Lock() + defer a.lock.Unlock() + if a.hasOutput { a.state = newSleepState(a.data) return } - a.lock.Lock() - defer a.lock.Unlock() a.state, err = a.state.nextState() return } diff --git a/core/consensus.go b/core/consensus.go index 07fb4da..ad1efeb 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -22,6 +22,7 @@ import ( "encoding/hex" "fmt" "sync" + "sync/atomic" "time" "github.com/dexon-foundation/dexon-consensus/common" @@ -69,11 +70,15 @@ type consensusBAReceiver struct { agreementModule *agreement chainID uint32 changeNotaryTime time.Time - round uint64 + roundValue *atomic.Value isNotary bool restartNotary chan types.Position } +func (recv *consensusBAReceiver) round() uint64 { + return recv.roundValue.Load().(uint64) +} + func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { if !recv.isNotary { return @@ -99,7 +104,7 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash { if !recv.isNotary { return common.Hash{} } - block := recv.consensus.proposeBlock(recv.chainID, recv.round) + block := recv.consensus.proposeBlock(recv.chainID, recv.round()) if block == nil { recv.consensus.logger.Error("unable to propose block") return nullBlockHash @@ -125,7 +130,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( recv.consensus.logger.Info("Empty block is confirmed", "position", &aID) var err error - block, err = recv.consensus.proposeEmptyBlock(recv.round, recv.chainID) + block, err = recv.consensus.proposeEmptyBlock(recv.round(), recv.chainID) if err != nil { recv.consensus.logger.Error("Propose empty block failed", "error", err) return @@ -143,9 +148,19 @@ func (recv *consensusBAReceiver) ConfirmBlock( defer recv.consensus.lock.Unlock() recv.consensus.baConfirmedBlock[hash] = ch }() - recv.consensus.network.PullBlocks(common.Hashes{hash}) go func() { - block = <-ch + hashes := common.Hashes{hash} + PullBlockLoop: + for { + recv.consensus.logger.Debug("Calling Network.PullBlock for BA block", + "hash", hash) + recv.consensus.network.PullBlocks(hashes) + select { + case block = <-ch: + break PullBlockLoop + case <-time.After(1 * time.Second): + } + } recv.consensus.logger.Info("Receive unknown block", "hash", hash.String()[:6], "position", &block.Position, @@ -245,7 +260,7 @@ CleanChannelLoop: } newPos := block.Position if block.Timestamp.After(recv.changeNotaryTime) { - recv.round++ + recv.roundValue.Store(recv.round() + 1) newPos.Round++ } recv.restartNotary <- newPos diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go index fa62bda..99de7c9 100644 --- a/integration_test/consensus_test.go +++ b/integration_test/consensus_test.go @@ -220,6 +220,7 @@ func (s *ConsensusTestSuite) TestSimple() { nodes := s.setupNodes(dMoment, prvKeys, seedGov) for _, n := range nodes { go n.con.Run() + defer n.con.Stop() } Loop: for { @@ -236,7 +237,6 @@ Loop: break } s.verifyNodes(nodes) - // TODO(haoping) stop consensus. } func (s *ConsensusTestSuite) TestNumChainsChange() { @@ -300,6 +300,7 @@ func (s *ConsensusTestSuite) TestNumChainsChange() { // Run test. for _, n := range nodes { go n.con.Run() + defer n.con.Stop() } Loop: for { @@ -359,6 +360,9 @@ func (s *ConsensusTestSuite) TestSync() { for _, n := range nodes { if n.ID != syncNode.ID { go n.con.Run() + if n.ID != stoppedNode.ID { + defer n.con.Stop() + } } } // Clean syncNode's network receive channel, or it might exceed the limit @@ -415,6 +419,10 @@ ReachAlive: if syncedCon != nil { syncNode.con = syncedCon go syncNode.con.Run() + go func() { + <-runnerCtx.Done() + syncNode.con.Stop() + }() break SyncLoop } if err != nil { @@ -465,7 +473,6 @@ ReachAlive: req.NoError(err) case <-runnerCtx.Done(): // This test passed. - // TODO(haoping) stop consensus. } } |