aboutsummaryrefslogtreecommitdiffstats
path: root/core/syncer/consensus.go
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2019-02-15 14:18:59 +0800
committerJimmy Hu <jimmy.hu@dexon.org>2019-02-19 10:48:50 +0800
commit4dbdc22e355cf1f6f0c39af1b2f3737b7527bc0c (patch)
tree625b7d34aa700d072ffb8e68dc89ed3936b76d29 /core/syncer/consensus.go
parente4825619fb2499f5f534537c1a4d52d3e0bcacfe (diff)
downloaddexon-consensus-4dbdc22e355cf1f6f0c39af1b2f3737b7527bc0c.tar
dexon-consensus-4dbdc22e355cf1f6f0c39af1b2f3737b7527bc0c.tar.gz
dexon-consensus-4dbdc22e355cf1f6f0c39af1b2f3737b7527bc0c.tar.bz2
dexon-consensus-4dbdc22e355cf1f6f0c39af1b2f3737b7527bc0c.tar.lz
dexon-consensus-4dbdc22e355cf1f6f0c39af1b2f3737b7527bc0c.tar.xz
dexon-consensus-4dbdc22e355cf1f6f0c39af1b2f3737b7527bc0c.tar.zst
dexon-consensus-4dbdc22e355cf1f6f0c39af1b2f3737b7527bc0c.zip
big-bang: single chain (#446)
Diffstat (limited to 'core/syncer/consensus.go')
-rw-r--r--core/syncer/consensus.go572
1 files changed, 128 insertions, 444 deletions
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index 75c1067..618d90e 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -63,19 +63,16 @@ type Consensus struct {
nodeSetCache *utils.NodeSetCache
tsigVerifier *core.TSigVerifierCache
- lattice *core.Lattice
- validatedChains map[uint32]struct{}
- finalizedBlockHashes common.Hashes
- latticeLastRound uint64
- randomnessResults map[common.Hash]*types.BlockRandomnessResult
- blocks []types.ByPosition
- agreements []*agreement
- configs []*types.Config
- roundBeginTimes []time.Time
- agreementRoundCut uint64
+ randomnessResults map[common.Hash]*types.BlockRandomnessResult
+ blocks types.BlocksByPosition
+ agreementModule *agreement
+ configs []*types.Config
+ roundBeginTimes []time.Time
+ agreementRoundCut uint64
// lock for accessing all fields.
lock sync.RWMutex
+ duringBuffering bool
moduleWaitGroup sync.WaitGroup
agreementWaitGroup sync.WaitGroup
pullChan chan common.Hash
@@ -100,16 +97,15 @@ func NewConsensus(
logger common.Logger) *Consensus {
con := &Consensus{
- dMoment: dMoment,
- app: app,
- gov: gov,
- db: db,
- network: network,
- nodeSetCache: utils.NewNodeSetCache(gov),
- tsigVerifier: core.NewTSigVerifierCache(gov, 7),
- prv: prv,
- logger: logger,
- validatedChains: make(map[uint32]struct{}),
+ dMoment: dMoment,
+ app: app,
+ gov: gov,
+ db: db,
+ network: network,
+ nodeSetCache: utils.NewNodeSetCache(gov),
+ tsigVerifier: core.NewTSigVerifierCache(gov, 7),
+ prv: prv,
+ logger: logger,
configs: []*types.Config{
utils.GetConfigWithPanic(gov, 0, logger),
},
@@ -119,294 +115,66 @@ func NewConsensus(
randomnessResults: make(map[common.Hash]*types.BlockRandomnessResult),
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
+ con.agreementModule = newAgreement(
+ con.receiveChan, con.pullChan, con.nodeSetCache, con.logger)
+ con.agreementWaitGroup.Add(1)
+ go func() {
+ defer con.agreementWaitGroup.Done()
+ con.agreementModule.run()
+ }()
return con
}
-func (con *Consensus) initConsensusObj(initBlock *types.Block) {
- func() {
- con.lock.Lock()
- defer con.lock.Unlock()
- con.latticeLastRound = initBlock.Position.Round
- debugApp, _ := con.app.(core.Debug)
- con.lattice = core.NewLattice(
- con.roundBeginTimes[con.latticeLastRound],
- con.latticeLastRound,
- con.configs[con.latticeLastRound],
- utils.NewSigner(con.prv),
- con.app,
- debugApp,
- con.db,
- con.logger,
- )
- }()
+func (con *Consensus) assureBuffering() {
+ if func() bool {
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ return con.duringBuffering
+ }() {
+ return
+ }
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ if con.duringBuffering {
+ return
+ }
+ con.duringBuffering = true
con.startAgreement()
con.startNetwork()
con.startCRSMonitor()
}
-func (con *Consensus) checkIfValidated() (validated bool) {
- con.lock.RLock()
- defer con.lock.RUnlock()
- var (
- round = con.blocks[0][0].Position.Round
- numChains = con.configs[round].NumChains
- validatedChainCount uint32
- )
- // Make sure we validate some block in all chains.
- for chainID := range con.validatedChains {
- if chainID < numChains {
- validatedChainCount++
- }
- }
- validated = validatedChainCount == numChains
- con.logger.Debug("syncer chain-validation status",
- "validated-chain", validatedChainCount,
- "round", round,
- "valid", validated)
- return
-}
-
func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) {
con.lock.RLock()
defer con.lock.RUnlock()
- var (
- round = con.blocks[0][0].Position.Round
- numChains = con.configs[round].NumChains
- compactionTips = make([]*types.Block, numChains)
- overlapCount = uint32(0)
- )
defer func() {
con.logger.Debug("syncer synced status",
- "overlap-count", overlapCount,
- "num-chain", numChains,
"last-block", blocks[len(blocks)-1],
- "synced", synced)
+ "synced", synced,
+ )
}()
- // Find tips (newset blocks) of each chain in compaction chain.
- b := blocks[len(blocks)-1]
- for tipCount := uint32(0); tipCount < numChains; {
- if compactionTips[b.Position.ChainID] == nil {
- // Check chainID for config change.
- if b.Position.ChainID < numChains {
- compactionTips[b.Position.ChainID] = b
- tipCount++
- }
- }
- if (b.Finalization.ParentHash == common.Hash{}) {
- return
- }
- b1, err := con.db.GetBlock(b.Finalization.ParentHash)
- if err != nil {
- panic(err)
- }
- b = &b1
- }
- // Check if chain tips of compaction chain and current cached confirmed
- // blocks are overlapped on each chain, numChains is decided by the round
- // of last block we seen on compaction chain.
- for chainID, b := range compactionTips {
- if len(con.blocks[chainID]) > 0 {
- if !b.Position.Older(&con.blocks[chainID][0].Position) {
- overlapCount++
- }
- }
+ if len(con.blocks) == 0 || len(blocks) == 0 {
+ return
}
- synced = overlapCount == numChains
+ synced = !blocks[len(blocks)-1].Position.Older(con.blocks[0].Position)
return
}
-// ensureAgreementOverlapRound ensures the oldest blocks in each chain in
-// con.blocks are all in the same round, for avoiding config change while
-// syncing.
-func (con *Consensus) ensureAgreementOverlapRound() bool {
+func (con *Consensus) buildAllEmptyBlocks() {
con.lock.Lock()
defer con.lock.Unlock()
- defer func() {
- con.logger.Debug("ensureAgreementOverlapRound returned",
- "round", con.agreementRoundCut)
- }()
- if con.agreementRoundCut > 0 {
- return true
- }
// Clean empty blocks on tips of chains.
- for idx, bs := range con.blocks {
- for len(bs) > 0 && con.isEmptyBlock(bs[0]) {
- bs = bs[1:]
- }
- con.blocks[idx] = bs
+ for len(con.blocks) > 0 && con.isEmptyBlock(con.blocks[0]) {
+ con.blocks = con.blocks[1:]
}
// Build empty blocks.
- for _, bs := range con.blocks {
- for i := range bs {
- if con.isEmptyBlock(bs[i]) {
- if bs[i-1].Position.Height == bs[i].Position.Height-1 {
- con.buildEmptyBlock(bs[i], bs[i-1])
- }
+ for i, b := range con.blocks {
+ if con.isEmptyBlock(b) {
+ if con.blocks[i-1].Position.Height+1 == b.Position.Height {
+ con.buildEmptyBlock(b, con.blocks[i-1])
}
}
}
- var tipRoundMap map[uint64]uint32
- for {
- tipRoundMap = make(map[uint64]uint32)
- for _, bs := range con.blocks {
- if len(bs) > 0 {
- tipRoundMap[bs[0].Position.Round]++
- }
- }
- if len(tipRoundMap) <= 1 {
- break
- }
- // Make all tips in same round.
- var maxRound uint64
- for r := range tipRoundMap {
- if r > maxRound {
- maxRound = r
- }
- }
- for idx, bs := range con.blocks {
- for len(bs) > 0 && bs[0].Position.Round < maxRound {
- bs = bs[1:]
- }
- con.blocks[idx] = bs
- }
- }
- if len(tipRoundMap) == 1 {
- var r uint64
- for r = range tipRoundMap {
- break
- }
- con.logger.Debug("check agreement round cut",
- "tip-round", r,
- "configs", len(con.configs))
- if tipRoundMap[r] == con.configs[r].NumChains {
- con.agreementRoundCut = r
- return true
- }
- }
- return false
-}
-
-func (con *Consensus) findLatticeSyncBlock(
- blocks []*types.Block) (*types.Block, error) {
- lastBlock := blocks[len(blocks)-1]
- round := lastBlock.Position.Round
- isConfigChanged := func(prev, cur *types.Config) bool {
- return prev.K != cur.K ||
- prev.NumChains != cur.NumChains ||
- prev.PhiRatio != cur.PhiRatio
- }
- for {
- // Find round r which r-1, r, r+1 are all in same total ordering config.
- for {
- sameAsPrevRound := round == 0 || !isConfigChanged(
- con.configs[round-1], con.configs[round])
- sameAsNextRound := !isConfigChanged(
- con.configs[round], con.configs[round+1])
- if sameAsPrevRound && sameAsNextRound {
- break
- }
- if round == 0 {
- // Unable to find a safe round, wait for new rounds.
- return nil, nil
- }
- round--
- }
- // Find the newset block which round is "round".
- for lastBlock.Position.Round != round {
- if (lastBlock.Finalization.ParentHash == common.Hash{}) {
- return nil, ErrGenesisBlockReached
- }
- b, err := con.db.GetBlock(lastBlock.Finalization.ParentHash)
- if err != nil {
- return nil, err
- }
- lastBlock = &b
- }
- // Find the deliver set by hash for two times. Blocks in a deliver set
- // returned by total ordering is sorted by hash. If a block's parent
- // hash is greater than its hash means there is a cut between deliver
- // sets.
- var curBlock, prevBlock *types.Block
- var deliverSetFirstBlock, deliverSetLastBlock *types.Block
- curBlock = lastBlock
- for {
- if (curBlock.Finalization.ParentHash == common.Hash{}) {
- return nil, ErrGenesisBlockReached
- }
- b, err := con.db.GetBlock(curBlock.Finalization.ParentHash)
- if err != nil {
- return nil, err
- }
- prevBlock = &b
- if !prevBlock.Hash.Less(curBlock.Hash) {
- break
- }
- curBlock = prevBlock
- }
- deliverSetLastBlock = prevBlock
- curBlock = prevBlock
- for {
- if (curBlock.Finalization.ParentHash == common.Hash{}) {
- break
- }
- b, err := con.db.GetBlock(curBlock.Finalization.ParentHash)
- if err != nil {
- return nil, err
- }
- prevBlock = &b
- if !prevBlock.Hash.Less(curBlock.Hash) {
- break
- }
- curBlock = prevBlock
- }
- deliverSetFirstBlock = curBlock
- // Check if all blocks from deliverSetFirstBlock to deliverSetLastBlock
- // are in the same round.
- ok := true
- curBlock = deliverSetLastBlock
- for {
- if curBlock.Position.Round != round {
- ok = false
- break
- }
- b, err := con.db.GetBlock(curBlock.Finalization.ParentHash)
- if err != nil {
- return nil, err
- }
- curBlock = &b
- if curBlock.Hash == deliverSetFirstBlock.Hash {
- break
- }
- }
- if ok {
- return deliverSetFirstBlock, nil
- }
- if round == 0 {
- return nil, nil
- }
- round--
- }
-}
-
-func (con *Consensus) processFinalizedBlock(block *types.Block) error {
- if con.lattice == nil {
- return nil
- }
- delivered, err := con.lattice.ProcessFinalizedBlock(block)
- if err != nil {
- return err
- }
- con.lock.Lock()
- defer con.lock.Unlock()
- con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash)
- for idx, b := range delivered {
- if con.finalizedBlockHashes[idx] != b.Hash {
- return ErrMismatchBlockHashSequence
- }
- con.validatedChains[b.Position.ChainID] = struct{}{}
- }
- con.finalizedBlockHashes = con.finalizedBlockHashes[len(delivered):]
- return nil
}
// SyncBlocks syncs blocks from compaction chain, latest is true if the caller
@@ -420,7 +188,8 @@ func (con *Consensus) SyncBlocks(
con.logger.Debug("SyncBlocks returned",
"synced", synced,
"error", err,
- "last-block", con.syncedLastBlock)
+ "last-block", con.syncedLastBlock,
+ )
}()
if con.syncedLastBlock != nil {
synced, err = true, ErrAlreadySynced
@@ -442,7 +211,8 @@ func (con *Consensus) SyncBlocks(
if blocks[0].Finalization.Height != tipHeight+1 {
con.logger.Error("mismatched finalization height",
"now", blocks[0].Finalization.Height,
- "expected", tipHeight+1)
+ "expected", tipHeight+1,
+ )
err = ErrInvalidSyncingFinalizationHeight
return
}
@@ -454,7 +224,6 @@ func (con *Consensus) SyncBlocks(
)
con.setupConfigs(blocks)
for _, b := range blocks {
- // TODO(haoping) remove this if lattice puts blocks into db.
if err = con.db.PutBlock(*b); err != nil {
// A block might be put into db when confirmed by BA, but not
// finalized yet.
@@ -469,60 +238,15 @@ func (con *Consensus) SyncBlocks(
b.Hash, b.Finalization.Height); err != nil {
return
}
- if err = con.processFinalizedBlock(b); err != nil {
- return
- }
- }
- if latest && con.lattice == nil {
- // New Lattice and find the deliver set of total ordering when "latest"
- // is true for first time. Deliver set is found by block hashes.
- var syncBlock *types.Block
- syncBlock, err = con.findLatticeSyncBlock(blocks)
- if err != nil {
- if err == ErrGenesisBlockReached {
- con.logger.Debug("SyncBlocks skip error", "error", err)
- err = nil
- }
- return
- }
- if syncBlock != nil {
- con.logger.Debug("deliver set found", "block", syncBlock)
- // New lattice with the round of syncBlock.
- con.initConsensusObj(syncBlock)
- con.setupConfigs(blocks)
- // Process blocks from syncBlock to blocks' last block.
- b := blocks[len(blocks)-1]
- blocksCount :=
- b.Finalization.Height - syncBlock.Finalization.Height + 1
- blocksToProcess := make([]*types.Block, blocksCount)
- for {
- blocksToProcess[blocksCount-1] = b
- blocksCount--
- if b.Hash == syncBlock.Hash {
- break
- }
- var b1 types.Block
- b1, err = con.db.GetBlock(b.Finalization.ParentHash)
- if err != nil {
- return
- }
- b = &b1
- }
- for _, b := range blocksToProcess {
- if err = con.processFinalizedBlock(b); err != nil {
- return
- }
- }
- }
}
- if latest && con.ensureAgreementOverlapRound() {
+ if latest {
+ con.assureBuffering()
+ con.buildAllEmptyBlocks()
// Check if compaction and agreements' blocks are overlapped. The
// overlapping of compaction chain and BA's oldest blocks means the
// syncing is done.
- if con.checkIfValidated() && con.checkIfSynced(blocks) {
- if err = con.Stop(); err != nil {
- return
- }
+ if con.checkIfSynced(blocks) {
+ con.stopBuffering()
con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
context.Background(), con.network.ReceiveChan(),
func(msg interface{}) {
@@ -547,10 +271,6 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
}
// flush all blocks in con.blocks into core.Consensus, and build
// core.Consensus from syncer.
- confirmedBlocks := make([][]*types.Block, len(con.blocks))
- for i, bs := range con.blocks {
- confirmedBlocks[i] = []*types.Block(bs)
- }
randomnessResults := []*types.BlockRandomnessResult{}
for _, r := range con.randomnessResults {
randomnessResults = append(randomnessResults, r)
@@ -566,19 +286,31 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
con.db,
con.network,
con.prv,
- con.lattice,
- confirmedBlocks,
+ con.blocks,
randomnessResults,
con.dummyMsgBuffer,
con.logger)
return con.syncedConsensus, err
}
-// Stop the syncer.
+// stopBuffering stops the syncer buffering routines.
//
// This method is mainly for caller to stop the syncer before synced, the syncer
// would call this method automatically after being synced.
-func (con *Consensus) Stop() error {
+func (con *Consensus) stopBuffering() {
+ if func() bool {
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ return !con.duringBuffering
+ }() {
+ return
+ }
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ if !con.duringBuffering {
+ return
+ }
+ con.duringBuffering = false
con.logger.Trace("syncer is about to stop")
// Stop network and CRS routines, wait until they are all stoped.
con.ctxCancel()
@@ -588,7 +320,7 @@ func (con *Consensus) Stop() error {
con.logger.Trace("stop syncer agreement modules")
con.stopAgreement()
con.logger.Trace("syncer stopped")
- return nil
+ return
}
// isEmptyBlock checks if a block is an empty block by both its hash and parent
@@ -607,41 +339,6 @@ func (con *Consensus) buildEmptyBlock(b *types.Block, parent *types.Block) {
b.Acks = common.NewSortedHashes(common.Hashes{parent.Hash})
}
-func (con *Consensus) setupConfigsUntilRound(round uint64) {
- curMaxNumChains := uint32(0)
- func() {
- con.lock.Lock()
- defer con.lock.Unlock()
- con.logger.Debug("syncer setupConfigs",
- "until-round", round,
- "length", len(con.configs),
- "lattice", con.latticeLastRound)
- for r := uint64(len(con.configs)); r <= round; r++ {
- cfg := utils.GetConfigWithPanic(con.gov, r, con.logger)
- con.configs = append(con.configs, cfg)
- con.roundBeginTimes = append(
- con.roundBeginTimes,
- con.roundBeginTimes[r-1].Add(con.configs[r-1].RoundInterval))
- if cfg.NumChains >= curMaxNumChains {
- curMaxNumChains = cfg.NumChains
- }
- }
- // Notify core.Lattice for new configs.
- if con.lattice != nil {
- for con.latticeLastRound+1 <= round {
- con.latticeLastRound++
- if err := con.lattice.AppendConfig(
- con.latticeLastRound,
- con.configs[con.latticeLastRound]); err != nil {
- panic(err)
- }
- }
- }
- }()
- con.resizeByNumChains(curMaxNumChains)
- con.logger.Trace("setupConfgis finished", "round", round)
-}
-
// setupConfigs is called by SyncBlocks with blocks from compaction chain. In
// the first time, setupConfigs setups from round 0.
func (con *Consensus) setupConfigs(blocks []*types.Block) {
@@ -661,25 +358,19 @@ func (con *Consensus) setupConfigs(blocks []*types.Block) {
con.setupConfigsUntilRound(maxRound + core.ConfigRoundShift - 1)
}
-// resizeByNumChains resizes fake lattice and agreement if numChains increases.
-// Notice the decreasing case is neglected.
-func (con *Consensus) resizeByNumChains(numChains uint32) {
+func (con *Consensus) setupConfigsUntilRound(round uint64) {
con.lock.Lock()
defer con.lock.Unlock()
- if numChains > uint32(len(con.blocks)) {
- for i := uint32(len(con.blocks)); i < numChains; i++ {
- // Resize the pool of blocks.
- con.blocks = append(con.blocks, types.ByPosition{})
- // Resize agreement modules.
- a := newAgreement(
- con.receiveChan, con.pullChan, con.nodeSetCache, con.logger)
- con.agreements = append(con.agreements, a)
- con.agreementWaitGroup.Add(1)
- go func() {
- defer con.agreementWaitGroup.Done()
- a.run()
- }()
- }
+ con.logger.Debug("syncer setupConfigs",
+ "until-round", round,
+ "length", len(con.configs),
+ )
+ for r := uint64(len(con.configs)); r <= round; r++ {
+ cfg := utils.GetConfigWithPanic(con.gov, r, con.logger)
+ con.configs = append(con.configs, cfg)
+ con.roundBeginTimes = append(
+ con.roundBeginTimes,
+ con.roundBeginTimes[r-1].Add(con.configs[r-1].RoundInterval))
}
}
@@ -693,17 +384,15 @@ func (con *Consensus) startAgreement() {
if !ok {
return
}
- chainID := b.Position.ChainID
func() {
con.lock.Lock()
defer con.lock.Unlock()
- // If round is cut in agreements, do not add blocks with
- // round less then cut round.
- if b.Position.Round < con.agreementRoundCut {
+ if len(con.blocks) > 0 &&
+ !b.Position.Newer(con.blocks[0].Position) {
return
}
- con.blocks[chainID] = append(con.blocks[chainID], b)
- sort.Sort(con.blocks[chainID])
+ con.blocks = append(con.blocks, b)
+ sort.Sort(con.blocks)
}()
case h, ok := <-con.pullChan:
if !ok {
@@ -721,18 +410,14 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) {
return
}
// We only have to cache randomness result after cutting round.
- if r.Position.Round < func() uint64 {
- con.lock.RLock()
- defer con.lock.RUnlock()
- return con.agreementRoundCut
- }() {
- return
- }
- if func() (exists bool) {
+ if func() bool {
con.lock.RLock()
defer con.lock.RUnlock()
- _, exists = con.randomnessResults[r.BlockHash]
- return
+ if len(con.blocks) > 0 && r.Position.Older(con.blocks[0].Position) {
+ return true
+ }
+ _, exists := con.randomnessResults[r.BlockHash]
+ return exists
}() {
return
}
@@ -740,8 +425,9 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) {
if err != nil {
con.logger.Error("Unable to get tsig verifier",
"hash", r.BlockHash.String()[:6],
- "position", &r.Position,
- "error", err)
+ "position", r.Position,
+ "error", err,
+ )
return
}
if !ok {
@@ -752,8 +438,9 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) {
Type: "bls",
Signature: r.Randomness}) {
con.logger.Info("Block randomness is not valid",
- "position", &r.Position,
- "hash", r.BlockHash.String()[:6])
+ "position", r.Position,
+ "hash", r.BlockHash.String()[:6],
+ )
return
}
con.lock.Lock()
@@ -785,18 +472,19 @@ func (con *Consensus) startNetwork() {
if func() bool {
con.lock.RLock()
defer con.lock.RUnlock()
- if pos.ChainID >= uint32(len(con.agreements)) {
+ if pos.ChainID > 0 {
// This error might be easily encountered when the
// "latest" parameter of SyncBlocks is turned on too
// early.
con.logger.Error(
"Unknown chainID message received (syncer)",
- "position", &pos)
+ "position", pos,
+ )
return false
}
return true
}() {
- con.agreements[pos.ChainID].inputChan <- val
+ con.agreementModule.inputChan <- val
}
case <-con.ctx.Done():
return
@@ -817,23 +505,25 @@ func (con *Consensus) startCRSMonitor() {
}
con.logger.Debug("CRS is ready", "round", round)
lastNotifiedRound = round
- con.lock.Lock()
- defer con.lock.Unlock()
- for idx, a := range con.agreements {
- loop:
- for {
- select {
- case <-con.ctx.Done():
- break loop
- case a.inputChan <- round:
- break loop
- case <-time.After(500 * time.Millisecond):
- con.logger.Debug(
- "agreement input channel is full when putting CRS",
- "chainID", idx,
- "round", round)
- }
+ for func() bool {
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ if !con.duringBuffering {
+ return false
+ }
+ select {
+ case <-con.ctx.Done():
+ return false
+ case con.agreementModule.inputChan <- round:
+ return false
+ case <-time.After(500 * time.Millisecond):
+ con.logger.Debug(
+ "agreement input channel is full when putting CRS",
+ "round", round,
+ )
+ return true
}
+ }() {
}
}
con.moduleWaitGroup.Add(1)
@@ -860,16 +550,10 @@ func (con *Consensus) startCRSMonitor() {
}
func (con *Consensus) stopAgreement() {
- func() {
- con.lock.Lock()
- defer con.lock.Unlock()
- for _, a := range con.agreements {
- if a.inputChan != nil {
- close(a.inputChan)
- a.inputChan = nil
- }
- }
- }()
+ if con.agreementModule.inputChan != nil {
+ close(con.agreementModule.inputChan)
+ con.agreementModule.inputChan = nil
+ }
con.agreementWaitGroup.Wait()
close(con.receiveChan)
close(con.pullChan)