diff options
Diffstat (limited to 'eth')
-rw-r--r-- | eth/downloader/downloader.go | 568 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 123 | ||||
-rw-r--r-- | eth/downloader/metrics.go | 5 | ||||
-rw-r--r-- | eth/downloader/peer.go | 107 | ||||
-rw-r--r-- | eth/downloader/queue.go | 271 | ||||
-rw-r--r-- | eth/downloader/types.go | 137 | ||||
-rw-r--r-- | eth/handler.go | 25 | ||||
-rw-r--r-- | eth/peer.go | 2 |
8 files changed, 779 insertions, 459 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 24ba3da17..96177ae8a 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -19,7 +19,6 @@ package downloader import ( "errors" - "fmt" "math" "math/big" "strings" @@ -29,9 +28,11 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" + "github.com/rcrowley/go-metrics" ) var ( @@ -39,8 +40,8 @@ var ( MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request - MaxStateFetch = 384 // Amount of node state values to allow fetching per request MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request + MaxStateFetch = 384 // Amount of node state values to allow fetching per request hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth @@ -49,10 +50,13 @@ var ( bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth - receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a block body request is considered expired + receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired + stateSoftTTL = 2 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth + stateHardTTL = 3 * stateSoftTTL // [eth/63] Maximum time allowance before a node data request is considered expired maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) + maxQueuedStates = 256 * 1024 // [eth/63] Maximum number of state requests to queue (DOS protection) maxResultsProcess = 256 // Number of download results to import at once into the chain headerCheckFrequency = 64 // Verification frequency of the downloaded headers during fast sync @@ -84,98 +88,6 @@ var ( errNoSyncActive = errors.New("no sync active") ) -// headerCheckFn is a callback type for verifying a header's presence in the local chain. -type headerCheckFn func(common.Hash) bool - -// blockCheckFn is a callback type for verifying a block's presence in the local chain. -type blockCheckFn func(common.Hash) bool - -// headerRetrievalFn is a callback type for retrieving a header from the local chain. -type headerRetrievalFn func(common.Hash) *types.Header - -// blockRetrievalFn is a callback type for retrieving a block from the local chain. -type blockRetrievalFn func(common.Hash) *types.Block - -// headHeaderRetrievalFn is a callback type for retrieving the head header from the local chain. -type headHeaderRetrievalFn func() *types.Header - -// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain. -type headBlockRetrievalFn func() *types.Block - -// headFastBlockRetrievalFn is a callback type for retrieving the head fast block from the local chain. -type headFastBlockRetrievalFn func() *types.Block - -// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block. -type tdRetrievalFn func(common.Hash) *big.Int - -// headerChainInsertFn is a callback type to insert a batch of headers into the local chain. -type headerChainInsertFn func([]*types.Header, bool) (int, error) - -// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain. -type blockChainInsertFn func(types.Blocks) (int, error) - -// receiptChainInsertFn is a callback type to insert a batch of receipts into the local chain. -type receiptChainInsertFn func(types.Blocks, []types.Receipts) (int, error) - -// peerDropFn is a callback type for dropping a peer detected as malicious. -type peerDropFn func(id string) - -// dataPack is a data message returned by a peer for some query. -type dataPack interface { - PeerId() string - Empty() bool - Stats() string -} - -// hashPack is a batch of block hashes returned by a peer (eth/61). -type hashPack struct { - peerId string - hashes []common.Hash -} - -// blockPack is a batch of blocks returned by a peer (eth/61). -type blockPack struct { - peerId string - blocks []*types.Block -} - -// headerPack is a batch of block headers returned by a peer. -type headerPack struct { - peerId string - headers []*types.Header -} - -// bodyPack is a batch of block bodies returned by a peer. -type bodyPack struct { - peerId string - transactions [][]*types.Transaction - uncles [][]*types.Header -} - -// PeerId retrieves the origin peer who sent this block body packet. -func (p *bodyPack) PeerId() string { return p.peerId } - -// Empty returns whether the no block bodies were delivered. -func (p *bodyPack) Empty() bool { return len(p.transactions) == 0 || len(p.uncles) == 0 } - -// Stats creates a textual stats report for logging purposes. -func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) } - -// receiptPack is a batch of receipts returned by a peer. -type receiptPack struct { - peerId string - receipts [][]*types.Receipt -} - -// PeerId retrieves the origin peer who sent this receipt packet. -func (p *receiptPack) PeerId() string { return p.peerId } - -// Empty returns whether the no receipts were delivered. -func (p *receiptPack) Empty() bool { return len(p.receipts) == 0 } - -// Stats creates a textual stats report for logging purposes. -func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) } - type Downloader struct { mode SyncMode // Synchronisation mode defining the strategies used mux *event.TypeMux // Event multiplexer to announce sync operation events @@ -186,23 +98,26 @@ type Downloader struct { interrupt int32 // Atomic boolean to signal termination // Statistics - syncStatsOrigin uint64 // Origin block number where syncing started at - syncStatsHeight uint64 // Highest block number known when syncing started - syncStatsLock sync.RWMutex // Lock protecting the sync stats fields + syncStatsChainOrigin uint64 // Origin block number where syncing started at + syncStatsChainHeight uint64 // Highest block number known when syncing started + syncStatsStateTotal uint64 // Total number of node state entries known so far + syncStatsStateDone uint64 // Number of state trie entries already pulled + syncStatsLock sync.RWMutex // Lock protecting the sync stats fields // Callbacks - hasHeader headerCheckFn // Checks if a header is present in the chain - hasBlock blockCheckFn // Checks if a block is present in the chain - getHeader headerRetrievalFn // Retrieves a header from the chain - getBlock blockRetrievalFn // Retrieves a block from the chain - headHeader headHeaderRetrievalFn // Retrieves the head header from the chain - headBlock headBlockRetrievalFn // Retrieves the head block from the chain - headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain - getTd tdRetrievalFn // Retrieves the TD of a block from the chain - insertHeaders headerChainInsertFn // Injects a batch of headers into the chain - insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain - insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain - dropPeer peerDropFn // Drops a peer for misbehaving + hasHeader headerCheckFn // Checks if a header is present in the chain + hasBlock blockCheckFn // Checks if a block is present in the chain + getHeader headerRetrievalFn // Retrieves a header from the chain + getBlock blockRetrievalFn // Retrieves a block from the chain + headHeader headHeaderRetrievalFn // Retrieves the head header from the chain + headBlock headBlockRetrievalFn // Retrieves the head block from the chain + headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain + commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head + getTd tdRetrievalFn // Retrieves the TD of a block from the chain + insertHeaders headerChainInsertFn // Injects a batch of headers into the chain + insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain + insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain + dropPeer peerDropFn // Drops a peer for misbehaving // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing @@ -212,14 +127,16 @@ type Downloader struct { // Channels newPeerCh chan *peer - hashCh chan hashPack // [eth/61] Channel receiving inbound hashes - blockCh chan blockPack // [eth/61] Channel receiving inbound blocks - headerCh chan headerPack // [eth/62] Channel receiving inbound block headers - bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies - receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts - blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks - bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks - receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks + hashCh chan dataPack // [eth/61] Channel receiving inbound hashes + blockCh chan dataPack // [eth/61] Channel receiving inbound blocks + headerCh chan dataPack // [eth/62] Channel receiving inbound block headers + bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies + receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts + stateCh chan dataPack // [eth/63] Channel receiving inbound node state data + blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks + bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks + receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks + stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers @@ -232,36 +149,40 @@ type Downloader struct { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mode SyncMode, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn, getBlock blockRetrievalFn, - headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn, getTd tdRetrievalFn, - insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader { +func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn, + getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn, + commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, + insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader { return &Downloader{ - mode: mode, - mux: mux, - queue: newQueue(), - peers: newPeerSet(), - hasHeader: hasHeader, - hasBlock: hasBlock, - getHeader: getHeader, - getBlock: getBlock, - headHeader: headHeader, - headBlock: headBlock, - headFastBlock: headFastBlock, - getTd: getTd, - insertHeaders: insertHeaders, - insertBlocks: insertBlocks, - insertReceipts: insertReceipts, - dropPeer: dropPeer, - newPeerCh: make(chan *peer, 1), - hashCh: make(chan hashPack, 1), - blockCh: make(chan blockPack, 1), - headerCh: make(chan headerPack, 1), - bodyCh: make(chan dataPack, 1), - receiptCh: make(chan dataPack, 1), - blockWakeCh: make(chan bool, 1), - bodyWakeCh: make(chan bool, 1), - receiptWakeCh: make(chan bool, 1), + mode: mode, + mux: mux, + queue: newQueue(stateDb), + peers: newPeerSet(), + hasHeader: hasHeader, + hasBlock: hasBlock, + getHeader: getHeader, + getBlock: getBlock, + headHeader: headHeader, + headBlock: headBlock, + headFastBlock: headFastBlock, + commitHeadBlock: commitHeadBlock, + getTd: getTd, + insertHeaders: insertHeaders, + insertBlocks: insertBlocks, + insertReceipts: insertReceipts, + dropPeer: dropPeer, + newPeerCh: make(chan *peer, 1), + hashCh: make(chan dataPack, 1), + blockCh: make(chan dataPack, 1), + headerCh: make(chan dataPack, 1), + bodyCh: make(chan dataPack, 1), + receiptCh: make(chan dataPack, 1), + stateCh: make(chan dataPack, 1), + blockWakeCh: make(chan bool, 1), + bodyWakeCh: make(chan bool, 1), + receiptWakeCh: make(chan bool, 1), + stateWakeCh: make(chan bool, 1), } } @@ -272,7 +193,7 @@ func (d *Downloader) Boundaries() (uint64, uint64) { d.syncStatsLock.RLock() defer d.syncStatsLock.RUnlock() - return d.syncStatsOrigin, d.syncStatsHeight + return d.syncStatsChainOrigin, d.syncStatsChainHeight } // Synchronising returns whether the downloader is currently retrieving blocks. @@ -284,10 +205,11 @@ func (d *Downloader) Synchronising() bool { // used for fetching hashes and blocks from. func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading - getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getReceipts receiptFetcherFn) error { + getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, + getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error { glog.V(logger.Detail).Infoln("Registering peer", id) - if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts)); err != nil { + if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil { glog.V(logger.Error).Infoln("Register failed:", err) return err } @@ -357,12 +279,18 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error d.queue.Reset() d.peers.Reset() - for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh} { + for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { select { case <-ch: default: } } + // Reset and ephemeral sync statistics + d.syncStatsLock.Lock() + d.syncStatsStateTotal = 0 + d.syncStatsStateDone = 0 + d.syncStatsLock.Unlock() + // Create cancel channel for aborting mid-flight d.cancelLock.Lock() d.cancelCh = make(chan struct{}) @@ -414,17 +342,17 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e return err } d.syncStatsLock.Lock() - if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin { - d.syncStatsOrigin = origin + if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { + d.syncStatsChainOrigin = origin } - d.syncStatsHeight = latest + d.syncStatsChainHeight = latest d.syncStatsLock.Unlock() // Initiate the sync using a concurrent hash and block retrieval algorithm if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - d.queue.Prepare(origin+1, 1) + d.queue.Prepare(origin+1, d.mode, 0) errc := make(chan error, 2) go func() { errc <- d.fetchHashes61(p, td, origin+1) }() @@ -449,26 +377,27 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e return err } d.syncStatsLock.Lock() - if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin { - d.syncStatsOrigin = origin + if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { + d.syncStatsChainOrigin = origin } - d.syncStatsHeight = latest + d.syncStatsChainHeight = latest d.syncStatsLock.Unlock() // Initiate the sync using a concurrent header and content retrieval algorithm - parts := 1 - if d.mode == FastSync { - parts = 2 // receipts are fetched too + pivot := uint64(0) + if latest > uint64(minFullBlocks) { + pivot = latest - uint64(minFullBlocks) } - d.queue.Prepare(origin+1, parts) + d.queue.Prepare(origin+1, d.mode, pivot) if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - errc := make(chan error, 3) + errc := make(chan error, 4) go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved + go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync - go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal sync + go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync // If any fetcher fails, cancel the others var fail error @@ -538,14 +467,14 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) { case <-d.hashCh: // Out of bounds hashes received, ignore them - case blockPack := <-d.blockCh: + case packet := <-d.blockCh: // Discard anything not from the origin peer - if blockPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", blockPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", packet.PeerId()) break } // Make sure the peer actually gave something valid - blocks := blockPack.blocks + blocks := packet.(*blockPack).blocks if len(blocks) != 1 { glog.V(logger.Debug).Infof("%v: invalid number of head blocks: %d != 1", p, len(blocks)) return 0, errBadPeer @@ -584,14 +513,14 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) { case <-d.cancelCh: return 0, errCancelHashFetch - case hashPack := <-d.hashCh: + case packet := <-d.hashCh: // Discard anything not from the origin peer - if hashPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId()) break } // Make sure the peer actually gave something valid - hashes := hashPack.hashes + hashes := packet.(*hashPack).hashes if len(hashes) == 0 { glog.V(logger.Debug).Infof("%v: empty head hash set", p) return 0, errEmptyHashSet @@ -639,14 +568,14 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) { case <-d.cancelCh: return 0, errCancelHashFetch - case hashPack := <-d.hashCh: + case packet := <-d.hashCh: // Discard anything not from the origin peer - if hashPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId()) break } // Make sure the peer actually gave something valid - hashes := hashPack.hashes + hashes := packet.(*hashPack).hashes if len(hashes) != 1 { glog.V(logger.Debug).Infof("%v: invalid search hash set (%d)", p, len(hashes)) return 0, errBadPeer @@ -716,17 +645,17 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { case <-d.bodyCh: // Out of bounds eth/62 block bodies received, ignore them - case hashPack := <-d.hashCh: + case packet := <-d.hashCh: // Make sure the active peer is giving us the hashes - if hashPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId()) break } hashReqTimer.UpdateSince(request) timeout.Stop() // If no more hashes are inbound, notify the block fetcher and return - if len(hashPack.hashes) == 0 { + if packet.Items() == 0 { glog.V(logger.Debug).Infof("%v: no available hashes", p) select { @@ -751,12 +680,13 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { return nil } gotHashes = true + hashes := packet.(*hashPack).hashes // Otherwise insert all the new hashes, aborting in case of junk - glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashPack.hashes), from) + glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashes), from) - inserts := d.queue.Schedule61(hashPack.hashes, true) - if len(inserts) != len(hashPack.hashes) { + inserts := d.queue.Schedule61(hashes, true) + if len(inserts) != len(hashes) { glog.V(logger.Debug).Infof("%v: stale hashes", p) return errBadPeer } @@ -776,7 +706,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { return nil } // Queue not yet full, fetch the next batch - from += uint64(len(hashPack.hashes)) + from += uint64(len(hashes)) getHashes(from) case <-timeout.C: @@ -813,16 +743,17 @@ func (d *Downloader) fetchBlocks61(from uint64) error { case <-d.bodyCh: // Out of bounds eth/62 block bodies received, ignore them - case blockPack := <-d.blockCh: + case packet := <-d.blockCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. - if peer := d.peers.Peer(blockPack.peerId); peer != nil { + if peer := d.peers.Peer(packet.PeerId()); peer != nil { // Deliver the received chunk of blocks, and demote in case of errors - err := d.queue.Deliver61(blockPack.peerId, blockPack.blocks) + blocks := packet.(*blockPack).blocks + err := d.queue.DeliverBlocks(peer.id, blocks) switch err { case nil: // If no blocks were delivered, demote the peer (need the delivery above) - if len(blockPack.blocks) == 0 { + if len(blocks) == 0 { peer.Demote() peer.SetBlocksIdle() glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) @@ -831,7 +762,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // All was successful, promote the peer and potentially start processing peer.Promote() peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) + glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks)) go d.process() case errInvalidChain: @@ -891,7 +822,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { return errNoPeers } // Check for block request timeouts and demote the responsible peers - for _, pid := range d.queue.Expire61(blockHardTTL) { + for _, pid := range d.queue.ExpireBlocks(blockHardTTL) { if peer := d.peers.Peer(pid); peer != nil { peer.Demote() glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) @@ -907,7 +838,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { } // Send a download request to all idle peers, until throttled throttled := false - idles, total := d.peers.BlockIdlePeers(61) + idles, total := d.peers.BlockIdlePeers() for _, peer := range idles { // Short circuit if throttling activated @@ -918,7 +849,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // Reserve a chunk of hashes for a peer. A nil can mean either that // no more hashes are available, or that the peer is known not to // have them. - request := d.queue.Reserve61(peer, peer.BlockCapacity()) + request := d.queue.ReserveBlocks(peer, peer.BlockCapacity()) if request == nil { continue } @@ -928,7 +859,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // Fetch the chunk and make sure any errors return the hashes to the queue if err := peer.Fetch61(request); err != nil { glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer) - d.queue.Cancel61(request) + d.queue.CancelBlocks(request) } } // Make sure that we have peers available for fetching. If all peers have been tried @@ -954,14 +885,14 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) { case <-d.cancelCh: return 0, errCancelBlockFetch - case headerPack := <-d.headerCh: + case packet := <-d.headerCh: // Discard anything not from the origin peer - if headerPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId()) break } // Make sure the peer actually gave something valid - headers := headerPack.headers + headers := packet.(*headerPack).headers if len(headers) != 1 { glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers)) return 0, errBadPeer @@ -1014,14 +945,14 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { case <-d.cancelCh: return 0, errCancelHashFetch - case headerPack := <-d.headerCh: + case packet := <-d.headerCh: // Discard anything not from the origin peer - if headerPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId()) break } // Make sure the peer actually gave something valid - headers := headerPack.headers + headers := packet.(*headerPack).headers if len(headers) == 0 { glog.V(logger.Debug).Infof("%v: empty head header set", p) return 0, errEmptyHeaderSet @@ -1069,14 +1000,14 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { case <-d.cancelCh: return 0, errCancelHashFetch - case headerPack := <-d.headerCh: + case packer := <-d.headerCh: // Discard anything not from the origin peer - if headerPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId) + if packer.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packer.PeerId()) break } // Make sure the peer actually gave something valid - headers := headerPack.headers + headers := packer.(*headerPack).headers if len(headers) != 1 { glog.V(logger.Debug).Infof("%v: invalid search header set (%d)", p, len(headers)) return 0, errBadPeer @@ -1150,20 +1081,20 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { case <-d.blockCh: // Out of bounds eth/61 blocks received, ignore them - case headerPack := <-d.headerCh: + case packet := <-d.headerCh: // Make sure the active peer is giving us the headers - if headerPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", headerPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", packet.PeerId()) break } headerReqTimer.UpdateSince(request) timeout.Stop() // If no more headers are inbound, notify the content fetchers and return - if len(headerPack.headers) == 0 { + if packet.Items() == 0 { glog.V(logger.Debug).Infof("%v: no available headers", p) - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -1187,26 +1118,27 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { return nil } gotHeaders = true + headers := packet.(*headerPack).headers // Otherwise insert all the new headers, aborting in case of junk - glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headerPack.headers), from) + glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from) if d.mode == FastSync || d.mode == LightSync { - if n, err := d.insertHeaders(headerPack.headers, false); err != nil { - glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headerPack.headers[n].Number, headerPack.headers[n].Hash().Bytes()[:4], err) + if n, err := d.insertHeaders(headers, false); err != nil { + glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headers[n].Number, headers[n].Hash().Bytes()[:4], err) return errInvalidChain } } if d.mode == FullSync || d.mode == FastSync { - inserts := d.queue.Schedule(headerPack.headers, from, d.mode == FastSync) - if len(inserts) != len(headerPack.headers) { + inserts := d.queue.Schedule(headers, from) + if len(inserts) != len(headers) { glog.V(logger.Debug).Infof("%v: stale headers", p) return errBadPeer } } // Notify the content fetchers of new headers, but stop if queue is full cont := d.queue.PendingBlocks() < maxQueuedHeaders || d.queue.PendingReceipts() < maxQueuedHeaders - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { if cont { // We still have headers to fetch, send continuation wake signal (potential) select { @@ -1223,7 +1155,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { } } // Queue not yet full, fetch the next batch - from += uint64(len(headerPack.headers)) + from += uint64(len(headers)) getHeaders(from) case <-timeout.C: @@ -1233,7 +1165,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { d.dropPeer(p.id) // Finish the sync gracefully instead of dumping the gathered data though - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -1251,19 +1183,19 @@ func (d *Downloader) fetchBodies(from uint64) error { glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from) var ( - deliver = func(packet interface{}) error { + deliver = func(packet dataPack) error { pack := packet.(*bodyPack) - return d.queue.DeliverBlocks(pack.peerId, pack.transactions, pack.uncles) + return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles) } - expire = func() []string { return d.queue.ExpireBlocks(bodyHardTTL) } + expire = func() []string { return d.queue.ExpireBodies(bodyHardTTL) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) } capacity = func(p *peer) int { return p.BlockCapacity() } - getIdles = func() ([]*peer, int) { return d.peers.BlockIdlePeers(62) } + getIdles = func() ([]*peer, int) { return d.peers.BodyIdlePeers() } setIdle = func(p *peer) { p.SetBlocksIdle() } ) - err := d.fetchParts(from, errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, - d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBlocks, d.bodyFetchHook, - fetch, d.queue.CancelBlocks, capacity, getIdles, setIdle, "Body") + err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, + d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBodies, d.bodyFetchHook, + fetch, d.queue.CancelBodies, capacity, getIdles, setIdle, "Body") glog.V(logger.Debug).Infof("Block body download terminated: %v", err) return err @@ -1276,7 +1208,7 @@ func (d *Downloader) fetchReceipts(from uint64) error { glog.V(logger.Debug).Infof("Downloading receipts from #%d", from) var ( - deliver = func(packet interface{}) error { + deliver = func(packet dataPack) error { pack := packet.(*receiptPack) return d.queue.DeliverReceipts(pack.peerId, pack.receipts) } @@ -1285,7 +1217,7 @@ func (d *Downloader) fetchReceipts(from uint64) error { capacity = func(p *peer) int { return p.ReceiptCapacity() } setIdle = func(p *peer) { p.SetReceiptsIdle() } ) - err := d.fetchParts(from, errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, + err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, d.queue.PendingReceipts, d.queue.ThrottleReceipts, d.queue.ReserveReceipts, d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt") @@ -1293,10 +1225,46 @@ func (d *Downloader) fetchReceipts(from uint64) error { return err } +// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any +// available peers, reserving a chunk of nodes for each, waiting for delivery and +// also periodically checking for timeouts. +func (d *Downloader) fetchNodeData() error { + glog.V(logger.Debug).Infof("Downloading node state data") + + var ( + deliver = func(packet dataPack) error { + start := time.Now() + done, found, err := d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states) + + d.syncStatsLock.Lock() + totalDone, totalKnown := d.syncStatsStateDone+uint64(done), d.syncStatsStateTotal+uint64(found) + d.syncStatsStateDone, d.syncStatsStateTotal = totalDone, totalKnown + d.syncStatsLock.Unlock() + + glog.V(logger.Info).Infof("imported %d [%d / %d] state entries in %v.", done, totalDone, totalKnown, time.Since(start)) + return err + } + expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) } + throttle = func() bool { return false } + reserve = func(p *peer, count int) (*fetchRequest, bool, error) { + return d.queue.ReserveNodeData(p, count), false, nil + } + fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) } + capacity = func(p *peer) int { return p.NodeDataCapacity() } + setIdle = func(p *peer) { p.SetNodeDataIdle() } + ) + err := d.fetchParts(errCancelReceiptFetch, d.stateCh, deliver, d.stateWakeCh, expire, + d.queue.PendingNodeData, throttle, reserve, nil, fetch, d.queue.CancelNodeData, + capacity, d.peers.ReceiptIdlePeers, setIdle, "State") + + glog.V(logger.Debug).Infof("Node state data download terminated: %v", err) + return err +} + // fetchParts iteratively downloads scheduled block parts, taking any available // peers, reserving a chunk of fetch requests for each, waiting for delivery and // also periodically checking for timeouts. -func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan dataPack, deliver func(packet interface{}) error, wakeCh chan bool, +func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool, expire func() []string, pending func() int, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, idle func() ([]*peer, int), setIdle func(*peer), kind string) error { @@ -1327,7 +1295,7 @@ func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan da switch err := deliver(packet); err { case nil: // If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!) - if packet.Empty() { + if packet.Items() == 0 { peer.Demote() setIdle(peer) glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind)) @@ -1441,7 +1409,11 @@ func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan da continue } if glog.V(logger.Detail) { - glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number) + if len(request.Headers) > 0 { + glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number) + } else { + glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind)) + } } // Fetch the chunk and make sure any errors return the hashes to the queue if fetchHook != nil { @@ -1528,7 +1500,9 @@ func (d *Downloader) process() { blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) case d.mode == FastSync: blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) - receipts = append(receipts, result.Receipts) + if result.Header.Number.Uint64() <= d.queue.fastSyncPivot { + receipts = append(receipts, result.Receipts) + } case d.mode == LightSync: headers = append(headers, result.Header) } @@ -1539,12 +1513,16 @@ func (d *Downloader) process() { index int ) switch { - case d.mode == FullSync: - index, err = d.insertBlocks(blocks) - case d.mode == FastSync: - index, err = d.insertReceipts(blocks, receipts) - case d.mode == LightSync: + case len(headers) > 0: index, err = d.insertHeaders(headers, true) + + case len(receipts) > 0: + index, err = d.insertReceipts(blocks, receipts) + if err == nil && blocks[len(blocks)-1].NumberU64() == d.queue.fastSyncPivot { + err = d.commitHeadBlock(blocks[len(blocks)-1].Hash()) + } + default: + index, err = d.insertBlocks(blocks) } if err != nil { glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err) @@ -1557,125 +1535,47 @@ func (d *Downloader) process() { } } -// DeliverHashes61 injects a new batch of hashes received from a remote node into +// DeliverHashes injects a new batch of hashes received from a remote node into // the download schedule. This is usually invoked through the BlockHashesMsg by // the protocol handler. -func (d *Downloader) DeliverHashes61(id string, hashes []common.Hash) (err error) { - // Update the delivery metrics for both good and failed deliveries - hashInMeter.Mark(int64(len(hashes))) - defer func() { - if err != nil { - hashDropMeter.Mark(int64(len(hashes))) - } - }() - // Make sure the downloader is active - if atomic.LoadInt32(&d.synchronising) == 0 { - return errNoSyncActive - } - // Deliver or abort if the sync is canceled while queuing - d.cancelLock.RLock() - cancel := d.cancelCh - d.cancelLock.RUnlock() - - select { - case d.hashCh <- hashPack{id, hashes}: - return nil - - case <-cancel: - return errNoSyncActive - } +func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) (err error) { + return d.deliver(id, d.hashCh, &hashPack{id, hashes}, hashInMeter, hashDropMeter) } -// DeliverBlocks61 injects a new batch of blocks received from a remote node. +// DeliverBlocks injects a new batch of blocks received from a remote node. // This is usually invoked through the BlocksMsg by the protocol handler. -func (d *Downloader) DeliverBlocks61(id string, blocks []*types.Block) (err error) { - // Update the delivery metrics for both good and failed deliveries - blockInMeter.Mark(int64(len(blocks))) - defer func() { - if err != nil { - blockDropMeter.Mark(int64(len(blocks))) - } - }() - // Make sure the downloader is active - if atomic.LoadInt32(&d.synchronising) == 0 { - return errNoSyncActive - } - // Deliver or abort if the sync is canceled while queuing - d.cancelLock.RLock() - cancel := d.cancelCh - d.cancelLock.RUnlock() - - select { - case d.blockCh <- blockPack{id, blocks}: - return nil - - case <-cancel: - return errNoSyncActive - } +func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) (err error) { + return d.deliver(id, d.blockCh, &blockPack{id, blocks}, blockInMeter, blockDropMeter) } // DeliverHeaders injects a new batch of blck headers received from a remote // node into the download schedule. func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) { - // Update the delivery metrics for both good and failed deliveries - headerInMeter.Mark(int64(len(headers))) - defer func() { - if err != nil { - headerDropMeter.Mark(int64(len(headers))) - } - }() - // Make sure the downloader is active - if atomic.LoadInt32(&d.synchronising) == 0 { - return errNoSyncActive - } - // Deliver or abort if the sync is canceled while queuing - d.cancelLock.RLock() - cancel := d.cancelCh - d.cancelLock.RUnlock() - - select { - case d.headerCh <- headerPack{id, headers}: - return nil - - case <-cancel: - return errNoSyncActive - } + return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter) } // DeliverBodies injects a new batch of block bodies received from a remote node. func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) { - // Update the delivery metrics for both good and failed deliveries - bodyInMeter.Mark(int64(len(transactions))) - defer func() { - if err != nil { - bodyDropMeter.Mark(int64(len(transactions))) - } - }() - // Make sure the downloader is active - if atomic.LoadInt32(&d.synchronising) == 0 { - return errNoSyncActive - } - // Deliver or abort if the sync is canceled while queuing - d.cancelLock.RLock() - cancel := d.cancelCh - d.cancelLock.RUnlock() - - select { - case d.bodyCh <- &bodyPack{id, transactions, uncles}: - return nil - - case <-cancel: - return errNoSyncActive - } + return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter) } // DeliverReceipts injects a new batch of receipts received from a remote node. func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) { + return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter) +} + +// DeliverNodeData injects a new batch of node state data received from a remote node. +func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) { + return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter) +} + +// deliver injects a new batch of data received from a remote node. +func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) { // Update the delivery metrics for both good and failed deliveries - receiptInMeter.Mark(int64(len(receipts))) + inMeter.Mark(int64(packet.Items())) defer func() { if err != nil { - receiptDropMeter.Mark(int64(len(receipts))) + dropMeter.Mark(int64(packet.Items())) } }() // Make sure the downloader is active @@ -1688,7 +1588,7 @@ func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (er d.cancelLock.RUnlock() select { - case d.receiptCh <- &receiptPack{id, receipts}: + case destCh <- packet: return nil case <-cancel: diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 68c4ca26e..8944ae4b0 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -27,11 +27,13 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/trie" ) var ( @@ -115,6 +117,7 @@ func makeChainFork(n, f int, parent *types.Block, parentReceipts types.Receipts) // downloadTester is a test simulator for mocking out local block chain. type downloadTester struct { + stateDb ethdb.Database downloader *Downloader ownHashes []common.Hash // Hash chain belonging to the tester @@ -146,8 +149,10 @@ func newTester(mode SyncMode) *downloadTester { peerReceipts: make(map[string]map[common.Hash]types.Receipts), peerChainTds: make(map[string]map[common.Hash]*big.Int), } - tester.downloader = New(mode, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader, tester.getBlock, - tester.headHeader, tester.headBlock, tester.headFastBlock, tester.getTd, tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.dropPeer) + tester.stateDb, _ = ethdb.NewMemDatabase() + tester.downloader = New(mode, tester.stateDb, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader, + tester.getBlock, tester.headHeader, tester.headBlock, tester.headFastBlock, tester.commitHeadBlock, tester.getTd, + tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.dropPeer) return tester } @@ -213,7 +218,7 @@ func (dl *downloadTester) headHeader() *types.Header { return header } } - return nil + return genesis.Header() } // headBlock retrieves the current head block from the canonical chain. @@ -223,10 +228,12 @@ func (dl *downloadTester) headBlock() *types.Block { for i := len(dl.ownHashes) - 1; i >= 0; i-- { if block := dl.getBlock(dl.ownHashes[i]); block != nil { - return block + if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil { + return block + } } } - return nil + return genesis } // headFastBlock retrieves the current head fast-sync block from the canonical chain. @@ -236,12 +243,20 @@ func (dl *downloadTester) headFastBlock() *types.Block { for i := len(dl.ownHashes) - 1; i >= 0; i-- { if block := dl.getBlock(dl.ownHashes[i]); block != nil { - if _, ok := dl.ownReceipts[block.Hash()]; ok { - return block - } + return block } } - return nil + return genesis +} + +// commitHeadBlock manually sets the head block to a given hash. +func (dl *downloadTester) commitHeadBlock(hash common.Hash) error { + // For now only check that the state trie is correct + if block := dl.getBlock(hash); block != nil { + _, err := trie.NewSecure(block.Root(), dl.stateDb) + return err + } + return fmt.Errorf("non existent block: %x", hash[:4]) } // getTd retrieves the block's total difficulty from the canonical chain. @@ -283,6 +298,7 @@ func (dl *downloadTester) insertBlocks(blocks types.Blocks) (int, error) { dl.ownHashes = append(dl.ownHashes, block.Hash()) dl.ownHeaders[block.Hash()] = block.Header() dl.ownBlocks[block.Hash()] = block + dl.stateDb.Put(block.Root().Bytes(), []byte{}) dl.ownChainTd[block.Hash()] = dl.ownChainTd[block.ParentHash()] } return len(blocks), nil @@ -321,13 +337,13 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha var err error switch version { case 61: - err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), nil, nil, nil, nil) + err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), nil, nil, nil, nil, nil) case 62: - err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil) + err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil) case 63: - err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay)) + err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay)) case 64: - err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay)) + err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay)) } if err == nil { // Assign the owned hashes, headers and blocks to the peer (deep copy) @@ -399,7 +415,7 @@ func (dl *downloadTester) peerGetRelHashesFn(id string, delay time.Duration) fun // Delay delivery a bit to allow attacks to unfold go func() { time.Sleep(time.Millisecond) - dl.downloader.DeliverHashes61(id, result) + dl.downloader.DeliverHashes(id, result) }() return nil } @@ -424,7 +440,7 @@ func (dl *downloadTester) peerGetAbsHashesFn(id string, delay time.Duration) fun // Delay delivery a bit to allow attacks to unfold go func() { time.Sleep(time.Millisecond) - dl.downloader.DeliverHashes61(id, result) + dl.downloader.DeliverHashes(id, result) }() return nil } @@ -447,7 +463,7 @@ func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([ result = append(result, block) } } - go dl.downloader.DeliverBlocks61(id, result) + go dl.downloader.DeliverBlocks(id, result) return nil } @@ -553,17 +569,54 @@ func (dl *downloadTester) peerGetReceiptsFn(id string, delay time.Duration) func } } +// peerGetNodeDataFn constructs a getNodeData method associated with a particular +// peer in the download tester. The returned function can be used to retrieve +// batches of node state data from the particularly requested peer. +func (dl *downloadTester) peerGetNodeDataFn(id string, delay time.Duration) func([]common.Hash) error { + return func(hashes []common.Hash) error { + time.Sleep(delay) + + dl.lock.RLock() + defer dl.lock.RUnlock() + + results := make([][]byte, 0, len(hashes)) + for _, hash := range hashes { + if data, err := testdb.Get(hash.Bytes()); err == nil { + results = append(results, data) + } + } + go dl.downloader.DeliverNodeData(id, results) + + return nil + } +} + // assertOwnChain checks if the local chain contains the correct number of items // of the various chain components. func assertOwnChain(t *testing.T, tester *downloadTester, length int) { - headers, blocks, receipts := length, length, length + assertOwnForkedChain(t, tester, 1, []int{length}) +} + +// assertOwnForkedChain checks if the local forked chain contains the correct +// number of items of the various chain components. +func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, lengths []int) { + // Initialize the counters for the first fork + headers, blocks, receipts := lengths[0], lengths[0], lengths[0]-minFullBlocks + if receipts < 0 { + receipts = 1 + } + // Update the counters for each subsequent fork + for _, length := range lengths[1:] { + headers += length - common + blocks += length - common + receipts += length - common - minFullBlocks + } switch tester.downloader.mode { case FullSync: receipts = 1 case LightSync: blocks, receipts = 1, 1 } - if hs := len(tester.ownHeaders); hs != headers { t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers) } @@ -573,6 +626,14 @@ func assertOwnChain(t *testing.T, tester *downloadTester, length int) { if rs := len(tester.ownReceipts); rs != receipts { t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts) } + // Verify the state trie too for fast syncs + if tester.downloader.mode == FastSync { + if index := lengths[len(lengths)-1] - minFullBlocks - 1; index > 0 { + if statedb := state.New(tester.ownHeaders[tester.ownHashes[index]].Root, tester.stateDb); statedb == nil { + t.Fatalf("state reconstruction failed") + } + } + } } // Tests that simple synchronization against a canonical chain works correctly. @@ -647,7 +708,9 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { cached = len(tester.downloader.queue.blockDonePool) if mode == FastSync { if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached { - cached = receipts + if tester.downloader.queue.resultCache[receipts].Header.Number.Uint64() < tester.downloader.queue.fastSyncPivot { + cached = receipts + } } } tester.downloader.queue.lock.RUnlock() @@ -704,7 +767,7 @@ func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) { if err := tester.sync("fork B", nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - assertOwnChain(t, tester, common+2*fork+1) + assertOwnForkedChain(t, tester, common+1, []int{common + fork + 1, common + fork + 1}) } // Tests that an inactive downloader will not accept incoming hashes and blocks. @@ -712,10 +775,10 @@ func TestInactiveDownloader61(t *testing.T) { tester := newTester(FullSync) // Check that neither hashes nor blocks are accepted - if err := tester.downloader.DeliverHashes61("bad peer", []common.Hash{}); err != errNoSyncActive { + if err := tester.downloader.DeliverHashes("bad peer", []common.Hash{}); err != errNoSyncActive { t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) } - if err := tester.downloader.DeliverBlocks61("bad peer", []*types.Block{}); err != errNoSyncActive { + if err := tester.downloader.DeliverBlocks("bad peer", []*types.Block{}); err != errNoSyncActive { t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) } } @@ -809,14 +872,6 @@ func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) { id := fmt.Sprintf("peer #%d", i) tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], headers, blocks, receipts) } - // Synchronise with the middle peer and make sure half of the blocks were retrieved - id := fmt.Sprintf("peer #%d", targetPeers/2) - if err := tester.sync(id, nil); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - assertOwnChain(t, tester, len(tester.peerHashes[id])) - - // Synchronise with the best peer and make sure everything is retrieved if err := tester.sync("peer #0", nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } @@ -870,8 +925,8 @@ func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, F func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) } func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { - // Create a small enough block chain to download - targetBlocks := blockCacheLimit - 15 + // Create a block chain to download + targetBlocks := 2*blockCacheLimit - 15 hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) tester := newTester(mode) @@ -898,8 +953,8 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { bodiesNeeded++ } } - for _, receipt := range receipts { - if mode == FastSync && len(receipt) > 0 { + for hash, receipt := range receipts { + if mode == FastSync && len(receipt) > 0 && headers[hash].Number.Uint64() <= uint64(targetBlocks-minFullBlocks) { receiptsNeeded++ } } diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go index 92acb6ba8..d6fcfa25c 100644 --- a/eth/downloader/metrics.go +++ b/eth/downloader/metrics.go @@ -47,4 +47,9 @@ var ( receiptReqTimer = metrics.NewTimer("eth/downloader/receipts/req") receiptDropMeter = metrics.NewMeter("eth/downloader/receipts/drop") receiptTimeoutMeter = metrics.NewMeter("eth/downloader/receipts/timeout") + + stateInMeter = metrics.NewMeter("eth/downloader/states/in") + stateReqTimer = metrics.NewTimer("eth/downloader/states/req") + stateDropMeter = metrics.NewMeter("eth/downloader/states/drop") + stateTimeoutMeter = metrics.NewMeter("eth/downloader/states/timeout") ) diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 5fc0db587..5011d5d46 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -41,6 +41,7 @@ type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error type absoluteHeaderFetcherFn func(uint64, int, int, bool) error type blockBodyFetcherFn func([]common.Hash) error type receiptFetcherFn func([]common.Hash) error +type stateFetcherFn func([]common.Hash) error var ( errAlreadyFetching = errors.New("already fetching blocks from peer") @@ -55,12 +56,16 @@ type peer struct { blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1) + stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1) rep int32 // Simple peer reputation - blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request - receiptCapacity int32 // Number of receipts allowed to fetch per request - blockStarted time.Time // Time instance when the last block (body)fetch was started - receiptStarted time.Time // Time instance when the last receipt fetch was started + blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request + receiptCapacity int32 // Number of receipts allowed to fetch per request + stateCapacity int32 // Number of node data pieces allowed to fetch per request + + blockStarted time.Time // Time instance when the last block (body)fetch was started + receiptStarted time.Time // Time instance when the last receipt fetch was started + stateStarted time.Time // Time instance when the last node data fetch was started ignored *set.Set // Set of hashes not to request (didn't have previously) @@ -73,6 +78,7 @@ type peer struct { getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts + getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data version int // Eth protocol version number to switch strategies } @@ -82,12 +88,13 @@ type peer struct { func newPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, - getReceipts receiptFetcherFn) *peer { + getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer { return &peer{ id: id, head: head, blockCapacity: 1, receiptCapacity: 1, + stateCapacity: 1, ignored: set.New(), getRelHashes: getRelHashes, @@ -99,6 +106,7 @@ func newPeer(id string, version int, head common.Hash, getBlockBodies: getBlockBodies, getReceipts: getReceipts, + getNodeData: getNodeData, version: version, } @@ -110,6 +118,7 @@ func (p *peer) Reset() { atomic.StoreInt32(&p.receiptIdle, 0) atomic.StoreInt32(&p.blockCapacity, 1) atomic.StoreInt32(&p.receiptCapacity, 1) + atomic.StoreInt32(&p.stateCapacity, 1) p.ignored.Clear() } @@ -167,6 +176,24 @@ func (p *peer) FetchReceipts(request *fetchRequest) error { return nil } +// FetchNodeData sends a node state data retrieval request to the remote peer. +func (p *peer) FetchNodeData(request *fetchRequest) error { + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.stateIdle, 0, 1) { + return errAlreadyFetching + } + p.stateStarted = time.Now() + + // Convert the hash set to a retrievable slice + hashes := make([]common.Hash, 0, len(request.Hashes)) + for hash, _ := range request.Hashes { + hashes = append(hashes, hash) + } + go p.getNodeData(hashes) + + return nil +} + // SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests. // Its block retrieval allowance will also be updated either up- or downwards, // depending on whether the previous fetch completed in time or not. @@ -188,6 +215,13 @@ func (p *peer) SetReceiptsIdle() { p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle) } +// SetNodeDataIdle sets the peer to idle, allowing it to execute new retrieval +// requests. Its node data retrieval allowance will also be updated either up- or +// downwards, depending on whether the previous fetch completed in time or not. +func (p *peer) SetNodeDataIdle() { + p.setIdle(p.stateStarted, stateSoftTTL, stateSoftTTL, MaxStateFetch, &p.stateCapacity, &p.stateIdle) +} + // setIdle sets the peer to idle, allowing it to execute new retrieval requests. // Its data retrieval allowance will also be updated either up- or downwards, // depending on whether the previous fetch completed in time or not. @@ -230,6 +264,12 @@ func (p *peer) ReceiptCapacity() int { return int(atomic.LoadInt32(&p.receiptCapacity)) } +// NodeDataCapacity retrieves the peers block download allowance based on its +// previously discovered bandwidth capacity. +func (p *peer) NodeDataCapacity() int { + return int(atomic.LoadInt32(&p.stateCapacity)) +} + // Promote increases the peer's reputation. func (p *peer) Promote() { atomic.AddInt32(&p.rep, 1) @@ -340,39 +380,50 @@ func (ps *peerSet) AllPeers() []*peer { // BlockIdlePeers retrieves a flat list of all the currently idle peers within the // active peer set, ordered by their reputation. -func (ps *peerSet) BlockIdlePeers(version int) ([]*peer, int) { - ps.lock.RLock() - defer ps.lock.RUnlock() - - idle, total := make([]*peer, 0, len(ps.peers)), 0 - for _, p := range ps.peers { - if (version == 61 && p.version == 61) || (version >= 62 && p.version >= 62) { - if atomic.LoadInt32(&p.blockIdle) == 0 { - idle = append(idle, p) - } - total++ - } +func (ps *peerSet) BlockIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.blockIdle) == 0 } - for i := 0; i < len(idle); i++ { - for j := i + 1; j < len(idle); j++ { - if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) { - idle[i], idle[j] = idle[j], idle[i] - } - } + return ps.idlePeers(61, 61, idle) +} + +// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within +// the active peer set, ordered by their reputation. +func (ps *peerSet) BodyIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.blockIdle) == 0 } - return idle, total + return ps.idlePeers(62, 64, idle) } -// ReceiptIdlePeers retrieves a flat list of all the currently idle peers within the -// active peer set, ordered by their reputation. +// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers +// within the active peer set, ordered by their reputation. func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.receiptIdle) == 0 + } + return ps.idlePeers(63, 64, idle) +} + +// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle +// peers within the active peer set, ordered by their reputation. +func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.stateIdle) == 0 + } + return ps.idlePeers(63, 64, idle) +} + +// idlePeers retrieves a flat list of all currently idle peers satisfying the +// protocol version constraints, using the provided function to check idleness. +func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool) ([]*peer, int) { ps.lock.RLock() defer ps.lock.RUnlock() idle, total := make([]*peer, 0, len(ps.peers)), 0 for _, p := range ps.peers { - if p.version >= 63 { - if atomic.LoadInt32(&p.receiptIdle) == 0 { + if p.version >= minProtocol && p.version <= maxProtocol { + if idleCheck(p) { idle = append(idle, p) } total++ diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index c53ad939e..942ed0d63 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -26,9 +26,13 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/trie" "github.com/rcrowley/go-metrics" "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) @@ -39,13 +43,14 @@ var ( var ( errNoFetchesPending = errors.New("no fetches pending") + errStateSyncPending = errors.New("state trie sync already scheduled") errStaleDelivery = errors.New("stale delivery") ) // fetchRequest is a currently running data retrieval operation. type fetchRequest struct { Peer *peer // Peer to which the request was sent - Hashes map[common.Hash]int // [eth/61] Requested block with their insertion index (priority) + Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority) Headers []*types.Header // [eth/62] Requested headers, sorted by request order Time time.Time // Time when the request was made } @@ -64,6 +69,9 @@ type fetchResult struct { // queue represents hashes that are either need fetching or are being fetched type queue struct { + mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching + fastSyncPivot uint64 // Block number where the fast sync pivots into archive synchronisation mode + hashPool map[common.Hash]int // [eth/61] Pending hashes, mapping to their insertion index (priority) hashQueue *prque.Prque // [eth/61] Priority queue of the block hashes to fetch hashCounter int // [eth/61] Counter indexing the added hashes to ensure retrieval order @@ -80,15 +88,22 @@ type queue struct { receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches + stateTaskIndex int // [eth/63] Counter indexing the added hashes to ensure prioritized retrieval order + stateTaskPool map[common.Hash]int // [eth/63] Pending node data retrieval tasks, mapping to their priority + stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for + statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations + + stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly + stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator + resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultOffset uint64 // Offset of the first cached fetch result in the block-chain - resultParts int // Number of fetch components required to complete an item lock sync.RWMutex } // newQueue creates a new download queue for scheduling block retrieval. -func newQueue() *queue { +func newQueue(stateDb ethdb.Database) *queue { return &queue{ hashPool: make(map[common.Hash]int), hashQueue: prque.New(), @@ -100,6 +115,10 @@ func newQueue() *queue { receiptTaskQueue: prque.New(), receiptPendPool: make(map[string]*fetchRequest), receiptDonePool: make(map[common.Hash]struct{}), + stateTaskPool: make(map[common.Hash]int), + stateTaskQueue: prque.New(), + statePendPool: make(map[string]*fetchRequest), + stateDatabase: stateDb, resultCache: make([]*fetchResult, blockCacheLimit), } } @@ -109,6 +128,9 @@ func (q *queue) Reset() { q.lock.Lock() defer q.lock.Unlock() + q.mode = FullSync + q.fastSyncPivot = 0 + q.hashPool = make(map[common.Hash]int) q.hashQueue.Reset() q.hashCounter = 0 @@ -125,9 +147,14 @@ func (q *queue) Reset() { q.receiptPendPool = make(map[string]*fetchRequest) q.receiptDonePool = make(map[common.Hash]struct{}) + q.stateTaskIndex = 0 + q.stateTaskPool = make(map[common.Hash]int) + q.stateTaskQueue.Reset() + q.statePendPool = make(map[string]*fetchRequest) + q.stateScheduler = nil + q.resultCache = make([]*fetchResult, blockCacheLimit) q.resultOffset = 0 - q.resultParts = 0 } // PendingBlocks retrieves the number of block (body) requests pending for retrieval. @@ -146,12 +173,20 @@ func (q *queue) PendingReceipts() int { return q.receiptTaskQueue.Size() } +// PendingNodeData retrieves the number of node data entries pending for retrieval. +func (q *queue) PendingNodeData() int { + q.lock.RLock() + defer q.lock.RUnlock() + + return q.stateTaskQueue.Size() +} + // InFlight retrieves the number of fetch requests currently in flight. func (q *queue) InFlight() int { q.lock.RLock() defer q.lock.RUnlock() - return len(q.blockPendPool) + len(q.receiptPendPool) + return len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) } // Idle returns if the queue is fully idle or has some data still inside. This @@ -160,8 +195,8 @@ func (q *queue) Idle() bool { q.lock.RLock() defer q.lock.RUnlock() - queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() - pending := len(q.blockPendPool) + len(q.receiptPendPool) + queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size() + pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) cached := len(q.blockDonePool) + len(q.receiptDonePool) return (queued + pending + cached) == 0 @@ -227,7 +262,7 @@ func (q *queue) Schedule61(hashes []common.Hash, fifo bool) []common.Hash { // Schedule adds a set of headers for the download queue for scheduling, returning // the new headers encountered. -func (q *queue) Schedule(headers []*types.Header, from uint64, receipts bool) []*types.Header { +func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { q.lock.Lock() defer q.lock.Unlock() @@ -256,10 +291,21 @@ func (q *queue) Schedule(headers []*types.Header, from uint64, receipts bool) [] // Queue the header for content retrieval q.blockTaskPool[hash] = header q.blockTaskQueue.Push(header, -float32(header.Number.Uint64())) - if receipts { + + if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot { + // Fast phase of the fast sync, retrieve receipts too q.receiptTaskPool[hash] = header q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64())) } + if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot { + // Pivoting point of the fast sync, retrieve the state tries + q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase) + for _, hash := range q.stateScheduler.Missing(0) { + q.stateTaskPool[hash] = q.stateTaskIndex + q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex)) + q.stateTaskIndex++ + } + } inserts = append(inserts, header) q.headerHead = hash from++ @@ -279,6 +325,9 @@ func (q *queue) GetHeadResult() *fetchResult { if q.resultCache[0].Pending > 0 { return nil } + if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot && len(q.stateTaskPool) > 0 { + return nil + } return q.resultCache[0] } @@ -291,9 +340,18 @@ func (q *queue) TakeResults() []*fetchResult { // Accumulate all available results results := []*fetchResult{} for _, result := range q.resultCache { + // Stop if no more results are ready if result == nil || result.Pending > 0 { break } + // The fast sync pivot block may only be processed after state fetch completes + if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot && len(q.stateTaskPool) > 0 { + break + } + // If we've just inserted the fast sync pivot, stop as the following batch needs different insertion + if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 { + break + } results = append(results, result) hash := result.Header.Hash() @@ -312,31 +370,45 @@ func (q *queue) TakeResults() []*fetchResult { return results } -// Reserve61 reserves a set of hashes for the given peer, skipping any previously -// failed download. -func (q *queue) Reserve61(p *peer, count int) *fetchRequest { +// ReserveBlocks reserves a set of block hashes for the given peer, skipping any +// previously failed download. +func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest { + return q.reserveHashes(p, count, q.hashQueue, q.blockPendPool, len(q.resultCache)-len(q.blockDonePool)) +} + +// ReserveNodeData reserves a set of node data hashes for the given peer, skipping +// any previously failed download. +func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest { + return q.reserveHashes(p, count, q.stateTaskQueue, q.statePendPool, 0) +} + +// reserveHashes reserves a set of hashes for the given peer, skipping previously +// failed ones. +func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, maxPending int) *fetchRequest { q.lock.Lock() defer q.lock.Unlock() // Short circuit if the pool has been depleted, or if the peer's already // downloading something (sanity check not to corrupt state) - if q.hashQueue.Empty() { + if taskQueue.Empty() { return nil } - if _, ok := q.blockPendPool[p.id]; ok { + if _, ok := pendPool[p.id]; ok { return nil } // Calculate an upper limit on the hashes we might fetch (i.e. throttling) - space := len(q.resultCache) - len(q.blockDonePool) - for _, request := range q.blockPendPool { - space -= len(request.Hashes) + allowance := maxPending + if allowance > 0 { + for _, request := range pendPool { + allowance -= len(request.Hashes) + } } // Retrieve a batch of hashes, skipping previously failed ones send := make(map[common.Hash]int) skip := make(map[common.Hash]int) - for proc := 0; proc < space && len(send) < count && !q.hashQueue.Empty(); proc++ { - hash, priority := q.hashQueue.Pop() + for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ { + hash, priority := taskQueue.Pop() if p.ignored.Has(hash) { skip[hash.(common.Hash)] = int(priority) } else { @@ -345,7 +417,7 @@ func (q *queue) Reserve61(p *peer, count int) *fetchRequest { } // Merge all the skipped hashes back for hash, index := range skip { - q.hashQueue.Push(hash, float32(index)) + taskQueue.Push(hash, float32(index)) } // Assemble and return the block download request if len(send) == 0 { @@ -356,19 +428,19 @@ func (q *queue) Reserve61(p *peer, count int) *fetchRequest { Hashes: send, Time: time.Now(), } - q.blockPendPool[p.id] = request + pendPool[p.id] = request return request } -// ReserveBlocks reserves a set of body fetches for the given peer, skipping any +// ReserveBodies reserves a set of body fetches for the given peer, skipping any // previously failed downloads. Beside the next batch of needed fetches, it also // returns a flag whether empty blocks were queued requiring processing. -func (q *queue) ReserveBlocks(p *peer, count int) (*fetchRequest, bool, error) { +func (q *queue) ReserveBodies(p *peer, count int) (*fetchRequest, bool, error) { noop := func(header *types.Header) bool { return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash } - return q.reserveFetch(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, noop) + return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, noop) } // ReserveReceipts reserves a set of receipt fetches for the given peer, skipping @@ -378,13 +450,13 @@ func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error) noop := func(header *types.Header) bool { return header.ReceiptHash == types.EmptyRootHash } - return q.reserveFetch(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, noop) + return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, noop) } -// reserveFetch reserves a set of data download operations for a given peer, +// reserveHeaders reserves a set of data download operations for a given peer, // skipping any previously failed ones. This method is a generic version used // by the individual special reservation functions. -func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, +func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, noop func(*types.Header) bool) (*fetchRequest, bool, error) { q.lock.Lock() defer q.lock.Unlock() @@ -416,8 +488,12 @@ func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types return nil, false, errInvalidChain } if q.resultCache[index] == nil { + components := 1 + if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot { + components = 2 + } q.resultCache[index] = &fetchResult{ - Pending: q.resultParts, + Pending: components, Header: header, } } @@ -456,30 +532,36 @@ func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types return request, progress, nil } -// Cancel61 aborts a fetch request, returning all pending hashes to the queue. -func (q *queue) Cancel61(request *fetchRequest) { - q.cancel(request, nil, q.blockPendPool) +// CancelBlocks aborts a fetch request, returning all pending hashes to the queue. +func (q *queue) CancelBlocks(request *fetchRequest) { + q.cancel(request, q.hashQueue, q.blockPendPool) } -// CancelBlocks aborts a body fetch request, returning all pending hashes to the +// CancelBodies aborts a body fetch request, returning all pending headers to the // task queue. -func (q *queue) CancelBlocks(request *fetchRequest) { +func (q *queue) CancelBodies(request *fetchRequest) { q.cancel(request, q.blockTaskQueue, q.blockPendPool) } -// CancelReceipts aborts a body fetch request, returning all pending hashes to +// CancelReceipts aborts a body fetch request, returning all pending headers to // the task queue. func (q *queue) CancelReceipts(request *fetchRequest) { q.cancel(request, q.receiptTaskQueue, q.receiptPendPool) } +// CancelNodeData aborts a node state data fetch request, returning all pending +// hashes to the task queue. +func (q *queue) CancelNodeData(request *fetchRequest) { + q.cancel(request, q.stateTaskQueue, q.statePendPool) +} + // Cancel aborts a fetch request, returning all pending hashes to the task queue. func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) { q.lock.Lock() defer q.lock.Unlock() for hash, index := range request.Hashes { - q.hashQueue.Push(hash, float32(index)) + taskQueue.Push(hash, float32(index)) } for _, header := range request.Headers { taskQueue.Push(header, -float32(header.Number.Uint64())) @@ -509,29 +591,41 @@ func (q *queue) Revoke(peerId string) { } delete(q.receiptPendPool, peerId) } + if request, ok := q.statePendPool[peerId]; ok { + for hash, index := range request.Hashes { + q.stateTaskQueue.Push(hash, float32(index)) + } + delete(q.statePendPool, peerId) + } } -// Expire61 checks for in flight requests that exceeded a timeout allowance, +// ExpireBlocks checks for in flight requests that exceeded a timeout allowance, // canceling them and returning the responsible peers for penalization. -func (q *queue) Expire61(timeout time.Duration) []string { - return q.expire(timeout, q.blockPendPool, nil) +func (q *queue) ExpireBlocks(timeout time.Duration) []string { + return q.expire(timeout, q.blockPendPool, q.hashQueue, blockTimeoutMeter) } -// ExpireBlocks checks for in flight block body requests that exceeded a timeout +// ExpireBodies checks for in flight block body requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalization. -func (q *queue) ExpireBlocks(timeout time.Duration) []string { - return q.expire(timeout, q.blockPendPool, q.blockTaskQueue) +func (q *queue) ExpireBodies(timeout time.Duration) []string { + return q.expire(timeout, q.blockPendPool, q.blockTaskQueue, bodyTimeoutMeter) } // ExpireReceipts checks for in flight receipt requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalization. func (q *queue) ExpireReceipts(timeout time.Duration) []string { - return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue) + return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter) +} + +// ExpireNodeData checks for in flight node data requests that exceeded a timeout +// allowance, canceling them and returning the responsible peers for penalization. +func (q *queue) ExpireNodeData(timeout time.Duration) []string { + return q.expire(timeout, q.statePendPool, q.stateTaskQueue, stateTimeoutMeter) } // expire is the generic check that move expired tasks from a pending pool back // into a task pool, returning all entities caught with expired tasks. -func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque) []string { +func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string { q.lock.Lock() defer q.lock.Unlock() @@ -540,14 +634,11 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, for id, request := range pendPool { if time.Since(request.Time) > timeout { // Update the metrics with the timeout - if len(request.Hashes) > 0 { - blockTimeoutMeter.Mark(1) - } else { - bodyTimeoutMeter.Mark(1) - } + timeoutMeter.Mark(1) + // Return any non satisfied requests to the pool for hash, index := range request.Hashes { - q.hashQueue.Push(hash, float32(index)) + taskQueue.Push(hash, float32(index)) } for _, header := range request.Headers { taskQueue.Push(header, -float32(header.Number.Uint64())) @@ -562,8 +653,8 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, return peers } -// Deliver61 injects a block retrieval response into the download queue. -func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) { +// DeliverBlocks injects a block retrieval response into the download queue. +func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { q.lock.Lock() defer q.lock.Unlock() @@ -626,8 +717,8 @@ func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) { } } -// DeliverBlocks injects a block (body) retrieval response into the results queue. -func (q *queue) DeliverBlocks(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error { +// DeliverBodies injects a block body retrieval response into the results queue. +func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error { reconstruct := func(header *types.Header, index int, result *fetchResult) error { if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash { return errInvalidBody @@ -717,14 +808,84 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ } } +// DeliverNodeData injects a node state data retrieval response into the queue. +func (q *queue) DeliverNodeData(id string, data [][]byte) (int, int, error) { + q.lock.Lock() + defer q.lock.Unlock() + + // Short circuit if the data was never requested + request := q.statePendPool[id] + if request == nil { + return 0, 0, errNoFetchesPending + } + stateReqTimer.UpdateSince(request.Time) + delete(q.statePendPool, id) + + // If no data was retrieved, mark them as unavailable for the origin peer + if len(data) == 0 { + for hash, _ := range request.Hashes { + request.Peer.ignored.Add(hash) + } + } + // Iterate over the downloaded data and verify each of them + errs := make([]error, 0) + processed := 0 + for _, blob := range data { + // Skip any blocks that were not requested + hash := common.BytesToHash(crypto.Sha3(blob)) + if _, ok := request.Hashes[hash]; !ok { + errs = append(errs, fmt.Errorf("non-requested state data %x", hash)) + continue + } + // Inject the next state trie item into the database + if err := q.stateScheduler.Process([]trie.SyncResult{{hash, blob}}); err != nil { + errs = []error{err} + break + } + processed++ + + delete(request.Hashes, hash) + delete(q.stateTaskPool, hash) + } + // Return all failed or missing fetches to the queue + for hash, index := range request.Hashes { + q.stateTaskQueue.Push(hash, float32(index)) + } + // Also enqueue any newly required state trie nodes + discovered := 0 + if len(q.stateTaskPool) < maxQueuedStates { + for _, hash := range q.stateScheduler.Missing(4 * MaxStateFetch) { + q.stateTaskPool[hash] = q.stateTaskIndex + q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex)) + q.stateTaskIndex++ + discovered++ + } + } + // If none of the data items were good, it's a stale delivery + switch { + case len(errs) == 0: + return processed, discovered, nil + + case len(errs) == len(request.Hashes): + return processed, discovered, errStaleDelivery + + default: + return processed, discovered, fmt.Errorf("multiple failures: %v", errs) + } +} + // Prepare configures the result cache to allow accepting and caching inbound // fetch results. -func (q *queue) Prepare(offset uint64, parts int) { +func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64) { q.lock.Lock() defer q.lock.Unlock() if q.resultOffset < offset { q.resultOffset = offset } - q.resultParts = parts + q.fastSyncPivot = 0 + if mode == FastSync { + q.fastSyncPivot = pivot + } + q.mode = mode } diff --git a/eth/downloader/types.go b/eth/downloader/types.go new file mode 100644 index 000000000..221ef38f6 --- /dev/null +++ b/eth/downloader/types.go @@ -0,0 +1,137 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package downloader + +import ( + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// headerCheckFn is a callback type for verifying a header's presence in the local chain. +type headerCheckFn func(common.Hash) bool + +// blockCheckFn is a callback type for verifying a block's presence in the local chain. +type blockCheckFn func(common.Hash) bool + +// headerRetrievalFn is a callback type for retrieving a header from the local chain. +type headerRetrievalFn func(common.Hash) *types.Header + +// blockRetrievalFn is a callback type for retrieving a block from the local chain. +type blockRetrievalFn func(common.Hash) *types.Block + +// headHeaderRetrievalFn is a callback type for retrieving the head header from the local chain. +type headHeaderRetrievalFn func() *types.Header + +// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain. +type headBlockRetrievalFn func() *types.Block + +// headFastBlockRetrievalFn is a callback type for retrieving the head fast block from the local chain. +type headFastBlockRetrievalFn func() *types.Block + +// headBlockCommitterFn is a callback for directly committing the head block to a certain entity. +type headBlockCommitterFn func(common.Hash) error + +// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block. +type tdRetrievalFn func(common.Hash) *big.Int + +// headerChainInsertFn is a callback type to insert a batch of headers into the local chain. +type headerChainInsertFn func([]*types.Header, bool) (int, error) + +// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain. +type blockChainInsertFn func(types.Blocks) (int, error) + +// receiptChainInsertFn is a callback type to insert a batch of receipts into the local chain. +type receiptChainInsertFn func(types.Blocks, []types.Receipts) (int, error) + +// peerDropFn is a callback type for dropping a peer detected as malicious. +type peerDropFn func(id string) + +// dataPack is a data message returned by a peer for some query. +type dataPack interface { + PeerId() string + Items() int + Stats() string +} + +// hashPack is a batch of block hashes returned by a peer (eth/61). +type hashPack struct { + peerId string + hashes []common.Hash +} + +func (p *hashPack) PeerId() string { return p.peerId } +func (p *hashPack) Items() int { return len(p.hashes) } +func (p *hashPack) Stats() string { return fmt.Sprintf("%d", len(p.hashes)) } + +// blockPack is a batch of blocks returned by a peer (eth/61). +type blockPack struct { + peerId string + blocks []*types.Block +} + +func (p *blockPack) PeerId() string { return p.peerId } +func (p *blockPack) Items() int { return len(p.blocks) } +func (p *blockPack) Stats() string { return fmt.Sprintf("%d", len(p.blocks)) } + +// headerPack is a batch of block headers returned by a peer. +type headerPack struct { + peerId string + headers []*types.Header +} + +func (p *headerPack) PeerId() string { return p.peerId } +func (p *headerPack) Items() int { return len(p.headers) } +func (p *headerPack) Stats() string { return fmt.Sprintf("%d", len(p.headers)) } + +// bodyPack is a batch of block bodies returned by a peer. +type bodyPack struct { + peerId string + transactions [][]*types.Transaction + uncles [][]*types.Header +} + +func (p *bodyPack) PeerId() string { return p.peerId } +func (p *bodyPack) Items() int { + if len(p.transactions) <= len(p.uncles) { + return len(p.transactions) + } + return len(p.uncles) +} +func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) } + +// receiptPack is a batch of receipts returned by a peer. +type receiptPack struct { + peerId string + receipts [][]*types.Receipt +} + +func (p *receiptPack) PeerId() string { return p.peerId } +func (p *receiptPack) Items() int { return len(p.receipts) } +func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) } + +// statePack is a batch of states returned by a peer. +type statePack struct { + peerId string + states [][]byte +} + +func (p *statePack) PeerId() string { return p.peerId } +func (p *statePack) Items() int { return len(p.states) } +func (p *statePack) Stats() string { return fmt.Sprintf("%d", len(p.states)) } diff --git a/eth/handler.go b/eth/handler.go index 1117cb1b7..b0916d50b 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -129,9 +129,9 @@ func NewProtocolManager(mode Mode, networkId int, mux *event.TypeMux, txpool txP case LightMode: syncMode = downloader.LightSync } - manager.downloader = downloader.New(syncMode, manager.eventMux, blockchain.HasHeader, blockchain.HasBlock, blockchain.GetHeader, - blockchain.GetBlock, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.GetTd, - blockchain.InsertHeaderChain, blockchain.InsertChain, blockchain.InsertReceiptChain, manager.removePeer) + manager.downloader = downloader.New(syncMode, chaindb, manager.eventMux, blockchain.HasHeader, blockchain.HasBlock, blockchain.GetHeader, + blockchain.GetBlock, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.FastSyncCommitHead, + blockchain.GetTd, blockchain.InsertHeaderChain, blockchain.InsertChain, blockchain.InsertReceiptChain, manager.removePeer) validator := func(block *types.Block, parent *types.Block) error { return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false) @@ -220,8 +220,8 @@ func (pm *ProtocolManager) handle(p *peer) error { // Register the peer in the downloader. If the downloader considers it banned, we disconnect if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), - p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, - p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts); err != nil { + p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, p.RequestHeadersByHash, + p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil { return err } // Propagate existing transactions. new transactions appearing @@ -307,7 +307,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { break } // Deliver them all to the downloader for queuing - err := pm.downloader.DeliverHashes61(p.id, hashes) + err := pm.downloader.DeliverHashes(p.id, hashes) if err != nil { glog.V(logger.Debug).Infoln(err) } @@ -353,7 +353,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } // Filter out any explicitly requested blocks, deliver the rest to the downloader if blocks := pm.fetcher.FilterBlocks(blocks); len(blocks) > 0 { - pm.downloader.DeliverBlocks61(p.id, blocks) + pm.downloader.DeliverBlocks(p.id, blocks) } // Block header query, collect the requested headers and reply @@ -515,6 +515,17 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } return p.SendNodeData(data) + case p.version >= eth63 && msg.Code == NodeDataMsg: + // A batch of node state data arrived to one of our previous requests + var data [][]byte + if err := msg.Decode(&data); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Deliver all to the downloader + if err := pm.downloader.DeliverNodeData(p.id, data); err != nil { + glog.V(logger.Debug).Infof("failed to deliver node state data: %v", err) + } + case p.version >= eth63 && msg.Code == GetReceiptsMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) diff --git a/eth/peer.go b/eth/peer.go index e24be97f1..68ce903a6 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -191,7 +191,7 @@ func (p *peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error { return p2p.Send(p.rw, BlockBodiesMsg, bodies) } -// SendNodeData sends a batch of arbitrary internal data, corresponding to the +// SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the // hashes requested. func (p *peer) SendNodeData(data [][]byte) error { return p2p.Send(p.rw, NodeDataMsg, data) |