diff options
-rw-r--r-- | dex/blockproposer.go | 13 | ||||
-rw-r--r-- | dex/handler.go | 40 |
2 files changed, 31 insertions, 22 deletions
diff --git a/dex/blockproposer.go b/dex/blockproposer.go index bcc150747..5af08f6c4 100644 --- a/dex/blockproposer.go +++ b/dex/blockproposer.go @@ -87,7 +87,7 @@ func (b *blockProposer) Stop() { defer b.mu.Unlock() if atomic.LoadInt32(&b.running) == 1 { - b.dex.protocolManager.isBlockProposer = false + b.dex.protocolManager.receiveEnabled = false close(b.stopCh) b.wg.Wait() atomic.StoreInt32(&b.proposing, 0) @@ -127,7 +127,6 @@ func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) { // Feed the current block we have in local blockchain. cb := b.dex.blockchain.CurrentBlock() - if cb.NumberU64() > 0 { var block coreTypes.Block if err := rlp.DecodeBytes(cb.Header().DexconMeta, &block); err != nil { @@ -153,6 +152,9 @@ func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) { // Sync all blocks in compaction chain to core. _, coreHeight := db.GetCompactionChainTipInfo() + // Stop receiving block proposer message when syncing. + b.dex.protocolManager.receiveEnabled = false + Loop: for { currentBlock := b.dex.blockchain.CurrentBlock() @@ -182,9 +184,6 @@ Loop: } } - // Enable isBlockProposer flag to start receiving msg. - b.dex.protocolManager.isBlockProposer = true - ch := make(chan core.ChainHeadEvent) sub := b.dex.blockchain.SubscribeChainHeadEvent(ch) defer sub.Unsubscribe() @@ -208,6 +207,10 @@ ListenLoop: log.Error("SyncBlocks fail", "err", err) return nil, err } + if !b.dex.protocolManager.receiveEnabled { + // Start receiving block proposer message. + b.dex.protocolManager.receiveEnabled = true + } if synced { log.Debug("Consensus core synced") break ListenLoop diff --git a/dex/handler.go b/dex/handler.go index 6cbd62a8f..57315f90a 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -134,7 +134,8 @@ type ProtocolManager struct { chainHeadSub event.Subscription // channels for dexon consensus core - receiveCh chan interface{} + receiveCh chan interface{} + receiveEnabled bool srvr p2pServer @@ -178,6 +179,7 @@ func NewProtocolManager( recordsyncCh: make(chan *recordsync), quitSync: make(chan struct{}), receiveCh: make(chan interface{}, 1024), + receiveEnabled: isBlockProposer, isBlockProposer: isBlockProposer, app: app, blockNumberGauge: metrics.GetOrRegisterGauge("dex/blocknumber", nil), @@ -400,12 +402,17 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { defer close(ch) go func() { - n := 0 + maxDelay := time.Minute + delay := 1 * time.Second + start := time.Now() for { select { - case <-time.After(time.Second): - p.Log().Debug("no msg more than 1s", "n", n) - n++ + case <-time.After(delay): + delay *= 2 + if delay > maxDelay { + delay = maxDelay + } + p.Log().Debug("no msg for a while", "t", time.Since(start)) case <-ch: return } @@ -424,12 +431,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { defer msg.Discard() go func() { - n := 0 + start := time.Now() for { select { case <-time.After(100 * time.Millisecond): - p.Log().Debug("handle msg more than 100ms", "n", n, "code", msg.Code) - n++ + p.Log().Debug("handle msg too long", "code", msg.Code, "t", time.Since(start)) case <-ch: return } @@ -827,7 +833,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Block proposer-only messages. case msg.Code == CoreBlockMsg: - if !pm.isBlockProposer { + if !pm.receiveEnabled { break } var blocks []*coreTypes.Block @@ -839,7 +845,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.receiveCh <- block } case msg.Code == VoteMsg: - if !pm.isBlockProposer { + if !pm.receiveEnabled { break } var votes []*coreTypes.Vote @@ -853,7 +859,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.receiveCh <- vote } case msg.Code == AgreementMsg: - if !pm.isBlockProposer { + if !pm.receiveEnabled { break } // DKG set is receiver @@ -864,7 +870,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.MarkAgreement(rlpHash(agreement)) pm.receiveCh <- &agreement case msg.Code == RandomnessMsg: - if !pm.isBlockProposer { + if !pm.receiveEnabled { break } // Broadcast this to all peer @@ -877,7 +883,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.receiveCh <- randomness } case msg.Code == DKGPrivateShareMsg: - if !pm.isBlockProposer { + if !pm.receiveEnabled { break } // Do not relay this msg @@ -888,7 +894,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.MarkDKGPrivateShares(rlpHash(ps)) pm.receiveCh <- &ps case msg.Code == DKGPartialSignatureMsg: - if !pm.isBlockProposer { + if !pm.receiveEnabled { break } // broadcast in DKG set @@ -898,7 +904,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &psig case msg.Code == PullBlocksMsg: - if !pm.isBlockProposer { + if !pm.receiveEnabled { break } var hashes coreCommon.Hashes @@ -909,7 +915,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { log.Debug("Push blocks", "blocks", blocks) return p.SendCoreBlocks(blocks) case msg.Code == PullVotesMsg: - if !pm.isBlockProposer { + if !pm.receiveEnabled { break } next, ok := pm.nextPullVote.Load(p.ID()) @@ -928,7 +934,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { log.Debug("Push votes", "votes", votes) return p.SendVotes(votes) case msg.Code == PullRandomnessMsg: - if !pm.isBlockProposer { + if !pm.receiveEnabled { break } var hashes coreCommon.Hashes |