From 74c022a7679cb03a5ad027dc659e723638ff6a78 Mon Sep 17 00:00:00 2001
From: Sonic <sonic@dexon.org>
Date: Mon, 18 Mar 2019 08:43:39 +0800
Subject: p2p, dex: add debug log (#269)

---
 dex/blockproposer.go |  4 ++++
 dex/handler.go       | 34 ++++++++++++++++++++++++++++++++++
 dex/peer.go          | 43 +++++++++++++++++++++++++------------------
 3 files changed, 63 insertions(+), 18 deletions(-)

(limited to 'dex')

diff --git a/dex/blockproposer.go b/dex/blockproposer.go
index 58e6eafee..b51c9d10b 100644
--- a/dex/blockproposer.go
+++ b/dex/blockproposer.go
@@ -156,6 +156,7 @@ Loop:
 		blocks := blocksToSync(coreHeight, currentBlock.NumberU64())
 
 		if len(blocks) == 0 {
+			log.Debug("No new block to sync", "current", currentBlock.NumberU64())
 			break Loop
 		}
 		b.watchCat.Feed(blocks[len(blocks)-1].Position)
@@ -164,6 +165,7 @@ Loop:
 			"first", blocks[0].Finalization.Height,
 			"last", blocks[len(blocks)-1].Finalization.Height)
 		if _, err := consensusSync.SyncBlocks(blocks, false); err != nil {
+			log.Debug("SyncBlocks fail", "err", err)
 			return nil, err
 		}
 		coreHeight = blocks[len(blocks)-1].Finalization.Height
@@ -182,6 +184,8 @@ Loop:
 	sub := b.dex.blockchain.SubscribeChainHeadEvent(ch)
 	defer sub.Unsubscribe()
 
