diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-06-27 01:42:27 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-07-01 00:00:00 +0800 |
commit | 2c8ed76e01161d9fe4e69064404cd888b4e327f3 (patch) | |
tree | 833493bed3bb120bfceda78c134005d3c46e97b0 /eth/handler.go | |
parent | 393d675690923207746ac800568faacae723f251 (diff) | |
download | go-tangerine-2c8ed76e01161d9fe4e69064404cd888b4e327f3.tar go-tangerine-2c8ed76e01161d9fe4e69064404cd888b4e327f3.tar.gz go-tangerine-2c8ed76e01161d9fe4e69064404cd888b4e327f3.tar.bz2 go-tangerine-2c8ed76e01161d9fe4e69064404cd888b4e327f3.tar.lz go-tangerine-2c8ed76e01161d9fe4e69064404cd888b4e327f3.tar.xz go-tangerine-2c8ed76e01161d9fe4e69064404cd888b4e327f3.tar.zst go-tangerine-2c8ed76e01161d9fe4e69064404cd888b4e327f3.zip |
eth: start cleaning up old protocol implementation, add metrics
Diffstat (limited to 'eth/handler.go')
-rw-r--r-- | eth/handler.go | 54 |
1 files changed, 37 insertions, 17 deletions
diff --git a/eth/handler.go b/eth/handler.go index 44d297461..d25118337 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -163,26 +163,28 @@ func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter return newPeer(pv, nv, genesis, current, td, p, rw) } +// handle is the callback invoked to manage the life cycle of an eth peer. When +// this function terminates, the peer is disconnected. func (pm *ProtocolManager) handle(p *peer) error { - // Execute the Ethereum handshake. + glog.V(logger.Debug).Infof("%v: peer connected", p) + + // Execute the Ethereum handshake if err := p.handleStatus(); err != nil { + glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err) return err } - - // Register the peer locally. - glog.V(logger.Detail).Infoln("Adding peer", p.id) + // Register the peer locally + glog.V(logger.Detail).Infof("%v: adding peer", p) if err := pm.peers.Register(p); err != nil { - glog.V(logger.Error).Infoln("Addition failed:", err) + glog.V(logger.Error).Infof("%v: addition failed: %v", p, err) return err } defer pm.removePeer(p.id) - // Register the peer in the downloader. If the downloader - // considers it banned, we disconnect. + // Register the peer in the downloader. If the downloader considers it banned, we disconnect if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil { return err } - // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. pm.syncTransactions(p) @@ -190,13 +192,17 @@ func (pm *ProtocolManager) handle(p *peer) error { // main loop. handle incoming messages. for { if err := pm.handleMsg(p); err != nil { + glog.V(logger.Debug).Infof("%v: message handling failed: %v", p, err) return err } } return nil } +// handleMsg is invoked whenever an inbound message is received from a remote +// peer. The remote connection is torn down upon returning any error. func (pm *ProtocolManager) handleMsg(p *peer) error { + // Read the next message from the remote peer, and ensure it's fully consumed msg, err := p.rw.ReadMsg() if err != nil { return err @@ -204,23 +210,25 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if msg.Size > ProtocolMaxMsgSize { return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) } - // make sure that the payload has been fully consumed defer msg.Discard() + // Handle the message depending on its contents switch msg.Code { case StatusMsg: return errResp(ErrExtraStatusMsg, "uncontrolled status message") case TxMsg: - // TODO: rework using lazy RLP stream + // Transactions arrived, parse all of them and deliver to the pool var txs []*types.Transaction if err := msg.Decode(&txs); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } + propTxnInPacketsMeter.Mark(1) for i, tx := range txs { if tx == nil { return errResp(ErrDecode, "transaction %d is nil", i) } + propTxnInTrafficMeter.Mark(tx.Size().Int64()) jsonlogger.LogJson(&logger.EthTxReceived{ TxHash: tx.Hash().Hex(), RemoteId: p.ID().String(), @@ -250,12 +258,17 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return p.sendBlockHashes(hashes) case BlockHashesMsg: + // A batch of hashes arrived to one of our previous requests msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + reqHashInPacketsMeter.Mark(1) var hashes []common.Hash if err := msgStream.Decode(&hashes); err != nil { break } + reqHashInTrafficMeter.Mark(int64(32 * len(hashes))) + + // Deliver them all to the downloader for queuing err := pm.downloader.DeliverHashes(p.id, hashes) if err != nil { glog.V(logger.Debug).Infoln(err) @@ -299,13 +312,14 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } list = list[:len(list)-2] + "]" - glog.Infof("Peer %s: no blocks found for requested hashes %s", p.id, list) + glog.Infof("%v: no blocks found for requested hashes %s", p, list) } return p.sendBlocks(blocks) case BlocksMsg: // Decode the arrived block message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + reqBlockInPacketsMeter.Mark(1) var blocks []*types.Block if err := msgStream.Decode(&blocks); err != nil { @@ -313,8 +327,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { blocks = nil } // Update the receive timestamp of each block - for i := 0; i < len(blocks); i++ { - blocks[i].ReceivedAt = msg.ReceivedAt + for _, block := range blocks { + reqBlockInTrafficMeter.Mark(block.Size().Int64()) + block.ReceivedAt = msg.ReceivedAt } // Filter out any explicitly requested blocks, deliver the rest to the downloader if blocks := pm.fetcher.Filter(blocks); len(blocks) > 0 { @@ -329,6 +344,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msgStream.Decode(&hashes); err != nil { break } + propHashInPacketsMeter.Mark(1) + propHashInTrafficMeter.Mark(int64(32 * len(hashes))) + // Mark the hashes as present at the remote node for _, hash := range hashes { p.blockHashes.Add(hash) @@ -351,6 +369,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&request); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } + propBlockInPacketsMeter.Mark(1) + propBlockInTrafficMeter.Mark(request.Block.Size().Int64()) + if err := request.Block.ValidateFields(); err != nil { return errResp(ErrDecode, "block validation %v: %v", msg, err) } @@ -404,15 +425,14 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { } } -// BroadcastTx will propagate the block to its connected peers. It will sort -// out which peers do not contain the block in their block set and will do a -// sqrt(peers) to determine the amount of peers we broadcast to. +// BroadcastTx will propagate a transaction to all peers which are not known to +// already have the given transaction. func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) { // Broadcast transaction to a batch of peers not knowing about it peers := pm.peers.PeersWithoutTx(hash) //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { - peer.sendTransaction(tx) + peer.sendTransactions(types.Transactions{tx}) } glog.V(logger.Detail).Infoln("broadcast tx to", len(peers), "peers") } |