diff options
author | Sonic <sonic@dexon.org> | 2019-03-20 16:34:56 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-04-09 21:32:58 +0800 |
commit | 2a6418678807137dfe6e5fba2969863bd468b840 (patch) | |
tree | bea1a37f28080c1fcdf58a6f9eb9e1dbd01df16e | |
parent | 4364f00e2204debb48edf2940e1f89f46d7a9a18 (diff) | |
download | dexon-2a6418678807137dfe6e5fba2969863bd468b840.tar dexon-2a6418678807137dfe6e5fba2969863bd468b840.tar.gz dexon-2a6418678807137dfe6e5fba2969863bd468b840.tar.bz2 dexon-2a6418678807137dfe6e5fba2969863bd468b840.tar.lz dexon-2a6418678807137dfe6e5fba2969863bd468b840.tar.xz dexon-2a6418678807137dfe6e5fba2969863bd468b840.tar.zst dexon-2a6418678807137dfe6e5fba2969863bd468b840.zip |
dex: recieve bp msg when recovery, use atomic to protect the flag (#286)
-rw-r--r-- | dex/blockproposer.go | 10 | ||||
-rw-r--r-- | dex/handler.go | 26 |
2 files changed, 19 insertions, 17 deletions
diff --git a/dex/blockproposer.go b/dex/blockproposer.go index 63aaa5b51..24a277335 100644 --- a/dex/blockproposer.go +++ b/dex/blockproposer.go @@ -91,7 +91,7 @@ func (b *blockProposer) Stop() { defer b.mu.Unlock() if atomic.LoadInt32(&b.running) == 1 { - b.dex.protocolManager.receiveEnabled = false + atomic.StoreInt32(&b.dex.protocolManager.receiveEnabled, 0) close(b.stopCh) b.wg.Wait() atomic.StoreInt32(&b.proposing, 0) @@ -157,7 +157,7 @@ func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) { _, coreHeight := db.GetCompactionChainTipInfo() // Stop receiving block proposer message when syncing. - b.dex.protocolManager.receiveEnabled = false + atomic.StoreInt32(&b.dex.protocolManager.receiveEnabled, 0) Loop: for { @@ -211,10 +211,7 @@ ListenLoop: log.Error("SyncBlocks fail", "err", err) return nil, err } - if !b.dex.protocolManager.receiveEnabled { - // Start receiving block proposer message. - b.dex.protocolManager.receiveEnabled = true - } + atomic.CompareAndSwapInt32(&b.dex.protocolManager.receiveEnabled, 0, 1) if synced { log.Debug("Consensus core synced") break ListenLoop @@ -252,6 +249,7 @@ ListenLoop: log.Info("Sleeping until next starting time", "time", nextDMoment) time.Sleep(time.Duration(nextDMoment-time.Now().Unix()) * time.Second) + atomic.StoreInt32(&b.dex.protocolManager.receiveEnabled, 1) consensusSync.ForceSync(true) break ListenLoop } diff --git a/dex/handler.go b/dex/handler.go index ea5c31b36..3a5a81f50 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -135,7 +135,7 @@ type ProtocolManager struct { // channels for dexon consensus core receiveCh chan interface{} - receiveEnabled bool + receiveEnabled int32 srvr p2pServer @@ -179,12 +179,16 @@ func NewProtocolManager( recordsyncCh: make(chan *recordsync), quitSync: make(chan struct{}), receiveCh: make(chan interface{}, 1024), - receiveEnabled: isBlockProposer, + receiveEnabled: 0, isBlockProposer: isBlockProposer, app: app, blockNumberGauge: metrics.GetOrRegisterGauge("dex/blocknumber", nil), } + if isBlockProposer { + manager.receiveEnabled = 1 + } + // Figure out whether to allow fast sync or not if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 { log.Warn("Blockchain not empty, fast sync disabled") @@ -833,7 +837,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Block proposer-only messages. case msg.Code == CoreBlockMsg: - if !pm.receiveEnabled { + if atomic.LoadInt32(&pm.receiveEnabled) == 0 { break } var blocks []*coreTypes.Block @@ -845,7 +849,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.receiveCh <- block } case msg.Code == VoteMsg: - if !pm.receiveEnabled { + if atomic.LoadInt32(&pm.receiveEnabled) == 0 { break } var votes []*coreTypes.Vote @@ -859,7 +863,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.receiveCh <- vote } case msg.Code == AgreementMsg: - if !pm.receiveEnabled { + if atomic.LoadInt32(&pm.receiveEnabled) == 0 { break } // DKG set is receiver @@ -870,7 +874,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.MarkAgreement(rlpHash(agreement)) pm.receiveCh <- &agreement case msg.Code == RandomnessMsg: - if !pm.receiveEnabled { + if atomic.LoadInt32(&pm.receiveEnabled) == 0 { break } // Broadcast this to all peer @@ -883,7 +887,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.receiveCh <- randomness } case msg.Code == DKGPrivateShareMsg: - if !pm.receiveEnabled { + if atomic.LoadInt32(&pm.receiveEnabled) == 0 { break } // Do not relay this msg @@ -894,7 +898,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.MarkDKGPrivateShares(rlpHash(ps)) pm.receiveCh <- &ps case msg.Code == DKGPartialSignatureMsg: - if !pm.receiveEnabled { + if atomic.LoadInt32(&pm.receiveEnabled) == 0 { break } // broadcast in DKG set @@ -904,7 +908,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &psig case msg.Code == PullBlocksMsg: - if !pm.receiveEnabled { + if atomic.LoadInt32(&pm.receiveEnabled) == 0 { break } var hashes coreCommon.Hashes @@ -915,7 +919,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { log.Debug("Push blocks", "blocks", blocks) return p.SendCoreBlocks(blocks) case msg.Code == PullVotesMsg: - if !pm.receiveEnabled { + if atomic.LoadInt32(&pm.receiveEnabled) == 0 { break } next, ok := pm.nextPullVote.Load(p.ID()) @@ -934,7 +938,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { log.Debug("Push votes", "votes", votes) return p.SendVotes(votes) case msg.Code == PullRandomnessMsg: - if !pm.receiveEnabled { + if atomic.LoadInt32(&pm.receiveEnabled) == 0 { break } var hashes coreCommon.Hashes |