aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/agreement-mgr.go11
-rw-r--r--core/agreement.go10
-rw-r--r--core/consensus.go27
-rw-r--r--integration_test/consensus_test.go11
4 files changed, 40 insertions, 19 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go
index a995cca..eb4abda 100644
--- a/core/agreement-mgr.go
+++ b/core/agreement-mgr.go
@@ -22,6 +22,7 @@ import (
"errors"
"math"
"sync"
+ "sync/atomic"
"time"
"github.com/dexon-foundation/dexon-consensus/common"
@@ -180,7 +181,9 @@ func (mgr *agreementMgr) appendConfig(
consensus: mgr.con,
chainID: i,
restartNotary: make(chan types.Position, 1),
+ roundValue: &atomic.Value{},
}
+ recv.roundValue.Store(uint64(0))
agrModule := newAgreement(
mgr.con.ID,
recv,
@@ -399,7 +402,7 @@ Loop:
<-setting.ticker.Tick()
}
// Run BA for this round.
- recv.round = currentRound
+ recv.roundValue.Store(currentRound)
recv.changeNotaryTime = roundEndTime
recv.restartNotary <- types.Position{ChainID: math.MaxUint32}
if err := mgr.baRoutineForOneRound(&setting); err != nil {
@@ -439,11 +442,11 @@ Loop:
}
var nextHeight uint64
for {
- nextHeight, err = mgr.lattice.NextHeight(recv.round, setting.chainID)
+ nextHeight, err = mgr.lattice.NextHeight(recv.round(), setting.chainID)
if err != nil {
mgr.logger.Debug("Error getting next height",
"error", err,
- "round", recv.round,
+ "round", recv.round(),
"chainID", setting.chainID)
err = nil
nextHeight = oldPos.Height
@@ -459,7 +462,7 @@ Loop:
time.Sleep(100 * time.Millisecond)
}
nextPos := types.Position{
- Round: recv.round,
+ Round: recv.round(),
ChainID: setting.chainID,
Height: nextHeight,
}
diff --git a/core/agreement.go b/core/agreement.go
index 11ed89a..f9cf546 100644
--- a/core/agreement.go
+++ b/core/agreement.go
@@ -281,16 +281,12 @@ func (a *agreement) leader() types.NodeID {
// nextState is called at the specific clock time.
func (a *agreement) nextState() (err error) {
- if func() bool {
- a.lock.RLock()
- defer a.lock.RUnlock()
- return a.hasOutput
- }() {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ if a.hasOutput {
a.state = newSleepState(a.data)
return
}
- a.lock.Lock()
- defer a.lock.Unlock()
a.state, err = a.state.nextState()
return
}
diff --git a/core/consensus.go b/core/consensus.go
index 07fb4da..ad1efeb 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -22,6 +22,7 @@ import (
"encoding/hex"
"fmt"
"sync"
+ "sync/atomic"
"time"
"github.com/dexon-foundation/dexon-consensus/common"
@@ -69,11 +70,15 @@ type consensusBAReceiver struct {
agreementModule *agreement
chainID uint32
changeNotaryTime time.Time
- round uint64
+ roundValue *atomic.Value
isNotary bool
restartNotary chan types.Position
}
+func (recv *consensusBAReceiver) round() uint64 {
+ return recv.roundValue.Load().(uint64)
+}
+
func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) {
if !recv.isNotary {
return
@@ -99,7 +104,7 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash {
if !recv.isNotary {
return common.Hash{}
}
- block := recv.consensus.proposeBlock(recv.chainID, recv.round)
+ block := recv.consensus.proposeBlock(recv.chainID, recv.round())
if block == nil {
recv.consensus.logger.Error("unable to propose block")
return nullBlockHash
@@ -125,7 +130,7 @@ func (recv *consensusBAReceiver) ConfirmBlock(
recv.consensus.logger.Info("Empty block is confirmed",
"position", &aID)
var err error
- block, err = recv.consensus.proposeEmptyBlock(recv.round, recv.chainID)
+ block, err = recv.consensus.proposeEmptyBlock(recv.round(), recv.chainID)
if err != nil {
recv.consensus.logger.Error("Propose empty block failed", "error", err)
return
@@ -143,9 +148,19 @@ func (recv *consensusBAReceiver) ConfirmBlock(
defer recv.consensus.lock.Unlock()
recv.consensus.baConfirmedBlock[hash] = ch
}()
- recv.consensus.network.PullBlocks(common.Hashes{hash})
go func() {
- block = <-ch
+ hashes := common.Hashes{hash}
+ PullBlockLoop:
+ for {
+ recv.consensus.logger.Debug("Calling Network.PullBlock for BA block",
+ "hash", hash)
+ recv.consensus.network.PullBlocks(hashes)
+ select {
+ case block = <-ch:
+ break PullBlockLoop
+ case <-time.After(1 * time.Second):
+ }
+ }
recv.consensus.logger.Info("Receive unknown block",
"hash", hash.String()[:6],
"position", &block.Position,
@@ -245,7 +260,7 @@ CleanChannelLoop:
}
newPos := block.Position
if block.Timestamp.After(recv.changeNotaryTime) {
- recv.round++
+ recv.roundValue.Store(recv.round() + 1)
newPos.Round++
}
recv.restartNotary <- newPos
diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go
index fa62bda..99de7c9 100644
--- a/integration_test/consensus_test.go
+++ b/integration_test/consensus_test.go
@@ -220,6 +220,7 @@ func (s *ConsensusTestSuite) TestSimple() {
nodes := s.setupNodes(dMoment, prvKeys, seedGov)
for _, n := range nodes {
go n.con.Run()
+ defer n.con.Stop()
}
Loop:
for {
@@ -236,7 +237,6 @@ Loop:
break
}
s.verifyNodes(nodes)
- // TODO(haoping) stop consensus.
}
func (s *ConsensusTestSuite) TestNumChainsChange() {
@@ -300,6 +300,7 @@ func (s *ConsensusTestSuite) TestNumChainsChange() {
// Run test.
for _, n := range nodes {
go n.con.Run()
+ defer n.con.Stop()
}
Loop:
for {
@@ -359,6 +360,9 @@ func (s *ConsensusTestSuite) TestSync() {
for _, n := range nodes {
if n.ID != syncNode.ID {
go n.con.Run()
+ if n.ID != stoppedNode.ID {
+ defer n.con.Stop()
+ }
}
}
// Clean syncNode's network receive channel, or it might exceed the limit
@@ -415,6 +419,10 @@ ReachAlive:
if syncedCon != nil {
syncNode.con = syncedCon
go syncNode.con.Run()
+ go func() {
+ <-runnerCtx.Done()
+ syncNode.con.Stop()
+ }()
break SyncLoop
}
if err != nil {
@@ -465,7 +473,6 @@ ReachAlive:
req.NoError(err)
case <-runnerCtx.Done():
// This test passed.
- // TODO(haoping) stop consensus.
}
}