diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-07-03 00:55:18 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-08-24 22:57:28 +0800 |
commit | 42f44dda5468000b3b2c005ec485529bc5da3674 (patch) | |
tree | c93c9a2734adceac75e48f825076d48325826140 /eth/handler.go | |
parent | c51e153b5c5327f971e7b410e49e7babfc3f0b8e (diff) | |
download | go-tangerine-42f44dda5468000b3b2c005ec485529bc5da3674.tar go-tangerine-42f44dda5468000b3b2c005ec485529bc5da3674.tar.gz go-tangerine-42f44dda5468000b3b2c005ec485529bc5da3674.tar.bz2 go-tangerine-42f44dda5468000b3b2c005ec485529bc5da3674.tar.lz go-tangerine-42f44dda5468000b3b2c005ec485529bc5da3674.tar.xz go-tangerine-42f44dda5468000b3b2c005ec485529bc5da3674.tar.zst go-tangerine-42f44dda5468000b3b2c005ec485529bc5da3674.zip |
eth, eth/downloader: handle header requests, table driven proto tests
Diffstat (limited to 'eth/handler.go')
-rw-r--r-- | eth/handler.go | 263 |
1 files changed, 206 insertions, 57 deletions
diff --git a/eth/handler.go b/eth/handler.go index 6c1895bbd..25ff0eef0 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -36,10 +36,8 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) -// This is the target maximum size of returned blocks for the -// getBlocks message. The reply message may exceed it -// if a single block is larger than the limit. -const maxBlockRespSize = 2 * 1024 * 1024 +// This is the target maximum size of returned blocks, headers or node data. +const softResponseLimit = 2 * 1024 * 1024 func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) @@ -59,12 +57,13 @@ func (ep extProt) GetHashes(hash common.Hash) error { return ep.getHashes(has func (ep extProt) GetBlock(hashes []common.Hash) error { return ep.getBlocks(hashes) } type ProtocolManager struct { - protVer, netId int - txpool txPool - chainman *core.ChainManager - downloader *downloader.Downloader - fetcher *fetcher.Fetcher - peers *peerSet + txpool txPool + chainman *core.ChainManager + chaindb common.Database + + downloader *downloader.Downloader + fetcher *fetcher.Fetcher + peers *peerSet SubProtocols []p2p.Protocol @@ -85,17 +84,17 @@ type ProtocolManager struct { // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, chainman *core.ChainManager) *ProtocolManager { +func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, chainman *core.ChainManager, chaindb common.Database) *ProtocolManager { // Create the protocol manager with the base fields manager := &ProtocolManager{ eventMux: mux, txpool: txpool, chainman: chainman, + chaindb: chaindb, peers: newPeerSet(), newPeerCh: make(chan *peer, 1), txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), - netId: networkId, } // Initiate a sub-protocol for every implemented version we can handle manager.SubProtocols = make([]p2p.Protocol, len(ProtocolVersions)) @@ -190,6 +189,9 @@ func (pm *ProtocolManager) handle(p *peer) error { glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err) return err } + if rw, ok := p.rw.(*meteredMsgReadWriter); ok { + rw.Init(p.version) + } // Register the peer locally glog.V(logger.Detail).Infof("%v: adding peer", p) if err := pm.peers.Register(p); err != nil { @@ -230,12 +232,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { defer msg.Discard() // Handle the message depending on its contents - switch msg.Code { - case StatusMsg: + switch { + case msg.Code == StatusMsg: // Status messages should never arrive after the handshake return errResp(ErrExtraStatusMsg, "uncontrolled status message") - case GetBlockHashesMsg: + case p.version < eth62 && msg.Code == GetBlockHashesMsg: // Retrieve the number of hashes to return and from which origin hash var request getBlockHashesData if err := msg.Decode(&request); err != nil { @@ -251,7 +253,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } return p.SendBlockHashes(hashes) - case GetBlockHashesFromNumberMsg: + case p.version < eth62 && msg.Code == GetBlockHashesFromNumberMsg: // Retrieve and decode the number of hashes to return and from which origin number var request getBlockHashesFromNumberData if err := msg.Decode(&request); err != nil { @@ -278,12 +280,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } return p.SendBlockHashes(hashes) - case BlockHashesMsg: + case p.version < eth62 && msg.Code == BlockHashesMsg: // A batch of hashes arrived to one of our previous requests - msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) - var hashes []common.Hash - if err := msgStream.Decode(&hashes); err != nil { + if err := msg.Decode(&hashes); err != nil { break } // Deliver them all to the downloader for queuing @@ -292,7 +292,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { glog.V(logger.Debug).Infoln(err) } - case GetBlocksMsg: + case p.version < eth62 && msg.Code == GetBlocksMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) if _, err := msgStream.List(); err != nil { @@ -302,44 +302,28 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { var ( hash common.Hash bytes common.StorageSize - hashes []common.Hash blocks []*types.Block ) - for { + for len(blocks) < downloader.MaxBlockFetch && bytes < softResponseLimit { + //Retrieve the hash of the next block err := msgStream.Decode(&hash) if err == rlp.EOL { break } else if err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - hashes = append(hashes, hash) - // Retrieve the requested block, stopping if enough was found if block := pm.chainman.GetBlock(hash); block != nil { blocks = append(blocks, block) bytes += block.Size() - if len(blocks) >= downloader.MaxBlockFetch || bytes > maxBlockRespSize { - break - } } } - if glog.V(logger.Detail) && len(blocks) == 0 && len(hashes) > 0 { - list := "[" - for _, hash := range hashes { - list += fmt.Sprintf("%x, ", hash[:4]) - } - list = list[:len(list)-2] + "]" - - glog.Infof("%v: no blocks found for requested hashes %s", p, list) - } return p.SendBlocks(blocks) - case BlocksMsg: + case p.version < eth62 && msg.Code == BlocksMsg: // Decode the arrived block message - msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) - var blocks []*types.Block - if err := msgStream.Decode(&blocks); err != nil { + if err := msg.Decode(&blocks); err != nil { glog.V(logger.Detail).Infoln("Decode error", err) blocks = nil } @@ -352,31 +336,196 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.downloader.DeliverBlocks(p.id, blocks) } - case NewBlockHashesMsg: - // Retrieve and deseralize the remote new block hashes notification + // Block header query, collect the requested headers and reply + case p.version >= eth62 && msg.Code == GetBlockHeadersMsg: + // Decode the complex header query + var query getBlockHeadersData + if err := msg.Decode(&query); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + // Gather blocks until the fetch or network limits is reached + var ( + bytes common.StorageSize + headers []*types.Header + unknown bool + ) + for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch { + // Retrieve the next block satisfying the query + var origin *types.Block + if query.Origin.Hash != (common.Hash{}) { + origin = pm.chainman.GetBlock(query.Origin.Hash) + } else { + origin = pm.chainman.GetBlockByNumber(query.Origin.Number) + } + if origin == nil { + break + } + headers = append(headers, origin.Header()) + bytes += origin.Size() + + // Advance to the next block of the query + switch { + case query.Origin.Hash != (common.Hash{}) && query.Reverse: + // Hash based traversal towards the genesis block + for i := 0; i < int(query.Skip)+1; i++ { + if block := pm.chainman.GetBlock(query.Origin.Hash); block != nil { + query.Origin.Hash = block.ParentHash() + } else { + unknown = true + break + } + } + case query.Origin.Hash != (common.Hash{}) && !query.Reverse: + // Hash based traversal towards the leaf block + if block := pm.chainman.GetBlockByNumber(origin.NumberU64() + query.Skip + 1); block != nil { + if pm.chainman.GetBlockHashesFromHash(block.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash { + query.Origin.Hash = block.Hash() + } else { + unknown = true + } + } else { + unknown = true + } + case query.Reverse: + // Number based traversal towards the genesis block + if query.Origin.Number >= query.Skip+1 { + query.Origin.Number -= (query.Skip + 1) + } else { + unknown = true + } + + case !query.Reverse: + // Number based traversal towards the leaf block + query.Origin.Number += (query.Skip + 1) + } + } + return p.SendBlockHeaders(headers) + + case p.version >= eth62 && msg.Code == GetBlockBodiesMsg: + // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + if _, err := msgStream.List(); err != nil { + return err + } + // Gather blocks until the fetch or network limits is reached + var ( + hash common.Hash + bytes common.StorageSize + bodies []*blockBody + ) + for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch { + //Retrieve the hash of the next block + if err := msgStream.Decode(&hash); err == rlp.EOL { + break + } else if err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Retrieve the requested block, stopping if enough was found + if block := pm.chainman.GetBlock(hash); block != nil { + bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()}) + bytes += block.Size() + } + } + return p.SendBlockBodies(bodies) - var hashes []common.Hash - if err := msgStream.Decode(&hashes); err != nil { - break + case p.version >= eth63 && msg.Code == GetNodeDataMsg: + // Decode the retrieval message + msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + if _, err := msgStream.List(); err != nil { + return err + } + // Gather state data until the fetch or network limits is reached + var ( + hash common.Hash + bytes int + data [][]byte + ) + for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch { + // Retrieve the hash of the next state entry + if err := msgStream.Decode(&hash); err == rlp.EOL { + break + } else if err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Retrieve the requested state entry, stopping if enough was found + if entry, err := pm.chaindb.Get(hash.Bytes()); err == nil { + data = append(data, entry) + bytes += len(entry) + } + } + return p.SendNodeData(data) + + case p.version >= eth63 && msg.Code == GetReceiptsMsg: + // Decode the retrieval message + msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + if _, err := msgStream.List(); err != nil { + return err + } + // Gather state data until the fetch or network limits is reached + var ( + hash common.Hash + bytes int + receipts []*types.Receipt + ) + for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptsFetch { + // Retrieve the hash of the next transaction receipt + if err := msgStream.Decode(&hash); err == rlp.EOL { + break + } else if err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Retrieve the requested receipt, stopping if enough was found + if receipt := core.GetReceipt(pm.chaindb, hash); receipt != nil { + receipts = append(receipts, receipt) + bytes += len(receipt.RlpEncode()) + } + } + return p.SendReceipts(receipts) + + case msg.Code == NewBlockHashesMsg: + // Retrieve and deseralize the remote new block hashes notification + type announce struct { + Hash common.Hash + Number uint64 + } + var announces = []announce{} + + if p.version < eth62 { + // We're running the old protocol, make block number unknown (0) + var hashes []common.Hash + if err := msg.Decode(&hashes); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + for _, hash := range hashes { + announces = append(announces, announce{hash, 0}) + } + } else { + // Otherwise extract both block hash and number + var request newBlockHashesData + if err := msg.Decode(&request); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + for _, block := range request { + announces = append(announces, announce{block.Hash, block.Number}) + } } // Mark the hashes as present at the remote node - for _, hash := range hashes { - p.MarkBlock(hash) - p.SetHead(hash) + for _, block := range announces { + p.MarkBlock(block.Hash) + p.SetHead(block.Hash) } // Schedule all the unknown hashes for retrieval - unknown := make([]common.Hash, 0, len(hashes)) - for _, hash := range hashes { - if !pm.chainman.HasBlock(hash) { - unknown = append(unknown, hash) + unknown := make([]announce, 0, len(announces)) + for _, block := range announces { + if !pm.chainman.HasBlock(block.Hash) { + unknown = append(unknown, block) } } - for _, hash := range unknown { - pm.fetcher.Notify(p.id, hash, time.Now(), p.RequestBlocks) + for _, block := range unknown { + pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestBlocks) } - case NewBlockMsg: + case msg.Code == NewBlockMsg: // Retrieve and decode the propagated block var request newBlockData if err := msg.Decode(&request); err != nil { @@ -410,7 +559,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } - case TxMsg: + case msg.Code == TxMsg: // Transactions arrived, parse all of them and deliver to the pool var txs []*types.Transaction if err := msg.Decode(&txs); err != nil { |