diff options
Diffstat (limited to 'eth/handler.go')
-rw-r--r-- | eth/handler.go | 188 |
1 files changed, 120 insertions, 68 deletions
diff --git a/eth/handler.go b/eth/handler.go index 278a2bec2..59bbb480b 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -49,7 +49,7 @@ type ProtocolManager struct { fetcher *fetcher.Fetcher peers *peerSet - SubProtocol p2p.Protocol + SubProtocols []p2p.Protocol eventMux *event.TypeMux txSub event.Subscription @@ -68,8 +68,8 @@ type ProtocolManager struct { // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, chainman *core.ChainManager) *ProtocolManager { - // Create the protocol manager and initialize peer handlers +func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, chainman *core.ChainManager) *ProtocolManager { + // Create the protocol manager with the base fields manager := &ProtocolManager{ eventMux: mux, txpool: txpool, @@ -79,18 +79,24 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } - manager.SubProtocol = p2p.Protocol{ - Name: "eth", - Version: uint(protocolVersion), - Length: ProtocolLength, - Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { - peer := manager.newPeer(protocolVersion, networkId, p, rw) - manager.newPeerCh <- peer - return manager.handle(peer) - }, + // Initiate a sub-protocol for every implemented version we can handle + manager.SubProtocols = make([]p2p.Protocol, len(ProtocolVersions)) + for i := 0; i < len(manager.SubProtocols); i++ { + version := ProtocolVersions[i] + + manager.SubProtocols[i] = p2p.Protocol{ + Name: "eth", + Version: version, + Length: ProtocolLengths[i], + Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := manager.newPeer(int(version), networkId, p, rw) + manager.newPeerCh <- peer + return manager.handle(peer) + }, + } } // Construct the different synchronisation mechanisms - manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer) + manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.CurrentBlock, manager.chainman.InsertChain, manager.removePeer) validator := func(block *types.Block, parent *types.Block) error { return core.ValidateHeader(pow, block.Header(), parent, true) @@ -152,31 +158,32 @@ func (pm *ProtocolManager) Stop() { } func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { - td, current, genesis := pm.chainman.Status() - - return newPeer(pv, nv, genesis, current, td, p, rw) + return newPeer(pv, nv, p, rw) } +// handle is the callback invoked to manage the life cycle of an eth peer. When +// this function terminates, the peer is disconnected. func (pm *ProtocolManager) handle(p *peer) error { - // Execute the Ethereum handshake. - if err := p.handleStatus(); err != nil { + glog.V(logger.Debug).Infof("%v: peer connected [%s]", p, p.Name()) + + // Execute the Ethereum handshake + td, head, genesis := pm.chainman.Status() + if err := p.Handshake(td, head, genesis); err != nil { + glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err) return err } - - // Register the peer locally. - glog.V(logger.Detail).Infoln("Adding peer", p.id) + // Register the peer locally + glog.V(logger.Detail).Infof("%v: adding peer", p) if err := pm.peers.Register(p); err != nil { - glog.V(logger.Error).Infoln("Addition failed:", err) + glog.V(logger.Error).Infof("%v: addition failed: %v", p, err) return err } defer pm.removePeer(p.id) - // Register the peer in the downloader. If the downloader - // considers it banned, we disconnect. - if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil { + // Register the peer in the downloader. If the downloader considers it banned, we disconnect + if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks); err != nil { return err } - // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. pm.syncTransactions(p) @@ -184,13 +191,17 @@ func (pm *ProtocolManager) handle(p *peer) error { // main loop. handle incoming messages. for { if err := pm.handleMsg(p); err != nil { + glog.V(logger.Debug).Infof("%v: message handling failed: %v", p, err) return err } } return nil } +// handleMsg is invoked whenever an inbound message is received from a remote +// peer. The remote connection is torn down upon returning any error. func (pm *ProtocolManager) handleMsg(p *peer) error { + // Read the next message from the remote peer, and ensure it's fully consumed msg, err := p.rw.ReadMsg() if err != nil { return err @@ -198,58 +209,69 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if msg.Size > ProtocolMaxMsgSize { return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) } - // make sure that the payload has been fully consumed defer msg.Discard() + // Handle the message depending on its contents switch msg.Code { case StatusMsg: + // Status messages should never arrive after the handshake return errResp(ErrExtraStatusMsg, "uncontrolled status message") - case TxMsg: - // TODO: rework using lazy RLP stream - var txs []*types.Transaction - if err := msg.Decode(&txs); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - for i, tx := range txs { - if tx == nil { - return errResp(ErrDecode, "transaction %d is nil", i) - } - jsonlogger.LogJson(&logger.EthTxReceived{ - TxHash: tx.Hash().Hex(), - RemoteId: p.ID().String(), - }) - } - pm.txpool.AddTransactions(txs) - case GetBlockHashesMsg: - var request getBlockHashesMsgData + // Retrieve the number of hashes to return and from which origin hash + var request getBlockHashesData if err := msg.Decode(&request); err != nil { - return errResp(ErrDecode, "->msg %v: %v", msg, err) + return errResp(ErrDecode, "%v: %v", msg, err) } - if request.Amount > uint64(downloader.MaxHashFetch) { request.Amount = uint64(downloader.MaxHashFetch) } - + // Retrieve the hashes from the block chain and return them hashes := pm.chainman.GetBlockHashesFromHash(request.Hash, request.Amount) + if len(hashes) == 0 { + glog.V(logger.Debug).Infof("invalid block hash %x", request.Hash.Bytes()[:4]) + } + return p.SendBlockHashes(hashes) - if glog.V(logger.Debug) { - if len(hashes) == 0 { - glog.Infof("invalid block hash %x", request.Hash.Bytes()[:4]) - } + case GetBlockHashesFromNumberMsg: + // Retrieve and decode the number of hashes to return and from which origin number + var request getBlockHashesFromNumberData + if err := msg.Decode(&request); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + if request.Amount > uint64(downloader.MaxHashFetch) { + request.Amount = uint64(downloader.MaxHashFetch) } + // Calculate the last block that should be retrieved, and short circuit if unavailable + last := pm.chainman.GetBlockByNumber(request.Number + request.Amount - 1) + if last == nil { + last = pm.chainman.CurrentBlock() + request.Amount = last.NumberU64() - request.Number + 1 + } + if last.NumberU64() < request.Number { + return p.SendBlockHashes(nil) + } + // Retrieve the hashes from the last block backwards, reverse and return + hashes := []common.Hash{last.Hash()} + hashes = append(hashes, pm.chainman.GetBlockHashesFromHash(last.Hash(), request.Amount-1)...) - // returns either requested hashes or nothing (i.e. not found) - return p.sendBlockHashes(hashes) + for i := 0; i < len(hashes)/2; i++ { + hashes[i], hashes[len(hashes)-1-i] = hashes[len(hashes)-1-i], hashes[i] + } + return p.SendBlockHashes(hashes) case BlockHashesMsg: + // A batch of hashes arrived to one of our previous requests msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + reqHashInPacketsMeter.Mark(1) var hashes []common.Hash if err := msgStream.Decode(&hashes); err != nil { break } + reqHashInTrafficMeter.Mark(int64(32 * len(hashes))) + + // Deliver them all to the downloader for queuing err := pm.downloader.DeliverHashes(p.id, hashes) if err != nil { glog.V(logger.Debug).Infoln(err) @@ -293,13 +315,14 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } list = list[:len(list)-2] + "]" - glog.Infof("Peer %s: no blocks found for requested hashes %s", p.id, list) + glog.Infof("%v: no blocks found for requested hashes %s", p, list) } - return p.sendBlocks(blocks) + return p.SendBlocks(blocks) case BlocksMsg: // Decode the arrived block message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + reqBlockInPacketsMeter.Mark(1) var blocks []*types.Block if err := msgStream.Decode(&blocks); err != nil { @@ -307,8 +330,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { blocks = nil } // Update the receive timestamp of each block - for i := 0; i < len(blocks); i++ { - blocks[i].ReceivedAt = msg.ReceivedAt + for _, block := range blocks { + reqBlockInTrafficMeter.Mark(block.Size().Int64()) + block.ReceivedAt = msg.ReceivedAt } // Filter out any explicitly requested blocks, deliver the rest to the downloader if blocks := pm.fetcher.Filter(blocks); len(blocks) > 0 { @@ -323,9 +347,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msgStream.Decode(&hashes); err != nil { break } + propHashInPacketsMeter.Mark(1) + propHashInTrafficMeter.Mark(int64(32 * len(hashes))) + // Mark the hashes as present at the remote node for _, hash := range hashes { - p.blockHashes.Add(hash) + p.MarkBlock(hash) p.SetHead(hash) } // Schedule all the unknown hashes for retrieval @@ -336,15 +363,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } for _, hash := range unknown { - pm.fetcher.Notify(p.id, hash, time.Now(), p.requestBlocks) + pm.fetcher.Notify(p.id, hash, time.Now(), p.RequestBlocks) } case NewBlockMsg: // Retrieve and decode the propagated block - var request newBlockMsgData + var request newBlockData if err := msg.Decode(&request); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } + propBlockInPacketsMeter.Mark(1) + propBlockInTrafficMeter.Mark(request.Block.Size().Int64()) + if err := request.Block.ValidateFields(); err != nil { return errResp(ErrDecode, "block validation %v: %v", msg, err) } @@ -360,7 +390,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { RemoteId: p.ID().String(), }) // Mark the peer as owning the block and schedule it for import - p.blockHashes.Add(request.Block.Hash()) + p.MarkBlock(request.Block.Hash()) p.SetHead(request.Block.Hash()) pm.fetcher.Enqueue(p.id, request.Block) @@ -369,6 +399,29 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.SetTd(request.TD) go pm.synchronise(p) + case TxMsg: + // Transactions arrived, parse all of them and deliver to the pool + var txs []*types.Transaction + if err := msg.Decode(&txs); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + propTxnInPacketsMeter.Mark(1) + for i, tx := range txs { + // Validate and mark the remote transaction + if tx == nil { + return errResp(ErrDecode, "transaction %d is nil", i) + } + p.MarkTransaction(tx.Hash()) + + // Log it's arrival for later analysis + propTxnInTrafficMeter.Mark(tx.Size().Int64()) + jsonlogger.LogJson(&logger.EthTxReceived{ + TxHash: tx.Hash().Hex(), + RemoteId: p.ID().String(), + }) + } + pm.txpool.AddTransactions(txs) + default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } @@ -385,28 +438,27 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { if propagate { transfer := peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range transfer { - peer.sendNewBlock(block) + peer.SendNewBlock(block) } glog.V(logger.Detail).Infof("propagated block %x to %d peers in %v", hash[:4], len(transfer), time.Since(block.ReceivedAt)) } // Otherwise if the block is indeed in out own chain, announce it if pm.chainman.HasBlock(hash) { for _, peer := range peers { - peer.sendNewBlockHashes([]common.Hash{hash}) + peer.SendNewBlockHashes([]common.Hash{hash}) } glog.V(logger.Detail).Infof("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt)) } } -// BroadcastTx will propagate the block to its connected peers. It will sort -// out which peers do not contain the block in their block set and will do a -// sqrt(peers) to determine the amount of peers we broadcast to. +// BroadcastTx will propagate a transaction to all peers which are not known to +// already have the given transaction. func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) { // Broadcast transaction to a batch of peers not knowing about it peers := pm.peers.PeersWithoutTx(hash) //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { - peer.sendTransaction(tx) + peer.SendTransactions(types.Transactions{tx}) } glog.V(logger.Detail).Infoln("broadcast tx to", len(peers), "peers") } |