diff options
Diffstat (limited to 'eth/downloader')
-rw-r--r-- | eth/downloader/downloader.go | 667 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 739 | ||||
-rw-r--r-- | eth/downloader/metrics.go | 5 | ||||
-rw-r--r-- | eth/downloader/modes.go | 26 | ||||
-rw-r--r-- | eth/downloader/peer.go | 192 | ||||
-rw-r--r-- | eth/downloader/queue.go | 534 |
6 files changed, 1388 insertions, 775 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 64fb1b57b..7ae7aa221 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -19,8 +19,10 @@ package downloader import ( "errors" + "fmt" "math" "math/big" + "strings" "sync" "sync/atomic" "time" @@ -32,70 +34,96 @@ import ( "github.com/ethereum/go-ethereum/logger/glog" ) -const ( - eth61 = 61 // Constant to check for old protocol support - eth62 = 62 // Constant to check for new protocol support -) - var ( - MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request - MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request - MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request - MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request - MaxStateFetch = 384 // Amount of node state values to allow fetching per request - MaxReceiptsFetch = 384 // Amount of transaction receipts to allow fetching per request - - hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out - blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth - blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired - headerTTL = 5 * time.Second // [eth/62] Time it takes for a header request to time out - bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth - bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired - - maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) - maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) - maxBlockProcess = 256 // Number of blocks to import at once into the chain + MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request + MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request + MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request + MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request + MaxStateFetch = 384 // Amount of node state values to allow fetching per request + MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request + + hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out + blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth + blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired + headerTTL = 5 * time.Second // [eth/62] Time it takes for a header request to time out + bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth + bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired + receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth + receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a block body request is considered expired + + maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) + maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) + maxResultsProcess = 256 // Number of download results to import at once into the chain + + headerCheckFrequency = 64 // Verification frequency of the downloaded headers during fast sync + minCheckedHeaders = 1024 // Number of headers to verify fully when approaching the chain head + minFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync ) var ( - errBusy = errors.New("busy") - errUnknownPeer = errors.New("peer is unknown or unhealthy") - errBadPeer = errors.New("action from bad peer ignored") - errStallingPeer = errors.New("peer is stalling") - errNoPeers = errors.New("no peers to keep download active") - errPendingQueue = errors.New("pending items in queue") - errTimeout = errors.New("timeout") - errEmptyHashSet = errors.New("empty hash set by peer") - errEmptyHeaderSet = errors.New("empty header set by peer") - errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") - errAlreadyInPool = errors.New("hash already in pool") - errInvalidChain = errors.New("retrieved hash chain is invalid") - errInvalidBody = errors.New("retrieved block body is invalid") - errCancelHashFetch = errors.New("hash fetching canceled (requested)") - errCancelBlockFetch = errors.New("block downloading canceled (requested)") - errCancelHeaderFetch = errors.New("block header fetching canceled (requested)") - errCancelBodyFetch = errors.New("block body downloading canceled (requested)") - errNoSyncActive = errors.New("no sync active") + errBusy = errors.New("busy") + errUnknownPeer = errors.New("peer is unknown or unhealthy") + errBadPeer = errors.New("action from bad peer ignored") + errStallingPeer = errors.New("peer is stalling") + errNoPeers = errors.New("no peers to keep download active") + errPendingQueue = errors.New("pending items in queue") + errTimeout = errors.New("timeout") + errEmptyHashSet = errors.New("empty hash set by peer") + errEmptyHeaderSet = errors.New("empty header set by peer") + errPeersUnavailable = errors.New("no peers available or all tried for download") + errAlreadyInPool = errors.New("hash already in pool") + errInvalidChain = errors.New("retrieved hash chain is invalid") + errInvalidBlock = errors.New("retrieved block is invalid") + errInvalidBody = errors.New("retrieved block body is invalid") + errInvalidReceipt = errors.New("retrieved receipt is invalid") + errCancelHashFetch = errors.New("hash download canceled (requested)") + errCancelBlockFetch = errors.New("block download canceled (requested)") + errCancelHeaderFetch = errors.New("block header download canceled (requested)") + errCancelBodyFetch = errors.New("block body download canceled (requested)") + errCancelReceiptFetch = errors.New("receipt download canceled (requested)") + errNoSyncActive = errors.New("no sync active") ) -// hashCheckFn is a callback type for verifying a hash's presence in the local chain. -type hashCheckFn func(common.Hash) bool +// headerCheckFn is a callback type for verifying a header's presence in the local chain. +type headerCheckFn func(common.Hash) bool + +// blockCheckFn is a callback type for verifying a block's presence in the local chain. +type blockCheckFn func(common.Hash) bool + +// headerRetrievalFn is a callback type for retrieving a header from the local chain. +type headerRetrievalFn func(common.Hash) *types.Header // blockRetrievalFn is a callback type for retrieving a block from the local chain. type blockRetrievalFn func(common.Hash) *types.Block -// headRetrievalFn is a callback type for retrieving the head block from the local chain. -type headRetrievalFn func() *types.Block +// headHeaderRetrievalFn is a callback type for retrieving the head header from the local chain. +type headHeaderRetrievalFn func() *types.Header + +// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain. +type headBlockRetrievalFn func() *types.Block // tdRetrievalFn is a callback type for retrieving the total difficulty of a local block. type tdRetrievalFn func(common.Hash) *big.Int -// chainInsertFn is a callback type to insert a batch of blocks into the local chain. -type chainInsertFn func(types.Blocks) (int, error) +// headerChainInsertFn is a callback type to insert a batch of headers into the local chain. +type headerChainInsertFn func([]*types.Header, bool) (int, error) + +// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain. +type blockChainInsertFn func(types.Blocks) (int, error) + +// receiptChainInsertFn is a callback type to insert a batch of receipts into the local chain. +type receiptChainInsertFn func(types.Blocks, []types.Receipts) (int, error) // peerDropFn is a callback type for dropping a peer detected as malicious. type peerDropFn func(id string) +// dataPack is a data message returned by a peer for some query. +type dataPack interface { + PeerId() string + Empty() bool + Stats() string +} + // hashPack is a batch of block hashes returned by a peer (eth/61). type hashPack struct { peerId string @@ -121,8 +149,33 @@ type bodyPack struct { uncles [][]*types.Header } +// PeerId retrieves the origin peer who sent this block body packet. +func (p *bodyPack) PeerId() string { return p.peerId } + +// Empty returns whether the no block bodies were delivered. +func (p *bodyPack) Empty() bool { return len(p.transactions) == 0 || len(p.uncles) == 0 } + +// Stats creates a textual stats report for logging purposes. +func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) } + +// receiptPack is a batch of receipts returned by a peer. +type receiptPack struct { + peerId string + receipts [][]*types.Receipt +} + +// PeerId retrieves the origin peer who sent this receipt packet. +func (p *receiptPack) PeerId() string { return p.peerId } + +// Empty returns whether the no receipts were delivered. +func (p *receiptPack) Empty() bool { return len(p.receipts) == 0 } + +// Stats creates a textual stats report for logging purposes. +func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) } + type Downloader struct { - mux *event.TypeMux + mode SyncMode // Synchronisation mode defining the strategies used + mux *event.TypeMux // Event multiplexer to announce sync operation events queue *queue // Scheduler for selecting the hashes to download peers *peerSet // Set of active peers from which download can proceed @@ -135,12 +188,17 @@ type Downloader struct { syncStatsLock sync.RWMutex // Lock protecting the sync stats fields // Callbacks - hasBlock hashCheckFn // Checks if a block is present in the chain - getBlock blockRetrievalFn // Retrieves a block from the chain - headBlock headRetrievalFn // Retrieves the head block from the chain - getTd tdRetrievalFn // Retrieves the TD of a block from the chain - insertChain chainInsertFn // Injects a batch of blocks into the chain - dropPeer peerDropFn // Drops a peer for misbehaving + hasHeader headerCheckFn // Checks if a header is present in the chain + hasBlock blockCheckFn // Checks if a block is present in the chain + getHeader headerRetrievalFn // Retrieves a header from the chain + getBlock blockRetrievalFn // Retrieves a block from the chain + headHeader headHeaderRetrievalFn // Retrieves the head header from the chain + headBlock headBlockRetrievalFn // Retrieves the head block from the chain + getTd tdRetrievalFn // Retrieves the TD of a block from the chain + insertHeaders headerChainInsertFn // Injects a batch of headers into the chain + insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain + insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain + dropPeer peerDropFn // Drops a peer for misbehaving // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing @@ -149,46 +207,56 @@ type Downloader struct { notified int32 // Channels - newPeerCh chan *peer - hashCh chan hashPack // [eth/61] Channel receiving inbound hashes - blockCh chan blockPack // [eth/61] Channel receiving inbound blocks - headerCh chan headerPack // [eth/62] Channel receiving inbound block headers - bodyCh chan bodyPack // [eth/62] Channel receiving inbound block bodies - wakeCh chan bool // Channel to signal the block/body fetcher of new tasks + newPeerCh chan *peer + hashCh chan hashPack // [eth/61] Channel receiving inbound hashes + blockCh chan blockPack // [eth/61] Channel receiving inbound blocks + headerCh chan headerPack // [eth/62] Channel receiving inbound block headers + bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies + receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts + blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks + bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks + receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers // Testing hooks - syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run - bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch - chainInsertHook func([]*Block) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) -} - -// Block is an origin-tagged blockchain block. -type Block struct { - RawBlock *types.Block - OriginPeer string + syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run + bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch + receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch + chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, headBlock headRetrievalFn, getTd tdRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader { +func New(mode SyncMode, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn, getBlock blockRetrievalFn, + headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, + insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader { + return &Downloader{ - mux: mux, - queue: newQueue(), - peers: newPeerSet(), - hasBlock: hasBlock, - getBlock: getBlock, - headBlock: headBlock, - getTd: getTd, - insertChain: insertChain, - dropPeer: dropPeer, - newPeerCh: make(chan *peer, 1), - hashCh: make(chan hashPack, 1), - blockCh: make(chan blockPack, 1), - headerCh: make(chan headerPack, 1), - bodyCh: make(chan bodyPack, 1), - wakeCh: make(chan bool, 1), + mode: mode, + mux: mux, + queue: newQueue(), + peers: newPeerSet(), + hasHeader: hasHeader, + hasBlock: hasBlock, + getHeader: getHeader, + getBlock: getBlock, + headHeader: headHeader, + headBlock: headBlock, + getTd: getTd, + insertHeaders: insertHeaders, + insertBlocks: insertBlocks, + insertReceipts: insertReceipts, + dropPeer: dropPeer, + newPeerCh: make(chan *peer, 1), + hashCh: make(chan hashPack, 1), + blockCh: make(chan blockPack, 1), + headerCh: make(chan headerPack, 1), + bodyCh: make(chan dataPack, 1), + receiptCh: make(chan dataPack, 1), + blockWakeCh: make(chan bool, 1), + bodyWakeCh: make(chan bool, 1), + receiptWakeCh: make(chan bool, 1), } } @@ -211,10 +279,10 @@ func (d *Downloader) Synchronising() bool { // used for fetching hashes and blocks from. func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading - getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn) error { + getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getReceipts receiptFetcherFn) error { glog.V(logger.Detail).Infoln("Registering peer", id) - if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies)); err != nil { + if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts)); err != nil { glog.V(logger.Error).Infoln("Register failed:", err) return err } @@ -222,13 +290,15 @@ func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, } // UnregisterPeer remove a peer from the known list, preventing any action from -// the specified peer. +// the specified peer. An effort is also made to return any pending fetches into +// the queue. func (d *Downloader) UnregisterPeer(id string) error { glog.V(logger.Detail).Infoln("Unregistering peer", id) if err := d.peers.Unregister(id); err != nil { glog.V(logger.Error).Infoln("Unregister failed:", err) return err } + d.queue.Revoke(id) return nil } @@ -275,16 +345,18 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error glog.V(logger.Info).Infoln("Block synchronisation started") } // Abort if the queue still contains some leftover data - if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { + if d.queue.GetHeadResult() != nil { return errPendingQueue } - // Reset the queue and peer set to clean any internal leftover state + // Reset the queue, peer set and wake channels to clean any internal leftover state d.queue.Reset() d.peers.Reset() - select { - case <-d.wakeCh: - default: + for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh} { + select { + case <-ch: + default: + } } // Create cancel channel for aborting mid-flight d.cancelLock.Lock() @@ -299,12 +371,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error return d.syncWithPeer(p, hash, td) } +/* // Has checks if the downloader knows about a particular hash, meaning that its // either already downloaded of pending retrieval. func (d *Downloader) Has(hash common.Hash) bool { return d.queue.Has(hash) } - +*/ // syncWithPeer starts a block synchronization based on the hash chain from the // specified peer and head hash. func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) { @@ -323,7 +396,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e defer glog.V(logger.Debug).Infof("Synchronisation terminated") switch { - case p.version == eth61: + case p.version == 61: // Look up the sync boundaries: the common ancestor and the target block latest, err := d.fetchHeight61(p) if err != nil { @@ -344,6 +417,8 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e if d.syncInitHook != nil { d.syncInitHook(origin, latest) } + d.queue.Prepare(origin+1, 1) + errc := make(chan error, 2) go func() { errc <- d.fetchHashes61(p, td, origin+1) }() go func() { errc <- d.fetchBlocks61(origin + 1) }() @@ -356,7 +431,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e } return <-errc - case p.version >= eth62: + case p.version >= 62: // Look up the sync boundaries: the common ancestor and the target block latest, err := d.fetchHeight(p) if err != nil { @@ -373,21 +448,32 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e d.syncStatsHeight = latest d.syncStatsLock.Unlock() - // Initiate the sync using a concurrent hash and block retrieval algorithm + // Initiate the sync using a concurrent header and content retrieval algorithm + parts := 1 + if d.mode == FastSync { + parts = 2 // receipts are fetched too + } + d.queue.Prepare(origin+1, parts) + if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - errc := make(chan error, 2) - go func() { errc <- d.fetchHeaders(p, td, origin+1) }() - go func() { errc <- d.fetchBodies(origin + 1) }() - - // If any fetcher fails, cancel the other - if err := <-errc; err != nil { - d.cancel() - <-errc - return err + errc := make(chan error, 3) + go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved + go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync + go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal sync + + // If any fetcher fails, cancel the others + var fail error + for i := 0; i < cap(errc); i++ { + if err := <-errc; err != nil { + if fail == nil { + fail = err + d.cancel() + } + } } - return <-errc + return fail default: // Something very wrong, stop right here @@ -637,7 +723,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { glog.V(logger.Debug).Infof("%v: no available hashes", p) select { - case d.wakeCh <- false: + case d.blockWakeCh <- false: case <-d.cancelCh: } // If no hashes were retrieved at all, the peer violated it's TD promise that it had a @@ -660,24 +746,24 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { gotHashes = true // Otherwise insert all the new hashes, aborting in case of junk - glog.V(logger.Detail).Infof("%v: inserting %d hashes from #%d", p, len(hashPack.hashes), from) + glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashPack.hashes), from) - inserts := d.queue.Insert61(hashPack.hashes, true) + inserts := d.queue.Schedule61(hashPack.hashes, true) if len(inserts) != len(hashPack.hashes) { glog.V(logger.Debug).Infof("%v: stale hashes", p) return errBadPeer } // Notify the block fetcher of new hashes, but stop if queue is full - if d.queue.Pending() < maxQueuedHashes { + if d.queue.PendingBlocks() < maxQueuedHashes { // We still have hashes to fetch, send continuation wake signal (potential) select { - case d.wakeCh <- true: + case d.blockWakeCh <- true: default: } } else { // Hash limit reached, send a termination wake signal (enforced) select { - case d.wakeCh <- false: + case d.blockWakeCh <- false: case <-d.cancelCh: } return nil @@ -707,10 +793,8 @@ func (d *Downloader) fetchBlocks61(from uint64) error { update := make(chan struct{}, 1) - // Prepare the queue and fetch blocks until the hash fetcher's done - d.queue.Prepare(from) + // Fetch blocks until the hash fetcher's done finished := false - for { select { case <-d.cancelCh: @@ -733,13 +817,13 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // If no blocks were delivered, demote the peer (need the delivery above) if len(blockPack.blocks) == 0 { peer.Demote() - peer.SetIdle61() + peer.SetBlocksIdle() glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) break } // All was successful, promote the peer and potentially start processing peer.Promote() - peer.SetIdle61() + peer.SetBlocksIdle() glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) go d.process() @@ -751,7 +835,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // Peer probably timed out with its delivery but came through // in the end, demote, but allow to to pull from this peer. peer.Demote() - peer.SetIdle61() + peer.SetBlocksIdle() glog.V(logger.Detail).Infof("%s: out of bound delivery", peer) case errStaleDelivery: @@ -765,7 +849,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { default: // Peer did something semi-useful, demote but keep it around peer.Demote() - peer.SetIdle61() + peer.SetBlocksIdle() glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) go d.process() } @@ -776,7 +860,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { default: } - case cont := <-d.wakeCh: + case cont := <-d.blockWakeCh: // The hash fetcher sent a continuation flag, check if it's done if !cont { finished = true @@ -800,14 +884,14 @@ func (d *Downloader) fetchBlocks61(from uint64) error { return errNoPeers } // Check for block request timeouts and demote the responsible peers - for _, pid := range d.queue.Expire(blockHardTTL) { + for _, pid := range d.queue.Expire61(blockHardTTL) { if peer := d.peers.Peer(pid); peer != nil { peer.Demote() glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) } } - // If there's noting more to fetch, wait or terminate - if d.queue.Pending() == 0 { + // If there's nothing more to fetch, wait or terminate + if d.queue.PendingBlocks() == 0 { if d.queue.InFlight() == 0 && finished { glog.V(logger.Debug).Infof("Block fetching completed") return nil @@ -816,16 +900,18 @@ func (d *Downloader) fetchBlocks61(from uint64) error { } // Send a download request to all idle peers, until throttled throttled := false - for _, peer := range d.peers.IdlePeers(eth61) { + idles, total := d.peers.BlockIdlePeers(61) + + for _, peer := range idles { // Short circuit if throttling activated - if d.queue.Throttle() { + if d.queue.ThrottleBlocks() { throttled = true break } // Reserve a chunk of hashes for a peer. A nil can mean either that // no more hashes are available, or that the peer is known not to // have them. - request := d.queue.Reserve61(peer, peer.Capacity()) + request := d.queue.Reserve61(peer, peer.BlockCapacity()) if request == nil { continue } @@ -835,12 +921,12 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // Fetch the chunk and make sure any errors return the hashes to the queue if err := peer.Fetch61(request); err != nil { glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer) - d.queue.Cancel(request) + d.queue.Cancel61(request) } } // Make sure that we have peers available for fetching. If all peers have been tried // and all failed throw an error - if !throttled && d.queue.InFlight() == 0 { + if !throttled && d.queue.InFlight() == 0 && len(idles) == total { return errPeersUnavailable } } @@ -891,16 +977,19 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) { } } -// findAncestor tries to locate the common ancestor block of the local chain and +// findAncestor tries to locate the common ancestor link of the local chain and // a remote peers blockchain. In the general case when our node was in sync and -// on the correct chain, checking the top N blocks should already get us a match. +// on the correct chain, checking the top N links should already get us a match. // In the rare scenario when we ended up on a long reorganization (i.e. none of -// the head blocks match), we do a binary search to find the common ancestor. +// the head links match), we do a binary search to find the common ancestor. func (d *Downloader) findAncestor(p *peer) (uint64, error) { glog.V(logger.Debug).Infof("%v: looking for common ancestor", p) - // Request our head blocks to short circuit ancestor location - head := d.headBlock().NumberU64() + // Request our head headers to short circuit ancestor location + head := d.headHeader().Number.Uint64() + if d.mode == FullSync { + head = d.headBlock().NumberU64() + } from := int64(head) - int64(MaxHeaderFetch) + 1 if from < 0 { from = 0 @@ -931,7 +1020,7 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { // Check if a common ancestor was found finished = true for i := len(headers) - 1; i >= 0; i-- { - if d.hasBlock(headers[i].Hash()) { + if (d.mode == FullSync && d.hasBlock(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) { number, hash = headers[i].Number.Uint64(), headers[i].Hash() break } @@ -986,13 +1075,13 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { arrived = true // Modify the search interval based on the response - block := d.getBlock(headers[0].Hash()) - if block == nil { + if (d.mode == FullSync && !d.hasBlock(headers[0].Hash())) || (d.mode != FullSync && !d.hasHeader(headers[0].Hash())) { end = check break } - if block.NumberU64() != check { - glog.V(logger.Debug).Infof("%v: non requested header #%d [%x…], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check) + header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists + if header.Number.Uint64() != check { + glog.V(logger.Debug).Infof("%v: non requested header #%d [%x…], instead of #%d", p, header.Number, header.Hash().Bytes()[:4], check) return 0, errBadPeer } start = check @@ -1017,6 +1106,9 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { // fetchHeaders keeps retrieving headers from the requested number, until no more // are returned, potentially throttling on the way. +// +// The queue parameter can be used to switch between queuing headers for block +// body download too, or directly import as pure header chains. func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { glog.V(logger.Debug).Infof("%v: downloading headers from #%d", p, from) defer glog.V(logger.Debug).Infof("%v: header download terminated", p) @@ -1058,13 +1150,15 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { headerReqTimer.UpdateSince(request) timeout.Stop() - // If no more headers are inbound, notify the body fetcher and return + // If no more headers are inbound, notify the content fetchers and return if len(headerPack.headers) == 0 { glog.V(logger.Debug).Infof("%v: no available headers", p) - select { - case d.wakeCh <- false: - case <-d.cancelCh: + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + select { + case ch <- false: + case <-d.cancelCh: + } } // If no headers were retrieved at all, the peer violated it's TD promise that it had a // better chain compared to ours. The only exception is if it's promised blocks were @@ -1086,27 +1180,37 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { gotHeaders = true // Otherwise insert all the new headers, aborting in case of junk - glog.V(logger.Detail).Infof("%v: inserting %d headers from #%d", p, len(headerPack.headers), from) + glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headerPack.headers), from) - inserts := d.queue.Insert(headerPack.headers, from) - if len(inserts) != len(headerPack.headers) { - glog.V(logger.Debug).Infof("%v: stale headers", p) - return errBadPeer - } - // Notify the block fetcher of new headers, but stop if queue is full - if d.queue.Pending() < maxQueuedHeaders { - // We still have headers to fetch, send continuation wake signal (potential) - select { - case d.wakeCh <- true: - default: + if d.mode == FullSync || d.mode == FastSync { + inserts := d.queue.Schedule(headerPack.headers, from, d.mode == FastSync) + if len(inserts) != len(headerPack.headers) { + glog.V(logger.Debug).Infof("%v: stale headers", p) + return errBadPeer } } else { - // Header limit reached, send a termination wake signal (enforced) - select { - case d.wakeCh <- false: - case <-d.cancelCh: + if n, err := d.insertHeaders(headerPack.headers, true); err != nil { + glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headerPack.headers[n].Number, headerPack.headers[n].Hash().Bytes()[:4], err) + return errInvalidChain + } + } + // Notify the content fetchers of new headers, but stop if queue is full + cont := d.queue.PendingBlocks() < maxQueuedHeaders || d.queue.PendingReceipts() < maxQueuedHeaders + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + if cont { + // We still have headers to fetch, send continuation wake signal (potential) + select { + case ch <- true: + default: + } + } else { + // Header limit reached, send a termination wake signal (enforced) + select { + case ch <- false: + case <-d.cancelCh: + } + return nil } - return nil } // Queue not yet full, fetch the next batch from += uint64(len(headerPack.headers)) @@ -1119,9 +1223,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { d.dropPeer(p.id) // Finish the sync gracefully instead of dumping the gathered data though - select { - case d.wakeCh <- false: - case <-d.cancelCh: + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + select { + case ch <- false: + case <-d.cancelCh: + } } return nil } @@ -1133,22 +1239,69 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { // and also periodically checking for timeouts. func (d *Downloader) fetchBodies(from uint64) error { glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from) - defer glog.V(logger.Debug).Infof("Block body download terminated") - // Create a timeout timer for scheduling expiration tasks + var ( + deliver = func(packet interface{}) error { + pack := packet.(*bodyPack) + return d.queue.DeliverBlocks(pack.peerId, pack.transactions, pack.uncles) + } + expire = func() []string { return d.queue.ExpireBlocks(bodyHardTTL) } + fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) } + capacity = func(p *peer) int { return p.BlockCapacity() } + getIdles = func() ([]*peer, int) { return d.peers.BlockIdlePeers(62) } + setIdle = func(p *peer) { p.SetBlocksIdle() } + ) + err := d.fetchParts(from, errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, + d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBlocks, d.bodyFetchHook, + fetch, d.queue.CancelBlocks, capacity, getIdles, setIdle, "Body") + + glog.V(logger.Debug).Infof("Block body download terminated: %v", err) + return err +} + +// fetchReceipts iteratively downloads the scheduled block receipts, taking any +// available peers, reserving a chunk of receipts for each, waiting for delivery +// and also periodically checking for timeouts. +func (d *Downloader) fetchReceipts(from uint64) error { + glog.V(logger.Debug).Infof("Downloading receipts from #%d", from) + + var ( + deliver = func(packet interface{}) error { + pack := packet.(*receiptPack) + return d.queue.DeliverReceipts(pack.peerId, pack.receipts) + } + expire = func() []string { return d.queue.ExpireReceipts(bodyHardTTL) } + fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) } + capacity = func(p *peer) int { return p.ReceiptCapacity() } + setIdle = func(p *peer) { p.SetReceiptsIdle() } + ) + err := d.fetchParts(from, errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, + d.queue.PendingReceipts, d.queue.ThrottleReceipts, d.queue.ReserveReceipts, d.receiptFetchHook, + fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt") + + glog.V(logger.Debug).Infof("Receipt download terminated: %v", err) + return err +} + +// fetchParts iteratively downloads scheduled block parts, taking any available +// peers, reserving a chunk of fetch requests for each, waiting for delivery and +// also periodically checking for timeouts. +func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan dataPack, deliver func(packet interface{}) error, wakeCh chan bool, + expire func() []string, pending func() int, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header), + fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, idle func() ([]*peer, int), setIdle func(*peer), kind string) error { + + // Create a ticker to detect expired retreival tasks ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() update := make(chan struct{}, 1) - // Prepare the queue and fetch block bodies until the block header fetcher's done - d.queue.Prepare(from) + // Prepare the queue and fetch block parts until the block header fetcher's done finished := false - for { select { case <-d.cancelCh: - return errCancelBlockFetch + return errCancel case <-d.hashCh: // Out of bounds eth/61 hashes received, ignore them @@ -1156,42 +1309,41 @@ func (d *Downloader) fetchBodies(from uint64) error { case <-d.blockCh: // Out of bounds eth/61 blocks received, ignore them - case bodyPack := <-d.bodyCh: + case packet := <-deliveryCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. - if peer := d.peers.Peer(bodyPack.peerId); peer != nil { - // Deliver the received chunk of bodies, and demote in case of errors - err := d.queue.Deliver(bodyPack.peerId, bodyPack.transactions, bodyPack.uncles) - switch err { + if peer := d.peers.Peer(packet.PeerId()); peer != nil { + // Deliver the received chunk of data, and demote in case of errors + switch err := deliver(packet); err { case nil: - // If no blocks were delivered, demote the peer (need the delivery above) - if len(bodyPack.transactions) == 0 || len(bodyPack.uncles) == 0 { + // If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!) + if packet.Empty() { peer.Demote() - peer.SetIdle() - glog.V(logger.Detail).Infof("%s: no block bodies delivered", peer) + setIdle(peer) + glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind)) break } // All was successful, promote the peer and potentially start processing peer.Promote() - peer.SetIdle() - glog.V(logger.Detail).Infof("%s: delivered %d:%d block bodies", peer, len(bodyPack.transactions), len(bodyPack.uncles)) + setIdle(peer) + glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind)) go d.process() case errInvalidChain: // The hash chain is invalid (blocks are not ordered properly), abort return err - case errInvalidBody: + case errInvalidBody, errInvalidReceipt: // The peer delivered something very bad, drop immediately - glog.V(logger.Error).Infof("%s: delivered invalid block, dropping", peer) + glog.V(logger.Error).Infof("%s: delivered invalid %s, dropping", peer, strings.ToLower(kind)) d.dropPeer(peer.id) case errNoFetchesPending: // Peer probably timed out with its delivery but came through // in the end, demote, but allow to to pull from this peer. peer.Demote() - peer.SetIdle() - glog.V(logger.Detail).Infof("%s: out of bound delivery", peer) + setIdle(peer) + glog.V(logger.Detail).Infof("%s: out of bound %s delivery", peer, strings.ToLower(kind)) case errStaleDelivery: // Delivered something completely else than requested, usually @@ -1199,13 +1351,13 @@ func (d *Downloader) fetchBodies(from uint64) error { // Don't set it to idle as the original request should still be // in flight. peer.Demote() - glog.V(logger.Detail).Infof("%s: stale delivery", peer) + glog.V(logger.Detail).Infof("%s: %s stale delivery", peer, strings.ToLower(kind)) default: // Peer did something semi-useful, demote but keep it around peer.Demote() - peer.SetIdle() - glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) + setIdle(peer) + glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err) go d.process() } } @@ -1215,7 +1367,7 @@ func (d *Downloader) fetchBodies(from uint64) error { default: } - case cont := <-d.wakeCh: + case cont := <-wakeCh: // The header fetcher sent a continuation flag, check if it's done if !cont { finished = true @@ -1238,65 +1390,69 @@ func (d *Downloader) fetchBodies(from uint64) error { if d.peers.Len() == 0 { return errNoPeers } - // Check for block body request timeouts and demote the responsible peers - for _, pid := range d.queue.Expire(bodyHardTTL) { + // Check for fetch request timeouts and demote the responsible peers + for _, pid := range expire() { if peer := d.peers.Peer(pid); peer != nil { peer.Demote() - glog.V(logger.Detail).Infof("%s: block body delivery timeout", peer) + glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind)) } } - // If there's noting more to fetch, wait or terminate - if d.queue.Pending() == 0 { + // If there's nothing more to fetch, wait or terminate + if pending() == 0 { if d.queue.InFlight() == 0 && finished { - glog.V(logger.Debug).Infof("Block body fetching completed") + glog.V(logger.Debug).Infof("%s fetching completed", kind) return nil } break } // Send a download request to all idle peers, until throttled - queuedEmptyBlocks, throttled := false, false - for _, peer := range d.peers.IdlePeers(eth62) { + progressed, throttled := false, false + idles, total := idle() + + for _, peer := range idles { // Short circuit if throttling activated - if d.queue.Throttle() { + if throttle() { throttled = true break } - // Reserve a chunk of hashes for a peer. A nil can mean either that - // no more hashes are available, or that the peer is known not to + // Reserve a chunk of fetches for a peer. A nil can mean either that + // no more headers are available, or that the peer is known not to // have them. - request, process, err := d.queue.Reserve(peer, peer.Capacity()) + request, progress, err := reserve(peer, capacity(peer)) if err != nil { return err } - if process { - queuedEmptyBlocks = true + if progress { + progressed = true go d.process() } if request == nil { continue } if glog.V(logger.Detail) { - glog.Infof("%s: requesting %d block bodies", peer, len(request.Headers)) + glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number) } // Fetch the chunk and make sure any errors return the hashes to the queue - if d.bodyFetchHook != nil { - d.bodyFetchHook(request.Headers) + if fetchHook != nil { + fetchHook(request.Headers) } - if err := peer.Fetch(request); err != nil { - glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer) - d.queue.Cancel(request) + if err := fetch(peer, request); err != nil { + glog.V(logger.Error).Infof("%v: %s fetch failed, rescheduling", peer, strings.ToLower(kind)) + cancel(request) } } // Make sure that we have peers available for fetching. If all peers have been tried // and all failed throw an error - if !queuedEmptyBlocks && !throttled && d.queue.InFlight() == 0 { + if !progressed && !throttled && d.queue.InFlight() == 0 && len(idles) == total { return errPeersUnavailable } } } } -// process takes blocks from the queue and tries to import them into the chain. +// process takes fetch results from the queue and tries to import them into the +// chain. The type of import operation will depend on the result contents: +// - // // The algorithmic flow is as follows: // - The `processing` flag is swapped to 1 to ensure singleton access @@ -1317,10 +1473,10 @@ func (d *Downloader) process() { } // If the processor just exited, but there are freshly pending items, try to // reenter. This is needed because the goroutine spinned up for processing - // the fresh blocks might have been rejected entry to to this present thread + // the fresh results might have been rejected entry to to this present thread // not yet releasing the `processing` state. defer func() { - if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadBlock() != nil { + if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadResult() != nil { d.process() } }() @@ -1328,38 +1484,64 @@ func (d *Downloader) process() { // the import statistics to zero. defer atomic.StoreInt32(&d.processing, 0) - // Repeat the processing as long as there are blocks to import + // Repeat the processing as long as there are results to process for { - // Fetch the next batch of blocks - blocks := d.queue.TakeBlocks() - if len(blocks) == 0 { + // Fetch the next batch of results + results := d.queue.TakeResults() + if len(results) == 0 { return } if d.chainInsertHook != nil { - d.chainInsertHook(blocks) + d.chainInsertHook(results) } // Actually import the blocks - glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number()) - for len(blocks) != 0 { + if glog.V(logger.Debug) { + first, last := results[0].Header, results[len(results)-1].Header + glog.V(logger.Debug).Infof("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4]) + } + for len(results) != 0 { // Check for any termination requests if atomic.LoadInt32(&d.interrupt) == 1 { return } - // Retrieve the first batch of blocks to insert - max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess))) - raw := make(types.Blocks, 0, max) - for _, block := range blocks[:max] { - raw = append(raw, block.RawBlock) + // Retrieve the a batch of results to import + var ( + headers = make([]*types.Header, 0, maxResultsProcess) + blocks = make([]*types.Block, 0, maxResultsProcess) + receipts = make([]types.Receipts, 0, maxResultsProcess) + ) + items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) + for _, result := range results[:items] { + switch { + case d.mode == FullSync: + blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) + case d.mode == FastSync: + blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) + receipts = append(receipts, result.Receipts) + case d.mode == LightSync: + headers = append(headers, result.Header) + } + } + // Try to process the results, aborting if there's an error + var ( + err error + index int + ) + switch { + case d.mode == FullSync: + index, err = d.insertBlocks(blocks) + case d.mode == FastSync: + index, err = d.insertReceipts(blocks, receipts) + case d.mode == LightSync: + index, err = d.insertHeaders(headers, true) } - // Try to inset the blocks, drop the originating peer if there's an error - index, err := d.insertChain(raw) if err != nil { - glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err) - d.dropPeer(blocks[index].OriginPeer) + glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash(), err) d.cancel() return } - blocks = blocks[max:] + // Shift the results to the next batch + results = results[items:] } } } @@ -1468,7 +1650,34 @@ func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transactio d.cancelLock.RUnlock() select { - case d.bodyCh <- bodyPack{id, transactions, uncles}: + case d.bodyCh <- &bodyPack{id, transactions, uncles}: + return nil + + case <-cancel: + return errNoSyncActive + } +} + +// DeliverReceipts injects a new batch of receipts received from a remote node. +func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) { + // Update the delivery metrics for both good and failed deliveries + receiptInMeter.Mark(int64(len(receipts))) + defer func() { + if err != nil { + receiptDropMeter.Mark(int64(len(receipts))) + } + }() + // Make sure the downloader is active + if atomic.LoadInt32(&d.synchronising) == 0 { + return errNoSyncActive + } + // Deliver or abort if the sync is canceled while queuing + d.cancelLock.RLock() + cancel := d.cancelCh + d.cancelLock.RUnlock() + + select { + case d.receiptCh <- &receiptPack{id, receipts}: return nil case <-cancel: diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 96096527e..18bdb56dd 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -45,7 +45,8 @@ var ( // the returned hash chain is ordered head->parent. In addition, every 3rd block // contains a transaction and every 5th an uncle to allow testing correct block // reassembly. -func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) { +func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Block) { + // Generate the block chain blocks := core.GenerateChain(parent, testdb, n, func(i int, block *core.BlockGen) { block.SetCoinbase(common.Address{seed}) @@ -62,59 +63,80 @@ func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common block.AddUncle(&types.Header{ParentHash: block.PrevBlock(i - 1).Hash(), Number: big.NewInt(int64(i - 1))}) } }) + // Convert the block-chain into a hash-chain and header/block maps hashes := make([]common.Hash, n+1) hashes[len(hashes)-1] = parent.Hash() + + headerm := make(map[common.Hash]*types.Header, n+1) + headerm[parent.Hash()] = parent.Header() + blockm := make(map[common.Hash]*types.Block, n+1) blockm[parent.Hash()] = parent + for i, b := range blocks { hashes[len(hashes)-i-2] = b.Hash() + headerm[b.Hash()] = b.Header() blockm[b.Hash()] = b } - return hashes, blockm + return hashes, headerm, blockm } // makeChainFork creates two chains of length n, such that h1[:f] and // h2[:f] are different but have a common suffix of length n-f. -func makeChainFork(n, f int, parent *types.Block) (h1, h2 []common.Hash, b1, b2 map[common.Hash]*types.Block) { - // Create the common suffix. - h, b := makeChain(n-f, 0, parent) - // Create the forks. - h1, b1 = makeChain(f, 1, b[h[0]]) - h1 = append(h1, h[1:]...) - h2, b2 = makeChain(f, 2, b[h[0]]) - h2 = append(h2, h[1:]...) - for hash, block := range b { - b1[hash] = block - b2[hash] = block - } - return h1, h2, b1, b2 +func makeChainFork(n, f int, parent *types.Block) ([]common.Hash, []common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]*types.Block) { + // Create the common suffix + hashes, headers, blocks := makeChain(n-f, 0, parent) + + // Create the forks + hashes1, headers1, blocks1 := makeChain(f, 1, blocks[hashes[0]]) + hashes1 = append(hashes1, hashes[1:]...) + + hashes2, headers2, blocks2 := makeChain(f, 2, blocks[hashes[0]]) + hashes2 = append(hashes2, hashes[1:]...) + + for hash, header := range headers { + headers1[hash] = header + headers2[hash] = header + } + for hash, block := range blocks { + blocks1[hash] = block + blocks2[hash] = block + } + return hashes1, hashes2, headers1, headers2, blocks1, blocks2 } // downloadTester is a test simulator for mocking out local block chain. type downloadTester struct { downloader *Downloader - ownHashes []common.Hash // Hash chain belonging to the tester - ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester - ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain - peerHashes map[string][]common.Hash // Hash chain belonging to different test peers - peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers - peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains + ownHashes []common.Hash // Hash chain belonging to the tester + ownHeaders map[common.Hash]*types.Header // Headers belonging to the tester + ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester + ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester + ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain + peerHashes map[string][]common.Hash // Hash chain belonging to different test peers + peerHeaders map[string]map[common.Hash]*types.Header // Headers belonging to different test peers + peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers + peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains lock sync.RWMutex } // newTester creates a new downloader test mocker. -func newTester() *downloadTester { +func newTester(mode SyncMode) *downloadTester { tester := &downloadTester{ ownHashes: []common.Hash{genesis.Hash()}, + ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()}, ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis}, + ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): genesis.Receipts()}, ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()}, peerHashes: make(map[string][]common.Hash), + peerHeaders: make(map[string]map[common.Hash]*types.Header), peerBlocks: make(map[string]map[common.Hash]*types.Block), peerChainTds: make(map[string]map[common.Hash]*big.Int), } - tester.downloader = New(new(event.TypeMux), tester.hasBlock, tester.getBlock, tester.headBlock, tester.getTd, tester.insertChain, tester.dropPeer) + tester.downloader = New(mode, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader, tester.getBlock, + tester.headHeader, tester.headBlock, tester.getTd, tester.insertHeaders, tester.insertBlocks, tester.insertConfirmedBlocks, tester.dropPeer) return tester } @@ -135,8 +157,7 @@ func (dl *downloadTester) sync(id string, td *big.Int) error { err := dl.downloader.synchronise(id, hash, td) for { // If the queue is empty and processing stopped, break - hashes, blocks := dl.downloader.queue.Size() - if hashes+blocks == 0 && atomic.LoadInt32(&dl.downloader.processing) == 0 { + if dl.downloader.queue.Idle() && atomic.LoadInt32(&dl.downloader.processing) == 0 { break } // Otherwise sleep a bit and retry @@ -145,12 +166,22 @@ func (dl *downloadTester) sync(id string, td *big.Int) error { return err } -// hasBlock checks if a block is pres ent in the testers canonical chain. +// hasHeader checks if a header is present in the testers canonical chain. +func (dl *downloadTester) hasHeader(hash common.Hash) bool { + return dl.getHeader(hash) != nil +} + +// hasBlock checks if a block is present in the testers canonical chain. func (dl *downloadTester) hasBlock(hash common.Hash) bool { + return dl.getBlock(hash) != nil +} + +// getHeader retrieves a header from the testers canonical chain. +func (dl *downloadTester) getHeader(hash common.Hash) *types.Header { dl.lock.RLock() defer dl.lock.RUnlock() - return dl.getBlock(hash) != nil + return dl.ownHeaders[hash] } // getBlock retrieves a block from the testers canonical chain. @@ -161,12 +192,25 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { return dl.ownBlocks[hash] } +// headHeader retrieves the current head header from the canonical chain. +func (dl *downloadTester) headHeader() *types.Header { + dl.lock.RLock() + defer dl.lock.RUnlock() + + return dl.getHeader(dl.ownHashes[len(dl.ownHashes)-1]) +} + // headBlock retrieves the current head block from the canonical chain. func (dl *downloadTester) headBlock() *types.Block { dl.lock.RLock() defer dl.lock.RUnlock() - return dl.getBlock(dl.ownHashes[len(dl.ownHashes)-1]) + for i := len(dl.ownHashes) - 1; i >= 0; i-- { + if block := dl.getBlock(dl.ownHashes[i]); block != nil { + return block + } + } + return nil } // getTd retrieves the block's total difficulty from the canonical chain. @@ -177,8 +221,24 @@ func (dl *downloadTester) getTd(hash common.Hash) *big.Int { return dl.ownChainTd[hash] } -// insertChain injects a new batch of blocks into the simulated chain. -func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { +// insertHeaders injects a new batch of headers into the simulated chain. +func (dl *downloadTester) insertHeaders(headers []*types.Header, verify bool) (int, error) { + dl.lock.Lock() + defer dl.lock.Unlock() + + for i, header := range headers { + if _, ok := dl.ownHeaders[header.ParentHash]; !ok { + return i, errors.New("unknown parent") + } + dl.ownHashes = append(dl.ownHashes, header.Hash()) + dl.ownHeaders[header.Hash()] = header + dl.ownChainTd[header.Hash()] = dl.ownChainTd[header.ParentHash] + } + return len(headers), nil +} + +// insertBlocks injects a new batch of blocks into the simulated chain. +func (dl *downloadTester) insertBlocks(blocks types.Blocks) (int, error) { dl.lock.Lock() defer dl.lock.Unlock() @@ -187,47 +247,74 @@ func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { return i, errors.New("unknown parent") } dl.ownHashes = append(dl.ownHashes, block.Hash()) + dl.ownHeaders[block.Hash()] = block.Header() dl.ownBlocks[block.Hash()] = block dl.ownChainTd[block.Hash()] = dl.ownChainTd[block.ParentHash()] } return len(blocks), nil } +// insertBlocks injects a new batch of blocks into the simulated chain. +func (dl *downloadTester) insertConfirmedBlocks(blocks types.Blocks, receipts []types.Receipts) (int, error) { + dl.lock.Lock() + defer dl.lock.Unlock() + + for i := 0; i < len(blocks) && i < len(receipts); i++ { + if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok { + return i, errors.New("unknown parent") + } + dl.ownHashes = append(dl.ownHashes, blocks[i].Hash()) + dl.ownHeaders[blocks[i].Hash()] = blocks[i].Header() + dl.ownBlocks[blocks[i].Hash()] = blocks[i] + dl.ownReceipts[blocks[i].Hash()] = blocks[i].Receipts() + dl.ownChainTd[blocks[i].Hash()] = dl.ownChainTd[blocks[i].ParentHash()] + } + return len(blocks), nil +} + // newPeer registers a new block download source into the downloader. -func (dl *downloadTester) newPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { - return dl.newSlowPeer(id, version, hashes, blocks, 0) +func (dl *downloadTester) newPeer(id string, version int, hashes []common.Hash, headers map[common.Hash]*types.Header, blocks map[common.Hash]*types.Block) error { + return dl.newSlowPeer(id, version, hashes, headers, blocks, 0) } // newSlowPeer registers a new block download source into the downloader, with a // specific delay time on processing the network packets sent to it, simulating // potentially slow network IO. -func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error { +func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Hash, headers map[common.Hash]*types.Header, blocks map[common.Hash]*types.Block, delay time.Duration) error { dl.lock.Lock() defer dl.lock.Unlock() var err error switch version { case 61: - err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), nil, nil, nil) + err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), nil, nil, nil, nil) case 62: - err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay)) + err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil) case 63: - err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay)) + err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay)) case 64: - err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay)) + err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay)) } if err == nil { - // Assign the owned hashes and blocks to the peer (deep copy) + // Assign the owned hashes, headers and blocks to the peer (deep copy) dl.peerHashes[id] = make([]common.Hash, len(hashes)) copy(dl.peerHashes[id], hashes) + dl.peerHeaders[id] = make(map[common.Hash]*types.Header) dl.peerBlocks[id] = make(map[common.Hash]*types.Block) dl.peerChainTds[id] = make(map[common.Hash]*big.Int) + for _, hash := range hashes { + if header, ok := headers[hash]; ok { + dl.peerHeaders[id][hash] = header + if _, ok := dl.peerHeaders[id][header.ParentHash]; ok { + dl.peerChainTds[id][hash] = new(big.Int).Add(header.Difficulty, dl.peerChainTds[id][header.ParentHash]) + } + } if block, ok := blocks[hash]; ok { dl.peerBlocks[id][hash] = block - if parent, ok := dl.peerBlocks[id][block.ParentHash()]; ok { - dl.peerChainTds[id][hash] = new(big.Int).Add(block.Difficulty(), dl.peerChainTds[id][parent.Hash()]) + if _, ok := dl.peerBlocks[id][block.ParentHash()]; ok { + dl.peerChainTds[id][hash] = new(big.Int).Add(block.Difficulty(), dl.peerChainTds[id][block.ParentHash()]) } } } @@ -241,6 +328,7 @@ func (dl *downloadTester) dropPeer(id string) { defer dl.lock.Unlock() delete(dl.peerHashes, id) + delete(dl.peerHeaders, id) delete(dl.peerBlocks, id) delete(dl.peerChainTds, id) @@ -358,13 +446,13 @@ func (dl *downloadTester) peerGetAbsHeadersFn(id string, delay time.Duration) fu dl.lock.RLock() defer dl.lock.RUnlock() - // Gather the next batch of hashes + // Gather the next batch of headers hashes := dl.peerHashes[id] - blocks := dl.peerBlocks[id] + headers := dl.peerHeaders[id] result := make([]*types.Header, 0, amount) for i := 0; i < amount && len(hashes)-int(origin)-1-i >= 0; i++ { - if block, ok := blocks[hashes[len(hashes)-int(origin)-1-i]]; ok { - result = append(result, block.Header()) + if header, ok := headers[hashes[len(hashes)-int(origin)-1-i]]; ok { + result = append(result, header) } } // Delay delivery a bit to allow attacks to unfold @@ -403,50 +491,99 @@ func (dl *downloadTester) peerGetBodiesFn(id string, delay time.Duration) func([ } } +// peerGetReceiptsFn constructs a getReceipts method associated with a particular +// peer in the download tester. The returned function can be used to retrieve +// batches of block receipts from the particularly requested peer. +func (dl *downloadTester) peerGetReceiptsFn(id string, delay time.Duration) func([]common.Hash) error { + return func(hashes []common.Hash) error { + time.Sleep(delay) + + dl.lock.RLock() + defer dl.lock.RUnlock() + + blocks := dl.peerBlocks[id] + + receipts := make([][]*types.Receipt, 0, len(hashes)) + for _, hash := range hashes { + if block, ok := blocks[hash]; ok { + receipts = append(receipts, block.Receipts()) + } + } + go dl.downloader.DeliverReceipts(id, receipts) + + return nil + } +} + +// assertOwnChain checks if the local chain contains the correct number of items +// of the various chain components. +func assertOwnChain(t *testing.T, tester *downloadTester, length int) { + headers, blocks, receipts := length, length, length + switch tester.downloader.mode { + case FullSync: + receipts = 1 + case LightSync: + blocks, receipts = 1, 1 + } + + if hs := len(tester.ownHeaders); hs != headers { + t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers) + } + if bs := len(tester.ownBlocks); bs != blocks { + t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, blocks) + } + if rs := len(tester.ownReceipts); rs != receipts { + t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts) + } +} + // Tests that simple synchronization against a canonical chain works correctly. // In this test common ancestor lookup should be short circuited and not require // binary searching. -func TestCanonicalSynchronisation61(t *testing.T) { testCanonicalSynchronisation(t, 61) } -func TestCanonicalSynchronisation62(t *testing.T) { testCanonicalSynchronisation(t, 62) } -func TestCanonicalSynchronisation63(t *testing.T) { testCanonicalSynchronisation(t, 63) } -func TestCanonicalSynchronisation64(t *testing.T) { testCanonicalSynchronisation(t, 64) } - -func testCanonicalSynchronisation(t *testing.T, protocol int) { +func TestCanonicalSynchronisation61(t *testing.T) { testCanonicalSynchronisation(t, 61, FullSync) } +func TestCanonicalSynchronisation62(t *testing.T) { testCanonicalSynchronisation(t, 62, FullSync) } +func TestCanonicalSynchronisation63Full(t *testing.T) { testCanonicalSynchronisation(t, 63, FullSync) } +func TestCanonicalSynchronisation63Fast(t *testing.T) { testCanonicalSynchronisation(t, 63, FastSync) } +func TestCanonicalSynchronisation64Full(t *testing.T) { testCanonicalSynchronisation(t, 64, FullSync) } +func TestCanonicalSynchronisation64Fast(t *testing.T) { testCanonicalSynchronisation(t, 64, FastSync) } +func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronisation(t, 64, LightSync) } + +func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) { // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 - hashes, blocks := makeChain(targetBlocks, 0, genesis) + hashes, headers, blocks := makeChain(targetBlocks, 0, genesis) - tester := newTester() - tester.newPeer("peer", protocol, hashes, blocks) + tester := newTester(mode) + tester.newPeer("peer", protocol, hashes, headers, blocks) - // Synchronise with the peer and make sure all blocks were retrieved + // Synchronise with the peer and make sure all relevant data was retrieved if err := tester.sync("peer", nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - if imported := len(tester.ownBlocks); imported != targetBlocks+1 { - t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) - } + assertOwnChain(t, tester, targetBlocks+1) } // Tests that if a large batch of blocks are being downloaded, it is throttled // until the cached blocks are retrieved. -func TestThrottling61(t *testing.T) { testThrottling(t, 61) } -func TestThrottling62(t *testing.T) { testThrottling(t, 62) } -func TestThrottling63(t *testing.T) { testThrottling(t, 63) } -func TestThrottling64(t *testing.T) { testThrottling(t, 64) } - -func testThrottling(t *testing.T, protocol int) { +func TestThrottling61(t *testing.T) { testThrottling(t, 61, FullSync) } +func TestThrottling62(t *testing.T) { testThrottling(t, 62, FullSync) } +func TestThrottling63Full(t *testing.T) { testThrottling(t, 63, FullSync) } +func TestThrottling63Fast(t *testing.T) { testThrottling(t, 63, FastSync) } +func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) } +func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) } + +func testThrottling(t *testing.T, protocol int, mode SyncMode) { // Create a long block chain to download and the tester targetBlocks := 8 * blockCacheLimit - hashes, blocks := makeChain(targetBlocks, 0, genesis) + hashes, headers, blocks := makeChain(targetBlocks, 0, genesis) - tester := newTester() - tester.newPeer("peer", protocol, hashes, blocks) + tester := newTester(mode) + tester.newPeer("peer", protocol, hashes, headers, blocks) // Wrap the importer to allow stepping blocked, proceed := uint32(0), make(chan struct{}) - tester.downloader.chainInsertHook = func(blocks []*Block) { - atomic.StoreUint32(&blocked, uint32(len(blocks))) + tester.downloader.chainInsertHook = func(results []*fetchResult) { + atomic.StoreUint32(&blocked, uint32(len(results))) <-proceed } // Start a synchronisation concurrently @@ -469,7 +606,12 @@ func testThrottling(t *testing.T, protocol int) { time.Sleep(25 * time.Millisecond) tester.downloader.queue.lock.RLock() - cached = len(tester.downloader.queue.blockPool) + cached = len(tester.downloader.queue.blockDonePool) + if mode == FastSync { + if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached { + cached = receipts + } + } tester.downloader.queue.lock.RUnlock() if cached == blockCacheLimit || len(tester.ownBlocks)+cached+int(atomic.LoadUint32(&blocked)) == targetBlocks+1 { @@ -488,9 +630,7 @@ func testThrottling(t *testing.T, protocol int) { } } // Check that we haven't pulled more blocks than available - if len(tester.ownBlocks) > targetBlocks+1 { - t.Fatalf("target block count mismatch: have %v, want %v", len(tester.ownBlocks), targetBlocks+1) - } + assertOwnChain(t, tester, targetBlocks+1) if err := <-errc; err != nil { t.Fatalf("block synchronization failed: %v", err) } @@ -499,39 +639,39 @@ func testThrottling(t *testing.T, protocol int) { // Tests that simple synchronization against a forked chain works correctly. In // this test common ancestor lookup should *not* be short circuited, and a full // binary search should be executed. -func TestForkedSynchronisation61(t *testing.T) { testForkedSynchronisation(t, 61) } -func TestForkedSynchronisation62(t *testing.T) { testForkedSynchronisation(t, 62) } -func TestForkedSynchronisation63(t *testing.T) { testForkedSynchronisation(t, 63) } -func TestForkedSynchronisation64(t *testing.T) { testForkedSynchronisation(t, 64) } - -func testForkedSynchronisation(t *testing.T, protocol int) { +func TestForkedSynchronisation61(t *testing.T) { testForkedSynchronisation(t, 61, FullSync) } +func TestForkedSynchronisation62(t *testing.T) { testForkedSynchronisation(t, 62, FullSync) } +func TestForkedSynchronisation63Full(t *testing.T) { testForkedSynchronisation(t, 63, FullSync) } +func TestForkedSynchronisation63Fast(t *testing.T) { testForkedSynchronisation(t, 63, FastSync) } +func TestForkedSynchronisation64Full(t *testing.T) { testForkedSynchronisation(t, 64, FullSync) } +func TestForkedSynchronisation64Fast(t *testing.T) { testForkedSynchronisation(t, 64, FastSync) } +func TestForkedSynchronisation64Light(t *testing.T) { testForkedSynchronisation(t, 64, LightSync) } + +func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) { // Create a long enough forked chain common, fork := MaxHashFetch, 2*MaxHashFetch - hashesA, hashesB, blocksA, blocksB := makeChainFork(common+fork, fork, genesis) + hashesA, hashesB, headersA, headersB, blocksA, blocksB := makeChainFork(common+fork, fork, genesis) - tester := newTester() - tester.newPeer("fork A", protocol, hashesA, blocksA) - tester.newPeer("fork B", protocol, hashesB, blocksB) + tester := newTester(mode) + tester.newPeer("fork A", protocol, hashesA, headersA, blocksA) + tester.newPeer("fork B", protocol, hashesB, headersB, blocksB) // Synchronise with the peer and make sure all blocks were retrieved if err := tester.sync("fork A", nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - if imported := len(tester.ownBlocks); imported != common+fork+1 { - t.Fatalf("synchronised block mismatch: have %v, want %v", imported, common+fork+1) - } + assertOwnChain(t, tester, common+fork+1) + // Synchronise with the second peer and make sure that fork is pulled too if err := tester.sync("fork B", nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - if imported := len(tester.ownBlocks); imported != common+2*fork+1 { - t.Fatalf("synchronised block mismatch: have %v, want %v", imported, common+2*fork+1) - } + assertOwnChain(t, tester, common+2*fork+1) } // Tests that an inactive downloader will not accept incoming hashes and blocks. func TestInactiveDownloader61(t *testing.T) { - tester := newTester() + tester := newTester(FullSync) // Check that neither hashes nor blocks are accepted if err := tester.downloader.DeliverHashes61("bad peer", []common.Hash{}); err != errNoSyncActive { @@ -542,9 +682,10 @@ func TestInactiveDownloader61(t *testing.T) { } } -// Tests that an inactive downloader will not accept incoming block headers and bodies. +// Tests that an inactive downloader will not accept incoming block headers and +// bodies. func TestInactiveDownloader62(t *testing.T) { - tester := newTester() + tester := newTester(FullSync) // Check that neither block headers nor bodies are accepted if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive { @@ -555,13 +696,33 @@ func TestInactiveDownloader62(t *testing.T) { } } -// Tests that a canceled download wipes all previously accumulated state. -func TestCancel61(t *testing.T) { testCancel(t, 61) } -func TestCancel62(t *testing.T) { testCancel(t, 62) } -func TestCancel63(t *testing.T) { testCancel(t, 63) } -func TestCancel64(t *testing.T) { testCancel(t, 64) } +// Tests that an inactive downloader will not accept incoming block headers, +// bodies and receipts. +func TestInactiveDownloader63(t *testing.T) { + tester := newTester(FullSync) + + // Check that neither block headers nor bodies are accepted + if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + } + if err := tester.downloader.DeliverBodies("bad peer", [][]*types.Transaction{}, [][]*types.Header{}); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + } + if err := tester.downloader.DeliverReceipts("bad peer", [][]*types.Receipt{}); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + } +} -func testCancel(t *testing.T, protocol int) { +// Tests that a canceled download wipes all previously accumulated state. +func TestCancel61(t *testing.T) { testCancel(t, 61, FullSync) } +func TestCancel62(t *testing.T) { testCancel(t, 62, FullSync) } +func TestCancel63Full(t *testing.T) { testCancel(t, 63, FullSync) } +func TestCancel63Fast(t *testing.T) { testCancel(t, 63, FastSync) } +func TestCancel64Full(t *testing.T) { testCancel(t, 64, FullSync) } +func TestCancel64Fast(t *testing.T) { testCancel(t, 64, FastSync) } +func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) } + +func testCancel(t *testing.T, protocol int, mode SyncMode) { // Create a small enough block chain to download and the tester targetBlocks := blockCacheLimit - 15 if targetBlocks >= MaxHashFetch { @@ -570,80 +731,81 @@ func testCancel(t *testing.T, protocol int) { if targetBlocks >= MaxHeaderFetch { targetBlocks = MaxHeaderFetch - 15 } - hashes, blocks := makeChain(targetBlocks, 0, genesis) + hashes, headers, blocks := makeChain(targetBlocks, 0, genesis) - tester := newTester() - tester.newPeer("peer", protocol, hashes, blocks) + tester := newTester(mode) + tester.newPeer("peer", protocol, hashes, headers, blocks) // Make sure canceling works with a pristine downloader tester.downloader.cancel() - downloading, importing := tester.downloader.queue.Size() - if downloading > 0 || importing > 0 { - t.Errorf("download or import count mismatch: %d downloading, %d importing, want 0", downloading, importing) + if !tester.downloader.queue.Idle() { + t.Errorf("download queue not idle") } // Synchronise with the peer, but cancel afterwards if err := tester.sync("peer", nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } tester.downloader.cancel() - downloading, importing = tester.downloader.queue.Size() - if downloading > 0 || importing > 0 { - t.Errorf("download or import count mismatch: %d downloading, %d importing, want 0", downloading, importing) + if !tester.downloader.queue.Idle() { + t.Errorf("download queue not idle") } } // Tests that synchronisation from multiple peers works as intended (multi thread sanity test). -func TestMultiSynchronisation61(t *testing.T) { testMultiSynchronisation(t, 61) } -func TestMultiSynchronisation62(t *testing.T) { testMultiSynchronisation(t, 62) } -func TestMultiSynchronisation63(t *testing.T) { testMultiSynchronisation(t, 63) } -func TestMultiSynchronisation64(t *testing.T) { testMultiSynchronisation(t, 64) } - -func testMultiSynchronisation(t *testing.T, protocol int) { +func TestMultiSynchronisation61(t *testing.T) { testMultiSynchronisation(t, 61, FullSync) } +func TestMultiSynchronisation62(t *testing.T) { testMultiSynchronisation(t, 62, FullSync) } +func TestMultiSynchronisation63Full(t *testing.T) { testMultiSynchronisation(t, 63, FullSync) } +func TestMultiSynchronisation63Fast(t *testing.T) { testMultiSynchronisation(t, 63, FastSync) } +func TestMultiSynchronisation64Full(t *testing.T) { testMultiSynchronisation(t, 64, FullSync) } +func TestMultiSynchronisation64Fast(t *testing.T) { testMultiSynchronisation(t, 64, FastSync) } +func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) } + +func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) { // Create various peers with various parts of the chain targetPeers := 8 targetBlocks := targetPeers*blockCacheLimit - 15 - hashes, blocks := makeChain(targetBlocks, 0, genesis) + hashes, headers, blocks := makeChain(targetBlocks, 0, genesis) - tester := newTester() + tester := newTester(mode) for i := 0; i < targetPeers; i++ { id := fmt.Sprintf("peer #%d", i) - tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], blocks) + tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], headers, blocks) } // Synchronise with the middle peer and make sure half of the blocks were retrieved id := fmt.Sprintf("peer #%d", targetPeers/2) if err := tester.sync(id, nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - if imported := len(tester.ownBlocks); imported != len(tester.peerHashes[id]) { - t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(tester.peerHashes[id])) - } + assertOwnChain(t, tester, len(tester.peerHashes[id])) + // Synchronise with the best peer and make sure everything is retrieved if err := tester.sync("peer #0", nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - if imported := len(tester.ownBlocks); imported != targetBlocks+1 { - t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) - } + assertOwnChain(t, tester, targetBlocks+1) } // Tests that synchronisations behave well in multi-version protocol environments // and not wreak havok on other nodes in the network. -func TestMultiProtocolSynchronisation61(t *testing.T) { testMultiProtocolSynchronisation(t, 61) } -func TestMultiProtocolSynchronisation62(t *testing.T) { testMultiProtocolSynchronisation(t, 62) } -func TestMultiProtocolSynchronisation63(t *testing.T) { testMultiProtocolSynchronisation(t, 63) } -func TestMultiProtocolSynchronisation64(t *testing.T) { testMultiProtocolSynchronisation(t, 64) } - -func testMultiProtocolSynchronisation(t *testing.T, protocol int) { +func TestMultiProtoSynchronisation61(t *testing.T) { testMultiProtoSync(t, 61, FullSync) } +func TestMultiProtoSynchronisation62(t *testing.T) { testMultiProtoSync(t, 62, FullSync) } +func TestMultiProtoSynchronisation63Full(t *testing.T) { testMultiProtoSync(t, 63, FullSync) } +func TestMultiProtoSynchronisation63Fast(t *testing.T) { testMultiProtoSync(t, 63, FastSync) } +func TestMultiProtoSynchronisation64Full(t *testing.T) { testMultiProtoSync(t, 64, FullSync) } +func TestMultiProtoSynchronisation64Fast(t *testing.T) { testMultiProtoSync(t, 64, FastSync) } +func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) } + +func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) { // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 - hashes, blocks := makeChain(targetBlocks, 0, genesis) + hashes, headers, blocks := makeChain(targetBlocks, 0, genesis) // Create peers of every type - tester := newTester() - tester.newPeer("peer 61", 61, hashes, blocks) - tester.newPeer("peer 62", 62, hashes, blocks) - tester.newPeer("peer 63", 63, hashes, blocks) - tester.newPeer("peer 64", 64, hashes, blocks) + tester := newTester(mode) + tester.newPeer("peer 61", 61, hashes, headers, blocks) + tester.newPeer("peer 62", 62, hashes, headers, blocks) + tester.newPeer("peer 63", 63, hashes, headers, blocks) + tester.newPeer("peer 64", 64, hashes, headers, blocks) // Synchronise with the requestd peer and make sure all blocks were retrieved if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil); err != nil { @@ -661,150 +823,181 @@ func testMultiProtocolSynchronisation(t *testing.T, protocol int) { } } -// Tests that if a block is empty (i.e. header only), no body request should be +// Tests that if a block is empty (e.g. header only), no body request should be // made, and instead the header should be assembled into a whole block in itself. -func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) } -func TestEmptyBlockShortCircuit63(t *testing.T) { testEmptyBlockShortCircuit(t, 63) } -func TestEmptyBlockShortCircuit64(t *testing.T) { testEmptyBlockShortCircuit(t, 64) } - -func testEmptyBlockShortCircuit(t *testing.T, protocol int) { +func TestEmptyShortCircuit62(t *testing.T) { testEmptyShortCircuit(t, 62, FullSync) } +func TestEmptyShortCircuit63Full(t *testing.T) { testEmptyShortCircuit(t, 63, FullSync) } +func TestEmptyShortCircuit63Fast(t *testing.T) { testEmptyShortCircuit(t, 63, FastSync) } +func TestEmptyShortCircuit64Full(t *testing.T) { testEmptyShortCircuit(t, 64, FullSync) } +func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, FastSync) } +func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) } + +func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 - hashes, blocks := makeChain(targetBlocks, 0, genesis) + hashes, headers, blocks := makeChain(targetBlocks, 0, genesis) - tester := newTester() - tester.newPeer("peer", protocol, hashes, blocks) + tester := newTester(mode) + tester.newPeer("peer", protocol, hashes, headers, blocks) // Instrument the downloader to signal body requests - requested := int32(0) + bodies, receipts := int32(0), int32(0) tester.downloader.bodyFetchHook = func(headers []*types.Header) { - atomic.AddInt32(&requested, int32(len(headers))) + atomic.AddInt32(&bodies, int32(len(headers))) + } + tester.downloader.receiptFetchHook = func(headers []*types.Header) { + atomic.AddInt32(&receipts, int32(len(headers))) } // Synchronise with the peer and make sure all blocks were retrieved if err := tester.sync("peer", nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - if imported := len(tester.ownBlocks); imported != targetBlocks+1 { - t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) - } + assertOwnChain(t, tester, targetBlocks+1) + // Validate the number of block bodies that should have been requested - needed := 0 + bodiesNeeded, receiptsNeeded := 0, 0 for _, block := range blocks { - if block != genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) { - needed++ + if mode != LightSync && block != genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) { + bodiesNeeded++ } + if mode == FastSync && block != genesis && len(block.Receipts()) > 0 { + receiptsNeeded++ + } + } + if int(bodies) != bodiesNeeded { + t.Errorf("body retrieval count mismatch: have %v, want %v", bodies, bodiesNeeded) } - if int(requested) != needed { - t.Fatalf("block body retrieval count mismatch: have %v, want %v", requested, needed) + if int(receipts) != receiptsNeeded { + t.Errorf("receipt retrieval count mismatch: have %v, want %v", receipts, receiptsNeeded) } } // Tests that headers are enqueued continuously, preventing malicious nodes from // stalling the downloader by feeding gapped header chains. -func TestMissingHeaderAttack62(t *testing.T) { testMissingHeaderAttack(t, 62) } -func TestMissingHeaderAttack63(t *testing.T) { testMissingHeaderAttack(t, 63) } -func TestMissingHeaderAttack64(t *testing.T) { testMissingHeaderAttack(t, 64) } - -func testMissingHeaderAttack(t *testing.T, protocol int) { +func TestMissingHeaderAttack62(t *testing.T) { testMissingHeaderAttack(t, 62, FullSync) } +func TestMissingHeaderAttack63Full(t *testing.T) { testMissingHeaderAttack(t, 63, FullSync) } +func TestMissingHeaderAttack63Fast(t *testing.T) { testMissingHeaderAttack(t, 63, FastSync) } +func TestMissingHeaderAttack64Full(t *testing.T) { testMissingHeaderAttack(t, 64, FullSync) } +func TestMissingHeaderAttack64Fast(t *testing.T) { testMissingHeaderAttack(t, 64, FastSync) } +func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) } + +func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) { // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 - hashes, blocks := makeChain(targetBlocks, 0, genesis) + hashes, headers, blocks := makeChain(targetBlocks, 0, genesis) - tester := newTester() + tester := newTester(mode) // Attempt a full sync with an attacker feeding gapped headers - tester.newPeer("attack", protocol, hashes, blocks) + tester.newPeer("attack", protocol, hashes, headers, blocks) missing := targetBlocks / 2 + delete(tester.peerHeaders["attack"], hashes[missing]) delete(tester.peerBlocks["attack"], hashes[missing]) if err := tester.sync("attack", nil); err == nil { t.Fatalf("succeeded attacker synchronisation") } // Synchronise with the valid peer and make sure sync succeeds - tester.newPeer("valid", protocol, hashes, blocks) + tester.newPeer("valid", protocol, hashes, headers, blocks) if err := tester.sync("valid", nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - if imported := len(tester.ownBlocks); imported != len(hashes) { - t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(hashes)) - } + assertOwnChain(t, tester, targetBlocks+1) } // Tests that if requested headers are shifted (i.e. first is missing), the queue // detects the invalid numbering. -func TestShiftedHeaderAttack62(t *testing.T) { testShiftedHeaderAttack(t, 62) } -func TestShiftedHeaderAttack63(t *testing.T) { testShiftedHeaderAttack(t, 63) } -func TestShiftedHeaderAttack64(t *testing.T) { testShiftedHeaderAttack(t, 64) } - -func testShiftedHeaderAttack(t *testing.T, protocol int) { +func TestShiftedHeaderAttack62(t *testing.T) { testShiftedHeaderAttack(t, 62, FullSync) } +func TestShiftedHeaderAttack63Full(t *testing.T) { testShiftedHeaderAttack(t, 63, FullSync) } +func TestShiftedHeaderAttack63Fast(t *testing.T) { testShiftedHeaderAttack(t, 63, FastSync) } +func TestShiftedHeaderAttack64Full(t *testing.T) { testShiftedHeaderAttack(t, 64, FullSync) } +func TestShiftedHeaderAttack64Fast(t *testing.T) { testShiftedHeaderAttack(t, 64, FastSync) } +func TestShiftedHeaderAttack64Light(t *testing.T) { testShiftedHeaderAttack(t, 64, LightSync) } + +func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) { // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 - hashes, blocks := makeChain(targetBlocks, 0, genesis) + hashes, headers, blocks := makeChain(targetBlocks, 0, genesis) - tester := newTester() + tester := newTester(mode) // Attempt a full sync with an attacker feeding shifted headers - tester.newPeer("attack", protocol, hashes, blocks) + tester.newPeer("attack", protocol, hashes, headers, blocks) + delete(tester.peerHeaders["attack"], hashes[len(hashes)-2]) delete(tester.peerBlocks["attack"], hashes[len(hashes)-2]) if err := tester.sync("attack", nil); err == nil { t.Fatalf("succeeded attacker synchronisation") } // Synchronise with the valid peer and make sure sync succeeds - tester.newPeer("valid", protocol, hashes, blocks) + tester.newPeer("valid", protocol, hashes, headers, blocks) if err := tester.sync("valid", nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - if imported := len(tester.ownBlocks); imported != len(hashes) { - t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(hashes)) - } + assertOwnChain(t, tester, targetBlocks+1) } -// Tests that if a peer sends an invalid body for a requested block, it gets -// dropped immediately by the downloader. -func TestInvalidBlockBodyAttack62(t *testing.T) { testInvalidBlockBodyAttack(t, 62) } -func TestInvalidBlockBodyAttack63(t *testing.T) { testInvalidBlockBodyAttack(t, 63) } -func TestInvalidBlockBodyAttack64(t *testing.T) { testInvalidBlockBodyAttack(t, 64) } +// Tests that if a peer sends an invalid block piece (body or receipt) for a +// requested block, it gets dropped immediately by the downloader. +func TestInvalidContentAttack62(t *testing.T) { testInvalidContentAttack(t, 62, FullSync) } +func TestInvalidContentAttack63Full(t *testing.T) { testInvalidContentAttack(t, 63, FullSync) } +func TestInvalidContentAttack63Fast(t *testing.T) { testInvalidContentAttack(t, 63, FastSync) } +func TestInvalidContentAttack64Full(t *testing.T) { testInvalidContentAttack(t, 64, FullSync) } +func TestInvalidContentAttack64Fast(t *testing.T) { testInvalidContentAttack(t, 64, FastSync) } +func TestInvalidContentAttack64Light(t *testing.T) { testInvalidContentAttack(t, 64, LightSync) } -func testInvalidBlockBodyAttack(t *testing.T, protocol int) { +func testInvalidContentAttack(t *testing.T, protocol int, mode SyncMode) { // Create two peers, one feeding invalid block bodies targetBlocks := 4*blockCacheLimit - 15 - hashes, validBlocks := makeChain(targetBlocks, 0, genesis) + hashes, headers, validBlocks := makeChain(targetBlocks, 0, genesis) invalidBlocks := make(map[common.Hash]*types.Block) for hash, block := range validBlocks { invalidBlocks[hash] = types.NewBlockWithHeader(block.Header()) } + invalidReceipts := make(map[common.Hash]*types.Block) + for hash, block := range validBlocks { + invalidReceipts[hash] = types.NewBlockWithHeader(block.Header()).WithBody(block.Transactions(), block.Uncles()) + } - tester := newTester() - tester.newPeer("valid", protocol, hashes, validBlocks) - tester.newPeer("attack", protocol, hashes, invalidBlocks) - + tester := newTester(mode) + tester.newPeer("valid", protocol, hashes, headers, validBlocks) + if mode != LightSync { + tester.newPeer("body attack", protocol, hashes, headers, invalidBlocks) + } + if mode == FastSync { + tester.newPeer("receipt attack", protocol, hashes, headers, invalidReceipts) + } // Synchronise with the valid peer (will pull contents from the attacker too) if err := tester.sync("valid", nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - if imported := len(tester.ownBlocks); imported != len(hashes) { - t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(hashes)) - } + assertOwnChain(t, tester, targetBlocks+1) + // Make sure the attacker was detected and dropped in the mean time - if _, ok := tester.peerHashes["attack"]; ok { + if _, ok := tester.peerHashes["body attack"]; ok { t.Fatalf("block body attacker not detected/dropped") } + if _, ok := tester.peerHashes["receipt attack"]; ok { + t.Fatalf("receipt attacker not detected/dropped") + } } // Tests that a peer advertising an high TD doesn't get to stall the downloader // afterwards by not sending any useful hashes. -func TestHighTDStarvationAttack61(t *testing.T) { testHighTDStarvationAttack(t, 61) } -func TestHighTDStarvationAttack62(t *testing.T) { testHighTDStarvationAttack(t, 62) } -func TestHighTDStarvationAttack63(t *testing.T) { testHighTDStarvationAttack(t, 63) } -func TestHighTDStarvationAttack64(t *testing.T) { testHighTDStarvationAttack(t, 64) } - -func testHighTDStarvationAttack(t *testing.T, protocol int) { - tester := newTester() - hashes, blocks := makeChain(0, 0, genesis) - - tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, blocks) +func TestHighTDStarvationAttack61(t *testing.T) { testHighTDStarvationAttack(t, 61, FullSync) } +func TestHighTDStarvationAttack62(t *testing.T) { testHighTDStarvationAttack(t, 62, FullSync) } +func TestHighTDStarvationAttack63Full(t *testing.T) { testHighTDStarvationAttack(t, 63, FullSync) } +func TestHighTDStarvationAttack63Fast(t *testing.T) { testHighTDStarvationAttack(t, 63, FastSync) } +func TestHighTDStarvationAttack64Full(t *testing.T) { testHighTDStarvationAttack(t, 64, FullSync) } +func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttack(t, 64, FastSync) } +func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) } + +func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) { + tester := newTester(mode) + hashes, headers, blocks := makeChain(0, 0, genesis) + + tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, headers, blocks) if err := tester.sync("attack", big.NewInt(1000000)); err != errStallingPeer { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } @@ -834,18 +1027,20 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop + {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin {errInvalidBody, false}, // A bad peer was detected, but not the sync origin + {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop } // Run the tests and check disconnection status - tester := newTester() + tester := newTester(FullSync) for i, tt := range tests { // Register a new peer and ensure it's presence id := fmt.Sprintf("test %d", i) - if err := tester.newPeer(id, protocol, []common.Hash{genesis.Hash()}, nil); err != nil { + if err := tester.newPeer(id, protocol, []common.Hash{genesis.Hash()}, nil, nil); err != nil { t.Fatalf("test %d: failed to register new peer: %v", i, err) } if _, ok := tester.peerHashes[id]; !ok { @@ -861,67 +1056,26 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { } } -// Tests that feeding bad blocks will result in a peer drop. -func TestBlockBodyAttackerDropping61(t *testing.T) { testBlockBodyAttackerDropping(t, 61) } -func TestBlockBodyAttackerDropping62(t *testing.T) { testBlockBodyAttackerDropping(t, 62) } -func TestBlockBodyAttackerDropping63(t *testing.T) { testBlockBodyAttackerDropping(t, 63) } -func TestBlockBodyAttackerDropping64(t *testing.T) { testBlockBodyAttackerDropping(t, 64) } - -func testBlockBodyAttackerDropping(t *testing.T, protocol int) { - // Define the disconnection requirement for individual block import errors - tests := []struct { - failure bool - drop bool - }{ - {true, true}, - {false, false}, - } - - // Run the tests and check disconnection status - tester := newTester() - for i, tt := range tests { - // Register a new peer and ensure it's presence - id := fmt.Sprintf("test %d", i) - if err := tester.newPeer(id, protocol, []common.Hash{common.Hash{}}, nil); err != nil { - t.Fatalf("test %d: failed to register new peer: %v", i, err) - } - if _, ok := tester.peerHashes[id]; !ok { - t.Fatalf("test %d: registered peer not found", i) - } - // Assemble a good or bad block, depending of the test - raw := core.GenerateChain(genesis, testdb, 1, nil)[0] - if tt.failure { - parent := types.NewBlock(&types.Header{}, nil, nil, nil) - raw = core.GenerateChain(parent, testdb, 1, nil)[0] - } - block := &Block{OriginPeer: id, RawBlock: raw} - - // Simulate block processing and check the result - tester.downloader.queue.blockCache[0] = block - tester.downloader.process() - if _, ok := tester.peerHashes[id]; !ok != tt.drop { - t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.failure, !ok, tt.drop) - } - } -} - // Tests that synchronisation boundaries (origin block number and highest block // number) is tracked and updated correctly. -func TestSyncBoundaries61(t *testing.T) { testSyncBoundaries(t, 61) } -func TestSyncBoundaries62(t *testing.T) { testSyncBoundaries(t, 62) } -func TestSyncBoundaries63(t *testing.T) { testSyncBoundaries(t, 63) } -func TestSyncBoundaries64(t *testing.T) { testSyncBoundaries(t, 64) } - -func testSyncBoundaries(t *testing.T, protocol int) { +func TestSyncBoundaries61(t *testing.T) { testSyncBoundaries(t, 61, FullSync) } +func TestSyncBoundaries62(t *testing.T) { testSyncBoundaries(t, 62, FullSync) } +func TestSyncBoundaries63Full(t *testing.T) { testSyncBoundaries(t, 63, FullSync) } +func TestSyncBoundaries63Fast(t *testing.T) { testSyncBoundaries(t, 63, FastSync) } +func TestSyncBoundaries64Full(t *testing.T) { testSyncBoundaries(t, 64, FullSync) } +func TestSyncBoundaries64Fast(t *testing.T) { testSyncBoundaries(t, 64, FastSync) } +func TestSyncBoundaries64Light(t *testing.T) { testSyncBoundaries(t, 64, LightSync) } + +func testSyncBoundaries(t *testing.T, protocol int, mode SyncMode) { // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 - hashes, blocks := makeChain(targetBlocks, 0, genesis) + hashes, headers, blocks := makeChain(targetBlocks, 0, genesis) // Set a sync init hook to catch boundary changes starting := make(chan struct{}) progress := make(chan struct{}) - tester := newTester() + tester := newTester(mode) tester.downloader.syncInitHook = func(origin, latest uint64) { starting <- struct{}{} <-progress @@ -931,7 +1085,7 @@ func testSyncBoundaries(t *testing.T, protocol int) { t.Fatalf("Pristine boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, 0) } // Synchronise half the blocks and check initial boundaries - tester.newPeer("peer-half", protocol, hashes[targetBlocks/2:], blocks) + tester.newPeer("peer-half", protocol, hashes[targetBlocks/2:], headers, blocks) pending := new(sync.WaitGroup) pending.Add(1) @@ -949,7 +1103,7 @@ func testSyncBoundaries(t *testing.T, protocol int) { pending.Wait() // Synchronise all the blocks and check continuation boundaries - tester.newPeer("peer-full", protocol, hashes, blocks) + tester.newPeer("peer-full", protocol, hashes, headers, blocks) pending.Add(1) go func() { @@ -969,21 +1123,24 @@ func testSyncBoundaries(t *testing.T, protocol int) { // Tests that synchronisation boundaries (origin block number and highest block // number) is tracked and updated correctly in case of a fork (or manual head // revertal). -func TestForkedSyncBoundaries61(t *testing.T) { testForkedSyncBoundaries(t, 61) } -func TestForkedSyncBoundaries62(t *testing.T) { testForkedSyncBoundaries(t, 62) } -func TestForkedSyncBoundaries63(t *testing.T) { testForkedSyncBoundaries(t, 63) } -func TestForkedSyncBoundaries64(t *testing.T) { testForkedSyncBoundaries(t, 64) } - -func testForkedSyncBoundaries(t *testing.T, protocol int) { +func TestForkedSyncBoundaries61(t *testing.T) { testForkedSyncBoundaries(t, 61, FullSync) } +func TestForkedSyncBoundaries62(t *testing.T) { testForkedSyncBoundaries(t, 62, FullSync) } +func TestForkedSyncBoundaries63Full(t *testing.T) { testForkedSyncBoundaries(t, 63, FullSync) } +func TestForkedSyncBoundaries63Fast(t *testing.T) { testForkedSyncBoundaries(t, 63, FastSync) } +func TestForkedSyncBoundaries64Full(t *testing.T) { testForkedSyncBoundaries(t, 64, FullSync) } +func TestForkedSyncBoundaries64Fast(t *testing.T) { testForkedSyncBoundaries(t, 64, FastSync) } +func TestForkedSyncBoundaries64Light(t *testing.T) { testForkedSyncBoundaries(t, 64, LightSync) } + +func testForkedSyncBoundaries(t *testing.T, protocol int, mode SyncMode) { // Create a forked chain to simulate origin revertal common, fork := MaxHashFetch, 2*MaxHashFetch - hashesA, hashesB, blocksA, blocksB := makeChainFork(common+fork, fork, genesis) + hashesA, hashesB, headersA, headersB, blocksA, blocksB := makeChainFork(common+fork, fork, genesis) // Set a sync init hook to catch boundary changes starting := make(chan struct{}) progress := make(chan struct{}) - tester := newTester() + tester := newTester(mode) tester.downloader.syncInitHook = func(origin, latest uint64) { starting <- struct{}{} <-progress @@ -993,7 +1150,7 @@ func testForkedSyncBoundaries(t *testing.T, protocol int) { t.Fatalf("Pristine boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, 0) } // Synchronise with one of the forks and check boundaries - tester.newPeer("fork A", protocol, hashesA, blocksA) + tester.newPeer("fork A", protocol, hashesA, headersA, blocksA) pending := new(sync.WaitGroup) pending.Add(1) @@ -1014,7 +1171,7 @@ func testForkedSyncBoundaries(t *testing.T, protocol int) { tester.downloader.syncStatsOrigin = tester.downloader.syncStatsHeight // Synchronise with the second fork and check boundary resets - tester.newPeer("fork B", protocol, hashesB, blocksB) + tester.newPeer("fork B", protocol, hashesB, headersB, blocksB) pending.Add(1) go func() { @@ -1034,21 +1191,24 @@ func testForkedSyncBoundaries(t *testing.T, protocol int) { // Tests that if synchronisation is aborted due to some failure, then the boundary // origin is not updated in the next sync cycle, as it should be considered the // continuation of the previous sync and not a new instance. -func TestFailedSyncBoundaries61(t *testing.T) { testFailedSyncBoundaries(t, 61) } -func TestFailedSyncBoundaries62(t *testing.T) { testFailedSyncBoundaries(t, 62) } -func TestFailedSyncBoundaries63(t *testing.T) { testFailedSyncBoundaries(t, 63) } -func TestFailedSyncBoundaries64(t *testing.T) { testFailedSyncBoundaries(t, 64) } - -func testFailedSyncBoundaries(t *testing.T, protocol int) { +func TestFailedSyncBoundaries61(t *testing.T) { testFailedSyncBoundaries(t, 61, FullSync) } +func TestFailedSyncBoundaries62(t *testing.T) { testFailedSyncBoundaries(t, 62, FullSync) } +func TestFailedSyncBoundaries63Full(t *testing.T) { testFailedSyncBoundaries(t, 63, FullSync) } +func TestFailedSyncBoundaries63Fast(t *testing.T) { testFailedSyncBoundaries(t, 63, FastSync) } +func TestFailedSyncBoundaries64Full(t *testing.T) { testFailedSyncBoundaries(t, 64, FullSync) } +func TestFailedSyncBoundaries64Fast(t *testing.T) { testFailedSyncBoundaries(t, 64, FastSync) } +func TestFailedSyncBoundaries64Light(t *testing.T) { testFailedSyncBoundaries(t, 64, LightSync) } + +func testFailedSyncBoundaries(t *testing.T, protocol int, mode SyncMode) { // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 - hashes, blocks := makeChain(targetBlocks, 0, genesis) + hashes, headers, blocks := makeChain(targetBlocks, 0, genesis) // Set a sync init hook to catch boundary changes starting := make(chan struct{}) progress := make(chan struct{}) - tester := newTester() + tester := newTester(mode) tester.downloader.syncInitHook = func(origin, latest uint64) { starting <- struct{}{} <-progress @@ -1058,8 +1218,9 @@ func testFailedSyncBoundaries(t *testing.T, protocol int) { t.Fatalf("Pristine boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, 0) } // Attempt a full sync with a faulty peer - tester.newPeer("faulty", protocol, hashes, blocks) + tester.newPeer("faulty", protocol, hashes, headers, blocks) missing := targetBlocks / 2 + delete(tester.peerHeaders["faulty"], hashes[missing]) delete(tester.peerBlocks["faulty"], hashes[missing]) pending := new(sync.WaitGroup) @@ -1079,7 +1240,7 @@ func testFailedSyncBoundaries(t *testing.T, protocol int) { pending.Wait() // Synchronise with a good peer and check that the boundary origin remind the same after a failure - tester.newPeer("valid", protocol, hashes, blocks) + tester.newPeer("valid", protocol, hashes, headers, blocks) pending.Add(1) go func() { @@ -1098,21 +1259,24 @@ func testFailedSyncBoundaries(t *testing.T, protocol int) { // Tests that if an attacker fakes a chain height, after the attack is detected, // the boundary height is successfully reduced at the next sync invocation. -func TestFakedSyncBoundaries61(t *testing.T) { testFakedSyncBoundaries(t, 61) } -func TestFakedSyncBoundaries62(t *testing.T) { testFakedSyncBoundaries(t, 62) } -func TestFakedSyncBoundaries63(t *testing.T) { testFakedSyncBoundaries(t, 63) } -func TestFakedSyncBoundaries64(t *testing.T) { testFakedSyncBoundaries(t, 64) } - -func testFakedSyncBoundaries(t *testing.T, protocol int) { +func TestFakedSyncBoundaries61(t *testing.T) { testFakedSyncBoundaries(t, 61, FullSync) } +func TestFakedSyncBoundaries62(t *testing.T) { testFakedSyncBoundaries(t, 62, FullSync) } +func TestFakedSyncBoundaries63Full(t *testing.T) { testFakedSyncBoundaries(t, 63, FullSync) } +func TestFakedSyncBoundaries63Fast(t *testing.T) { testFakedSyncBoundaries(t, 63, FastSync) } +func TestFakedSyncBoundaries64Full(t *testing.T) { testFakedSyncBoundaries(t, 64, FullSync) } +func TestFakedSyncBoundaries64Fast(t *testing.T) { testFakedSyncBoundaries(t, 64, FastSync) } +func TestFakedSyncBoundaries64Light(t *testing.T) { testFakedSyncBoundaries(t, 64, LightSync) } + +func testFakedSyncBoundaries(t *testing.T, protocol int, mode SyncMode) { // Create a small block chain targetBlocks := blockCacheLimit - 15 - hashes, blocks := makeChain(targetBlocks+3, 0, genesis) + hashes, headers, blocks := makeChain(targetBlocks+3, 0, genesis) // Set a sync init hook to catch boundary changes starting := make(chan struct{}) progress := make(chan struct{}) - tester := newTester() + tester := newTester(mode) tester.downloader.syncInitHook = func(origin, latest uint64) { starting <- struct{}{} <-progress @@ -1122,8 +1286,9 @@ func testFakedSyncBoundaries(t *testing.T, protocol int) { t.Fatalf("Pristine boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, 0) } // Create and sync with an attacker that promises a higher chain than available - tester.newPeer("attack", protocol, hashes, blocks) + tester.newPeer("attack", protocol, hashes, headers, blocks) for i := 1; i < 3; i++ { + delete(tester.peerHeaders["attack"], hashes[i]) delete(tester.peerBlocks["attack"], hashes[i]) } @@ -1144,7 +1309,7 @@ func testFakedSyncBoundaries(t *testing.T, protocol int) { pending.Wait() // Synchronise with a good peer and check that the boundary height has been reduced to the true value - tester.newPeer("valid", protocol, hashes[3:], blocks) + tester.newPeer("valid", protocol, hashes[3:], headers, blocks) pending.Add(1) go func() { diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go index fd926affd..92acb6ba8 100644 --- a/eth/downloader/metrics.go +++ b/eth/downloader/metrics.go @@ -42,4 +42,9 @@ var ( bodyReqTimer = metrics.NewTimer("eth/downloader/bodies/req") bodyDropMeter = metrics.NewMeter("eth/downloader/bodies/drop") bodyTimeoutMeter = metrics.NewMeter("eth/downloader/bodies/timeout") + + receiptInMeter = metrics.NewMeter("eth/downloader/receipts/in") + receiptReqTimer = metrics.NewTimer("eth/downloader/receipts/req") + receiptDropMeter = metrics.NewMeter("eth/downloader/receipts/drop") + receiptTimeoutMeter = metrics.NewMeter("eth/downloader/receipts/timeout") ) diff --git a/eth/downloader/modes.go b/eth/downloader/modes.go new file mode 100644 index 000000000..8916dbb79 --- /dev/null +++ b/eth/downloader/modes.go @@ -0,0 +1,26 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package downloader + +// SyncMode represents the synchronisation mode of the downloader. +type SyncMode int + +const ( + FullSync SyncMode = iota // Synchronise the entire block-chain history from full blocks + FastSync // Quikcly download the headers, full sync only at the chain head + LightSync // Download only the headers and terminate afterwards +) diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index c1d20ac61..5fc0db587 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -36,10 +36,11 @@ type relativeHashFetcherFn func(common.Hash) error type absoluteHashFetcherFn func(uint64, int) error type blockFetcherFn func([]common.Hash) error -// Block header and body fethers belonging to eth/62 and above +// Block header and body fetchers belonging to eth/62 and above type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error type absoluteHeaderFetcherFn func(uint64, int, int, bool) error type blockBodyFetcherFn func([]common.Hash) error +type receiptFetcherFn func([]common.Hash) error var ( errAlreadyFetching = errors.New("already fetching blocks from peer") @@ -52,11 +53,14 @@ type peer struct { id string // Unique identifier of the peer head common.Hash // Hash of the peers latest known block - idle int32 // Current activity state of the peer (idle = 0, active = 1) - rep int32 // Simple peer reputation + blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) + receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1) + rep int32 // Simple peer reputation - capacity int32 // Number of blocks allowed to fetch per request - started time.Time // Time instance when the last fetch was started + blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request + receiptCapacity int32 // Number of receipts allowed to fetch per request + blockStarted time.Time // Time instance when the last block (body)fetch was started + receiptStarted time.Time // Time instance when the last receipt fetch was started ignored *set.Set // Set of hashes not to request (didn't have previously) @@ -68,6 +72,8 @@ type peer struct { getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies + getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts + version int // Eth protocol version number to switch strategies } @@ -75,12 +81,14 @@ type peer struct { // mechanisms. func newPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading - getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn) *peer { + getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, + getReceipts receiptFetcherFn) *peer { return &peer{ - id: id, - head: head, - capacity: 1, - ignored: set.New(), + id: id, + head: head, + blockCapacity: 1, + receiptCapacity: 1, + ignored: set.New(), getRelHashes: getRelHashes, getAbsHashes: getAbsHashes, @@ -90,24 +98,28 @@ func newPeer(id string, version int, head common.Hash, getAbsHeaders: getAbsHeaders, getBlockBodies: getBlockBodies, + getReceipts: getReceipts, + version: version, } } // Reset clears the internal state of a peer entity. func (p *peer) Reset() { - atomic.StoreInt32(&p.idle, 0) - atomic.StoreInt32(&p.capacity, 1) + atomic.StoreInt32(&p.blockIdle, 0) + atomic.StoreInt32(&p.receiptIdle, 0) + atomic.StoreInt32(&p.blockCapacity, 1) + atomic.StoreInt32(&p.receiptCapacity, 1) p.ignored.Clear() } // Fetch61 sends a block retrieval request to the remote peer. func (p *peer) Fetch61(request *fetchRequest) error { // Short circuit if the peer is already fetching - if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { + if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) { return errAlreadyFetching } - p.started = time.Now() + p.blockStarted = time.Now() // Convert the hash set to a retrievable slice hashes := make([]common.Hash, 0, len(request.Hashes)) @@ -119,13 +131,13 @@ func (p *peer) Fetch61(request *fetchRequest) error { return nil } -// Fetch sends a block body retrieval request to the remote peer. -func (p *peer) Fetch(request *fetchRequest) error { +// FetchBodies sends a block body retrieval request to the remote peer. +func (p *peer) FetchBodies(request *fetchRequest) error { // Short circuit if the peer is already fetching - if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { + if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) { return errAlreadyFetching } - p.started = time.Now() + p.blockStarted = time.Now() // Convert the header set to a retrievable slice hashes := make([]common.Hash, 0, len(request.Headers)) @@ -137,55 +149,64 @@ func (p *peer) Fetch(request *fetchRequest) error { return nil } -// SetIdle61 sets the peer to idle, allowing it to execute new retrieval requests. -// Its block retrieval allowance will also be updated either up- or downwards, -// depending on whether the previous fetch completed in time or not. -func (p *peer) SetIdle61() { - // Update the peer's download allowance based on previous performance - scale := 2.0 - if time.Since(p.started) > blockSoftTTL { - scale = 0.5 - if time.Since(p.started) > blockHardTTL { - scale = 1 / float64(MaxBlockFetch) // reduces capacity to 1 - } +// FetchReceipts sends a receipt retrieval request to the remote peer. +func (p *peer) FetchReceipts(request *fetchRequest) error { + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.receiptIdle, 0, 1) { + return errAlreadyFetching } - for { - // Calculate the new download bandwidth allowance - prev := atomic.LoadInt32(&p.capacity) - next := int32(math.Max(1, math.Min(float64(MaxBlockFetch), float64(prev)*scale))) + p.receiptStarted = time.Now() - // Try to update the old value - if atomic.CompareAndSwapInt32(&p.capacity, prev, next) { - // If we're having problems at 1 capacity, try to find better peers - if next == 1 { - p.Demote() - } - break - } + // Convert the header set to a retrievable slice + hashes := make([]common.Hash, 0, len(request.Headers)) + for _, header := range request.Headers { + hashes = append(hashes, header.Hash()) } - // Set the peer to idle to allow further block requests - atomic.StoreInt32(&p.idle, 0) + go p.getReceipts(hashes) + + return nil +} + +// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests. +// Its block retrieval allowance will also be updated either up- or downwards, +// depending on whether the previous fetch completed in time or not. +func (p *peer) SetBlocksIdle() { + p.setIdle(p.blockStarted, blockSoftTTL, blockHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle) } -// SetIdle sets the peer to idle, allowing it to execute new retrieval requests. +// SetBodiesIdle sets the peer to idle, allowing it to execute new retrieval requests. // Its block body retrieval allowance will also be updated either up- or downwards, // depending on whether the previous fetch completed in time or not. -func (p *peer) SetIdle() { +func (p *peer) SetBodiesIdle() { + p.setIdle(p.blockStarted, bodySoftTTL, bodyHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle) +} + +// SetReceiptsIdle sets the peer to idle, allowing it to execute new retrieval requests. +// Its receipt retrieval allowance will also be updated either up- or downwards, +// depending on whether the previous fetch completed in time or not. +func (p *peer) SetReceiptsIdle() { + p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle) +} + +// setIdle sets the peer to idle, allowing it to execute new retrieval requests. +// Its data retrieval allowance will also be updated either up- or downwards, +// depending on whether the previous fetch completed in time or not. +func (p *peer) setIdle(started time.Time, softTTL, hardTTL time.Duration, maxFetch int, capacity, idle *int32) { // Update the peer's download allowance based on previous performance scale := 2.0 - if time.Since(p.started) > bodySoftTTL { + if time.Since(started) > softTTL { scale = 0.5 - if time.Since(p.started) > bodyHardTTL { - scale = 1 / float64(MaxBodyFetch) // reduces capacity to 1 + if time.Since(started) > hardTTL { + scale = 1 / float64(maxFetch) // reduces capacity to 1 } } for { // Calculate the new download bandwidth allowance - prev := atomic.LoadInt32(&p.capacity) - next := int32(math.Max(1, math.Min(float64(MaxBodyFetch), float64(prev)*scale))) + prev := atomic.LoadInt32(capacity) + next := int32(math.Max(1, math.Min(float64(maxFetch), float64(prev)*scale))) // Try to update the old value - if atomic.CompareAndSwapInt32(&p.capacity, prev, next) { + if atomic.CompareAndSwapInt32(capacity, prev, next) { // If we're having problems at 1 capacity, try to find better peers if next == 1 { p.Demote() @@ -193,14 +214,20 @@ func (p *peer) SetIdle() { break } } - // Set the peer to idle to allow further block requests - atomic.StoreInt32(&p.idle, 0) + // Set the peer to idle to allow further fetch requests + atomic.StoreInt32(idle, 0) +} + +// BlockCapacity retrieves the peers block download allowance based on its +// previously discovered bandwidth capacity. +func (p *peer) BlockCapacity() int { + return int(atomic.LoadInt32(&p.blockCapacity)) } -// Capacity retrieves the peers block download allowance based on its previously -// discovered bandwidth capacity. -func (p *peer) Capacity() int { - return int(atomic.LoadInt32(&p.capacity)) +// ReceiptCapacity retrieves the peers block download allowance based on its +// previously discovered bandwidth capacity. +func (p *peer) ReceiptCapacity() int { + return int(atomic.LoadInt32(&p.receiptCapacity)) } // Promote increases the peer's reputation. @@ -226,7 +253,8 @@ func (p *peer) Demote() { func (p *peer) String() string { return fmt.Sprintf("Peer %s [%s]", p.id, fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+ - fmt.Sprintf("capacity %3d, ", atomic.LoadInt32(&p.capacity))+ + fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+ + fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+ fmt.Sprintf("ignored %4d", p.ignored.Size()), ) } @@ -310,26 +338,52 @@ func (ps *peerSet) AllPeers() []*peer { return list } -// IdlePeers retrieves a flat list of all the currently idle peers within the +// BlockIdlePeers retrieves a flat list of all the currently idle peers within the // active peer set, ordered by their reputation. -func (ps *peerSet) IdlePeers(version int) []*peer { +func (ps *peerSet) BlockIdlePeers(version int) ([]*peer, int) { ps.lock.RLock() defer ps.lock.RUnlock() - list := make([]*peer, 0, len(ps.peers)) + idle, total := make([]*peer, 0, len(ps.peers)), 0 for _, p := range ps.peers { - if (version == eth61 && p.version == eth61) || (version >= eth62 && p.version >= eth62) { - if atomic.LoadInt32(&p.idle) == 0 { - list = append(list, p) + if (version == 61 && p.version == 61) || (version >= 62 && p.version >= 62) { + if atomic.LoadInt32(&p.blockIdle) == 0 { + idle = append(idle, p) } + total++ } } - for i := 0; i < len(list); i++ { - for j := i + 1; j < len(list); j++ { - if atomic.LoadInt32(&list[i].rep) < atomic.LoadInt32(&list[j].rep) { - list[i], list[j] = list[j], list[i] + for i := 0; i < len(idle); i++ { + for j := i + 1; j < len(idle); j++ { + if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) { + idle[i], idle[j] = idle[j], idle[i] } } } - return list + return idle, total +} + +// ReceiptIdlePeers retrieves a flat list of all the currently idle peers within the +// active peer set, ordered by their reputation. +func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) { + ps.lock.RLock() + defer ps.lock.RUnlock() + + idle, total := make([]*peer, 0, len(ps.peers)), 0 + for _, p := range ps.peers { + if p.version >= 63 { + if atomic.LoadInt32(&p.receiptIdle) == 0 { + idle = append(idle, p) + } + total++ + } + } + for i := 0; i < len(idle); i++ { + for j := i + 1; j < len(idle); j++ { + if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) { + idle[i], idle[j] = idle[j], idle[i] + } + } + } + return idle, total } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 49d1046fb..c53ad939e 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -29,11 +29,12 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" + "github.com/rcrowley/go-metrics" "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) var ( - blockCacheLimit = 8 * MaxBlockFetch // Maximum number of blocks to cache before throttling the download + blockCacheLimit = 1024 // Maximum number of blocks to cache before throttling the download ) var ( @@ -41,29 +42,47 @@ var ( errStaleDelivery = errors.New("stale delivery") ) -// fetchRequest is a currently running block retrieval operation. +// fetchRequest is a currently running data retrieval operation. type fetchRequest struct { Peer *peer // Peer to which the request was sent - Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority) + Hashes map[common.Hash]int // [eth/61] Requested block with their insertion index (priority) Headers []*types.Header // [eth/62] Requested headers, sorted by request order Time time.Time // Time when the request was made } +// fetchResult is the assembly collecting partial results from potentially more +// than one fetcher routines, until all outstanding retrievals complete and the +// result as a whole can be processed. +type fetchResult struct { + Pending int // Number of data fetches still pending + + Header *types.Header + Uncles []*types.Header + Transactions types.Transactions + Receipts types.Receipts +} + // queue represents hashes that are either need fetching or are being fetched type queue struct { hashPool map[common.Hash]int // [eth/61] Pending hashes, mapping to their insertion index (priority) hashQueue *prque.Prque // [eth/61] Priority queue of the block hashes to fetch hashCounter int // [eth/61] Counter indexing the added hashes to ensure retrieval order - headerPool map[common.Hash]*types.Header // [eth/62] Pending headers, mapping from their hashes - headerQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the bodies for - headerHead common.Hash // [eth/62] Hash of the last queued header to verify order + headerHead common.Hash // [eth/62] Hash of the last queued header to verify order - pendPool map[string]*fetchRequest // Currently pending block retrieval operations + blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers + blockTaskQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for + blockPendPool map[string]*fetchRequest // [eth/62] Currently pending block (body) retrieval operations + blockDonePool map[common.Hash]struct{} // [eth/62] Set of the completed block (body) fetches - blockPool map[common.Hash]uint64 // Hash-set of the downloaded data blocks, mapping to cache indexes - blockCache []*Block // Downloaded but not yet delivered blocks - blockOffset uint64 // Offset of the first cached block in the block-chain + receiptTaskPool map[common.Hash]*types.Header // [eth/63] Pending receipt retrieval tasks, mapping hashes to headers + receiptTaskQueue *prque.Prque // [eth/63] Priority queue of the headers to fetch the receipts for + receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations + receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches + + resultCache []*fetchResult // Downloaded but not yet delivered fetch results + resultOffset uint64 // Offset of the first cached fetch result in the block-chain + resultParts int // Number of fetch components required to complete an item lock sync.RWMutex } @@ -71,13 +90,17 @@ type queue struct { // newQueue creates a new download queue for scheduling block retrieval. func newQueue() *queue { return &queue{ - hashPool: make(map[common.Hash]int), - hashQueue: prque.New(), - headerPool: make(map[common.Hash]*types.Header), - headerQueue: prque.New(), - pendPool: make(map[string]*fetchRequest), - blockPool: make(map[common.Hash]uint64), - blockCache: make([]*Block, blockCacheLimit), + hashPool: make(map[common.Hash]int), + hashQueue: prque.New(), + blockTaskPool: make(map[common.Hash]*types.Header), + blockTaskQueue: prque.New(), + blockPendPool: make(map[string]*fetchRequest), + blockDonePool: make(map[common.Hash]struct{}), + receiptTaskPool: make(map[common.Hash]*types.Header), + receiptTaskQueue: prque.New(), + receiptPendPool: make(map[string]*fetchRequest), + receiptDonePool: make(map[common.Hash]struct{}), + resultCache: make([]*fetchResult, blockCacheLimit), } } @@ -90,32 +113,37 @@ func (q *queue) Reset() { q.hashQueue.Reset() q.hashCounter = 0 - q.headerPool = make(map[common.Hash]*types.Header) - q.headerQueue.Reset() q.headerHead = common.Hash{} - q.pendPool = make(map[string]*fetchRequest) + q.blockTaskPool = make(map[common.Hash]*types.Header) + q.blockTaskQueue.Reset() + q.blockPendPool = make(map[string]*fetchRequest) + q.blockDonePool = make(map[common.Hash]struct{}) + + q.receiptTaskPool = make(map[common.Hash]*types.Header) + q.receiptTaskQueue.Reset() + q.receiptPendPool = make(map[string]*fetchRequest) + q.receiptDonePool = make(map[common.Hash]struct{}) - q.blockPool = make(map[common.Hash]uint64) - q.blockOffset = 0 - q.blockCache = make([]*Block, blockCacheLimit) + q.resultCache = make([]*fetchResult, blockCacheLimit) + q.resultOffset = 0 + q.resultParts = 0 } -// Size retrieves the number of blocks in the queue, returning separately for -// pending and already downloaded. -func (q *queue) Size() (int, int) { +// PendingBlocks retrieves the number of block (body) requests pending for retrieval. +func (q *queue) PendingBlocks() int { q.lock.RLock() defer q.lock.RUnlock() - return len(q.hashPool) + len(q.headerPool), len(q.blockPool) + return q.hashQueue.Size() + q.blockTaskQueue.Size() } -// Pending retrieves the number of blocks pending for retrieval. -func (q *queue) Pending() int { +// PendingReceipts retrieves the number of block receipts pending for retrieval. +func (q *queue) PendingReceipts() int { q.lock.RLock() defer q.lock.RUnlock() - return q.hashQueue.Size() + q.headerQueue.Size() + return q.receiptTaskQueue.Size() } // InFlight retrieves the number of fetch requests currently in flight. @@ -123,44 +151,55 @@ func (q *queue) InFlight() int { q.lock.RLock() defer q.lock.RUnlock() - return len(q.pendPool) + return len(q.blockPendPool) + len(q.receiptPendPool) +} + +// Idle returns if the queue is fully idle or has some data still inside. This +// method is used by the tester to detect termination events. +func (q *queue) Idle() bool { + q.lock.RLock() + defer q.lock.RUnlock() + + queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + pending := len(q.blockPendPool) + len(q.receiptPendPool) + cached := len(q.blockDonePool) + len(q.receiptDonePool) + + return (queued + pending + cached) == 0 } -// Throttle checks if the download should be throttled (active block fetches -// exceed block cache). -func (q *queue) Throttle() bool { +// ThrottleBlocks checks if the download should be throttled (active block (body) +// fetches exceed block cache). +func (q *queue) ThrottleBlocks() bool { q.lock.RLock() defer q.lock.RUnlock() - // Calculate the currently in-flight block requests + // Calculate the currently in-flight block (body) requests pending := 0 - for _, request := range q.pendPool { + for _, request := range q.blockPendPool { pending += len(request.Hashes) + len(request.Headers) } - // Throttle if more blocks are in-flight than free space in the cache - return pending >= len(q.blockCache)-len(q.blockPool) + // Throttle if more blocks (bodies) are in-flight than free space in the cache + return pending >= len(q.resultCache)-len(q.blockDonePool) } -// Has checks if a hash is within the download queue or not. -func (q *queue) Has(hash common.Hash) bool { +// ThrottleReceipts checks if the download should be throttled (active receipt +// fetches exceed block cache). +func (q *queue) ThrottleReceipts() bool { q.lock.RLock() defer q.lock.RUnlock() - if _, ok := q.hashPool[hash]; ok { - return true - } - if _, ok := q.headerPool[hash]; ok { - return true - } - if _, ok := q.blockPool[hash]; ok { - return true + // Calculate the currently in-flight receipt requests + pending := 0 + for _, request := range q.receiptPendPool { + pending += len(request.Headers) } - return false + // Throttle if more receipts are in-flight than free space in the cache + return pending >= len(q.resultCache)-len(q.receiptDonePool) } -// Insert61 adds a set of hashes for the download queue for scheduling, returning +// Schedule61 adds a set of hashes for the download queue for scheduling, returning // the new hashes encountered. -func (q *queue) Insert61(hashes []common.Hash, fifo bool) []common.Hash { +func (q *queue) Schedule61(hashes []common.Hash, fifo bool) []common.Hash { q.lock.Lock() defer q.lock.Unlock() @@ -186,22 +225,17 @@ func (q *queue) Insert61(hashes []common.Hash, fifo bool) []common.Hash { return inserts } -// Insert adds a set of headers for the download queue for scheduling, returning +// Schedule adds a set of headers for the download queue for scheduling, returning // the new headers encountered. -func (q *queue) Insert(headers []*types.Header, from uint64) []*types.Header { +func (q *queue) Schedule(headers []*types.Header, from uint64, receipts bool) []*types.Header { q.lock.Lock() defer q.lock.Unlock() // Insert all the headers prioritized by the contained block number inserts := make([]*types.Header, 0, len(headers)) for _, header := range headers { - // Make sure no duplicate requests are executed - hash := header.Hash() - if _, ok := q.headerPool[hash]; ok { - glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled", header.Number.Uint64(), hash[:4]) - continue - } // Make sure chain order is honored and preserved throughout + hash := header.Hash() if header.Number == nil || header.Number.Uint64() != from { glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ordering, expected %d", header.Number, hash[:4], from) break @@ -210,69 +244,72 @@ func (q *queue) Insert(headers []*types.Header, from uint64) []*types.Header { glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ancestry", header.Number, hash[:4]) break } - // Queue the header for body retrieval + // Make sure no duplicate requests are executed + if _, ok := q.blockTaskPool[hash]; ok { + glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for block fetch", header.Number.Uint64(), hash[:4]) + continue + } + if _, ok := q.receiptTaskPool[hash]; ok { + glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for receipt fetch", header.Number.Uint64(), hash[:4]) + continue + } + // Queue the header for content retrieval + q.blockTaskPool[hash] = header + q.blockTaskQueue.Push(header, -float32(header.Number.Uint64())) + if receipts { + q.receiptTaskPool[hash] = header + q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64())) + } inserts = append(inserts, header) - q.headerPool[hash] = header - q.headerQueue.Push(header, -float32(header.Number.Uint64())) q.headerHead = hash from++ } return inserts } -// GetHeadBlock retrieves the first block from the cache, or nil if it hasn't +// GetHeadResult retrieves the first fetch result from the cache, or nil if it hasn't // been downloaded yet (or simply non existent). -func (q *queue) GetHeadBlock() *Block { +func (q *queue) GetHeadResult() *fetchResult { q.lock.RLock() defer q.lock.RUnlock() - if len(q.blockCache) == 0 { + if len(q.resultCache) == 0 || q.resultCache[0] == nil { return nil } - return q.blockCache[0] -} - -// GetBlock retrieves a downloaded block, or nil if non-existent. -func (q *queue) GetBlock(hash common.Hash) *Block { - q.lock.RLock() - defer q.lock.RUnlock() - - // Short circuit if the block hasn't been downloaded yet - index, ok := q.blockPool[hash] - if !ok { + if q.resultCache[0].Pending > 0 { return nil } - // Return the block if it's still available in the cache - if q.blockOffset <= index && index < q.blockOffset+uint64(len(q.blockCache)) { - return q.blockCache[index-q.blockOffset] - } - return nil + return q.resultCache[0] } -// TakeBlocks retrieves and permanently removes a batch of blocks from the cache. -func (q *queue) TakeBlocks() []*Block { +// TakeResults retrieves and permanently removes a batch of fetch results from +// the cache. +func (q *queue) TakeResults() []*fetchResult { q.lock.Lock() defer q.lock.Unlock() - // Accumulate all available blocks - blocks := []*Block{} - for _, block := range q.blockCache { - if block == nil { + // Accumulate all available results + results := []*fetchResult{} + for _, result := range q.resultCache { + if result == nil || result.Pending > 0 { break } - blocks = append(blocks, block) - delete(q.blockPool, block.RawBlock.Hash()) + results = append(results, result) + + hash := result.Header.Hash() + delete(q.blockDonePool, hash) + delete(q.receiptDonePool, hash) } - // Delete the blocks from the slice and let them be garbage collected - // without this slice trick the blocks would stay in memory until nil - // would be assigned to q.blocks - copy(q.blockCache, q.blockCache[len(blocks):]) - for k, n := len(q.blockCache)-len(blocks), len(q.blockCache); k < n; k++ { - q.blockCache[k] = nil + // Delete the results from the slice and let them be garbage collected + // without this slice trick the results would stay in memory until nil + // would be assigned to them. + copy(q.resultCache, q.resultCache[len(results):]) + for k, n := len(q.resultCache)-len(results), len(q.resultCache); k < n; k++ { + q.resultCache[k] = nil } - q.blockOffset += uint64(len(blocks)) + q.resultOffset += uint64(len(results)) - return blocks + return results } // Reserve61 reserves a set of hashes for the given peer, skipping any previously @@ -286,12 +323,12 @@ func (q *queue) Reserve61(p *peer, count int) *fetchRequest { if q.hashQueue.Empty() { return nil } - if _, ok := q.pendPool[p.id]; ok { + if _, ok := q.blockPendPool[p.id]; ok { return nil } // Calculate an upper limit on the hashes we might fetch (i.e. throttling) - space := len(q.blockCache) - len(q.blockPool) - for _, request := range q.pendPool { + space := len(q.resultCache) - len(q.blockDonePool) + for _, request := range q.blockPendPool { space -= len(request.Hashes) } // Retrieve a batch of hashes, skipping previously failed ones @@ -319,49 +356,82 @@ func (q *queue) Reserve61(p *peer, count int) *fetchRequest { Hashes: send, Time: time.Now(), } - q.pendPool[p.id] = request + q.blockPendPool[p.id] = request return request } -// Reserve reserves a set of headers for the given peer, skipping any previously -// failed download. Beside the next batch of needed fetches, it also returns a -// flag whether empty blocks were queued requiring processing. -func (q *queue) Reserve(p *peer, count int) (*fetchRequest, bool, error) { +// ReserveBlocks reserves a set of body fetches for the given peer, skipping any +// previously failed downloads. Beside the next batch of needed fetches, it also +// returns a flag whether empty blocks were queued requiring processing. +func (q *queue) ReserveBlocks(p *peer, count int) (*fetchRequest, bool, error) { + noop := func(header *types.Header) bool { + return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash + } + return q.reserveFetch(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, noop) +} + +// ReserveReceipts reserves a set of receipt fetches for the given peer, skipping +// any previously failed downloads. Beside the next batch of needed fetches, it +// also returns a flag whether empty receipts were queued requiring importing. +func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error) { + noop := func(header *types.Header) bool { + return header.ReceiptHash == types.EmptyRootHash + } + return q.reserveFetch(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, noop) +} + +// reserveFetch reserves a set of data download operations for a given peer, +// skipping any previously failed ones. This method is a generic version used +// by the individual special reservation functions. +func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, + pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, noop func(*types.Header) bool) (*fetchRequest, bool, error) { q.lock.Lock() defer q.lock.Unlock() // Short circuit if the pool has been depleted, or if the peer's already // downloading something (sanity check not to corrupt state) - if q.headerQueue.Empty() { + if taskQueue.Empty() { return nil, false, nil } - if _, ok := q.pendPool[p.id]; ok { + if _, ok := pendPool[p.id]; ok { return nil, false, nil } - // Calculate an upper limit on the bodies we might fetch (i.e. throttling) - space := len(q.blockCache) - len(q.blockPool) - for _, request := range q.pendPool { + // Calculate an upper limit on the items we might fetch (i.e. throttling) + space := len(q.resultCache) - len(donePool) + for _, request := range pendPool { space -= len(request.Headers) } - // Retrieve a batch of headers, skipping previously failed ones + // Retrieve a batch of tasks, skipping previously failed ones send := make([]*types.Header, 0, count) skip := make([]*types.Header, 0) - process := false - for proc := 0; proc < space && len(send) < count && !q.headerQueue.Empty(); proc++ { - header := q.headerQueue.PopItem().(*types.Header) + progress := false + for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ { + header := taskQueue.PopItem().(*types.Header) - // If the header defines an empty block, deliver straight - if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) { - if err := q.enqueue("", types.NewBlockWithHeader(header)); err != nil { - return nil, false, errInvalidChain + // If we're the first to request this task, initialize the result container + index := int(header.Number.Int64() - int64(q.resultOffset)) + if index >= len(q.resultCache) || index < 0 { + return nil, false, errInvalidChain + } + if q.resultCache[index] == nil { + q.resultCache[index] = &fetchResult{ + Pending: q.resultParts, + Header: header, } - delete(q.headerPool, header.Hash()) - process, space, proc = true, space-1, proc-1 + } + // If this fetch task is a noop, skip this fetch operation + if noop(header) { + donePool[header.Hash()] = struct{}{} + delete(taskPool, header.Hash()) + + space, proc = space-1, proc-1 + q.resultCache[index].Pending-- + progress = true continue } - // If it's a content block, add to the body fetch request + // Otherwise if not a known unknown block, add to the retrieve list if p.ignored.Has(header.Hash()) { skip = append(skip, header) } else { @@ -370,24 +440,41 @@ func (q *queue) Reserve(p *peer, count int) (*fetchRequest, bool, error) { } // Merge all the skipped headers back for _, header := range skip { - q.headerQueue.Push(header, -float32(header.Number.Uint64())) + taskQueue.Push(header, -float32(header.Number.Uint64())) } // Assemble and return the block download request if len(send) == 0 { - return nil, process, nil + return nil, progress, nil } request := &fetchRequest{ Peer: p, Headers: send, Time: time.Now(), } - q.pendPool[p.id] = request + pendPool[p.id] = request + + return request, progress, nil +} + +// Cancel61 aborts a fetch request, returning all pending hashes to the queue. +func (q *queue) Cancel61(request *fetchRequest) { + q.cancel(request, nil, q.blockPendPool) +} + +// CancelBlocks aborts a body fetch request, returning all pending hashes to the +// task queue. +func (q *queue) CancelBlocks(request *fetchRequest) { + q.cancel(request, q.blockTaskQueue, q.blockPendPool) +} - return request, process, nil +// CancelReceipts aborts a body fetch request, returning all pending hashes to +// the task queue. +func (q *queue) CancelReceipts(request *fetchRequest) { + q.cancel(request, q.receiptTaskQueue, q.receiptPendPool) } -// Cancel aborts a fetch request, returning all pending hashes to the queue. -func (q *queue) Cancel(request *fetchRequest) { +// Cancel aborts a fetch request, returning all pending hashes to the task queue. +func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) { q.lock.Lock() defer q.lock.Unlock() @@ -395,20 +482,62 @@ func (q *queue) Cancel(request *fetchRequest) { q.hashQueue.Push(hash, float32(index)) } for _, header := range request.Headers { - q.headerQueue.Push(header, -float32(header.Number.Uint64())) + taskQueue.Push(header, -float32(header.Number.Uint64())) } - delete(q.pendPool, request.Peer.id) + delete(pendPool, request.Peer.id) } -// Expire checks for in flight requests that exceeded a timeout allowance, +// Revoke cancels all pending requests belonging to a given peer. This method is +// meant to be called during a peer drop to quickly reassign owned data fetches +// to remaining nodes. +func (q *queue) Revoke(peerId string) { + q.lock.Lock() + defer q.lock.Unlock() + + if request, ok := q.blockPendPool[peerId]; ok { + for hash, index := range request.Hashes { + q.hashQueue.Push(hash, float32(index)) + } + for _, header := range request.Headers { + q.blockTaskQueue.Push(header, -float32(header.Number.Uint64())) + } + delete(q.blockPendPool, peerId) + } + if request, ok := q.receiptPendPool[peerId]; ok { + for _, header := range request.Headers { + q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64())) + } + delete(q.receiptPendPool, peerId) + } +} + +// Expire61 checks for in flight requests that exceeded a timeout allowance, // canceling them and returning the responsible peers for penalization. -func (q *queue) Expire(timeout time.Duration) []string { +func (q *queue) Expire61(timeout time.Duration) []string { + return q.expire(timeout, q.blockPendPool, nil) +} + +// ExpireBlocks checks for in flight block body requests that exceeded a timeout +// allowance, canceling them and returning the responsible peers for penalization. +func (q *queue) ExpireBlocks(timeout time.Duration) []string { + return q.expire(timeout, q.blockPendPool, q.blockTaskQueue) +} + +// ExpireReceipts checks for in flight receipt requests that exceeded a timeout +// allowance, canceling them and returning the responsible peers for penalization. +func (q *queue) ExpireReceipts(timeout time.Duration) []string { + return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue) +} + +// expire is the generic check that move expired tasks from a pending pool back +// into a task pool, returning all entities caught with expired tasks. +func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque) []string { q.lock.Lock() defer q.lock.Unlock() // Iterate over the expired requests and return each to the queue peers := []string{} - for id, request := range q.pendPool { + for id, request := range pendPool { if time.Since(request.Time) > timeout { // Update the metrics with the timeout if len(request.Hashes) > 0 { @@ -421,14 +550,14 @@ func (q *queue) Expire(timeout time.Duration) []string { q.hashQueue.Push(hash, float32(index)) } for _, header := range request.Headers { - q.headerQueue.Push(header, -float32(header.Number.Uint64())) + taskQueue.Push(header, -float32(header.Number.Uint64())) } peers = append(peers, id) } } // Remove the expired requests from the pending pool for _, id := range peers { - delete(q.pendPool, id) + delete(pendPool, id) } return peers } @@ -439,12 +568,12 @@ func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) { defer q.lock.Unlock() // Short circuit if the blocks were never requested - request := q.pendPool[id] + request := q.blockPendPool[id] if request == nil { return errNoFetchesPending } blockReqTimer.UpdateSince(request.Time) - delete(q.pendPool, id) + delete(q.blockPendPool, id) // If no blocks were retrieved, mark them as unavailable for the origin peer if len(blocks) == 0 { @@ -461,10 +590,19 @@ func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) { errs = append(errs, fmt.Errorf("non-requested block %x", hash)) continue } - // Queue the block up for processing - if err := q.enqueue(id, block); err != nil { - return err + // Reconstruct the next result if contents match up + index := int(block.Number().Int64() - int64(q.resultOffset)) + if index >= len(q.resultCache) || index < 0 { + errs = []error{errInvalidChain} + break + } + q.resultCache[index] = &fetchResult{ + Header: block.Header(), + Transactions: block.Transactions(), + Uncles: block.Uncles(), } + q.blockDonePool[block.Hash()] = struct{}{} + delete(request.Hashes, hash) delete(q.hashPool, hash) } @@ -473,60 +611,94 @@ func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) { q.hashQueue.Push(hash, float32(index)) } // If none of the blocks were good, it's a stale delivery - if len(errs) != 0 { - if len(errs) == len(blocks) { - return errStaleDelivery - } + switch { + case len(errs) == 0: + return nil + + case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock): + return errs[0] + + case len(errs) == len(request.Headers): + return errStaleDelivery + + default: return fmt.Errorf("multiple failures: %v", errs) } - return nil } -// Deliver injects a block body retrieval response into the download queue. -func (q *queue) Deliver(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error { +// DeliverBlocks injects a block (body) retrieval response into the results queue. +func (q *queue) DeliverBlocks(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error { + reconstruct := func(header *types.Header, index int, result *fetchResult) error { + if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash { + return errInvalidBody + } + result.Transactions = txLists[index] + result.Uncles = uncleLists[index] + return nil + } + return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, bodyReqTimer, len(txLists), reconstruct) +} + +// DeliverReceipts injects a receipt retrieval response into the results queue. +func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error { + reconstruct := func(header *types.Header, index int, result *fetchResult) error { + if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash { + return errInvalidReceipt + } + result.Receipts = receiptList[index] + return nil + } + return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, receiptReqTimer, len(receiptList), reconstruct) +} + +// deliver injects a data retrieval response into the results queue. +func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, + donePool map[common.Hash]struct{}, reqTimer metrics.Timer, results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) error { q.lock.Lock() defer q.lock.Unlock() - // Short circuit if the block bodies were never requested - request := q.pendPool[id] + // Short circuit if the data was never requested + request := pendPool[id] if request == nil { return errNoFetchesPending } - bodyReqTimer.UpdateSince(request.Time) - delete(q.pendPool, id) + reqTimer.UpdateSince(request.Time) + delete(pendPool, id) - // If no block bodies were retrieved, mark them as unavailable for the origin peer - if len(txLists) == 0 || len(uncleLists) == 0 { + // If no data items were retrieved, mark them as unavailable for the origin peer + if results == 0 { for hash, _ := range request.Headers { request.Peer.ignored.Add(hash) } } - // Assemble each of the block bodies with their headers and queue for processing + // Assemble each of the results with their headers and retrieved data parts errs := make([]error, 0) for i, header := range request.Headers { - // Short circuit block assembly if no more bodies are found - if i >= len(txLists) || i >= len(uncleLists) { + // Short circuit assembly if no more fetch results are found + if i >= results { break } - // Reconstruct the next block if contents match up - if types.DeriveSha(types.Transactions(txLists[i])) != header.TxHash || types.CalcUncleHash(uncleLists[i]) != header.UncleHash { - errs = []error{errInvalidBody} + // Reconstruct the next result if contents match up + index := int(header.Number.Int64() - int64(q.resultOffset)) + if index >= len(q.resultCache) || index < 0 || q.resultCache[index] == nil { + errs = []error{errInvalidChain} break } - block := types.NewBlockWithHeader(header).WithBody(txLists[i], uncleLists[i]) - - // Queue the block up for processing - if err := q.enqueue(id, block); err != nil { + if err := reconstruct(header, i, q.resultCache[index]); err != nil { errs = []error{err} break } + donePool[header.Hash()] = struct{}{} + q.resultCache[index].Pending-- + + // Clean up a successful fetch request.Headers[i] = nil - delete(q.headerPool, header.Hash()) + delete(taskPool, header.Hash()) } // Return all failed or missing fetches to the queue for _, header := range request.Headers { if header != nil { - q.headerQueue.Push(header, -float32(header.Number.Uint64())) + taskQueue.Push(header, -float32(header.Number.Uint64())) } } // If none of the blocks were good, it's a stale delivery @@ -534,11 +706,8 @@ func (q *queue) Deliver(id string, txLists [][]*types.Transaction, uncleLists [] case len(errs) == 0: return nil - case len(errs) == 1 && errs[0] == errInvalidBody: - return errInvalidBody - - case len(errs) == 1 && errs[0] == errInvalidChain: - return errInvalidChain + case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBody || errs[0] == errInvalidReceipt): + return errs[0] case len(errs) == len(request.Headers): return errStaleDelivery @@ -548,29 +717,14 @@ func (q *queue) Deliver(id string, txLists [][]*types.Transaction, uncleLists [] } } -// enqueue inserts a new block into the final delivery queue, waiting for pickup -// by the processor. -func (q *queue) enqueue(origin string, block *types.Block) error { - // If a requested block falls out of the range, the hash chain is invalid - index := int(int64(block.NumberU64()) - int64(q.blockOffset)) - if index >= len(q.blockCache) || index < 0 { - return errInvalidChain - } - // Otherwise merge the block and mark the hash done - q.blockCache[index] = &Block{ - RawBlock: block, - OriginPeer: origin, - } - q.blockPool[block.Header().Hash()] = block.NumberU64() - return nil -} - -// Prepare configures the block cache offset to allow accepting inbound blocks. -func (q *queue) Prepare(offset uint64) { +// Prepare configures the result cache to allow accepting and caching inbound +// fetch results. +func (q *queue) Prepare(offset uint64, parts int) { q.lock.Lock() defer q.lock.Unlock() - if q.blockOffset < offset { - q.blockOffset = offset + if q.resultOffset < offset { + q.resultOffset = offset } + q.resultParts = parts } |