aboutsummaryrefslogtreecommitdiffstats
path: root/eth/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/handler.go')
-rw-r--r--eth/handler.go263
1 files changed, 206 insertions, 57 deletions
diff --git a/eth/handler.go b/eth/handler.go
index 6c1895bbd..25ff0eef0 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -36,10 +36,8 @@ import (
"github.com/ethereum/go-ethereum/rlp"
)
-// This is the target maximum size of returned blocks for the
-// getBlocks message. The reply message may exceed it
-// if a single block is larger than the limit.
-const maxBlockRespSize = 2 * 1024 * 1024
+// This is the target maximum size of returned blocks, headers or node data.
+const softResponseLimit = 2 * 1024 * 1024
func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
@@ -59,12 +57,13 @@ func (ep extProt) GetHashes(hash common.Hash) error { return ep.getHashes(has
func (ep extProt) GetBlock(hashes []common.Hash) error { return ep.getBlocks(hashes) }
type ProtocolManager struct {
- protVer, netId int
- txpool txPool
- chainman *core.ChainManager
- downloader *downloader.Downloader
- fetcher *fetcher.Fetcher
- peers *peerSet
+ txpool txPool
+ chainman *core.ChainManager
+ chaindb common.Database
+
+ downloader *downloader.Downloader
+ fetcher *fetcher.Fetcher
+ peers *peerSet
SubProtocols []p2p.Protocol
@@ -85,17 +84,17 @@ type ProtocolManager struct {
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
-func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, chainman *core.ChainManager) *ProtocolManager {
+func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, chainman *core.ChainManager, chaindb common.Database) *ProtocolManager {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
eventMux: mux,
txpool: txpool,
chainman: chainman,
+ chaindb: chaindb,
peers: newPeerSet(),
newPeerCh: make(chan *peer, 1),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
- netId: networkId,
}
// Initiate a sub-protocol for every implemented version we can handle
manager.SubProtocols = make([]p2p.Protocol, len(ProtocolVersions))
@@ -190,6 +189,9 @@ func (pm *ProtocolManager) handle(p *peer) error {
glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err)
return err
}
+ if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
+ rw.Init(p.version)
+ }
// Register the peer locally
glog.V(logger.Detail).Infof("%v: adding peer", p)
if err := pm.peers.Register(p); err != nil {
@@ -230,12 +232,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
defer msg.Discard()
// Handle the message depending on its contents
- switch msg.Code {
- case StatusMsg:
+ switch {
+ case msg.Code == StatusMsg:
// Status messages should never arrive after the handshake
return errResp(ErrExtraStatusMsg, "uncontrolled status message")
- case GetBlockHashesMsg:
+ case p.version < eth62 && msg.Code == GetBlockHashesMsg:
// Retrieve the number of hashes to return and from which origin hash
var request getBlockHashesData
if err := msg.Decode(&request); err != nil {
@@ -251,7 +253,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
return p.SendBlockHashes(hashes)
- case GetBlockHashesFromNumberMsg:
+ case p.version < eth62 && msg.Code == GetBlockHashesFromNumberMsg:
// Retrieve and decode the number of hashes to return and from which origin number
var request getBlockHashesFromNumberData
if err := msg.Decode(&request); err != nil {
@@ -278,12 +280,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
return p.SendBlockHashes(hashes)
- case BlockHashesMsg:
+ case p.version < eth62 && msg.Code == BlockHashesMsg:
// A batch of hashes arrived to one of our previous requests
- msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
-
var hashes []common.Hash
- if err := msgStream.Decode(&hashes); err != nil {
+ if err := msg.Decode(&hashes); err != nil {
break
}
// Deliver them all to the downloader for queuing
@@ -292,7 +292,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
glog.V(logger.Debug).Infoln(err)
}
- case GetBlocksMsg:
+ case p.version < eth62 && msg.Code == GetBlocksMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
@@ -302,44 +302,28 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
var (
hash common.Hash
bytes common.StorageSize
- hashes []common.Hash
blocks []*types.Block
)
- for {
+ for len(blocks) < downloader.MaxBlockFetch && bytes < softResponseLimit {
+ //Retrieve the hash of the next block
err := msgStream.Decode(&hash)
if err == rlp.EOL {
break
} else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- hashes = append(hashes, hash)
-
// Retrieve the requested block, stopping if enough was found
if block := pm.chainman.GetBlock(hash); block != nil {
blocks = append(blocks, block)
bytes += block.Size()
- if len(blocks) >= downloader.MaxBlockFetch || bytes > maxBlockRespSize {
- break
- }
}
}
- if glog.V(logger.Detail) && len(blocks) == 0 && len(hashes) > 0 {
- list := "["
- for _, hash := range hashes {
- list += fmt.Sprintf("%x, ", hash[:4])
- }
- list = list[:len(list)-2] + "]"
-
- glog.Infof("%v: no blocks found for requested hashes %s", p, list)
- }
return p.SendBlocks(blocks)
- case BlocksMsg:
+ case p.version < eth62 && msg.Code == BlocksMsg:
// Decode the arrived block message
- msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
-
var blocks []*types.Block
- if err := msgStream.Decode(&blocks); err != nil {
+ if err := msg.Decode(&blocks); err != nil {
glog.V(logger.Detail).Infoln("Decode error", err)
blocks = nil
}
@@ -352,31 +336,196 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.downloader.DeliverBlocks(p.id, blocks)
}
- case NewBlockHashesMsg:
- // Retrieve and deseralize the remote new block hashes notification
+ // Block header query, collect the requested headers and reply
+ case p.version >= eth62 && msg.Code == GetBlockHeadersMsg:
+ // Decode the complex header query
+ var query getBlockHeadersData
+ if err := msg.Decode(&query); err != nil {
+ return errResp(ErrDecode, "%v: %v", msg, err)
+ }
+ // Gather blocks until the fetch or network limits is reached
+ var (
+ bytes common.StorageSize
+ headers []*types.Header
+ unknown bool
+ )
+ for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
+ // Retrieve the next block satisfying the query
+ var origin *types.Block
+ if query.Origin.Hash != (common.Hash{}) {
+ origin = pm.chainman.GetBlock(query.Origin.Hash)
+ } else {
+ origin = pm.chainman.GetBlockByNumber(query.Origin.Number)
+ }
+ if origin == nil {
+ break
+ }
+ headers = append(headers, origin.Header())
+ bytes += origin.Size()
+
+ // Advance to the next block of the query
+ switch {
+ case query.Origin.Hash != (common.Hash{}) && query.Reverse:
+ // Hash based traversal towards the genesis block
+ for i := 0; i < int(query.Skip)+1; i++ {
+ if block := pm.chainman.GetBlock(query.Origin.Hash); block != nil {
+ query.Origin.Hash = block.ParentHash()
+ } else {
+ unknown = true
+ break
+ }
+ }
+ case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
+ // Hash based traversal towards the leaf block
+ if block := pm.chainman.GetBlockByNumber(origin.NumberU64() + query.Skip + 1); block != nil {
+ if pm.chainman.GetBlockHashesFromHash(block.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
+ query.Origin.Hash = block.Hash()
+ } else {
+ unknown = true
+ }
+ } else {
+ unknown = true
+ }
+ case query.Reverse:
+ // Number based traversal towards the genesis block
+ if query.Origin.Number >= query.Skip+1 {
+ query.Origin.Number -= (query.Skip + 1)
+ } else {
+ unknown = true
+ }
+
+ case !query.Reverse:
+ // Number based traversal towards the leaf block
+ query.Origin.Number += (query.Skip + 1)
+ }
+ }
+ return p.SendBlockHeaders(headers)
+
+ case p.version >= eth62 && msg.Code == GetBlockBodiesMsg:
+ // Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+ // Gather blocks until the fetch or network limits is reached
+ var (
+ hash common.Hash
+ bytes common.StorageSize
+ bodies []*blockBody
+ )
+ for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
+ //Retrieve the hash of the next block
+ if err := msgStream.Decode(&hash); err == rlp.EOL {
+ break
+ } else if err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Retrieve the requested block, stopping if enough was found
+ if block := pm.chainman.GetBlock(hash); block != nil {
+ bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()})
+ bytes += block.Size()
+ }
+ }
+ return p.SendBlockBodies(bodies)
- var hashes []common.Hash
- if err := msgStream.Decode(&hashes); err != nil {
- break
+ case p.version >= eth63 && msg.Code == GetNodeDataMsg:
+ // Decode the retrieval message
+ msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+ // Gather state data until the fetch or network limits is reached
+ var (
+ hash common.Hash
+ bytes int
+ data [][]byte
+ )
+ for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch {
+ // Retrieve the hash of the next state entry
+ if err := msgStream.Decode(&hash); err == rlp.EOL {
+ break
+ } else if err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Retrieve the requested state entry, stopping if enough was found
+ if entry, err := pm.chaindb.Get(hash.Bytes()); err == nil {
+ data = append(data, entry)
+ bytes += len(entry)
+ }
+ }
+ return p.SendNodeData(data)
+
+ case p.version >= eth63 && msg.Code == GetReceiptsMsg:
+ // Decode the retrieval message
+ msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+ // Gather state data until the fetch or network limits is reached
+ var (
+ hash common.Hash
+ bytes int
+ receipts []*types.Receipt
+ )
+ for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptsFetch {
+ // Retrieve the hash of the next transaction receipt
+ if err := msgStream.Decode(&hash); err == rlp.EOL {
+ break
+ } else if err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Retrieve the requested receipt, stopping if enough was found
+ if receipt := core.GetReceipt(pm.chaindb, hash); receipt != nil {
+ receipts = append(receipts, receipt)
+ bytes += len(receipt.RlpEncode())
+ }
+ }
+ return p.SendReceipts(receipts)
+
+ case msg.Code == NewBlockHashesMsg:
+ // Retrieve and deseralize the remote new block hashes notification
+ type announce struct {
+ Hash common.Hash
+ Number uint64
+ }
+ var announces = []announce{}
+
+ if p.version < eth62 {
+ // We're running the old protocol, make block number unknown (0)
+ var hashes []common.Hash
+ if err := msg.Decode(&hashes); err != nil {
+ return errResp(ErrDecode, "%v: %v", msg, err)
+ }
+ for _, hash := range hashes {
+ announces = append(announces, announce{hash, 0})
+ }
+ } else {
+ // Otherwise extract both block hash and number
+ var request newBlockHashesData
+ if err := msg.Decode(&request); err != nil {
+ return errResp(ErrDecode, "%v: %v", msg, err)
+ }
+ for _, block := range request {
+ announces = append(announces, announce{block.Hash, block.Number})
+ }
}
// Mark the hashes as present at the remote node
- for _, hash := range hashes {
- p.MarkBlock(hash)
- p.SetHead(hash)
+ for _, block := range announces {
+ p.MarkBlock(block.Hash)
+ p.SetHead(block.Hash)
}
// Schedule all the unknown hashes for retrieval
- unknown := make([]common.Hash, 0, len(hashes))
- for _, hash := range hashes {
- if !pm.chainman.HasBlock(hash) {
- unknown = append(unknown, hash)
+ unknown := make([]announce, 0, len(announces))
+ for _, block := range announces {
+ if !pm.chainman.HasBlock(block.Hash) {
+ unknown = append(unknown, block)
}
}
- for _, hash := range unknown {
- pm.fetcher.Notify(p.id, hash, time.Now(), p.RequestBlocks)
+ for _, block := range unknown {
+ pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestBlocks)
}
- case NewBlockMsg:
+ case msg.Code == NewBlockMsg:
// Retrieve and decode the propagated block
var request newBlockData
if err := msg.Decode(&request); err != nil {
@@ -410,7 +559,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
- case TxMsg:
+ case msg.Code == TxMsg:
// Transactions arrived, parse all of them and deliver to the pool
var txs []*types.Transaction
if err := msg.Decode(&txs); err != nil {