aboutsummaryrefslogtreecommitdiffstats
path: root/eth/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/handler.go')
-rw-r--r--eth/handler.go170
1 files changed, 99 insertions, 71 deletions
diff --git a/eth/handler.go b/eth/handler.go
index 3fc909672..7dc7de80e 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -17,6 +17,7 @@
package eth
import (
+ "errors"
"fmt"
"math"
"math/big"
@@ -42,6 +43,10 @@ const (
estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
)
+// errIncompatibleConfig is returned if the requested protocols and configs are
+// not compatible (low protocol version restrictions and high requirements).
+var errIncompatibleConfig = errors.New("incompatible configuration")
+
func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
}
@@ -49,17 +54,8 @@ func errResp(code errCode, format string, v ...interface{}) error {
type hashFetcherFn func(common.Hash) error
type blockFetcherFn func([]common.Hash) error
-// extProt is an interface which is passed around so we can expose GetHashes and GetBlock without exposing it to the rest of the protocol
-// extProt is passed around to peers which require to GetHashes and GetBlocks
-type extProt struct {
- getHashes hashFetcherFn
- getBlocks blockFetcherFn
-}
-
-func (ep extProt) GetHashes(hash common.Hash) error { return ep.getHashes(hash) }
-func (ep extProt) GetBlock(hashes []common.Hash) error { return ep.getBlocks(hashes) }
-
type ProtocolManager struct {
+ fastSync bool
txpool txPool
blockchain *core.BlockChain
chaindb ethdb.Database
@@ -87,9 +83,15 @@ type ProtocolManager struct {
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
-func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, blockchain *core.BlockChain, chaindb ethdb.Database) *ProtocolManager {
+func NewProtocolManager(fastSync bool, networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
+ // Figure out whether to allow fast sync or not
+ if fastSync && blockchain.CurrentBlock().NumberU64() > 0 {
+ glog.V(logger.Info).Infof("blockchain not empty, fast sync disabled")
+ fastSync = false
+ }
// Create the protocol manager with the base fields
manager := &ProtocolManager{
+ fastSync: fastSync,
eventMux: mux,
txpool: txpool,
blockchain: blockchain,
@@ -100,11 +102,15 @@ func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow po
quitSync: make(chan struct{}),
}
// Initiate a sub-protocol for every implemented version we can handle
- manager.SubProtocols = make([]p2p.Protocol, len(ProtocolVersions))
- for i := 0; i < len(manager.SubProtocols); i++ {
- version := ProtocolVersions[i]
-
- manager.SubProtocols[i] = p2p.Protocol{
+ manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
+ for i, version := range ProtocolVersions {
+ // Skip protocol version if incompatible with the mode of operation
+ if fastSync && version < eth63 {
+ continue
+ }
+ // Compatible; initialise the sub-protocol
+ version := version // Closure for the run
+ manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
Name: "eth",
Version: version,
Length: ProtocolLengths[i],
@@ -113,20 +119,25 @@ func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow po
manager.newPeerCh <- peer
return manager.handle(peer)
},
- }
+ })
+ }
+ if len(manager.SubProtocols) == 0 {
+ return nil, errIncompatibleConfig
}
// Construct the different synchronisation mechanisms
- manager.downloader = downloader.New(manager.eventMux, manager.blockchain.HasBlock, manager.blockchain.GetBlock, manager.blockchain.CurrentBlock, manager.blockchain.GetTd, manager.blockchain.InsertChain, manager.removePeer)
+ manager.downloader = downloader.New(chaindb, manager.eventMux, blockchain.HasHeader, blockchain.HasBlock, blockchain.GetHeader, blockchain.GetBlock,
+ blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.FastSyncCommitHead, blockchain.GetTd,
+ blockchain.InsertHeaderChain, blockchain.InsertChain, blockchain.InsertReceiptChain, blockchain.Rollback, manager.removePeer)
validator := func(block *types.Block, parent *types.Block) error {
return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false)
}
heighter := func() uint64 {
- return manager.blockchain.CurrentBlock().NumberU64()
+ return blockchain.CurrentBlock().NumberU64()
}
- manager.fetcher = fetcher.New(manager.blockchain.GetBlock, validator, manager.BroadcastBlock, heighter, manager.blockchain.InsertChain, manager.removePeer)
+ manager.fetcher = fetcher.New(blockchain.GetBlock, validator, manager.BroadcastBlock, heighter, blockchain.InsertChain, manager.removePeer)
- return manager
+ return manager, nil
}
func (pm *ProtocolManager) removePeer(id string) {
@@ -205,8 +216,8 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(),
- p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks,
- p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies); err != nil {
+ p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, p.RequestHeadersByHash,
+ p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
@@ -292,7 +303,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
break
}
// Deliver them all to the downloader for queuing
- err := pm.downloader.DeliverHashes61(p.id, hashes)
+ err := pm.downloader.DeliverHashes(p.id, hashes)
if err != nil {
glog.V(logger.Debug).Infoln(err)
}
@@ -338,7 +349,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
// Filter out any explicitly requested blocks, deliver the rest to the downloader
if blocks := pm.fetcher.FilterBlocks(blocks); len(blocks) > 0 {
- pm.downloader.DeliverBlocks61(p.id, blocks)
+ pm.downloader.DeliverBlocks(p.id, blocks)
}
// Block header query, collect the requested headers and reply
@@ -424,28 +435,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
- case p.version >= eth62 && msg.Code == BlockBodiesMsg:
- // A batch of block bodies arrived to one of our previous requests
- var request blockBodiesData
- if err := msg.Decode(&request); err != nil {
- return errResp(ErrDecode, "msg %v: %v", msg, err)
- }
- // Deliver them all to the downloader for queuing
- trasactions := make([][]*types.Transaction, len(request))
- uncles := make([][]*types.Header, len(request))
-
- for i, body := range request {
- trasactions[i] = body.Transactions
- uncles[i] = body.Uncles
- }
- // Filter out any explicitly requested bodies, deliver the rest to the downloader
- if trasactions, uncles := pm.fetcher.FilterBodies(trasactions, uncles, time.Now()); len(trasactions) > 0 || len(uncles) > 0 {
- err := pm.downloader.DeliverBodies(p.id, trasactions, uncles)
- if err != nil {
- glog.V(logger.Debug).Infoln(err)
- }
- }
-
case p.version >= eth62 && msg.Code == GetBlockBodiesMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
@@ -473,6 +462,28 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
return p.SendBlockBodiesRLP(bodies)
+ case p.version >= eth62 && msg.Code == BlockBodiesMsg:
+ // A batch of block bodies arrived to one of our previous requests
+ var request blockBodiesData
+ if err := msg.Decode(&request); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Deliver them all to the downloader for queuing
+ trasactions := make([][]*types.Transaction, len(request))
+ uncles := make([][]*types.Header, len(request))
+
+ for i, body := range request {
+ trasactions[i] = body.Transactions
+ uncles[i] = body.Uncles
+ }
+ // Filter out any explicitly requested bodies, deliver the rest to the downloader
+ if trasactions, uncles := pm.fetcher.FilterBodies(trasactions, uncles, time.Now()); len(trasactions) > 0 || len(uncles) > 0 {
+ err := pm.downloader.DeliverBodies(p.id, trasactions, uncles)
+ if err != nil {
+ glog.V(logger.Debug).Infoln(err)
+ }
+ }
+
case p.version >= eth63 && msg.Code == GetNodeDataMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
@@ -500,6 +511,17 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
return p.SendNodeData(data)
+ case p.version >= eth63 && msg.Code == NodeDataMsg:
+ // A batch of node state data arrived to one of our previous requests
+ var data [][]byte
+ if err := msg.Decode(&data); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Deliver all to the downloader
+ if err := pm.downloader.DeliverNodeData(p.id, data); err != nil {
+ glog.V(logger.Debug).Infof("failed to deliver node state data: %v", err)
+ }
+
case p.version >= eth63 && msg.Code == GetReceiptsMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
@@ -510,22 +532,42 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
var (
hash common.Hash
bytes int
- receipts []*types.Receipt
+ receipts []rlp.RawValue
)
- for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptsFetch {
- // Retrieve the hash of the next transaction receipt
+ for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch {
+ // Retrieve the hash of the next block
if err := msgStream.Decode(&hash); err == rlp.EOL {
break
} else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- // Retrieve the requested receipt, stopping if enough was found
- if receipt := core.GetReceipt(pm.chaindb, hash); receipt != nil {
- receipts = append(receipts, receipt)
- bytes += len(receipt.RlpEncode())
+ // Retrieve the requested block's receipts, skipping if unknown to us
+ results := core.GetBlockReceipts(pm.chaindb, hash)
+ if results == nil {
+ if header := pm.blockchain.GetHeader(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
+ continue
+ }
+ }
+ // If known, encode and queue for response packet
+ if encoded, err := rlp.EncodeToBytes(results); err != nil {
+ glog.V(logger.Error).Infof("failed to encode receipt: %v", err)
+ } else {
+ receipts = append(receipts, encoded)
+ bytes += len(encoded)
}
}
- return p.SendReceipts(receipts)
+ return p.SendReceiptsRLP(receipts)
+
+ case p.version >= eth63 && msg.Code == ReceiptsMsg:
+ // A batch of receipts arrived to one of our previous requests
+ var receipts [][]*types.Receipt
+ if err := msg.Decode(&receipts); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Deliver all to the downloader
+ if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil {
+ glog.V(logger.Debug).Infof("failed to deliver receipts: %v", err)
+ }
case msg.Code == NewBlockHashesMsg:
// Retrieve and deseralize the remote new block hashes notification
@@ -585,15 +627,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
request.Block.ReceivedAt = msg.ReceivedAt
- // Mark the block's arrival for whatever reason
- _, chainHead, _ := pm.blockchain.Status()
- jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{
- BlockHash: request.Block.Hash().Hex(),
- BlockNumber: request.Block.Number(),
- ChainHeadHash: chainHead.Hex(),
- BlockPrevHash: request.Block.ParentHash().Hex(),
- RemoteId: p.ID().String(),
- })
// Mark the peer as owning the block and schedule it for import
p.MarkBlock(request.Block.Hash())
p.SetHead(request.Block.Hash())
@@ -603,7 +636,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Update the peers total difficulty if needed, schedule a download if gapped
if request.TD.Cmp(p.Td()) > 0 {
p.SetTd(request.TD)
- if request.TD.Cmp(new(big.Int).Add(pm.blockchain.Td(), request.Block.Difficulty())) > 0 {
+ td := pm.blockchain.GetTd(pm.blockchain.CurrentBlock().Hash())
+ if request.TD.Cmp(new(big.Int).Add(td, request.Block.Difficulty())) > 0 {
go pm.synchronise(p)
}
}
@@ -620,12 +654,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "transaction %d is nil", i)
}
p.MarkTransaction(tx.Hash())
-
- // Log it's arrival for later analysis
- jsonlogger.LogJson(&logger.EthTxReceived{
- TxHash: tx.Hash().Hex(),
- RemoteId: p.ID().String(),
- })
}
pm.txpool.AddTransactions(txs)