aboutsummaryrefslogtreecommitdiffstats
path: root/eth/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/handler.go')
-rw-r--r--eth/handler.go188
1 files changed, 120 insertions, 68 deletions
diff --git a/eth/handler.go b/eth/handler.go
index 278a2bec2..59bbb480b 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -49,7 +49,7 @@ type ProtocolManager struct {
fetcher *fetcher.Fetcher
peers *peerSet
- SubProtocol p2p.Protocol
+ SubProtocols []p2p.Protocol
eventMux *event.TypeMux
txSub event.Subscription
@@ -68,8 +68,8 @@ type ProtocolManager struct {
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
-func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, chainman *core.ChainManager) *ProtocolManager {
- // Create the protocol manager and initialize peer handlers
+func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, chainman *core.ChainManager) *ProtocolManager {
+ // Create the protocol manager with the base fields
manager := &ProtocolManager{
eventMux: mux,
txpool: txpool,
@@ -79,18 +79,24 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
- manager.SubProtocol = p2p.Protocol{
- Name: "eth",
- Version: uint(protocolVersion),
- Length: ProtocolLength,
- Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
- peer := manager.newPeer(protocolVersion, networkId, p, rw)
- manager.newPeerCh <- peer
- return manager.handle(peer)
- },
+ // Initiate a sub-protocol for every implemented version we can handle
+ manager.SubProtocols = make([]p2p.Protocol, len(ProtocolVersions))
+ for i := 0; i < len(manager.SubProtocols); i++ {
+ version := ProtocolVersions[i]
+
+ manager.SubProtocols[i] = p2p.Protocol{
+ Name: "eth",
+ Version: version,
+ Length: ProtocolLengths[i],
+ Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ peer := manager.newPeer(int(version), networkId, p, rw)
+ manager.newPeerCh <- peer
+ return manager.handle(peer)
+ },
+ }
}
// Construct the different synchronisation mechanisms
- manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer)
+ manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.CurrentBlock, manager.chainman.InsertChain, manager.removePeer)
validator := func(block *types.Block, parent *types.Block) error {
return core.ValidateHeader(pow, block.Header(), parent, true)
@@ -152,31 +158,32 @@ func (pm *ProtocolManager) Stop() {
}
func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
- td, current, genesis := pm.chainman.Status()
-
- return newPeer(pv, nv, genesis, current, td, p, rw)
+ return newPeer(pv, nv, p, rw)
}
+// handle is the callback invoked to manage the life cycle of an eth peer. When
+// this function terminates, the peer is disconnected.
func (pm *ProtocolManager) handle(p *peer) error {
- // Execute the Ethereum handshake.
- if err := p.handleStatus(); err != nil {
+ glog.V(logger.Debug).Infof("%v: peer connected [%s]", p, p.Name())
+
+ // Execute the Ethereum handshake
+ td, head, genesis := pm.chainman.Status()
+ if err := p.Handshake(td, head, genesis); err != nil {
+ glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err)
return err
}
-
- // Register the peer locally.
- glog.V(logger.Detail).Infoln("Adding peer", p.id)
+ // Register the peer locally
+ glog.V(logger.Detail).Infof("%v: adding peer", p)
if err := pm.peers.Register(p); err != nil {
- glog.V(logger.Error).Infoln("Addition failed:", err)
+ glog.V(logger.Error).Infof("%v: addition failed: %v", p, err)
return err
}
defer pm.removePeer(p.id)
- // Register the peer in the downloader. If the downloader
- // considers it banned, we disconnect.
- if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil {
+ // Register the peer in the downloader. If the downloader considers it banned, we disconnect
+ if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks); err != nil {
return err
}
-
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
@@ -184,13 +191,17 @@ func (pm *ProtocolManager) handle(p *peer) error {
// main loop. handle incoming messages.
for {
if err := pm.handleMsg(p); err != nil {
+ glog.V(logger.Debug).Infof("%v: message handling failed: %v", p, err)
return err
}
}
return nil
}
+// handleMsg is invoked whenever an inbound message is received from a remote
+// peer. The remote connection is torn down upon returning any error.
func (pm *ProtocolManager) handleMsg(p *peer) error {
+ // Read the next message from the remote peer, and ensure it's fully consumed
msg, err := p.rw.ReadMsg()
if err != nil {
return err
@@ -198,58 +209,69 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if msg.Size > ProtocolMaxMsgSize {
return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
}
- // make sure that the payload has been fully consumed
defer msg.Discard()
+ // Handle the message depending on its contents
switch msg.Code {
case StatusMsg:
+ // Status messages should never arrive after the handshake
return errResp(ErrExtraStatusMsg, "uncontrolled status message")
- case TxMsg:
- // TODO: rework using lazy RLP stream
- var txs []*types.Transaction
- if err := msg.Decode(&txs); err != nil {
- return errResp(ErrDecode, "msg %v: %v", msg, err)
- }
- for i, tx := range txs {
- if tx == nil {
- return errResp(ErrDecode, "transaction %d is nil", i)
- }
- jsonlogger.LogJson(&logger.EthTxReceived{
- TxHash: tx.Hash().Hex(),
- RemoteId: p.ID().String(),
- })
- }
- pm.txpool.AddTransactions(txs)
-
case GetBlockHashesMsg:
- var request getBlockHashesMsgData
+ // Retrieve the number of hashes to return and from which origin hash
+ var request getBlockHashesData
if err := msg.Decode(&request); err != nil {
- return errResp(ErrDecode, "->msg %v: %v", msg, err)
+ return errResp(ErrDecode, "%v: %v", msg, err)
}
-
if request.Amount > uint64(downloader.MaxHashFetch) {
request.Amount = uint64(downloader.MaxHashFetch)
}
-
+ // Retrieve the hashes from the block chain and return them
hashes := pm.chainman.GetBlockHashesFromHash(request.Hash, request.Amount)
+ if len(hashes) == 0 {
+ glog.V(logger.Debug).Infof("invalid block hash %x", request.Hash.Bytes()[:4])
+ }
+ return p.SendBlockHashes(hashes)
- if glog.V(logger.Debug) {
- if len(hashes) == 0 {
- glog.Infof("invalid block hash %x", request.Hash.Bytes()[:4])
- }
+ case GetBlockHashesFromNumberMsg:
+ // Retrieve and decode the number of hashes to return and from which origin number
+ var request getBlockHashesFromNumberData
+ if err := msg.Decode(&request); err != nil {
+ return errResp(ErrDecode, "%v: %v", msg, err)
+ }
+ if request.Amount > uint64(downloader.MaxHashFetch) {
+ request.Amount = uint64(downloader.MaxHashFetch)
}
+ // Calculate the last block that should be retrieved, and short circuit if unavailable
+ last := pm.chainman.GetBlockByNumber(request.Number + request.Amount - 1)
+ if last == nil {
+ last = pm.chainman.CurrentBlock()
+ request.Amount = last.NumberU64() - request.Number + 1
+ }
+ if last.NumberU64() < request.Number {
+ return p.SendBlockHashes(nil)
+ }
+ // Retrieve the hashes from the last block backwards, reverse and return
+ hashes := []common.Hash{last.Hash()}
+ hashes = append(hashes, pm.chainman.GetBlockHashesFromHash(last.Hash(), request.Amount-1)...)
- // returns either requested hashes or nothing (i.e. not found)
- return p.sendBlockHashes(hashes)
+ for i := 0; i < len(hashes)/2; i++ {
+ hashes[i], hashes[len(hashes)-1-i] = hashes[len(hashes)-1-i], hashes[i]
+ }
+ return p.SendBlockHashes(hashes)
case BlockHashesMsg:
+ // A batch of hashes arrived to one of our previous requests
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ reqHashInPacketsMeter.Mark(1)
var hashes []common.Hash
if err := msgStream.Decode(&hashes); err != nil {
break
}
+ reqHashInTrafficMeter.Mark(int64(32 * len(hashes)))
+
+ // Deliver them all to the downloader for queuing
err := pm.downloader.DeliverHashes(p.id, hashes)
if err != nil {
glog.V(logger.Debug).Infoln(err)
@@ -293,13 +315,14 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
list = list[:len(list)-2] + "]"
- glog.Infof("Peer %s: no blocks found for requested hashes %s", p.id, list)
+ glog.Infof("%v: no blocks found for requested hashes %s", p, list)
}
- return p.sendBlocks(blocks)
+ return p.SendBlocks(blocks)
case BlocksMsg:
// Decode the arrived block message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ reqBlockInPacketsMeter.Mark(1)
var blocks []*types.Block
if err := msgStream.Decode(&blocks); err != nil {
@@ -307,8 +330,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
blocks = nil
}
// Update the receive timestamp of each block
- for i := 0; i < len(blocks); i++ {
- blocks[i].ReceivedAt = msg.ReceivedAt
+ for _, block := range blocks {
+ reqBlockInTrafficMeter.Mark(block.Size().Int64())
+ block.ReceivedAt = msg.ReceivedAt
}
// Filter out any explicitly requested blocks, deliver the rest to the downloader
if blocks := pm.fetcher.Filter(blocks); len(blocks) > 0 {
@@ -323,9 +347,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msgStream.Decode(&hashes); err != nil {
break
}
+ propHashInPacketsMeter.Mark(1)
+ propHashInTrafficMeter.Mark(int64(32 * len(hashes)))
+
// Mark the hashes as present at the remote node
for _, hash := range hashes {
- p.blockHashes.Add(hash)
+ p.MarkBlock(hash)
p.SetHead(hash)
}
// Schedule all the unknown hashes for retrieval
@@ -336,15 +363,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
for _, hash := range unknown {
- pm.fetcher.Notify(p.id, hash, time.Now(), p.requestBlocks)
+ pm.fetcher.Notify(p.id, hash, time.Now(), p.RequestBlocks)
}
case NewBlockMsg:
// Retrieve and decode the propagated block
- var request newBlockMsgData
+ var request newBlockData
if err := msg.Decode(&request); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
+ propBlockInPacketsMeter.Mark(1)
+ propBlockInTrafficMeter.Mark(request.Block.Size().Int64())
+
if err := request.Block.ValidateFields(); err != nil {
return errResp(ErrDecode, "block validation %v: %v", msg, err)
}
@@ -360,7 +390,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
RemoteId: p.ID().String(),
})
// Mark the peer as owning the block and schedule it for import
- p.blockHashes.Add(request.Block.Hash())
+ p.MarkBlock(request.Block.Hash())
p.SetHead(request.Block.Hash())
pm.fetcher.Enqueue(p.id, request.Block)
@@ -369,6 +399,29 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.SetTd(request.TD)
go pm.synchronise(p)
+ case TxMsg:
+ // Transactions arrived, parse all of them and deliver to the pool
+ var txs []*types.Transaction
+ if err := msg.Decode(&txs); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ propTxnInPacketsMeter.Mark(1)
+ for i, tx := range txs {
+ // Validate and mark the remote transaction
+ if tx == nil {
+ return errResp(ErrDecode, "transaction %d is nil", i)
+ }
+ p.MarkTransaction(tx.Hash())
+
+ // Log it's arrival for later analysis
+ propTxnInTrafficMeter.Mark(tx.Size().Int64())
+ jsonlogger.LogJson(&logger.EthTxReceived{
+ TxHash: tx.Hash().Hex(),
+ RemoteId: p.ID().String(),
+ })
+ }
+ pm.txpool.AddTransactions(txs)
+
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
@@ -385,28 +438,27 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
if propagate {
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range transfer {
- peer.sendNewBlock(block)
+ peer.SendNewBlock(block)
}
glog.V(logger.Detail).Infof("propagated block %x to %d peers in %v", hash[:4], len(transfer), time.Since(block.ReceivedAt))
}
// Otherwise if the block is indeed in out own chain, announce it
if pm.chainman.HasBlock(hash) {
for _, peer := range peers {
- peer.sendNewBlockHashes([]common.Hash{hash})
+ peer.SendNewBlockHashes([]common.Hash{hash})
}
glog.V(logger.Detail).Infof("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt))
}
}
-// BroadcastTx will propagate the block to its connected peers. It will sort
-// out which peers do not contain the block in their block set and will do a
-// sqrt(peers) to determine the amount of peers we broadcast to.
+// BroadcastTx will propagate a transaction to all peers which are not known to
+// already have the given transaction.
func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {
// Broadcast transaction to a batch of peers not knowing about it
peers := pm.peers.PeersWithoutTx(hash)
//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range peers {
- peer.sendTransaction(tx)
+ peer.SendTransactions(types.Transactions{tx})
}
glog.V(logger.Detail).Infoln("broadcast tx to", len(peers), "peers")
}