From f817662c0e520e645f55518390836cb3594ffb38 Mon Sep 17 00:00:00 2001
From: Sonic <sonic@dexon.org>
Date: Tue, 19 Mar 2019 20:12:58 +0800
Subject: dex: fix start bp node with empty datadir (#278)

also modify some debug log
---
 dex/blockproposer.go | 13 ++++++++-----
 dex/handler.go       | 40 +++++++++++++++++++++++-----------------
 2 files changed, 31 insertions(+), 22 deletions(-)

diff --git a/dex/blockproposer.go b/dex/blockproposer.go
index bcc150747..5af08f6c4 100644
--- a/dex/blockproposer.go
+++ b/dex/blockproposer.go
@@ -87,7 +87,7 @@ func (b *blockProposer) Stop() {
 	defer b.mu.Unlock()
 
 	if atomic.LoadInt32(&b.running) == 1 {
-		b.dex.protocolManager.isBlockProposer = false
+		b.dex.protocolManager.receiveEnabled = false
 		close(b.stopCh)
 		b.wg.Wait()
 		atomic.StoreInt32(&b.proposing, 0)
@@ -127,7 +127,6 @@ func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) {
 
 	// Feed the current block we have in local blockchain.
 	cb := b.dex.blockchain.CurrentBlock()
-
 	if cb.NumberU64() > 0 {
 		var block coreTypes.Block
 		if err := rlp.DecodeBytes(cb.Header().DexconMeta, &block); err != nil {
@@ -153,6 +152,9 @@ func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) {
 	// Sync all blocks in compaction chain to core.
 	_, coreHeight := db.GetCompactionChainTipInfo()
 
+	// Stop receiving block proposer message when syncing.
+	b.dex.protocolManager.receiveEnabled = false
+
 Loop:
 	for {
 		currentBlock := b.dex.blockchain.CurrentBlock()
@@ -182,9 +184,6 @@ Loop:
 		}
 	}
 
-	// Enable isBlockProposer flag to start receiving msg.
-	b.dex.protocolManager.isBlockProposer = true
-
 	ch := make(chan core.ChainHeadEvent)
 	sub := b.dex.blockchain.SubscribeChainHeadEvent(ch)
 	defer sub.Unsubscribe()
@@ -208,6 +207,10 @@ ListenLoop:
 					log.Error("SyncBlocks fail", "err", err)
 					return nil, err
 				}
+				if !b.dex.protocolManager.receiveEnabled {
+					// Start receiving block proposer message.
+					b.dex.protocolManager.receiveEnabled = true
+				}
 				if synced {
 					log.Debug("Consensus core synced")
 					break ListenLoop
diff --git a/dex/handler.go b/dex/handler.go
index 6cbd62a8f..57315f90a 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -134,7 +134,8 @@ type ProtocolManager struct {
 	chainHeadSub event.Subscription
 
 	// channels for dexon consensus core
-	receiveCh chan interface{}
+	receiveCh      chan interface{}
+	receiveEnabled bool
 
 	srvr p2pServer
 
@@ -178,6 +179,7 @@ func NewProtocolManager(
 		recordsyncCh:     make(chan *recordsync),
 		quitSync:         make(chan struct{}),
 		receiveCh:        make(chan interface{}, 1024),
+		receiveEnabled:   isBlockProposer,
 		isBlockProposer:  isBlockProposer,
 		app:              app,
 		blockNumberGauge: metrics.GetOrRegisterGauge("dex/blocknumber", nil),
@@ -400,12 +402,17 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 	defer close(ch)
 
 	go func() {
-		n := 0
+		maxDelay := time.Minute
+		delay := 1 * time.Second
+		start := time.Now()
 		for {
 			select {
-			case <-time.After(time.Second):
-				p.Log().Debug("no msg more than 1s", "n", n)
-				n++
+			case <-time.After(delay):
+				delay *= 2
+				if delay > maxDelay {
+					delay = maxDelay
+				}
+				p.Log().Debug("no msg for a while", "t", time.Since(start))
 			case <-ch:
 				return
 			}
@@ -424,12 +431,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 	defer msg.Discard()
 
 	go func() {
-		n := 0
+		start := time.Now()
 		for {
 			select {
 			case <-time.After(100 * time.Millisecond):
-				p.Log().Debug("handle msg more than 100ms", "n", n, "code", msg.Code)
-				n++
+				p.Log().Debug("handle msg too long", "code", msg.Code, "t", time.Since(start))
 			case <-ch:
 				return
 			}
@@ -827,7 +833,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 	// Block proposer-only messages.
 
 	case msg.Code == CoreBlockMsg:
-		if !pm.isBlockProposer {
+		if !pm.receiveEnabled {
 			break
 		}
 		var blocks []*coreTypes.Block
@@ -839,7 +845,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 			pm.receiveCh <- block
 		}
 	case msg.Code == VoteMsg:
-		if !pm.isBlockProposer {
+		if !pm.receiveEnabled {
 			break
 		}
 		var votes []*coreTypes.Vote
@@ -853,7 +859,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 			pm.receiveCh <- vote
 		}
 	case msg.Code == AgreementMsg:
-		if !pm.isBlockProposer {
+		if !pm.receiveEnabled {
 			break
 		}
 		// DKG set is receiver
@@ -864,7 +870,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		p.MarkAgreement(rlpHash(agreement))
 		pm.receiveCh <- &agreement
 	case msg.Code == RandomnessMsg:
-		if !pm.isBlockProposer {
+		if !pm.receiveEnabled {
 			break
 		}
 		// Broadcast this to all peer
@@ -877,7 +883,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 			pm.receiveCh <- randomness
 		}
 	case msg.Code == DKGPrivateShareMsg:
-		if !pm.isBlockProposer {
+		if !pm.receiveEnabled {
 			break
 		}
 		// Do not relay this msg
@@ -888,7 +894,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		p.MarkDKGPrivateShares(rlpHash(ps))
 		pm.receiveCh <- &ps
 	case msg.Code == DKGPartialSignatureMsg:
-		if !pm.isBlockProposer {
+		if !pm.receiveEnabled {
 			break
 		}
 		// broadcast in DKG set
@@ -898,7 +904,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		}
 		pm.receiveCh <- &psig
 	case msg.Code == PullBlocksMsg:
-		if !pm.isBlockProposer {
+		if !pm.receiveEnabled {
 			break
 		}
 		var hashes coreCommon.Hashes
@@ -909,7 +915,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		log.Debug("Push blocks", "blocks", blocks)
 		return p.SendCoreBlocks(blocks)
 	case msg.Code == PullVotesMsg:
-		if !pm.isBlockProposer {
+		if !pm.receiveEnabled {
 			break
 		}
 		next, ok := pm.nextPullVote.Load(p.ID())
@@ -928,7 +934,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		log.Debug("Push votes", "votes", votes)
 		return p.SendVotes(votes)
 	case msg.Code == PullRandomnessMsg:
-		if !pm.isBlockProposer {
+		if !pm.receiveEnabled {
 			break
 		}
 		var hashes coreCommon.Hashes
-- 
cgit v1.2.3