+	log.Debug("Listen chain head event until synced")
+
 	// Listen chain head event until synced.
 ListenLoop:
 	for {
diff --git a/dex/handler.go b/dex/handler.go
index e887d54d9..6cbd62a8f 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -259,13 +259,17 @@ func (pm *ProtocolManager) removePeer(id string) {
 
 	// Unregister the peer from the downloader and Ethereum peer set
 	pm.downloader.UnregisterPeer(id)
+	log.Debug("after downloader unregister peer", "id", id)
 	if err := pm.peers.Unregister(id); err != nil {
 		log.Error("Peer removal failed", "peer", id, "err", err)
 	}
+	log.Debug("after unregister peer", "id", id)
 	// Hard disconnect at the networking layer
 	if peer != nil {
+		log.Debug("removePeer: peer disconnect")
 		peer.Peer.Disconnect(p2p.DiscUselessPeer)
 	}
+	log.Debug("peer removed", "id", id)
 }
 
 func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
@@ -392,16 +396,46 @@ func (pm *ProtocolManager) handle(p *peer) error {
 // handleMsg is invoked whenever an inbound message is received from a remote
 // peer. The remote connection is torn down upon returning any error.
 func (pm *ProtocolManager) handleMsg(p *peer) error {
+	ch := make(chan struct{})
+	defer close(ch)
+
+	go func() {
+		n := 0
+		for {
+			select {
+			case <-time.After(time.Second):
+				p.Log().Debug("no msg more than 1s", "n", n)
+				n++
+			case <-ch:
+				return
+			}
+		}
+	}()
+
 	// Read the next message from the remote peer, and ensure it's fully consumed
 	msg, err := p.rw.ReadMsg()
 	if err != nil {
 		return err
 	}
+	ch <- struct{}{}
 	if msg.Size > ProtocolMaxMsgSize {
 		return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
 	}
 	defer msg.Discard()
 
+	go func() {
+		n := 0
+		for {
+			select {
+			case <-time.After(100 * time.Millisecond):
+				p.Log().Debug("handle msg more than 100ms", "n", n, "code", msg.Code)
+				n++
+			case <-ch:
+				return
+			}
+		}
+	}()
+
 	// Handle the message depending on its contents
 	switch {
 	case msg.Code == StatusMsg:
diff --git a/dex/peer.go b/dex/peer.go
index e015ed9e5..f92fce130 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -380,13 +380,20 @@ func (p *peer) MarkDKGPrivateShares(hash common.Hash) {
 	p.knownDKGPrivateShares.Add(hash)
 }
 
+func (p *peer) logSend(err error, code uint64) error {
+	if err != nil {
+		p.Log().Error("Failed to send peer message", "code", code, "err", err)
+	}
+	return err
+}
+
 // SendTransactions sends transactions to the peer and includes the hashes
 // in its transaction hash set for future reference.
 func (p *peer) SendTransactions(txs types.Transactions) error {
 	for _, tx := range txs {
 		p.knownTxs.Add(tx.Hash())
 	}
-	return p2p.Send(p.rw, TxMsg, txs)
+	return p.logSend(p2p.Send(p.rw, TxMsg, txs), TxMsg)
 }
 
 // AsyncSendTransactions queues list of transactions propagation to a remote
@@ -408,7 +415,7 @@ func (p *peer) SendNodeRecords(records []*enr.Record) error {
 	for _, record := range records {
 		p.knownRecords.Add(rlpHash(record))
 	}
-	return p2p.Send(p.rw, RecordMsg, records)
+	return p.logSend(p2p.Send(p.rw, RecordMsg, records), RecordMsg)
 }
 
 // AsyncSendNodeRecord queues list of notary node records propagation to a
@@ -436,7 +443,7 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error
 		request[i].Hash = hashes[i]
 		request[i].Number = numbers[i]
 	}
-	return p2p.Send(p.rw, NewBlockHashesMsg, request)
+	return p.logSend(p2p.Send(p.rw, NewBlockHashesMsg, request), NewBlockHashesMsg)
 }
 
 // AsyncSendNewBlockHash queues the availability of a block for propagation to a
@@ -454,7 +461,7 @@ func (p *peer) AsyncSendNewBlockHash(block *types.Block) {
 // SendNewBlock propagates an entire block to a remote peer.
 func (p *peer) SendNewBlock(block *types.Block) error {
 	p.knownBlocks.Add(block.Hash())
-	return p2p.Send(p.rw, NewBlockMsg, block)
+	return p.logSend(p2p.Send(p.rw, NewBlockMsg, block), NewBlockMsg)
 }
 
 // AsyncSendNewBlock queues an entire block for propagation to a remote peer. If
@@ -469,7 +476,7 @@ func (p *peer) AsyncSendNewBlock(block *types.Block) {
 }
 
 func (p *peer) SendCoreBlocks(blocks []*coreTypes.Block) error {
-	return p2p.Send(p.rw, CoreBlockMsg, blocks)
+	return p.logSend(p2p.Send(p.rw, CoreBlockMsg, blocks), CoreBlockMsg)
 }
 
 func (p *peer) AsyncSendCoreBlocks(blocks []*coreTypes.Block) {
@@ -481,7 +488,7 @@ func (p *peer) AsyncSendCoreBlocks(blocks []*coreTypes.Block) {
 }
 
 func (p *peer) SendVotes(votes []*coreTypes.Vote) error {
-	return p2p.Send(p.rw, VoteMsg, votes)
+	return p.logSend(p2p.Send(p.rw, VoteMsg, votes), VoteMsg)
 }
 
 func (p *peer) AsyncSendVotes(votes []*coreTypes.Vote) {
@@ -494,7 +501,7 @@ func (p *peer) AsyncSendVotes(votes []*coreTypes.Vote) {
 
 func (p *peer) SendAgreement(agreement *coreTypes.AgreementResult) error {
 	p.knownAgreements.Add(rlpHash(agreement))
-	return p2p.Send(p.rw, AgreementMsg, agreement)
+	return p.logSend(p2p.Send(p.rw, AgreementMsg, agreement), AgreementMsg)
 }
 
 func (p *peer) AsyncSendAgreement(agreement *coreTypes.AgreementResult) {
@@ -510,7 +517,7 @@ func (p *peer) SendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult)
 	for _, randomness := range randomnesses {
 		p.knownRandomnesses.Add(rlpHash(randomness))
 	}
-	return p2p.Send(p.rw, RandomnessMsg, randomnesses)
+	return p.logSend(p2p.Send(p.rw, RandomnessMsg, randomnesses), RandomnessMsg)
 }
 
 func (p *peer) AsyncSendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult) {
@@ -526,7 +533,7 @@ func (p *peer) AsyncSendRandomnesses(randomnesses []*coreTypes.BlockRandomnessRe
 
 func (p *peer) SendDKGPrivateShare(privateShare *dkgTypes.PrivateShare) error {
 	p.knownDKGPrivateShares.Add(rlpHash(privateShare))
-	return p2p.Send(p.rw, DKGPrivateShareMsg, privateShare)
+	return p.logSend(p2p.Send(p.rw, DKGPrivateShareMsg, privateShare), DKGPrivateShareMsg)
 }
 
 func (p *peer) AsyncSendDKGPrivateShare(privateShare *dkgTypes.PrivateShare) {
@@ -539,7 +546,7 @@ func (p *peer) AsyncSendDKGPrivateShare(privateShare *dkgTypes.PrivateShare) {
 }
 
 func (p *peer) SendDKGPartialSignature(psig *dkgTypes.PartialSignature) error {
-	return p2p.Send(p.rw, DKGPartialSignatureMsg, psig)
+	return p.logSend(p2p.Send(p.rw, DKGPartialSignatureMsg, psig), DKGPartialSignatureMsg)
 }
 
 func (p *peer) AsyncSendDKGPartialSignature(psig *dkgTypes.PartialSignature) {
@@ -551,7 +558,7 @@ func (p *peer) AsyncSendDKGPartialSignature(psig *dkgTypes.PartialSignature) {
 }
 
 func (p *peer) SendPullBlocks(hashes coreCommon.Hashes) error {
-	return p2p.Send(p.rw, PullBlocksMsg, hashes)
+	return p.logSend(p2p.Send(p.rw, PullBlocksMsg, hashes), PullBlocksMsg)
 }
 
 func (p *peer) AsyncSendPullBlocks(hashes coreCommon.Hashes) {
@@ -563,7 +570,7 @@ func (p *peer) AsyncSendPullBlocks(hashes coreCommon.Hashes) {
 }
 
 func (p *peer) SendPullVotes(pos coreTypes.Position) error {
-	return p2p.Send(p.rw, PullVotesMsg, pos)
+	return p.logSend(p2p.Send(p.rw, PullVotesMsg, pos), PullVotesMsg)
 }
 
 func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) {
@@ -575,7 +582,7 @@ func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) {
 }
 
 func (p *peer) SendPullRandomness(hashes coreCommon.Hashes) error {
-	return p2p.Send(p.rw, PullRandomnessMsg, hashes)
+	return p.logSend(p2p.Send(p.rw, PullRandomnessMsg, hashes), PullRandomnessMsg)
 }
 
 func (p *peer) AsyncSendPullRandomness(hashes coreCommon.Hashes) {
@@ -588,29 +595,29 @@ func (p *peer) AsyncSendPullRandomness(hashes coreCommon.Hashes) {
 
 // SendBlockHeaders sends a batch of block headers to the remote peer.
 func (p *peer) SendBlockHeaders(flag uint8, headers []*types.HeaderWithGovState) error {
-	return p2p.Send(p.rw, BlockHeadersMsg, headersData{Flag: flag, Headers: headers})
+	return p.logSend(p2p.Send(p.rw, BlockHeadersMsg, headersData{Flag: flag, Headers: headers}), BlockHeadersMsg)
 }
 
 // SendBlockBodiesRLP sends a batch of block contents to the remote peer from
 // an already RLP encoded format.
 func (p *peer) SendBlockBodiesRLP(flag uint8, bodies []rlp.RawValue) error {
-	return p2p.Send(p.rw, BlockBodiesMsg, blockBodiesDataRLP{Flag: flag, Bodies: bodies})
+	return p.logSend(p2p.Send(p.rw, BlockBodiesMsg, blockBodiesDataRLP{Flag: flag, Bodies: bodies}), BlockBodiesMsg)
 }
 
 // SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the
 // hashes requested.
 func (p *peer) SendNodeData(data [][]byte) error {
-	return p2p.Send(p.rw, NodeDataMsg, data)
+	return p.logSend(p2p.Send(p.rw, NodeDataMsg, data), NodeDataMsg)
 }
 
 // SendReceiptsRLP sends a batch of transaction receipts, corresponding to the
 // ones requested from an already RLP encoded format.
 func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error {
-	return p2p.Send(p.rw, ReceiptsMsg, receipts)
+	return p.logSend(p2p.Send(p.rw, ReceiptsMsg, receipts), ReceiptsMsg)
 }
 
 func (p *peer) SendGovState(govState *types.GovState) error {
-	return p2p.Send(p.rw, GovStateMsg, govState)
+	return p.logSend(p2p.Send(p.rw, GovStateMsg, govState), GovStateMsg)
 }
 
 // RequestOneHeader is a wrapper around the header query functions to fetch a
-- 
cgit v1.2.3