From c51e153b5c5327f971e7b410e49e7babfc3f0b8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 2 Jul 2015 14:13:46 +0300 Subject: eth, metrics, p2p: prepare metrics and net packets to eth/62 --- eth/downloader/downloader.go | 3 +- eth/handler.go | 15 +------- eth/metrics.go | 84 ++++++++++++++++++++++++++++++++++++++++++++ eth/peer.go | 50 +++++++++++++++----------- eth/protocol.go | 25 +++++++++---- 5 files changed, 136 insertions(+), 41 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index e3e22a784..23d2e045e 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -39,6 +39,7 @@ import ( const ( eth60 = 60 // Constant to check for old protocol support eth61 = 61 // Constant to check for new protocol support + eth62 = 62 // Constant to check for experimental protocol support ) var ( @@ -329,7 +330,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e if err = d.fetchBlocks60(); err != nil { return err } - case eth61: + case eth61, eth62: // New eth/61, use forward, concurrent hash and block retrieval algorithm number, err := d.findAncestor(p) if err != nil { diff --git a/eth/handler.go b/eth/handler.go index 5d233dd96..6c1895bbd 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -176,7 +176,7 @@ func (pm *ProtocolManager) Stop() { } func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { - return newPeer(pv, nv, p, rw) + return newPeer(pv, nv, p, newMeteredMsgWriter(rw)) } // handle is the callback invoked to manage the life cycle of an eth peer. When @@ -281,14 +281,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { case BlockHashesMsg: // A batch of hashes arrived to one of our previous requests msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) - reqHashInPacketsMeter.Mark(1) var hashes []common.Hash if err := msgStream.Decode(&hashes); err != nil { break } - reqHashInTrafficMeter.Mark(int64(32 * len(hashes))) - // Deliver them all to the downloader for queuing err := pm.downloader.DeliverHashes(p.id, hashes) if err != nil { @@ -340,7 +337,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { case BlocksMsg: // Decode the arrived block message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) - reqBlockInPacketsMeter.Mark(1) var blocks []*types.Block if err := msgStream.Decode(&blocks); err != nil { @@ -349,7 +345,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } // Update the receive timestamp of each block for _, block := range blocks { - reqBlockInTrafficMeter.Mark(block.Size().Int64()) block.ReceivedAt = msg.ReceivedAt } // Filter out any explicitly requested blocks, deliver the rest to the downloader @@ -365,9 +360,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msgStream.Decode(&hashes); err != nil { break } - propHashInPacketsMeter.Mark(1) - propHashInTrafficMeter.Mark(int64(32 * len(hashes))) - // Mark the hashes as present at the remote node for _, hash := range hashes { p.MarkBlock(hash) @@ -390,9 +382,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&request); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } - propBlockInPacketsMeter.Mark(1) - propBlockInTrafficMeter.Mark(request.Block.Size().Int64()) - if err := request.Block.ValidateFields(); err != nil { return errResp(ErrDecode, "block validation %v: %v", msg, err) } @@ -427,7 +416,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&txs); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - propTxnInPacketsMeter.Mark(1) for i, tx := range txs { // Validate and mark the remote transaction if tx == nil { @@ -436,7 +424,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.MarkTransaction(tx.Hash()) // Log it's arrival for later analysis - propTxnInTrafficMeter.Mark(tx.Size().Int64()) jsonlogger.LogJson(&logger.EthTxReceived{ TxHash: tx.Hash().Hex(), RemoteId: p.ID().String(), diff --git a/eth/metrics.go b/eth/metrics.go index 625b90b67..13745dc43 100644 --- a/eth/metrics.go +++ b/eth/metrics.go @@ -18,6 +18,7 @@ package eth import ( "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/p2p" ) var ( @@ -41,4 +42,87 @@ var ( reqBlockInTrafficMeter = metrics.NewMeter("eth/req/blocks/in/traffic") reqBlockOutPacketsMeter = metrics.NewMeter("eth/req/blocks/out/packets") reqBlockOutTrafficMeter = metrics.NewMeter("eth/req/blocks/out/traffic") + reqHeaderInPacketsMeter = metrics.NewMeter("eth/req/header/in/packets") + reqHeaderInTrafficMeter = metrics.NewMeter("eth/req/header/in/traffic") + reqHeaderOutPacketsMeter = metrics.NewMeter("eth/req/header/out/packets") + reqHeaderOutTrafficMeter = metrics.NewMeter("eth/req/header/out/traffic") + reqStateInPacketsMeter = metrics.NewMeter("eth/req/state/in/packets") + reqStateInTrafficMeter = metrics.NewMeter("eth/req/state/in/traffic") + reqStateOutPacketsMeter = metrics.NewMeter("eth/req/state/out/packets") + reqStateOutTrafficMeter = metrics.NewMeter("eth/req/state/out/traffic") + miscInPacketsMeter = metrics.NewMeter("eth/misc/in/packets") + miscInTrafficMeter = metrics.NewMeter("eth/misc/in/traffic") + miscOutPacketsMeter = metrics.NewMeter("eth/misc/out/packets") + miscOutTrafficMeter = metrics.NewMeter("eth/misc/out/traffic") ) + +// meteredMsgReadWriter is a wrapper around a p2p.MsgReadWriter, capable of +// accumulating the above defined metrics based on the data stream contents. +type meteredMsgReadWriter struct { + p2p.MsgReadWriter +} + +// newMeteredMsgWriter wraps a p2p MsgReadWriter with metering support. If the +// metrics system is disabled, this fucntion returns the original object. +func newMeteredMsgWriter(rw p2p.MsgReadWriter) p2p.MsgReadWriter { + if !metrics.Enabled { + return rw + } + return &meteredMsgReadWriter{rw} +} + +func (rw *meteredMsgReadWriter) ReadMsg() (p2p.Msg, error) { + // Read the message and short circuit in case of an error + msg, err := rw.MsgReadWriter.ReadMsg() + if err != nil { + return msg, err + } + // Account for the data traffic + packets, traffic := miscInPacketsMeter, miscInTrafficMeter + switch msg.Code { + case BlockHashesMsg: + packets, traffic = reqHashInPacketsMeter, reqHashInTrafficMeter + case BlocksMsg: + packets, traffic = reqBlockInPacketsMeter, reqBlockInTrafficMeter + case BlockHeadersMsg: + packets, traffic = reqHeaderInPacketsMeter, reqHeaderInTrafficMeter + case NodeDataMsg: + packets, traffic = reqStateInPacketsMeter, reqStateInTrafficMeter + case NewBlockHashesMsg: + packets, traffic = propHashInPacketsMeter, propHashInTrafficMeter + case NewBlockMsg: + packets, traffic = propBlockInPacketsMeter, propBlockInTrafficMeter + case TxMsg: + packets, traffic = propTxnInPacketsMeter, propTxnInTrafficMeter + } + packets.Mark(1) + traffic.Mark(int64(msg.Size)) + + return msg, err +} + +func (rw *meteredMsgReadWriter) WriteMsg(msg p2p.Msg) error { + // Account for the data traffic + packets, traffic := miscOutPacketsMeter, miscOutTrafficMeter + switch msg.Code { + case BlockHashesMsg: + packets, traffic = reqHashOutPacketsMeter, reqHashOutTrafficMeter + case BlocksMsg: + packets, traffic = reqBlockOutPacketsMeter, reqBlockOutTrafficMeter + case BlockHeadersMsg: + packets, traffic = reqHeaderOutPacketsMeter, reqHeaderOutTrafficMeter + case NodeDataMsg: + packets, traffic = reqStateOutPacketsMeter, reqStateOutTrafficMeter + case NewBlockHashesMsg: + packets, traffic = propHashOutPacketsMeter, propHashOutTrafficMeter + case NewBlockMsg: + packets, traffic = propBlockOutPacketsMeter, propBlockOutTrafficMeter + case TxMsg: + packets, traffic = propTxnOutPacketsMeter, propTxnOutTrafficMeter + } + packets.Mark(1) + traffic.Mark(int64(msg.Size)) + + // Send the packet to the p2p layer + return rw.MsgReadWriter.WriteMsg(msg) +} diff --git a/eth/peer.go b/eth/peer.go index ade1f37ea..c17cdfca7 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -129,9 +129,7 @@ func (p *peer) MarkTransaction(hash common.Hash) { // SendTransactions sends transactions to the peer and includes the hashes // in its transaction hash set for future reference. func (p *peer) SendTransactions(txs types.Transactions) error { - propTxnOutPacketsMeter.Mark(1) for _, tx := range txs { - propTxnOutTrafficMeter.Mark(tx.Size().Int64()) p.knownTxs.Add(tx.Hash()) } return p2p.Send(p.rw, TxMsg, txs) @@ -139,27 +137,17 @@ func (p *peer) SendTransactions(txs types.Transactions) error { // SendBlockHashes sends a batch of known hashes to the remote peer. func (p *peer) SendBlockHashes(hashes []common.Hash) error { - reqHashOutPacketsMeter.Mark(1) - reqHashOutTrafficMeter.Mark(int64(32 * len(hashes))) - return p2p.Send(p.rw, BlockHashesMsg, hashes) } // SendBlocks sends a batch of blocks to the remote peer. func (p *peer) SendBlocks(blocks []*types.Block) error { - reqBlockOutPacketsMeter.Mark(1) - for _, block := range blocks { - reqBlockOutTrafficMeter.Mark(block.Size().Int64()) - } return p2p.Send(p.rw, BlocksMsg, blocks) } // SendNewBlockHashes announces the availability of a number of blocks through // a hash notification. func (p *peer) SendNewBlockHashes(hashes []common.Hash) error { - propHashOutPacketsMeter.Mark(1) - propHashOutTrafficMeter.Mark(int64(32 * len(hashes))) - for _, hash := range hashes { p.knownBlocks.Add(hash) } @@ -168,33 +156,55 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash) error { // SendNewBlock propagates an entire block to a remote peer. func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { - propBlockOutPacketsMeter.Mark(1) - propBlockOutTrafficMeter.Mark(block.Size().Int64()) - p.knownBlocks.Add(block.Hash()) return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) } +// SendBlockHeaders sends a batch of block headers to the remote peer. +func (p *peer) SendBlockHeaders(headers []*types.Header) error { + return p2p.Send(p.rw, BlockHeadersMsg, headers) +} + +// SendNodeData sends a batch of arbitrary internal data, corresponding to the +// hashes requested. +func (p *peer) SendNodeData(data [][]byte) error { + return p2p.Send(p.rw, NodeDataMsg, data) +} + // RequestHashes fetches a batch of hashes from a peer, starting at from, going // towards the genesis block. func (p *peer) RequestHashes(from common.Hash) error { - glog.V(logger.Debug).Infof("Peer [%s] fetching hashes (%d) from %x...\n", p.id, downloader.MaxHashFetch, from[:4]) + glog.V(logger.Debug).Infof("%v fetching hashes (%d) from %x...\n", p, downloader.MaxHashFetch, from[:4]) return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesData{from, uint64(downloader.MaxHashFetch)}) } -// RequestHashesFromNumber fetches a batch of hashes from a peer, starting at the -// requested block number, going upwards towards the genesis block. +// RequestHashesFromNumber fetches a batch of hashes from a peer, starting at +// the requested block number, going upwards towards the genesis block. func (p *peer) RequestHashesFromNumber(from uint64, count int) error { - glog.V(logger.Debug).Infof("Peer [%s] fetching hashes (%d) from #%d...\n", p.id, count, from) + glog.V(logger.Debug).Infof("%v fetching hashes (%d) from #%d...\n", p, count, from) return p2p.Send(p.rw, GetBlockHashesFromNumberMsg, getBlockHashesFromNumberData{from, uint64(count)}) } // RequestBlocks fetches a batch of blocks corresponding to the specified hashes. func (p *peer) RequestBlocks(hashes []common.Hash) error { - glog.V(logger.Debug).Infof("[%s] fetching %v blocks\n", p.id, len(hashes)) + glog.V(logger.Debug).Infof("%v fetching %v blocks\n", p, len(hashes)) return p2p.Send(p.rw, GetBlocksMsg, hashes) } +// RequestHeaders fetches a batch of blocks' headers corresponding to the +// specified hashes. +func (p *peer) RequestHeaders(hashes []common.Hash) error { + glog.V(logger.Debug).Infof("%v fetching %v headers\n", p, len(hashes)) + return p2p.Send(p.rw, GetBlockHeadersMsg, hashes) +} + +// RequestNodeData fetches a batch of arbitrary data from a node's known state +// data, corresponding to the specified hashes. +func (p *peer) RequestNodeData(hashes []common.Hash) error { + glog.V(logger.Debug).Infof("%v fetching %v state data\n", p, len(hashes)) + return p2p.Send(p.rw, GetNodeDataMsg, hashes) +} + // Handshake executes the eth protocol handshake, negotiating version number, // network IDs, difficulties, head and genesis blocks. func (p *peer) Handshake(td *big.Int, head common.Hash, genesis common.Hash) error { diff --git a/eth/protocol.go b/eth/protocol.go index b8c9b50d0..fcc5f21e2 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -24,10 +24,10 @@ import ( ) // Supported versions of the eth protocol (first is primary). -var ProtocolVersions = []uint{61, 60} +var ProtocolVersions = []uint{62, 61, 60} // Number of implemented message corresponding to different protocol versions. -var ProtocolLengths = []uint64{9, 8} +var ProtocolLengths = []uint64{13, 9, 8} const ( NetworkId = 1 @@ -36,6 +36,7 @@ const ( // eth protocol message codes const ( + // Protocol messages belonging to eth/60 StatusMsg = iota NewBlockHashesMsg TxMsg @@ -44,7 +45,15 @@ const ( GetBlocksMsg BlocksMsg NewBlockMsg + + // Protocol messages belonging to eth/61 GetBlockHashesFromNumberMsg + + // Protocol messages belonging to eth/62 + GetBlockHeadersMsg + BlockHeadersMsg + GetNodeDataMsg + NodeDataMsg ) type errCode int @@ -102,15 +111,14 @@ type statusData struct { GenesisBlock common.Hash } -// getBlockHashesData is the network packet for the hash based block retrieval -// message. +// getBlockHashesData is the network packet for the hash based hash retrieval. type getBlockHashesData struct { Hash common.Hash Amount uint64 } -// getBlockHashesFromNumberData is the network packet for the number based block -// retrieval message. +// getBlockHashesFromNumberData is the network packet for the number based hash +// retrieval. type getBlockHashesFromNumberData struct { Number uint64 Amount uint64 @@ -121,3 +129,8 @@ type newBlockData struct { Block *types.Block TD *big.Int } + +// nodeDataData is the network response packet for a node data retrieval. +type nodeDataData []struct { + Value []byte +} -- cgit v1.2.3