diff options
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 568 |
1 files changed, 234 insertions, 334 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 24ba3da17..96177ae8a 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -19,7 +19,6 @@ package downloader import ( "errors" - "fmt" "math" "math/big" "strings" @@ -29,9 +28,11 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" + "github.com/rcrowley/go-metrics" ) var ( @@ -39,8 +40,8 @@ var ( MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request - MaxStateFetch = 384 // Amount of node state values to allow fetching per request MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request + MaxStateFetch = 384 // Amount of node state values to allow fetching per request hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth @@ -49,10 +50,13 @@ var ( bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth - receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a block body request is considered expired + receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired + stateSoftTTL = 2 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth + stateHardTTL = 3 * stateSoftTTL // [eth/63] Maximum time allowance before a node data request is considered expired maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) + maxQueuedStates = 256 * 1024 // [eth/63] Maximum number of state requests to queue (DOS protection) maxResultsProcess = 256 // Number of download results to import at once into the chain headerCheckFrequency = 64 // Verification frequency of the downloaded headers during fast sync @@ -84,98 +88,6 @@ var ( errNoSyncActive = errors.New("no sync active") ) -// headerCheckFn is a callback type for verifying a header's presence in the local chain. -type headerCheckFn func(common.Hash) bool - -// blockCheckFn is a callback type for verifying a block's presence in the local chain. -type blockCheckFn func(common.Hash) bool - -// headerRetrievalFn is a callback type for retrieving a header from the local chain. -type headerRetrievalFn func(common.Hash) *types.Header - -// blockRetrievalFn is a callback type for retrieving a block from the local chain. -type blockRetrievalFn func(common.Hash) *types.Block - -// headHeaderRetrievalFn is a callback type for retrieving the head header from the local chain. -type headHeaderRetrievalFn func() *types.Header - -// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain. -type headBlockRetrievalFn func() *types.Block - -// headFastBlockRetrievalFn is a callback type for retrieving the head fast block from the local chain. -type headFastBlockRetrievalFn func() *types.Block - -// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block. -type tdRetrievalFn func(common.Hash) *big.Int - -// headerChainInsertFn is a callback type to insert a batch of headers into the local chain. -type headerChainInsertFn func([]*types.Header, bool) (int, error) - -// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain. -type blockChainInsertFn func(types.Blocks) (int, error) - -// receiptChainInsertFn is a callback type to insert a batch of receipts into the local chain. -type receiptChainInsertFn func(types.Blocks, []types.Receipts) (int, error) - -// peerDropFn is a callback type for dropping a peer detected as malicious. -type peerDropFn func(id string) - -// dataPack is a data message returned by a peer for some query. -type dataPack interface { - PeerId() string - Empty() bool - Stats() string -} - -// hashPack is a batch of block hashes returned by a peer (eth/61). -type hashPack struct { - peerId string - hashes []common.Hash -} - -// blockPack is a batch of blocks returned by a peer (eth/61). -type blockPack struct { - peerId string - blocks []*types.Block -} - -// headerPack is a batch of block headers returned by a peer. -type headerPack struct { - peerId string - headers []*types.Header -} - -// bodyPack is a batch of block bodies returned by a peer. -type bodyPack struct { - peerId string - transactions [][]*types.Transaction - uncles [][]*types.Header -} - -// PeerId retrieves the origin peer who sent this block body packet. -func (p *bodyPack) PeerId() string { return p.peerId } - -// Empty returns whether the no block bodies were delivered. -func (p *bodyPack) Empty() bool { return len(p.transactions) == 0 || len(p.uncles) == 0 } - -// Stats creates a textual stats report for logging purposes. -func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) } - -// receiptPack is a batch of receipts returned by a peer. -type receiptPack struct { - peerId string - receipts [][]*types.Receipt -} - -// PeerId retrieves the origin peer who sent this receipt packet. -func (p *receiptPack) PeerId() string { return p.peerId } - -// Empty returns whether the no receipts were delivered. -func (p *receiptPack) Empty() bool { return len(p.receipts) == 0 } - -// Stats creates a textual stats report for logging purposes. -func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) } - type Downloader struct { mode SyncMode // Synchronisation mode defining the strategies used mux *event.TypeMux // Event multiplexer to announce sync operation events @@ -186,23 +98,26 @@ type Downloader struct { interrupt int32 // Atomic boolean to signal termination // Statistics - syncStatsOrigin uint64 // Origin block number where syncing started at - syncStatsHeight uint64 // Highest block number known when syncing started - syncStatsLock sync.RWMutex // Lock protecting the sync stats fields + syncStatsChainOrigin uint64 // Origin block number where syncing started at + syncStatsChainHeight uint64 // Highest block number known when syncing started + syncStatsStateTotal uint64 // Total number of node state entries known so far + syncStatsStateDone uint64 // Number of state trie entries already pulled + syncStatsLock sync.RWMutex // Lock protecting the sync stats fields // Callbacks - hasHeader headerCheckFn // Checks if a header is present in the chain - hasBlock blockCheckFn // Checks if a block is present in the chain - getHeader headerRetrievalFn // Retrieves a header from the chain - getBlock blockRetrievalFn // Retrieves a block from the chain - headHeader headHeaderRetrievalFn // Retrieves the head header from the chain - headBlock headBlockRetrievalFn // Retrieves the head block from the chain - headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain - getTd tdRetrievalFn // Retrieves the TD of a block from the chain - insertHeaders headerChainInsertFn // Injects a batch of headers into the chain - insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain - insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain - dropPeer peerDropFn // Drops a peer for misbehaving + hasHeader headerCheckFn // Checks if a header is present in the chain + hasBlock blockCheckFn // Checks if a block is present in the chain + getHeader headerRetrievalFn // Retrieves a header from the chain + getBlock blockRetrievalFn // Retrieves a block from the chain + headHeader headHeaderRetrievalFn // Retrieves the head header from the chain + headBlock headBlockRetrievalFn // Retrieves the head block from the chain + headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain + commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head + getTd tdRetrievalFn // Retrieves the TD of a block from the chain + insertHeaders headerChainInsertFn // Injects a batch of headers into the chain + insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain + insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain + dropPeer peerDropFn // Drops a peer for misbehaving // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing @@ -212,14 +127,16 @@ type Downloader struct { // Channels newPeerCh chan *peer - hashCh chan hashPack // [eth/61] Channel receiving inbound hashes - blockCh chan blockPack // [eth/61] Channel receiving inbound blocks - headerCh chan headerPack // [eth/62] Channel receiving inbound block headers - bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies - receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts - blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks - bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks - receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks + hashCh chan dataPack // [eth/61] Channel receiving inbound hashes + blockCh chan dataPack // [eth/61] Channel receiving inbound blocks + headerCh chan dataPack // [eth/62] Channel receiving inbound block headers + bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies + receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts + stateCh chan dataPack // [eth/63] Channel receiving inbound node state data + blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks + bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks + receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks + stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers @@ -232,36 +149,40 @@ type Downloader struct { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mode SyncMode, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn, getBlock blockRetrievalFn, - headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn, getTd tdRetrievalFn, - insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader { +func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn, + getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn, + commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, + insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader { return &Downloader{ - mode: mode, - mux: mux, - queue: newQueue(), - peers: newPeerSet(), - hasHeader: hasHeader, - hasBlock: hasBlock, - getHeader: getHeader, - getBlock: getBlock, - headHeader: headHeader, - headBlock: headBlock, - headFastBlock: headFastBlock, - getTd: getTd, - insertHeaders: insertHeaders, - insertBlocks: insertBlocks, - insertReceipts: insertReceipts, - dropPeer: dropPeer, - newPeerCh: make(chan *peer, 1), - hashCh: make(chan hashPack, 1), - blockCh: make(chan blockPack, 1), - headerCh: make(chan headerPack, 1), - bodyCh: make(chan dataPack, 1), - receiptCh: make(chan dataPack, 1), - blockWakeCh: make(chan bool, 1), - bodyWakeCh: make(chan bool, 1), - receiptWakeCh: make(chan bool, 1), + mode: mode, + mux: mux, + queue: newQueue(stateDb), + peers: newPeerSet(), + hasHeader: hasHeader, + hasBlock: hasBlock, + getHeader: getHeader, + getBlock: getBlock, + headHeader: headHeader, + headBlock: headBlock, + headFastBlock: headFastBlock, + commitHeadBlock: commitHeadBlock, + getTd: getTd, + insertHeaders: insertHeaders, + insertBlocks: insertBlocks, + insertReceipts: insertReceipts, + dropPeer: dropPeer, + newPeerCh: make(chan *peer, 1), + hashCh: make(chan dataPack, 1), + blockCh: make(chan dataPack, 1), + headerCh: make(chan dataPack, 1), + bodyCh: make(chan dataPack, 1), + receiptCh: make(chan dataPack, 1), + stateCh: make(chan dataPack, 1), + blockWakeCh: make(chan bool, 1), + bodyWakeCh: make(chan bool, 1), + receiptWakeCh: make(chan bool, 1), + stateWakeCh: make(chan bool, 1), } } @@ -272,7 +193,7 @@ func (d *Downloader) Boundaries() (uint64, uint64) { d.syncStatsLock.RLock() defer d.syncStatsLock.RUnlock() - return d.syncStatsOrigin, d.syncStatsHeight + return d.syncStatsChainOrigin, d.syncStatsChainHeight } // Synchronising returns whether the downloader is currently retrieving blocks. @@ -284,10 +205,11 @@ func (d *Downloader) Synchronising() bool { // used for fetching hashes and blocks from. func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading - getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getReceipts receiptFetcherFn) error { + getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, + getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error { glog.V(logger.Detail).Infoln("Registering peer", id) - if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts)); err != nil { + if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil { glog.V(logger.Error).Infoln("Register failed:", err) return err } @@ -357,12 +279,18 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error d.queue.Reset() d.peers.Reset() - for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh} { + for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { select { case <-ch: default: } } + // Reset and ephemeral sync statistics + d.syncStatsLock.Lock() + d.syncStatsStateTotal = 0 + d.syncStatsStateDone = 0 + d.syncStatsLock.Unlock() + // Create cancel channel for aborting mid-flight d.cancelLock.Lock() d.cancelCh = make(chan struct{}) @@ -414,17 +342,17 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e return err } d.syncStatsLock.Lock() - if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin { - d.syncStatsOrigin = origin + if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { + d.syncStatsChainOrigin = origin } - d.syncStatsHeight = latest + d.syncStatsChainHeight = latest d.syncStatsLock.Unlock() // Initiate the sync using a concurrent hash and block retrieval algorithm if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - d.queue.Prepare(origin+1, 1) + d.queue.Prepare(origin+1, d.mode, 0) errc := make(chan error, 2) go func() { errc <- d.fetchHashes61(p, td, origin+1) }() @@ -449,26 +377,27 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e return err } d.syncStatsLock.Lock() - if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin { - d.syncStatsOrigin = origin + if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { + d.syncStatsChainOrigin = origin } - d.syncStatsHeight = latest + d.syncStatsChainHeight = latest d.syncStatsLock.Unlock() // Initiate the sync using a concurrent header and content retrieval algorithm - parts := 1 - if d.mode == FastSync { - parts = 2 // receipts are fetched too + pivot := uint64(0) + if latest > uint64(minFullBlocks) { + pivot = latest - uint64(minFullBlocks) } - d.queue.Prepare(origin+1, parts) + d.queue.Prepare(origin+1, d.mode, pivot) if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - errc := make(chan error, 3) + errc := make(chan error, 4) go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved + go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync - go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal sync + go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync // If any fetcher fails, cancel the others var fail error @@ -538,14 +467,14 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) { case <-d.hashCh: // Out of bounds hashes received, ignore them - case blockPack := <-d.blockCh: + case packet := <-d.blockCh: // Discard anything not from the origin peer - if blockPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", blockPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", packet.PeerId()) break } // Make sure the peer actually gave something valid - blocks := blockPack.blocks + blocks := packet.(*blockPack).blocks if len(blocks) != 1 { glog.V(logger.Debug).Infof("%v: invalid number of head blocks: %d != 1", p, len(blocks)) return 0, errBadPeer @@ -584,14 +513,14 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) { case <-d.cancelCh: return 0, errCancelHashFetch - case hashPack := <-d.hashCh: + case packet := <-d.hashCh: // Discard anything not from the origin peer - if hashPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId()) break } // Make sure the peer actually gave something valid - hashes := hashPack.hashes + hashes := packet.(*hashPack).hashes if len(hashes) == 0 { glog.V(logger.Debug).Infof("%v: empty head hash set", p) return 0, errEmptyHashSet @@ -639,14 +568,14 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) { case <-d.cancelCh: return 0, errCancelHashFetch - case hashPack := <-d.hashCh: + case packet := <-d.hashCh: // Discard anything not from the origin peer - if hashPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId()) break } // Make sure the peer actually gave something valid - hashes := hashPack.hashes + hashes := packet.(*hashPack).hashes if len(hashes) != 1 { glog.V(logger.Debug).Infof("%v: invalid search hash set (%d)", p, len(hashes)) return 0, errBadPeer @@ -716,17 +645,17 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { case <-d.bodyCh: // Out of bounds eth/62 block bodies received, ignore them - case hashPack := <-d.hashCh: + case packet := <-d.hashCh: // Make sure the active peer is giving us the hashes - if hashPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId()) break } hashReqTimer.UpdateSince(request) timeout.Stop() // If no more hashes are inbound, notify the block fetcher and return - if len(hashPack.hashes) == 0 { + if packet.Items() == 0 { glog.V(logger.Debug).Infof("%v: no available hashes", p) select { @@ -751,12 +680,13 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { return nil } gotHashes = true + hashes := packet.(*hashPack).hashes // Otherwise insert all the new hashes, aborting in case of junk - glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashPack.hashes), from) + glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashes), from) - inserts := d.queue.Schedule61(hashPack.hashes, true) - if len(inserts) != len(hashPack.hashes) { + inserts := d.queue.Schedule61(hashes, true) + if len(inserts) != len(hashes) { glog.V(logger.Debug).Infof("%v: stale hashes", p) return errBadPeer } @@ -776,7 +706,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { return nil } // Queue not yet full, fetch the next batch - from += uint64(len(hashPack.hashes)) + from += uint64(len(hashes)) getHashes(from) case <-timeout.C: @@ -813,16 +743,17 @@ func (d *Downloader) fetchBlocks61(from uint64) error { case <-d.bodyCh: // Out of bounds eth/62 block bodies received, ignore them - case blockPack := <-d.blockCh: + case packet := <-d.blockCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. - if peer := d.peers.Peer(blockPack.peerId); peer != nil { + if peer := d.peers.Peer(packet.PeerId()); peer != nil { // Deliver the received chunk of blocks, and demote in case of errors - err := d.queue.Deliver61(blockPack.peerId, blockPack.blocks) + blocks := packet.(*blockPack).blocks + err := d.queue.DeliverBlocks(peer.id, blocks) switch err { case nil: // If no blocks were delivered, demote the peer (need the delivery above) - if len(blockPack.blocks) == 0 { + if len(blocks) == 0 { peer.Demote() peer.SetBlocksIdle() glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) @@ -831,7 +762,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // All was successful, promote the peer and potentially start processing peer.Promote() peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) + glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks)) go d.process() case errInvalidChain: @@ -891,7 +822,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { return errNoPeers } // Check for block request timeouts and demote the responsible peers - for _, pid := range d.queue.Expire61(blockHardTTL) { + for _, pid := range d.queue.ExpireBlocks(blockHardTTL) { if peer := d.peers.Peer(pid); peer != nil { peer.Demote() glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) @@ -907,7 +838,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { } // Send a download request to all idle peers, until throttled throttled := false - idles, total := d.peers.BlockIdlePeers(61) + idles, total := d.peers.BlockIdlePeers() for _, peer := range idles { // Short circuit if throttling activated @@ -918,7 +849,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // Reserve a chunk of hashes for a peer. A nil can mean either that // no more hashes are available, or that the peer is known not to // have them. - request := d.queue.Reserve61(peer, peer.BlockCapacity()) + request := d.queue.ReserveBlocks(peer, peer.BlockCapacity()) if request == nil { continue } @@ -928,7 +859,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // Fetch the chunk and make sure any errors return the hashes to the queue if err := peer.Fetch61(request); err != nil { glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer) - d.queue.Cancel61(request) + d.queue.CancelBlocks(request) } } // Make sure that we have peers available for fetching. If all peers have been tried @@ -954,14 +885,14 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) { case <-d.cancelCh: return 0, errCancelBlockFetch - case headerPack := <-d.headerCh: + case packet := <-d.headerCh: // Discard anything not from the origin peer - if headerPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId()) break } // Make sure the peer actually gave something valid - headers := headerPack.headers + headers := packet.(*headerPack).headers if len(headers) != 1 { glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers)) return 0, errBadPeer @@ -1014,14 +945,14 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { case <-d.cancelCh: return 0, errCancelHashFetch - case headerPack := <-d.headerCh: + case packet := <-d.headerCh: // Discard anything not from the origin peer - if headerPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId()) break } // Make sure the peer actually gave something valid - headers := headerPack.headers + headers := packet.(*headerPack).headers if len(headers) == 0 { glog.V(logger.Debug).Infof("%v: empty head header set", p) return 0, errEmptyHeaderSet @@ -1069,14 +1000,14 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { case <-d.cancelCh: return 0, errCancelHashFetch - case headerPack := <-d.headerCh: + case packer := <-d.headerCh: // Discard anything not from the origin peer - if headerPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId) + if packer.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packer.PeerId()) break } // Make sure the peer actually gave something valid - headers := headerPack.headers + headers := packer.(*headerPack).headers if len(headers) != 1 { glog.V(logger.Debug).Infof("%v: invalid search header set (%d)", p, len(headers)) return 0, errBadPeer @@ -1150,20 +1081,20 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { case <-d.blockCh: // Out of bounds eth/61 blocks received, ignore them - case headerPack := <-d.headerCh: + case packet := <-d.headerCh: // Make sure the active peer is giving us the headers - if headerPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", headerPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", packet.PeerId()) break } headerReqTimer.UpdateSince(request) timeout.Stop() // If no more headers are inbound, notify the content fetchers and return - if len(headerPack.headers) == 0 { + if packet.Items() == 0 { glog.V(logger.Debug).Infof("%v: no available headers", p) - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -1187,26 +1118,27 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { return nil } gotHeaders = true + headers := packet.(*headerPack).headers // Otherwise insert all the new headers, aborting in case of junk - glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headerPack.headers), from) + glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from) if d.mode == FastSync || d.mode == LightSync { - if n, err := d.insertHeaders(headerPack.headers, false); err != nil { - glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headerPack.headers[n].Number, headerPack.headers[n].Hash().Bytes()[:4], err) + if n, err := d.insertHeaders(headers, false); err != nil { + glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headers[n].Number, headers[n].Hash().Bytes()[:4], err) return errInvalidChain } } if d.mode == FullSync || d.mode == FastSync { - inserts := d.queue.Schedule(headerPack.headers, from, d.mode == FastSync) - if len(inserts) != len(headerPack.headers) { + inserts := d.queue.Schedule(headers, from) + if len(inserts) != len(headers) { glog.V(logger.Debug).Infof("%v: stale headers", p) return errBadPeer } } // Notify the content fetchers of new headers, but stop if queue is full cont := d.queue.PendingBlocks() < maxQueuedHeaders || d.queue.PendingReceipts() < maxQueuedHeaders - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { if cont { // We still have headers to fetch, send continuation wake signal (potential) select { @@ -1223,7 +1155,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { } } // Queue not yet full, fetch the next batch - from += uint64(len(headerPack.headers)) + from += uint64(len(headers)) getHeaders(from) case <-timeout.C: @@ -1233,7 +1165,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { d.dropPeer(p.id) // Finish the sync gracefully instead of dumping the gathered data though - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -1251,19 +1183,19 @@ func (d *Downloader) fetchBodies(from uint64) error { glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from) var ( - deliver = func(packet interface{}) error { + deliver = func(packet dataPack) error { pack := packet.(*bodyPack) - return d.queue.DeliverBlocks(pack.peerId, pack.transactions, pack.uncles) + return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles) } - expire = func() []string { return d.queue.ExpireBlocks(bodyHardTTL) } + expire = func() []string { return d.queue.ExpireBodies(bodyHardTTL) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) } capacity = func(p *peer) int { return p.BlockCapacity() } - getIdles = func() ([]*peer, int) { return d.peers.BlockIdlePeers(62) } + getIdles = func() ([]*peer, int) { return d.peers.BodyIdlePeers() } setIdle = func(p *peer) { p.SetBlocksIdle() } ) - err := d.fetchParts(from, errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, - d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBlocks, d.bodyFetchHook, - fetch, d.queue.CancelBlocks, capacity, getIdles, setIdle, "Body") + err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, + d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBodies, d.bodyFetchHook, + fetch, d.queue.CancelBodies, capacity, getIdles, setIdle, "Body") glog.V(logger.Debug).Infof("Block body download terminated: %v", err) return err @@ -1276,7 +1208,7 @@ func (d *Downloader) fetchReceipts(from uint64) error { glog.V(logger.Debug).Infof("Downloading receipts from #%d", from) var ( - deliver = func(packet interface{}) error { + deliver = func(packet dataPack) error { pack := packet.(*receiptPack) return d.queue.DeliverReceipts(pack.peerId, pack.receipts) } @@ -1285,7 +1217,7 @@ func (d *Downloader) fetchReceipts(from uint64) error { capacity = func(p *peer) int { return p.ReceiptCapacity() } setIdle = func(p *peer) { p.SetReceiptsIdle() } ) - err := d.fetchParts(from, errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, + err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, d.queue.PendingReceipts, d.queue.ThrottleReceipts, d.queue.ReserveReceipts, d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt") @@ -1293,10 +1225,46 @@ func (d *Downloader) fetchReceipts(from uint64) error { return err } +// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any +// available peers, reserving a chunk of nodes for each, waiting for delivery and +// also periodically checking for timeouts. +func (d *Downloader) fetchNodeData() error { + glog.V(logger.Debug).Infof("Downloading node state data") + + var ( + deliver = func(packet dataPack) error { + start := time.Now() + done, found, err := d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states) + + d.syncStatsLock.Lock() + totalDone, totalKnown := d.syncStatsStateDone+uint64(done), d.syncStatsStateTotal+uint64(found) + d.syncStatsStateDone, d.syncStatsStateTotal = totalDone, totalKnown + d.syncStatsLock.Unlock() + + glog.V(logger.Info).Infof("imported %d [%d / %d] state entries in %v.", done, totalDone, totalKnown, time.Since(start)) + return err + } + expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) } + throttle = func() bool { return false } + reserve = func(p *peer, count int) (*fetchRequest, bool, error) { + return d.queue.ReserveNodeData(p, count), false, nil + } + fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) } + capacity = func(p *peer) int { return p.NodeDataCapacity() } + setIdle = func(p *peer) { p.SetNodeDataIdle() } + ) + err := d.fetchParts(errCancelReceiptFetch, d.stateCh, deliver, d.stateWakeCh, expire, + d.queue.PendingNodeData, throttle, reserve, nil, fetch, d.queue.CancelNodeData, + capacity, d.peers.ReceiptIdlePeers, setIdle, "State") + + glog.V(logger.Debug).Infof("Node state data download terminated: %v", err) + return err +} + // fetchParts iteratively downloads scheduled block parts, taking any available // peers, reserving a chunk of fetch requests for each, waiting for delivery and // also periodically checking for timeouts. -func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan dataPack, deliver func(packet interface{}) error, wakeCh chan bool, +func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool, expire func() []string, pending func() int, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, idle func() ([]*peer, int), setIdle func(*peer), kind string) error { @@ -1327,7 +1295,7 @@ func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan da switch err := deliver(packet); err { case nil: // If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!) - if packet.Empty() { + if packet.Items() == 0 { peer.Demote() setIdle(peer) glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind)) @@ -1441,7 +1409,11 @@ func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan da continue } if glog.V(logger.Detail) { - glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number) + if len(request.Headers) > 0 { + glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number) + } else { + glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind)) + } } // Fetch the chunk and make sure any errors return the hashes to the queue if fetchHook != nil { @@ -1528,7 +1500,9 @@ func (d *Downloader) process() { blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) case d.mode == FastSync: blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) - receipts = append(receipts, result.Receipts) + if result.Header.Number.Uint64() <= d.queue.fastSyncPivot { + receipts = append(receipts, result.Receipts) + } case d.mode == LightSync: headers = append(headers, result.Header) } @@ -1539,12 +1513,16 @@ func (d *Downloader) process() { index int ) switch { - case d.mode == FullSync: - index, err = d.insertBlocks(blocks) - case d.mode == FastSync: - index, err = d.insertReceipts(blocks, receipts) - case d.mode == LightSync: + case len(headers) > 0: index, err = d.insertHeaders(headers, true) + + case len(receipts) > 0: + index, err = d.insertReceipts(blocks, receipts) + if err == nil && blocks[len(blocks)-1].NumberU64() == d.queue.fastSyncPivot { + err = d.commitHeadBlock(blocks[len(blocks)-1].Hash()) + } + default: + index, err = d.insertBlocks(blocks) } if err != nil { glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err) @@ -1557,125 +1535,47 @@ func (d *Downloader) process() { } } -// DeliverHashes61 injects a new batch of hashes received from a remote node into +// DeliverHashes injects a new batch of hashes received from a remote node into // the download schedule. This is usually invoked through the BlockHashesMsg by // the protocol handler. -func (d *Downloader) DeliverHashes61(id string, hashes []common.Hash) (err error) { - // Update the delivery metrics for both good and failed deliveries - hashInMeter.Mark(int64(len(hashes))) - defer func() { - if err != nil { - hashDropMeter.Mark(int64(len(hashes))) - } - }() - // Make sure the downloader is active - if atomic.LoadInt32(&d.synchronising) == 0 { - return errNoSyncActive - } - // Deliver or abort if the sync is canceled while queuing - d.cancelLock.RLock() - cancel := d.cancelCh - d.cancelLock.RUnlock() - - select { - case d.hashCh <- hashPack{id, hashes}: - return nil - - case <-cancel: - return errNoSyncActive - } +func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) (err error) { + return d.deliver(id, d.hashCh, &hashPack{id, hashes}, hashInMeter, hashDropMeter) } -// DeliverBlocks61 injects a new batch of blocks received from a remote node. +// DeliverBlocks injects a new batch of blocks received from a remote node. // This is usually invoked through the BlocksMsg by the protocol handler. -func (d *Downloader) DeliverBlocks61(id string, blocks []*types.Block) (err error) { - // Update the delivery metrics for both good and failed deliveries - blockInMeter.Mark(int64(len(blocks))) - defer func() { - if err != nil { - blockDropMeter.Mark(int64(len(blocks))) - } - }() - // Make sure the downloader is active - if atomic.LoadInt32(&d.synchronising) == 0 { - return errNoSyncActive - } - // Deliver or abort if the sync is canceled while queuing - d.cancelLock.RLock() - cancel := d.cancelCh - d.cancelLock.RUnlock() - - select { - case d.blockCh <- blockPack{id, blocks}: - return nil - - case <-cancel: - return errNoSyncActive - } +func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) (err error) { + return d.deliver(id, d.blockCh, &blockPack{id, blocks}, blockInMeter, blockDropMeter) } // DeliverHeaders injects a new batch of blck headers received from a remote // node into the download schedule. func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) { - // Update the delivery metrics for both good and failed deliveries - headerInMeter.Mark(int64(len(headers))) - defer func() { - if err != nil { - headerDropMeter.Mark(int64(len(headers))) - } - }() - // Make sure the downloader is active - if atomic.LoadInt32(&d.synchronising) == 0 { - return errNoSyncActive - } - // Deliver or abort if the sync is canceled while queuing - d.cancelLock.RLock() - cancel := d.cancelCh - d.cancelLock.RUnlock() - - select { - case d.headerCh <- headerPack{id, headers}: - return nil - - case <-cancel: - return errNoSyncActive - } + return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter) } // DeliverBodies injects a new batch of block bodies received from a remote node. func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) { - // Update the delivery metrics for both good and failed deliveries - bodyInMeter.Mark(int64(len(transactions))) - defer func() { - if err != nil { - bodyDropMeter.Mark(int64(len(transactions))) - } - }() - // Make sure the downloader is active - if atomic.LoadInt32(&d.synchronising) == 0 { - return errNoSyncActive - } - // Deliver or abort if the sync is canceled while queuing - d.cancelLock.RLock() - cancel := d.cancelCh - d.cancelLock.RUnlock() - - select { - case d.bodyCh <- &bodyPack{id, transactions, uncles}: - return nil - - case <-cancel: - return errNoSyncActive - } + return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter) } // DeliverReceipts injects a new batch of receipts received from a remote node. func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) { + return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter) +} + +// DeliverNodeData injects a new batch of node state data received from a remote node. +func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) { + return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter) +} + +// deliver injects a new batch of data received from a remote node. +func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) { // Update the delivery metrics for both good and failed deliveries - receiptInMeter.Mark(int64(len(receipts))) + inMeter.Mark(int64(packet.Items())) defer func() { if err != nil { - receiptDropMeter.Mark(int64(len(receipts))) + dropMeter.Mark(int64(packet.Items())) } }() // Make sure the downloader is active @@ -1688,7 +1588,7 @@ func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (er d.cancelLock.RUnlock() select { - case d.receiptCh <- &receiptPack{id, receipts}: + case destCh <- packet: return nil case <-cancel: |