diff options
Diffstat (limited to 'eth/handler.go')
-rw-r--r-- | eth/handler.go | 170 |
1 files changed, 99 insertions, 71 deletions
diff --git a/eth/handler.go b/eth/handler.go index 3fc909672..7dc7de80e 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -17,6 +17,7 @@ package eth import ( + "errors" "fmt" "math" "math/big" @@ -42,6 +43,10 @@ const ( estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header ) +// errIncompatibleConfig is returned if the requested protocols and configs are +// not compatible (low protocol version restrictions and high requirements). +var errIncompatibleConfig = errors.New("incompatible configuration") + func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -49,17 +54,8 @@ func errResp(code errCode, format string, v ...interface{}) error { type hashFetcherFn func(common.Hash) error type blockFetcherFn func([]common.Hash) error -// extProt is an interface which is passed around so we can expose GetHashes and GetBlock without exposing it to the rest of the protocol -// extProt is passed around to peers which require to GetHashes and GetBlocks -type extProt struct { - getHashes hashFetcherFn - getBlocks blockFetcherFn -} - -func (ep extProt) GetHashes(hash common.Hash) error { return ep.getHashes(hash) } -func (ep extProt) GetBlock(hashes []common.Hash) error { return ep.getBlocks(hashes) } - type ProtocolManager struct { + fastSync bool txpool txPool blockchain *core.BlockChain chaindb ethdb.Database @@ -87,9 +83,15 @@ type ProtocolManager struct { // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, blockchain *core.BlockChain, chaindb ethdb.Database) *ProtocolManager { +func NewProtocolManager(fastSync bool, networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) { + // Figure out whether to allow fast sync or not + if fastSync && blockchain.CurrentBlock().NumberU64() > 0 { + glog.V(logger.Info).Infof("blockchain not empty, fast sync disabled") + fastSync = false + } // Create the protocol manager with the base fields manager := &ProtocolManager{ + fastSync: fastSync, eventMux: mux, txpool: txpool, blockchain: blockchain, @@ -100,11 +102,15 @@ func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow po quitSync: make(chan struct{}), } // Initiate a sub-protocol for every implemented version we can handle - manager.SubProtocols = make([]p2p.Protocol, len(ProtocolVersions)) - for i := 0; i < len(manager.SubProtocols); i++ { - version := ProtocolVersions[i] - - manager.SubProtocols[i] = p2p.Protocol{ + manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) + for i, version := range ProtocolVersions { + // Skip protocol version if incompatible with the mode of operation + if fastSync && version < eth63 { + continue + } + // Compatible; initialise the sub-protocol + version := version // Closure for the run + manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{ Name: "eth", Version: version, Length: ProtocolLengths[i], @@ -113,20 +119,25 @@ func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow po manager.newPeerCh <- peer return manager.handle(peer) }, - } + }) + } + if len(manager.SubProtocols) == 0 { + return nil, errIncompatibleConfig } // Construct the different synchronisation mechanisms - manager.downloader = downloader.New(manager.eventMux, manager.blockchain.HasBlock, manager.blockchain.GetBlock, manager.blockchain.CurrentBlock, manager.blockchain.GetTd, manager.blockchain.InsertChain, manager.removePeer) + manager.downloader = downloader.New(chaindb, manager.eventMux, blockchain.HasHeader, blockchain.HasBlock, blockchain.GetHeader, blockchain.GetBlock, + blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.FastSyncCommitHead, blockchain.GetTd, + blockchain.InsertHeaderChain, blockchain.InsertChain, blockchain.InsertReceiptChain, blockchain.Rollback, manager.removePeer) validator := func(block *types.Block, parent *types.Block) error { return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false) } heighter := func() uint64 { - return manager.blockchain.CurrentBlock().NumberU64() + return blockchain.CurrentBlock().NumberU64() } - manager.fetcher = fetcher.New(manager.blockchain.GetBlock, validator, manager.BroadcastBlock, heighter, manager.blockchain.InsertChain, manager.removePeer) + manager.fetcher = fetcher.New(blockchain.GetBlock, validator, manager.BroadcastBlock, heighter, blockchain.InsertChain, manager.removePeer) - return manager + return manager, nil } func (pm *ProtocolManager) removePeer(id string) { @@ -205,8 +216,8 @@ func (pm *ProtocolManager) handle(p *peer) error { // Register the peer in the downloader. If the downloader considers it banned, we disconnect if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), - p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, - p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies); err != nil { + p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, p.RequestHeadersByHash, + p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil { return err } // Propagate existing transactions. new transactions appearing @@ -292,7 +303,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { break } // Deliver them all to the downloader for queuing - err := pm.downloader.DeliverHashes61(p.id, hashes) + err := pm.downloader.DeliverHashes(p.id, hashes) if err != nil { glog.V(logger.Debug).Infoln(err) } @@ -338,7 +349,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } // Filter out any explicitly requested blocks, deliver the rest to the downloader if blocks := pm.fetcher.FilterBlocks(blocks); len(blocks) > 0 { - pm.downloader.DeliverBlocks61(p.id, blocks) + pm.downloader.DeliverBlocks(p.id, blocks) } // Block header query, collect the requested headers and reply @@ -424,28 +435,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } - case p.version >= eth62 && msg.Code == BlockBodiesMsg: - // A batch of block bodies arrived to one of our previous requests - var request blockBodiesData - if err := msg.Decode(&request); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - // Deliver them all to the downloader for queuing - trasactions := make([][]*types.Transaction, len(request)) - uncles := make([][]*types.Header, len(request)) - - for i, body := range request { - trasactions[i] = body.Transactions - uncles[i] = body.Uncles - } - // Filter out any explicitly requested bodies, deliver the rest to the downloader - if trasactions, uncles := pm.fetcher.FilterBodies(trasactions, uncles, time.Now()); len(trasactions) > 0 || len(uncles) > 0 { - err := pm.downloader.DeliverBodies(p.id, trasactions, uncles) - if err != nil { - glog.V(logger.Debug).Infoln(err) - } - } - case p.version >= eth62 && msg.Code == GetBlockBodiesMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) @@ -473,6 +462,28 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } return p.SendBlockBodiesRLP(bodies) + case p.version >= eth62 && msg.Code == BlockBodiesMsg: + // A batch of block bodies arrived to one of our previous requests + var request blockBodiesData + if err := msg.Decode(&request); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Deliver them all to the downloader for queuing + trasactions := make([][]*types.Transaction, len(request)) + uncles := make([][]*types.Header, len(request)) + + for i, body := range request { + trasactions[i] = body.Transactions + uncles[i] = body.Uncles + } + // Filter out any explicitly requested bodies, deliver the rest to the downloader + if trasactions, uncles := pm.fetcher.FilterBodies(trasactions, uncles, time.Now()); len(trasactions) > 0 || len(uncles) > 0 { + err := pm.downloader.DeliverBodies(p.id, trasactions, uncles) + if err != nil { + glog.V(logger.Debug).Infoln(err) + } + } + case p.version >= eth63 && msg.Code == GetNodeDataMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) @@ -500,6 +511,17 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } return p.SendNodeData(data) + case p.version >= eth63 && msg.Code == NodeDataMsg: + // A batch of node state data arrived to one of our previous requests + var data [][]byte + if err := msg.Decode(&data); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Deliver all to the downloader + if err := pm.downloader.DeliverNodeData(p.id, data); err != nil { + glog.V(logger.Debug).Infof("failed to deliver node state data: %v", err) + } + case p.version >= eth63 && msg.Code == GetReceiptsMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) @@ -510,22 +532,42 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { var ( hash common.Hash bytes int - receipts []*types.Receipt + receipts []rlp.RawValue ) - for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptsFetch { - // Retrieve the hash of the next transaction receipt + for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch { + // Retrieve the hash of the next block if err := msgStream.Decode(&hash); err == rlp.EOL { break } else if err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - // Retrieve the requested receipt, stopping if enough was found - if receipt := core.GetReceipt(pm.chaindb, hash); receipt != nil { - receipts = append(receipts, receipt) - bytes += len(receipt.RlpEncode()) + // Retrieve the requested block's receipts, skipping if unknown to us + results := core.GetBlockReceipts(pm.chaindb, hash) + if results == nil { + if header := pm.blockchain.GetHeader(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { + continue + } + } + // If known, encode and queue for response packet + if encoded, err := rlp.EncodeToBytes(results); err != nil { + glog.V(logger.Error).Infof("failed to encode receipt: %v", err) + } else { + receipts = append(receipts, encoded) + bytes += len(encoded) } } - return p.SendReceipts(receipts) + return p.SendReceiptsRLP(receipts) + + case p.version >= eth63 && msg.Code == ReceiptsMsg: + // A batch of receipts arrived to one of our previous requests + var receipts [][]*types.Receipt + if err := msg.Decode(&receipts); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Deliver all to the downloader + if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil { + glog.V(logger.Debug).Infof("failed to deliver receipts: %v", err) + } case msg.Code == NewBlockHashesMsg: // Retrieve and deseralize the remote new block hashes notification @@ -585,15 +627,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } request.Block.ReceivedAt = msg.ReceivedAt - // Mark the block's arrival for whatever reason - _, chainHead, _ := pm.blockchain.Status() - jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{ - BlockHash: request.Block.Hash().Hex(), - BlockNumber: request.Block.Number(), - ChainHeadHash: chainHead.Hex(), - BlockPrevHash: request.Block.ParentHash().Hex(), - RemoteId: p.ID().String(), - }) // Mark the peer as owning the block and schedule it for import p.MarkBlock(request.Block.Hash()) p.SetHead(request.Block.Hash()) @@ -603,7 +636,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Update the peers total difficulty if needed, schedule a download if gapped if request.TD.Cmp(p.Td()) > 0 { p.SetTd(request.TD) - if request.TD.Cmp(new(big.Int).Add(pm.blockchain.Td(), request.Block.Difficulty())) > 0 { + td := pm.blockchain.GetTd(pm.blockchain.CurrentBlock().Hash()) + if request.TD.Cmp(new(big.Int).Add(td, request.Block.Difficulty())) > 0 { go pm.synchronise(p) } } @@ -620,12 +654,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "transaction %d is nil", i) } p.MarkTransaction(tx.Hash()) - - // Log it's arrival for later analysis - jsonlogger.LogJson(&logger.EthTxReceived{ - TxHash: tx.Hash().Hex(), - RemoteId: p.ID().String(), - }) } pm.txpool.AddTransactions(txs) |