aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--dex/blockproposer.go13
-rw-r--r--dex/handler.go40
2 files changed, 31 insertions, 22 deletions
diff --git a/dex/blockproposer.go b/dex/blockproposer.go
index bcc150747..5af08f6c4 100644
--- a/dex/blockproposer.go
+++ b/dex/blockproposer.go
@@ -87,7 +87,7 @@ func (b *blockProposer) Stop() {
defer b.mu.Unlock()
if atomic.LoadInt32(&b.running) == 1 {
- b.dex.protocolManager.isBlockProposer = false
+ b.dex.protocolManager.receiveEnabled = false
close(b.stopCh)
b.wg.Wait()
atomic.StoreInt32(&b.proposing, 0)
@@ -127,7 +127,6 @@ func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) {
// Feed the current block we have in local blockchain.
cb := b.dex.blockchain.CurrentBlock()
-
if cb.NumberU64() > 0 {
var block coreTypes.Block
if err := rlp.DecodeBytes(cb.Header().DexconMeta, &block); err != nil {
@@ -153,6 +152,9 @@ func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) {
// Sync all blocks in compaction chain to core.
_, coreHeight := db.GetCompactionChainTipInfo()
+ // Stop receiving block proposer message when syncing.
+ b.dex.protocolManager.receiveEnabled = false
+
Loop:
for {
currentBlock := b.dex.blockchain.CurrentBlock()
@@ -182,9 +184,6 @@ Loop:
}
}
- // Enable isBlockProposer flag to start receiving msg.
- b.dex.protocolManager.isBlockProposer = true
-
ch := make(chan core.ChainHeadEvent)
sub := b.dex.blockchain.SubscribeChainHeadEvent(ch)
defer sub.Unsubscribe()
@@ -208,6 +207,10 @@ ListenLoop:
log.Error("SyncBlocks fail", "err", err)
return nil, err
}
+ if !b.dex.protocolManager.receiveEnabled {
+ // Start receiving block proposer message.
+ b.dex.protocolManager.receiveEnabled = true
+ }
if synced {
log.Debug("Consensus core synced")
break ListenLoop
diff --git a/dex/handler.go b/dex/handler.go
index 6cbd62a8f..57315f90a 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -134,7 +134,8 @@ type ProtocolManager struct {
chainHeadSub event.Subscription
// channels for dexon consensus core
- receiveCh chan interface{}
+ receiveCh chan interface{}
+ receiveEnabled bool
srvr p2pServer
@@ -178,6 +179,7 @@ func NewProtocolManager(
recordsyncCh: make(chan *recordsync),
quitSync: make(chan struct{}),
receiveCh: make(chan interface{}, 1024),
+ receiveEnabled: isBlockProposer,
isBlockProposer: isBlockProposer,
app: app,
blockNumberGauge: metrics.GetOrRegisterGauge("dex/blocknumber", nil),
@@ -400,12 +402,17 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
defer close(ch)
go func() {
- n := 0
+ maxDelay := time.Minute
+ delay := 1 * time.Second
+ start := time.Now()
for {
select {
- case <-time.After(time.Second):
- p.Log().Debug("no msg more than 1s", "n", n)
- n++
+ case <-time.After(delay):
+ delay *= 2
+ if delay > maxDelay {
+ delay = maxDelay
+ }
+ p.Log().Debug("no msg for a while", "t", time.Since(start))
case <-ch:
return
}
@@ -424,12 +431,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
defer msg.Discard()
go func() {
- n := 0
+ start := time.Now()
for {
select {
case <-time.After(100 * time.Millisecond):
- p.Log().Debug("handle msg more than 100ms", "n", n, "code", msg.Code)
- n++
+ p.Log().Debug("handle msg too long", "code", msg.Code, "t", time.Since(start))
case <-ch:
return
}
@@ -827,7 +833,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Block proposer-only messages.
case msg.Code == CoreBlockMsg:
- if !pm.isBlockProposer {
+ if !pm.receiveEnabled {
break
}
var blocks []*coreTypes.Block
@@ -839,7 +845,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.receiveCh <- block
}
case msg.Code == VoteMsg:
- if !pm.isBlockProposer {
+ if !pm.receiveEnabled {
break
}
var votes []*coreTypes.Vote
@@ -853,7 +859,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.receiveCh <- vote
}
case msg.Code == AgreementMsg:
- if !pm.isBlockProposer {
+ if !pm.receiveEnabled {
break
}
// DKG set is receiver
@@ -864,7 +870,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.MarkAgreement(rlpHash(agreement))
pm.receiveCh <- &agreement
case msg.Code == RandomnessMsg:
- if !pm.isBlockProposer {
+ if !pm.receiveEnabled {
break
}
// Broadcast this to all peer
@@ -877,7 +883,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.receiveCh <- randomness
}
case msg.Code == DKGPrivateShareMsg:
- if !pm.isBlockProposer {
+ if !pm.receiveEnabled {
break
}
// Do not relay this msg
@@ -888,7 +894,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.MarkDKGPrivateShares(rlpHash(ps))
pm.receiveCh <- &ps
case msg.Code == DKGPartialSignatureMsg:
- if !pm.isBlockProposer {
+ if !pm.receiveEnabled {
break
}
// broadcast in DKG set
@@ -898,7 +904,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &psig
case msg.Code == PullBlocksMsg:
- if !pm.isBlockProposer {
+ if !pm.receiveEnabled {
break
}
var hashes coreCommon.Hashes
@@ -909,7 +915,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
log.Debug("Push blocks", "blocks", blocks)
return p.SendCoreBlocks(blocks)
case msg.Code == PullVotesMsg:
- if !pm.isBlockProposer {
+ if !pm.receiveEnabled {
break
}
next, ok := pm.nextPullVote.Load(p.ID())
@@ -928,7 +934,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
log.Debug("Push votes", "votes", votes)
return p.SendVotes(votes)
case msg.Code == PullRandomnessMsg:
- if !pm.isBlockProposer {
+ if !pm.receiveEnabled {
break
}
var hashes coreCommon.Hashes