From 47a7fe5d22fe2a6be783f6576070814fe951eaaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 14 Aug 2015 21:25:41 +0300 Subject: eth: port the synchronisation algo to eth/62 --- eth/downloader/downloader.go | 674 ++++++++++++++++++++++++++++++++------ eth/downloader/downloader_test.go | 337 ++++++++++++++----- eth/downloader/peer.go | 92 +++++- eth/downloader/queue.go | 239 ++++++++++++-- 4 files changed, 1125 insertions(+), 217 deletions(-) (limited to 'eth/downloader') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index b28879ee6..0e8529756 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -26,12 +26,10 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" - "gopkg.in/fatih/set.v0" ) const ( @@ -40,40 +38,44 @@ const ( ) var ( - MinHashFetch = 512 // Minimum amount of hashes to not consider a peer stalling MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request - MaxHeaderFetch = 256 // Amount of block headers to be fetched per retrieval request + MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request + MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request MaxStateFetch = 384 // Amount of node state values to allow fetching per request MaxReceiptsFetch = 384 // Amount of transaction receipts to allow fetching per request - hashTTL = 5 * time.Second // Time it takes for a hash request to time out - blockSoftTTL = 3 * time.Second // Request completion threshold for increasing or decreasing a peer's bandwidth - blockHardTTL = 3 * blockSoftTTL // Maximum time allowance before a block request is considered expired - crossCheckCycle = time.Second // Period after which to check for expired cross checks + hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out + blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth + blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired + headerTTL = 5 * time.Second // [eth/62] Time it takes for a header request to time out + bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth + bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired - maxQueuedHashes = 256 * 1024 // Maximum number of hashes to queue for import (DOS protection) - maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out - maxBlockProcess = 256 // Number of blocks to import at once into the chain + maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) + maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) + maxBlockProcess = 256 // Number of blocks to import at once into the chain ) var ( - errBusy = errors.New("busy") - errUnknownPeer = errors.New("peer is unknown or unhealthy") - errBadPeer = errors.New("action from bad peer ignored") - errStallingPeer = errors.New("peer is stalling") - errBannedHead = errors.New("peer head hash already banned") - errNoPeers = errors.New("no peers to keep download active") - errPendingQueue = errors.New("pending items in queue") - errTimeout = errors.New("timeout") - errEmptyHashSet = errors.New("empty hash set by peer") - errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") - errAlreadyInPool = errors.New("hash already in pool") - errInvalidChain = errors.New("retrieved hash chain is invalid") - errCrossCheckFailed = errors.New("block cross-check failed") - errCancelHashFetch = errors.New("hash fetching canceled (requested)") - errCancelBlockFetch = errors.New("block downloading canceled (requested)") - errNoSyncActive = errors.New("no sync active") + errBusy = errors.New("busy") + errUnknownPeer = errors.New("peer is unknown or unhealthy") + errBadPeer = errors.New("action from bad peer ignored") + errStallingPeer = errors.New("peer is stalling") + errNoPeers = errors.New("no peers to keep download active") + errPendingQueue = errors.New("pending items in queue") + errTimeout = errors.New("timeout") + errEmptyHashSet = errors.New("empty hash set by peer") + errEmptyHeaderSet = errors.New("empty header set by peer") + errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") + errAlreadyInPool = errors.New("hash already in pool") + errInvalidChain = errors.New("retrieved hash chain is invalid") + errInvalidBody = errors.New("retrieved block body is invalid") + errCancelHashFetch = errors.New("hash fetching canceled (requested)") + errCancelBlockFetch = errors.New("block downloading canceled (requested)") + errCancelHeaderFetch = errors.New("block header fetching canceled (requested)") + errCancelBodyFetch = errors.New("block body downloading canceled (requested)") + errNoSyncActive = errors.New("no sync active") ) // hashCheckFn is a callback type for verifying a hash's presence in the local chain. @@ -91,28 +93,36 @@ type chainInsertFn func(types.Blocks) (int, error) // peerDropFn is a callback type for dropping a peer detected as malicious. type peerDropFn func(id string) +// hashPack is a batch of block hashes returned by a peer (eth/61). +type hashPack struct { + peerId string + hashes []common.Hash +} + +// blockPack is a batch of blocks returned by a peer (eth/61). type blockPack struct { peerId string blocks []*types.Block } -type hashPack struct { - peerId string - hashes []common.Hash +// headerPack is a batch of block headers returned by a peer. +type headerPack struct { + peerId string + headers []*types.Header } -type crossCheck struct { - expire time.Time - parent common.Hash +// bodyPack is a batch of block bodies returned by a peer. +type bodyPack struct { + peerId string + transactions [][]*types.Transaction + uncles [][]*types.Header } type Downloader struct { mux *event.TypeMux - queue *queue // Scheduler for selecting the hashes to download - peers *peerSet // Set of active peers from which download can proceed - checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain - banned *set.Set // Set of hashes we've received and banned + queue *queue // Scheduler for selecting the hashes to download + peers *peerSet // Set of active peers from which download can proceed interrupt int32 // Atomic boolean to signal termination @@ -137,12 +147,18 @@ type Downloader struct { // Channels newPeerCh chan *peer - hashCh chan hashPack // Channel receiving inbound hashes - blockCh chan blockPack // Channel receiving inbound blocks - processCh chan bool // Channel to signal the block fetcher of new or finished work + hashCh chan hashPack // [eth/61] Channel receiving inbound hashes + blockCh chan blockPack // [eth/61] Channel receiving inbound blocks + headerCh chan headerPack // [eth/62] Channel receiving inbound block headers + bodyCh chan bodyPack // [eth/62] Channel receiving inbound block bodies + processCh chan bool // Channel to signal the block fetcher of new or finished work cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers + + // Testing hooks + bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch + chainInsertHook func([]*Block) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) } // Block is an origin-tagged blockchain block. @@ -153,8 +169,7 @@ type Block struct { // New creates a new downloader to fetch hashes and blocks from remote peers. func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, headBlock headRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader { - // Create the base downloader - downloader := &Downloader{ + return &Downloader{ mux: mux, queue: newQueue(), peers: newPeerSet(), @@ -166,14 +181,10 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, he newPeerCh: make(chan *peer, 1), hashCh: make(chan hashPack, 1), blockCh: make(chan blockPack, 1), + headerCh: make(chan headerPack, 1), + bodyCh: make(chan bodyPack, 1), processCh: make(chan bool, 1), } - // Inject all the known bad hashes - downloader.banned = set.New() - for hash, _ := range core.BadHashes { - downloader.banned.Add(hash) - } - return downloader } // Stats retrieves the current status of the downloader. @@ -206,15 +217,12 @@ func (d *Downloader) Synchronising() bool { // RegisterPeer injects a new download peer into the set of block source to be // used for fetching hashes and blocks from. -func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn) error { - // If the peer wants to send a banned hash, reject - if d.banned.Has(head) { - glog.V(logger.Debug).Infoln("Register rejected, head hash banned:", id) - return errBannedHead - } - // Otherwise try to construct and register the peer +func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, + getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading + getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn) error { + glog.V(logger.Detail).Infoln("Registering peer", id) - if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks)); err != nil { + if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies)); err != nil { glog.V(logger.Error).Infoln("Register failed:", err) return err } @@ -235,7 +243,7 @@ func (d *Downloader) UnregisterPeer(id string) error { // Synchronise tries to sync up our local block chain with a remote peer, both // adding various sanity checks as well as wrapping it with various log entries. func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int) { - glog.V(logger.Detail).Infof("Attempting synchronisation: %v, head 0x%x, TD %v", id, head[:4], td) + glog.V(logger.Detail).Infof("Attempting synchronisation: %v, head [%x…], TD %v", id, head[:4], td) switch err := d.synchronise(id, head, td); err { case nil: @@ -244,7 +252,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int) { case errBusy: glog.V(logger.Detail).Infof("Synchronisation already in progress") - case errTimeout, errBadPeer, errStallingPeer, errBannedHead, errEmptyHashSet, errPeersUnavailable, errInvalidChain, errCrossCheckFailed: + case errTimeout, errBadPeer, errStallingPeer, errEmptyHashSet, errEmptyHeaderSet, errPeersUnavailable, errInvalidChain: glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err) d.dropPeer(id) @@ -270,10 +278,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error } defer atomic.StoreInt32(&d.synchronising, 0) - // If the head hash is banned, terminate immediately - if d.banned.Has(hash) { - return errBannedHead - } // Post a user notification of the sync (only once per session) if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { glog.V(logger.Info).Infoln("Block synchronisation started") @@ -285,7 +289,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error // Reset the queue and peer set to clean any internal leftover state d.queue.Reset() d.peers.Reset() - d.checks = make(map[common.Hash]*crossCheck) // Create cancel channel for aborting mid-flight d.cancelLock.Lock() @@ -320,17 +323,37 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e } }() - glog.V(logger.Debug).Infof("Synchronizing with the network using: %s, eth/%d", p.id, p.version) - switch p.version { - case eth61: + glog.V(logger.Debug).Infof("Synchronising with the network using: %s [eth/%d]", p.id, p.version) + defer glog.V(logger.Debug).Infof("Synchronisation terminated") + + switch { + case p.version == eth61: // Old eth/61, use forward, concurrent hash and block retrieval algorithm + number, err := d.findAncestor61(p) + if err != nil { + return err + } + errc := make(chan error, 2) + go func() { errc <- d.fetchHashes61(p, td, number+1) }() + go func() { errc <- d.fetchBlocks61(number + 1) }() + + // If any fetcher fails, cancel the other + if err := <-errc; err != nil { + d.cancel() + <-errc + return err + } + return <-errc + + case p.version >= eth62: + // New eth/62, use forward, concurrent header and block body retrieval algorithm number, err := d.findAncestor(p) if err != nil { return err } errc := make(chan error, 2) - go func() { errc <- d.fetchHashes(p, td, number+1) }() - go func() { errc <- d.fetchBlocks(number + 1) }() + go func() { errc <- d.fetchHeaders(p, td, number+1) }() + go func() { errc <- d.fetchBodies(number + 1) }() // If any fetcher fails, cancel the other if err := <-errc; err != nil { @@ -373,17 +396,17 @@ func (d *Downloader) Terminate() { d.cancel() } -// findAncestor tries to locate the common ancestor block of the local chain and +// findAncestor61 tries to locate the common ancestor block of the local chain and // a remote peers blockchain. In the general case when our node was in sync and // on the correct chain, checking the top N blocks should already get us a match. -// In the rare scenario when we ended up on a long soft fork (i.e. none of the -// head blocks match), we do a binary search to find the common ancestor. -func (d *Downloader) findAncestor(p *peer) (uint64, error) { +// In the rare scenario when we ended up on a long reorganization (i.e. none of +// the head blocks match), we do a binary search to find the common ancestor. +func (d *Downloader) findAncestor61(p *peer) (uint64, error) { glog.V(logger.Debug).Infof("%v: looking for common ancestor", p) // Request out head blocks to short circuit ancestor location head := d.headBlock().NumberU64() - from := int64(head) - int64(MaxHashFetch) + from := int64(head) - int64(MaxHashFetch) + 1 if from < 0 { from = 0 } @@ -422,6 +445,12 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { case <-d.blockCh: // Out of bounds blocks received, ignore them + case <-d.headerCh: + // Out of bounds eth/62 block headers received, ignore them + + case <-d.bodyCh: + // Out of bounds eth/62 block bodies received, ignore them + case <-timeout: glog.V(logger.Debug).Infof("%v: head hash timeout", p) return 0, errTimeout @@ -429,7 +458,7 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { } // If the head fetch already found an ancestor, return if !common.EmptyHash(hash) { - glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x]", p, number, hash[:4]) + glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, number, hash[:4]) return number, nil } // Ancestor not found, we need to binary search over our chain @@ -468,7 +497,7 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { break } if block.NumberU64() != check { - glog.V(logger.Debug).Infof("%v: non requested hash #%d [%x], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check) + glog.V(logger.Debug).Infof("%v: non requested hash #%d [%x…], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check) return 0, errBadPeer } start = check @@ -476,6 +505,12 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { case <-d.blockCh: // Out of bounds blocks received, ignore them + case <-d.headerCh: + // Out of bounds eth/62 block headers received, ignore them + + case <-d.bodyCh: + // Out of bounds eth/62 block bodies received, ignore them + case <-timeout: glog.V(logger.Debug).Infof("%v: search hash timeout", p) return 0, errTimeout @@ -485,9 +520,9 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { return start, nil } -// fetchHashes keeps retrieving hashes from the requested number, until no more +// fetchHashes61 keeps retrieving hashes from the requested number, until no more // are returned, potentially throttling on the way. -func (d *Downloader) fetchHashes(p *peer, td *big.Int, from uint64) error { +func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { glog.V(logger.Debug).Infof("%v: downloading hashes from #%d", p, from) // Create a timeout timer, and the associated hash fetcher @@ -510,6 +545,12 @@ func (d *Downloader) fetchHashes(p *peer, td *big.Int, from uint64) error { case <-d.cancelCh: return errCancelHashFetch + case <-d.headerCh: + // Out of bounds eth/62 block headers received, ignore them + + case <-d.bodyCh: + // Out of bounds eth/62 block bodies received, ignore them + case hashPack := <-d.hashCh: // Make sure the active peer is giving us the hashes if hashPack.peerId != p.id { @@ -548,7 +589,7 @@ func (d *Downloader) fetchHashes(p *peer, td *big.Int, from uint64) error { // Otherwise insert all the new hashes, aborting in case of junk glog.V(logger.Detail).Infof("%v: inserting %d hashes from #%d", p, len(hashPack.hashes), from) - inserts := d.queue.Insert(hashPack.hashes, true) + inserts := d.queue.Insert61(hashPack.hashes, true) if len(inserts) != len(hashPack.hashes) { glog.V(logger.Debug).Infof("%v: stale hashes", p) return errBadPeer @@ -573,10 +614,10 @@ func (d *Downloader) fetchHashes(p *peer, td *big.Int, from uint64) error { } } -// fetchBlocks iteratively downloads the scheduled hashes, taking any available +// fetchBlocks61 iteratively downloads the scheduled hashes, taking any available // peers, reserving a chunk of blocks for each, waiting for delivery and also // periodically checking for timeouts. -func (d *Downloader) fetchBlocks(from uint64) error { +func (d *Downloader) fetchBlocks61(from uint64) error { glog.V(logger.Debug).Infof("Downloading blocks from #%d", from) defer glog.V(logger.Debug).Infof("Block download terminated") @@ -595,24 +636,30 @@ func (d *Downloader) fetchBlocks(from uint64) error { case <-d.cancelCh: return errCancelBlockFetch + case <-d.headerCh: + // Out of bounds eth/62 block headers received, ignore them + + case <-d.bodyCh: + // Out of bounds eth/62 block bodies received, ignore them + case blockPack := <-d.blockCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(blockPack.peerId); peer != nil { // Deliver the received chunk of blocks, and demote in case of errors - err := d.queue.Deliver(blockPack.peerId, blockPack.blocks) + err := d.queue.Deliver61(blockPack.peerId, blockPack.blocks) switch err { case nil: // If no blocks were delivered, demote the peer (need the delivery above) if len(blockPack.blocks) == 0 { peer.Demote() - peer.SetIdle() + peer.SetIdle61() glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) break } // All was successful, promote the peer and potentially start processing peer.Promote() - peer.SetIdle() + peer.SetIdle61() glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) go d.process() @@ -624,7 +671,7 @@ func (d *Downloader) fetchBlocks(from uint64) error { // Peer probably timed out with its delivery but came through // in the end, demote, but allow to to pull from this peer. peer.Demote() - peer.SetIdle() + peer.SetIdle61() glog.V(logger.Detail).Infof("%s: out of bound delivery", peer) case errStaleDelivery: @@ -638,7 +685,7 @@ func (d *Downloader) fetchBlocks(from uint64) error { default: // Peer did something semi-useful, demote but keep it around peer.Demote() - peer.SetIdle() + peer.SetIdle61() glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) go d.process() } @@ -696,7 +743,7 @@ func (d *Downloader) fetchBlocks(from uint64) error { // Reserve a chunk of hashes for a peer. A nil can mean either that // no more hashes are available, or that the peer is known not to // have them. - request := d.queue.Reserve(peer, peer.Capacity()) + request := d.queue.Reserve61(peer, peer.Capacity()) if request == nil { continue } @@ -704,7 +751,7 @@ func (d *Downloader) fetchBlocks(from uint64) error { glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes)) } // Fetch the chunk and make sure any errors return the hashes to the queue - if err := peer.Fetch(request); err != nil { + if err := peer.Fetch61(request); err != nil { glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer) d.queue.Cancel(request) } @@ -718,6 +765,401 @@ func (d *Downloader) fetchBlocks(from uint64) error { } } +// findAncestor tries to locate the common ancestor block of the local chain and +// a remote peers blockchain. In the general case when our node was in sync and +// on the correct chain, checking the top N blocks should already get us a match. +// In the rare scenario when we ended up on a long reorganization (i.e. none of +// the head blocks match), we do a binary search to find the common ancestor. +func (d *Downloader) findAncestor(p *peer) (uint64, error) { + glog.V(logger.Debug).Infof("%v: looking for common ancestor", p) + + // Request our head blocks to short circuit ancestor location + head := d.headBlock().NumberU64() + from := int64(head) - int64(MaxHeaderFetch) + 1 + if from < 0 { + from = 0 + } + go p.getAbsHeaders(uint64(from), MaxHeaderFetch, 0, false) + + // Wait for the remote response to the head fetch + number, hash := uint64(0), common.Hash{} + timeout := time.After(hashTTL) + + for finished := false; !finished; { + select { + case <-d.cancelCh: + return 0, errCancelHashFetch + + case headerPack := <-d.headerCh: + // Discard anything not from the origin peer + if headerPack.peerId != p.id { + glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId) + break + } + // Make sure the peer actually gave something valid + headers := headerPack.headers + if len(headers) == 0 { + glog.V(logger.Debug).Infof("%v: empty head header set", p) + return 0, errEmptyHeaderSet + } + // Check if a common ancestor was found + finished = true + for i := len(headers) - 1; i >= 0; i-- { + if d.hasBlock(headers[i].Hash()) { + number, hash = headers[i].Number.Uint64(), headers[i].Hash() + break + } + } + + case <-d.bodyCh: + // Out of bounds block bodies received, ignore them + + case <-d.hashCh: + // Out of bounds eth/61 hashes received, ignore them + + case <-d.blockCh: + // Out of bounds eth/61 blocks received, ignore them + + case <-timeout: + glog.V(logger.Debug).Infof("%v: head header timeout", p) + return 0, errTimeout + } + } + // If the head fetch already found an ancestor, return + if !common.EmptyHash(hash) { + glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, number, hash[:4]) + return number, nil + } + // Ancestor not found, we need to binary search over our chain + start, end := uint64(0), head + for start+1 < end { + // Split our chain interval in two, and request the hash to cross check + check := (start + end) / 2 + + timeout := time.After(hashTTL) + go p.getAbsHeaders(uint64(check), 1, 0, false) + + // Wait until a reply arrives to this request + for arrived := false; !arrived; { + select { + case <-d.cancelCh: + return 0, errCancelHashFetch + + case headerPack := <-d.headerCh: + // Discard anything not from the origin peer + if headerPack.peerId != p.id { + glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId) + break + } + // Make sure the peer actually gave something valid + headers := headerPack.headers + if len(headers) != 1 { + glog.V(logger.Debug).Infof("%v: invalid search header set (%d)", p, len(headers)) + return 0, errBadPeer + } + arrived = true + + // Modify the search interval based on the response + block := d.getBlock(headers[0].Hash()) + if block == nil { + end = check + break + } + if block.NumberU64() != check { + glog.V(logger.Debug).Infof("%v: non requested header #%d [%x…], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check) + return 0, errBadPeer + } + start = check + + case <-d.bodyCh: + // Out of bounds block bodies received, ignore them + + case <-d.hashCh: + // Out of bounds eth/61 hashes received, ignore them + + case <-d.blockCh: + // Out of bounds eth/61 blocks received, ignore them + + case <-timeout: + glog.V(logger.Debug).Infof("%v: search header timeout", p) + return 0, errTimeout + } + } + } + return start, nil +} + +// fetchHeaders keeps retrieving headers from the requested number, until no more +// are returned, potentially throttling on the way. +func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { + glog.V(logger.Debug).Infof("%v: downloading headers from #%d", p, from) + defer glog.V(logger.Debug).Infof("%v: header download terminated", p) + + // Create a timeout timer, and the associated hash fetcher + timeout := time.NewTimer(0) // timer to dump a non-responsive active peer + <-timeout.C // timeout channel should be initially empty + defer timeout.Stop() + + getHeaders := func(from uint64) { + glog.V(logger.Detail).Infof("%v: fetching %d headers from #%d", p, MaxHeaderFetch, from) + + go p.getAbsHeaders(from, MaxHeaderFetch, 0, false) + timeout.Reset(headerTTL) + } + // Start pulling headers, until all are exhausted + getHeaders(from) + gotHeaders := false + + for { + select { + case <-d.cancelCh: + return errCancelHeaderFetch + + case <-d.hashCh: + // Out of bounds eth/61 hashes received, ignore them + + case <-d.blockCh: + // Out of bounds eth/61 blocks received, ignore them + + case headerPack := <-d.headerCh: + // Make sure the active peer is giving us the headers + if headerPack.peerId != p.id { + glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", headerPack.peerId) + break + } + timeout.Stop() + + // If no more headers are inbound, notify the body fetcher and return + if len(headerPack.headers) == 0 { + glog.V(logger.Debug).Infof("%v: no available headers", p) + + select { + case d.processCh <- false: + case <-d.cancelCh: + } + // If no headers were retrieved at all, the peer violated it's TD promise that it had a + // better chain compared to ours. The only exception is if it's promised blocks were + // already imported by other means (e.g. fecher): + // + // R , L : Both at block 10 + // R: Mine block 11, and propagate it to L + // L: Queue block 11 for import + // L: Notice that R's head and TD increased compared to ours, start sync + // L: Import of block 11 finishes + // L: Sync begins, and finds common ancestor at 11 + // L: Request new headers up from 11 (R's TD was higher, it must have something) + // R: Nothing to give + if !gotHeaders && td.Cmp(d.headBlock().Td) > 0 { + return errStallingPeer + } + return nil + } + gotHeaders = true + + // Otherwise insert all the new headers, aborting in case of junk + glog.V(logger.Detail).Infof("%v: inserting %d headers from #%d", p, len(headerPack.headers), from) + + inserts := d.queue.Insert(headerPack.headers) + if len(inserts) != len(headerPack.headers) { + glog.V(logger.Debug).Infof("%v: stale headers", p) + return errBadPeer + } + // Notify the block fetcher of new headers, but stop if queue is full + cont := d.queue.Pending() < maxQueuedHeaders + select { + case d.processCh <- cont: + default: + } + if !cont { + return nil + } + // Queue not yet full, fetch the next batch + from += uint64(len(headerPack.headers)) + getHeaders(from) + + case <-timeout.C: + // Header retrieval timed out, consider the peer bad and drop + glog.V(logger.Debug).Infof("%v: header request timed out", p) + d.dropPeer(p.id) + + // Finish the sync gracefully instead of dumping the gathered data though + select { + case d.processCh <- false: + default: + } + return nil + } + } +} + +// fetchBodies iteratively downloads the scheduled block bodies, taking any +// available peers, reserving a chunk of blocks for each, waiting for delivery +// and also periodically checking for timeouts. +func (d *Downloader) fetchBodies(from uint64) error { + glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from) + defer glog.V(logger.Debug).Infof("Block body download terminated") + + // Create a timeout timer for scheduling expiration tasks + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + update := make(chan struct{}, 1) + + // Prepare the queue and fetch block bodies until the block header fetcher's done + d.queue.Prepare(from) + finished := false + + for { + select { + case <-d.cancelCh: + return errCancelBlockFetch + + case <-d.hashCh: + // Out of bounds eth/61 hashes received, ignore them + + case <-d.blockCh: + // Out of bounds eth/61 blocks received, ignore them + + case bodyPack := <-d.bodyCh: + // If the peer was previously banned and failed to deliver it's pack + // in a reasonable time frame, ignore it's message. + if peer := d.peers.Peer(bodyPack.peerId); peer != nil { + // Deliver the received chunk of bodies, and demote in case of errors + err := d.queue.Deliver(bodyPack.peerId, bodyPack.transactions, bodyPack.uncles) + switch err { + case nil: + // If no blocks were delivered, demote the peer (need the delivery above) + if len(bodyPack.transactions) == 0 || len(bodyPack.uncles) == 0 { + peer.Demote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: no block bodies delivered", peer) + break + } + // All was successful, promote the peer and potentially start processing + peer.Promote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: delivered %d:%d block bodies", peer, len(bodyPack.transactions), len(bodyPack.uncles)) + go d.process() + + case errInvalidChain: + // The hash chain is invalid (blocks are not ordered properly), abort + return err + + case errInvalidBody: + // The peer delivered something very bad, drop immediately + glog.V(logger.Error).Infof("%s: delivered invalid block, dropping", peer) + d.dropPeer(peer.id) + + case errNoFetchesPending: + // Peer probably timed out with its delivery but came through + // in the end, demote, but allow to to pull from this peer. + peer.Demote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: out of bound delivery", peer) + + case errStaleDelivery: + // Delivered something completely else than requested, usually + // caused by a timeout and delivery during a new sync cycle. + // Don't set it to idle as the original request should still be + // in flight. + peer.Demote() + glog.V(logger.Detail).Infof("%s: stale delivery", peer) + + default: + // Peer did something semi-useful, demote but keep it around + peer.Demote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) + go d.process() + } + } + // Blocks assembled, try to update the progress + select { + case update <- struct{}{}: + default: + } + + case cont := <-d.processCh: + // The header fetcher sent a continuation flag, check if it's done + if !cont { + finished = true + } + // Headers arrive, try to update the progress + select { + case update <- struct{}{}: + default: + } + + case <-ticker.C: + // Sanity check update the progress + select { + case update <- struct{}{}: + default: + } + + case <-update: + // Short circuit if we lost all our peers + if d.peers.Len() == 0 { + return errNoPeers + } + // Check for block body request timeouts and demote the responsible peers + for _, pid := range d.queue.Expire(bodyHardTTL) { + if peer := d.peers.Peer(pid); peer != nil { + peer.Demote() + glog.V(logger.Detail).Infof("%s: block body delivery timeout", peer) + } + } + // If there's noting more to fetch, wait or terminate + if d.queue.Pending() == 0 { + if d.queue.InFlight() == 0 && finished { + glog.V(logger.Debug).Infof("Block body fetching completed") + return nil + } + break + } + // Send a download request to all idle peers, until throttled + queuedEmptyBlocks, throttled := false, false + for _, peer := range d.peers.IdlePeers() { + // Short circuit if throttling activated + if d.queue.Throttle() { + throttled = true + break + } + // Reserve a chunk of hashes for a peer. A nil can mean either that + // no more hashes are available, or that the peer is known not to + // have them. + request, process, err := d.queue.Reserve(peer, peer.Capacity()) + if err != nil { + return err + } + if process { + queuedEmptyBlocks = true + go d.process() + } + if request == nil { + continue + } + if glog.V(logger.Detail) { + glog.Infof("%s: requesting %d block bodies", peer, len(request.Headers)) + } + // Fetch the chunk and make sure any errors return the hashes to the queue + if d.bodyFetchHook != nil { + d.bodyFetchHook(request.Headers) + } + if err := peer.Fetch(request); err != nil { + glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer) + d.queue.Cancel(request) + } + } + // Make sure that we have peers available for fetching. If all peers have been tried + // and all failed throw an error + if !queuedEmptyBlocks && !throttled && d.queue.InFlight() == 0 { + return errPeersUnavailable + } + } + } +} + // process takes blocks from the queue and tries to import them into the chain. // // The algorithmic flow is as follows: @@ -763,6 +1205,9 @@ func (d *Downloader) process() { if len(blocks) == 0 { return } + if d.chainInsertHook != nil { + d.chainInsertHook(blocks) + } // Reset the import statistics d.importLock.Lock() d.importStart = time.Now() @@ -796,9 +1241,31 @@ func (d *Downloader) process() { } } -// DeliverBlocks injects a new batch of blocks received from a remote node. +// DeliverHashes61 injects a new batch of hashes received from a remote node into +// the download schedule. This is usually invoked through the BlockHashesMsg by +// the protocol handler. +func (d *Downloader) DeliverHashes61(id string, hashes []common.Hash) error { + // Make sure the downloader is active + if atomic.LoadInt32(&d.synchronising) == 0 { + return errNoSyncActive + } + // Deliver or abort if the sync is canceled while queuing + d.cancelLock.RLock() + cancel := d.cancelCh + d.cancelLock.RUnlock() + + select { + case d.hashCh <- hashPack{id, hashes}: + return nil + + case <-cancel: + return errNoSyncActive + } +} + +// DeliverBlocks61 injects a new batch of blocks received from a remote node. // This is usually invoked through the BlocksMsg by the protocol handler. -func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error { +func (d *Downloader) DeliverBlocks61(id string, blocks []*types.Block) error { // Make sure the downloader is active if atomic.LoadInt32(&d.synchronising) == 0 { return errNoSyncActive @@ -817,10 +1284,9 @@ func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error { } } -// DeliverHashes injects a new batch of hashes received from a remote node into -// the download schedule. This is usually invoked through the BlockHashesMsg by -// the protocol handler. -func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) error { +// DeliverHeaders injects a new batch of blck headers received from a remote +// node into the download schedule. +func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) error { // Make sure the downloader is active if atomic.LoadInt32(&d.synchronising) == 0 { return errNoSyncActive @@ -831,7 +1297,27 @@ func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) error { d.cancelLock.RUnlock() select { - case d.hashCh <- hashPack{id, hashes}: + case d.headerCh <- headerPack{id, headers}: + return nil + + case <-cancel: + return errNoSyncActive + } +} + +// DeliverBodies injects a new batch of block bodies received from a remote node. +func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) error { + // Make sure the downloader is active + if atomic.LoadInt32(&d.synchronising) == 0 { + return errNoSyncActive + } + // Deliver or abort if the sync is canceled while queuing + d.cancelLock.RLock() + cancel := d.cancelCh + d.cancelLock.RUnlock() + + select { + case d.bodyCh <- bodyPack{id, transactions, uncles}: return nil case <-cancel: diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 7e3456433..8d009b671 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -27,20 +27,39 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/params" ) var ( - testdb, _ = ethdb.NewMemDatabase() - genesis = core.GenesisBlockForTesting(testdb, common.Address{}, big.NewInt(0)) + testdb, _ = ethdb.NewMemDatabase() + testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + testAddress = crypto.PubkeyToAddress(testKey.PublicKey) + genesis = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000)) ) -// makeChain creates a chain of n blocks starting at but not including -// parent. the returned hash chain is ordered head->parent. +// makeChain creates a chain of n blocks starting at and including parent. +// the returned hash chain is ordered head->parent. In addition, every 3rd block +// contains a transaction and every 5th an uncle to allow testing correct block +// reassembly. func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) { - blocks := core.GenerateChain(parent, testdb, n, func(i int, gen *core.BlockGen) { - gen.SetCoinbase(common.Address{seed}) + blocks := core.GenerateChain(parent, testdb, n, func(i int, block *core.BlockGen) { + block.SetCoinbase(common.Address{seed}) + + // If the block number is multiple of 3, send a bonus transaction to the miner + if parent == genesis && i%3 == 0 { + tx, err := types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(testKey) + if err != nil { + panic(err) + } + block.AddTx(tx) + } + // If the block number is a multiple of 5, add a bonus uncle to the block + if i%5 == 0 { + block.AddUncle(&types.Header{ParentHash: block.PrevBlock(i - 1).Hash(), Number: big.NewInt(int64(i - 1))}) + } }) hashes := make([]common.Hash, n+1) hashes[len(hashes)-1] = parent.Hash() @@ -78,8 +97,6 @@ type downloadTester struct { ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester peerHashes map[string][]common.Hash // Hash chain belonging to different test peers peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers - - maxHashFetch int // Overrides the maximum number of retrieved hashes } // newTester creates a new downloader test mocker. @@ -156,7 +173,9 @@ func (dl *downloadTester) newPeer(id string, version int, hashes []common.Hash, // specific delay time on processing the network packets sent to it, simulating // potentially slow network IO. func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error { - err := dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, version, delay), dl.peerGetBlocksFn(id, delay)) + err := dl.downloader.RegisterPeer(id, version, hashes[0], + dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), + nil, dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay)) if err == nil { // Assign the owned hashes and blocks to the peer (deep copy) dl.peerHashes[id] = make([]common.Hash, len(hashes)) @@ -184,13 +203,9 @@ func (dl *downloadTester) peerGetRelHashesFn(id string, delay time.Duration) fun return func(head common.Hash) error { time.Sleep(delay) - limit := MaxHashFetch - if dl.maxHashFetch > 0 { - limit = dl.maxHashFetch - } // Gather the next batch of hashes hashes := dl.peerHashes[id] - result := make([]common.Hash, 0, limit) + result := make([]common.Hash, 0, MaxHashFetch) for i, hash := range hashes { if hash == head { i++ @@ -204,7 +219,7 @@ func (dl *downloadTester) peerGetRelHashesFn(id string, delay time.Duration) fun // Delay delivery a bit to allow attacks to unfold go func() { time.Sleep(time.Millisecond) - dl.downloader.DeliverHashes(id, result) + dl.downloader.DeliverHashes61(id, result) }() return nil } @@ -213,24 +228,20 @@ func (dl *downloadTester) peerGetRelHashesFn(id string, delay time.Duration) fun // peerGetAbsHashesFn constructs a GetHashesFromNumber function associated with // a particular peer in the download tester. The returned function can be used to // retrieve batches of hashes from the particularly requested peer. -func (dl *downloadTester) peerGetAbsHashesFn(id string, version int, delay time.Duration) func(uint64, int) error { +func (dl *downloadTester) peerGetAbsHashesFn(id string, delay time.Duration) func(uint64, int) error { return func(head uint64, count int) error { time.Sleep(delay) - limit := count - if dl.maxHashFetch > 0 { - limit = dl.maxHashFetch - } // Gather the next batch of hashes hashes := dl.peerHashes[id] - result := make([]common.Hash, 0, limit) - for i := 0; i < limit && len(hashes)-int(head)-1-i >= 0; i++ { + result := make([]common.Hash, 0, count) + for i := 0; i < count && len(hashes)-int(head)-1-i >= 0; i++ { result = append(result, hashes[len(hashes)-int(head)-1-i]) } // Delay delivery a bit to allow attacks to unfold go func() { time.Sleep(time.Millisecond) - dl.downloader.DeliverHashes(id, result) + dl.downloader.DeliverHashes61(id, result) }() return nil } @@ -249,7 +260,55 @@ func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([ result = append(result, block) } } - go dl.downloader.DeliverBlocks(id, result) + go dl.downloader.DeliverBlocks61(id, result) + + return nil + } +} + +// peerGetAbsHeadersFn constructs a GetBlockHeaders function based on a numbered +// origin; associated with a particular peer in the download tester. The returned +// function can be used to retrieve batches of headers from the particular peer. +func (dl *downloadTester) peerGetAbsHeadersFn(id string, delay time.Duration) func(uint64, int, int, bool) error { + return func(origin uint64, amount int, skip int, reverse bool) error { + time.Sleep(delay) + + // Gather the next batch of hashes + hashes := dl.peerHashes[id] + blocks := dl.peerBlocks[id] + result := make([]*types.Header, 0, amount) + for i := 0; i < amount && len(hashes)-int(origin)-1-i >= 0; i++ { + if block, ok := blocks[hashes[len(hashes)-int(origin)-1-i]]; ok { + result = append(result, block.Header()) + } + } + // Delay delivery a bit to allow attacks to unfold + go func() { + time.Sleep(time.Millisecond) + dl.downloader.DeliverHeaders(id, result) + }() + return nil + } +} + +// peerGetBodiesFn constructs a getBlockBodies method associated with a particular +// peer in the download tester. The returned function can be used to retrieve +// batches of block bodies from the particularly requested peer. +func (dl *downloadTester) peerGetBodiesFn(id string, delay time.Duration) func([]common.Hash) error { + return func(hashes []common.Hash) error { + time.Sleep(delay) + blocks := dl.peerBlocks[id] + + transactions := make([][]*types.Transaction, 0, len(hashes)) + uncles := make([][]*types.Header, 0, len(hashes)) + + for _, hash := range hashes { + if block, ok := blocks[hash]; ok { + transactions = append(transactions, block.Transactions()) + uncles = append(uncles, block.Uncles()) + } + } + go dl.downloader.DeliverBodies(id, transactions, uncles) return nil } @@ -258,13 +317,18 @@ func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([ // Tests that simple synchronization against a canonical chain works correctly. // In this test common ancestor lookup should be short circuited and not require // binary searching. -func TestCanonicalSynchronisation61(t *testing.T) { +func TestCanonicalSynchronisation61(t *testing.T) { testCanonicalSynchronisation(t, 61) } +func TestCanonicalSynchronisation62(t *testing.T) { testCanonicalSynchronisation(t, 62) } +func TestCanonicalSynchronisation63(t *testing.T) { testCanonicalSynchronisation(t, 63) } +func TestCanonicalSynchronisation64(t *testing.T) { testCanonicalSynchronisation(t, 64) } + +func testCanonicalSynchronisation(t *testing.T, protocol int) { // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 hashes, blocks := makeChain(targetBlocks, 0, genesis) tester := newTester() - tester.newPeer("peer", eth61, hashes, blocks) + tester.newPeer("peer", protocol, hashes, blocks) // Synchronise with the peer and make sure all blocks were retrieved if err := tester.sync("peer", nil); err != nil { @@ -277,7 +341,10 @@ func TestCanonicalSynchronisation61(t *testing.T) { // Tests that if a large batch of blocks are being downloaded, it is throttled // until the cached blocks are retrieved. -func TestThrottling61(t *testing.T) { testThrottling(t, eth61) } +func TestThrottling61(t *testing.T) { testThrottling(t, 61) } +func TestThrottling62(t *testing.T) { testThrottling(t, 62) } +func TestThrottling63(t *testing.T) { testThrottling(t, 63) } +func TestThrottling64(t *testing.T) { testThrottling(t, 64) } func testThrottling(t *testing.T, protocol int) { // Create a long block chain to download and the tester @@ -288,11 +355,10 @@ func testThrottling(t *testing.T, protocol int) { tester.newPeer("peer", protocol, hashes, blocks) // Wrap the importer to allow stepping - done := make(chan int) - tester.downloader.insertChain = func(blocks types.Blocks) (int, error) { - n, err := tester.insertChain(blocks) - done <- n - return n, err + blocked, proceed := uint32(0), make(chan struct{}) + tester.downloader.chainInsertHook = func(blocks []*Block) { + atomic.StoreUint32(&blocked, uint32(len(blocks))) + <-proceed } // Start a synchronisation concurrently errc := make(chan error) @@ -303,27 +369,25 @@ func testThrottling(t *testing.T, protocol int) { for len(tester.ownBlocks) < targetBlocks+1 { // Wait a bit for sync to throttle itself var cached int - for start := time.Now(); time.Since(start) < 3*time.Second; { + for start := time.Now(); time.Since(start) < time.Second; { time.Sleep(25 * time.Millisecond) cached = len(tester.downloader.queue.blockPool) - if cached == blockCacheLimit || len(tester.ownBlocks)+cached == targetBlocks+1 { + if cached == blockCacheLimit || len(tester.ownBlocks)+cached+int(atomic.LoadUint32(&blocked)) == targetBlocks+1 { break } } // Make sure we filled up the cache, then exhaust it time.Sleep(25 * time.Millisecond) // give it a chance to screw up - if cached != blockCacheLimit && len(tester.ownBlocks)+cached < targetBlocks+1 { - t.Fatalf("block count mismatch: have %v, want %v", cached, blockCacheLimit) + if cached != blockCacheLimit && len(tester.ownBlocks)+cached+int(atomic.LoadUint32(&blocked)) != targetBlocks+1 { + t.Fatalf("block count mismatch: have %v, want %v (owned %v, target %v)", cached, blockCacheLimit, len(tester.ownBlocks), targetBlocks+1) } - <-done // finish previous blocking import - for cached > maxBlockProcess { - cached -= <-done + // Permit the blocked blocks to import + if atomic.LoadUint32(&blocked) > 0 { + atomic.StoreUint32(&blocked, uint32(0)) + proceed <- struct{}{} } - time.Sleep(25 * time.Millisecond) // yield to the insertion } - <-done // finish the last blocking import - // Check that we haven't pulled more blocks than available if len(tester.ownBlocks) > targetBlocks+1 { t.Fatalf("target block count mismatch: have %v, want %v", len(tester.ownBlocks), targetBlocks+1) @@ -336,14 +400,19 @@ func testThrottling(t *testing.T, protocol int) { // Tests that simple synchronization against a forked chain works correctly. In // this test common ancestor lookup should *not* be short circuited, and a full // binary search should be executed. -func TestForkedSynchronisation61(t *testing.T) { +func TestForkedSynchronisation61(t *testing.T) { testForkedSynchronisation(t, 61) } +func TestForkedSynchronisation62(t *testing.T) { testForkedSynchronisation(t, 62) } +func TestForkedSynchronisation63(t *testing.T) { testForkedSynchronisation(t, 63) } +func TestForkedSynchronisation64(t *testing.T) { testForkedSynchronisation(t, 64) } + +func testForkedSynchronisation(t *testing.T, protocol int) { // Create a long enough forked chain common, fork := MaxHashFetch, 2*MaxHashFetch hashesA, hashesB, blocksA, blocksB := makeChainFork(common+fork, fork, genesis) tester := newTester() - tester.newPeer("fork A", eth61, hashesA, blocksA) - tester.newPeer("fork B", eth61, hashesB, blocksB) + tester.newPeer("fork A", protocol, hashesA, blocksA) + tester.newPeer("fork B", protocol, hashesB, blocksB) // Synchronise with the peer and make sure all blocks were retrieved if err := tester.sync("fork A", nil); err != nil { @@ -362,20 +431,36 @@ func TestForkedSynchronisation61(t *testing.T) { } // Tests that an inactive downloader will not accept incoming hashes and blocks. -func TestInactiveDownloader(t *testing.T) { +func TestInactiveDownloader61(t *testing.T) { tester := newTester() // Check that neither hashes nor blocks are accepted - if err := tester.downloader.DeliverHashes("bad peer", []common.Hash{}); err != errNoSyncActive { + if err := tester.downloader.DeliverHashes61("bad peer", []common.Hash{}); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + } + if err := tester.downloader.DeliverBlocks61("bad peer", []*types.Block{}); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + } +} + +// Tests that an inactive downloader will not accept incoming block headers and bodies. +func TestInactiveDownloader62(t *testing.T) { + tester := newTester() + + // Check that neither block headers nor bodies are accepted + if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive { t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) } - if err := tester.downloader.DeliverBlocks("bad peer", []*types.Block{}); err != errNoSyncActive { + if err := tester.downloader.DeliverBodies("bad peer", [][]*types.Transaction{}, [][]*types.Header{}); err != errNoSyncActive { t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) } } // Tests that a canceled download wipes all previously accumulated state. -func TestCancel61(t *testing.T) { testCancel(t, eth61) } +func TestCancel61(t *testing.T) { testCancel(t, 61) } +func TestCancel62(t *testing.T) { testCancel(t, 62) } +func TestCancel63(t *testing.T) { testCancel(t, 63) } +func TestCancel64(t *testing.T) { testCancel(t, 64) } func testCancel(t *testing.T, protocol int) { // Create a small enough block chain to download and the tester @@ -383,6 +468,9 @@ func testCancel(t *testing.T, protocol int) { if targetBlocks >= MaxHashFetch { targetBlocks = MaxHashFetch - 15 } + if targetBlocks >= MaxHeaderFetch { + targetBlocks = MaxHeaderFetch - 15 + } hashes, blocks := makeChain(targetBlocks, 0, genesis) tester := newTester() @@ -390,27 +478,30 @@ func testCancel(t *testing.T, protocol int) { // Make sure canceling works with a pristine downloader tester.downloader.cancel() - hashCount, blockCount := tester.downloader.queue.Size() - if hashCount > 0 || blockCount > 0 { - t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + downloading, importing := tester.downloader.queue.Size() + if downloading > 0 || importing > 0 { + t.Errorf("download or import count mismatch: %d downloading, %d importing, want 0", downloading, importing) } // Synchronise with the peer, but cancel afterwards if err := tester.sync("peer", nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } tester.downloader.cancel() - hashCount, blockCount = tester.downloader.queue.Size() - if hashCount > 0 || blockCount > 0 { - t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + downloading, importing = tester.downloader.queue.Size() + if downloading > 0 || importing > 0 { + t.Errorf("download or import count mismatch: %d downloading, %d importing, want 0", downloading, importing) } } // Tests that synchronisation from multiple peers works as intended (multi thread sanity test). -func TestMultiSynchronisation61(t *testing.T) { testMultiSynchronisation(t, eth61) } +func TestMultiSynchronisation61(t *testing.T) { testMultiSynchronisation(t, 61) } +func TestMultiSynchronisation62(t *testing.T) { testMultiSynchronisation(t, 62) } +func TestMultiSynchronisation63(t *testing.T) { testMultiSynchronisation(t, 63) } +func TestMultiSynchronisation64(t *testing.T) { testMultiSynchronisation(t, 64) } func testMultiSynchronisation(t *testing.T, protocol int) { // Create various peers with various parts of the chain - targetPeers := 16 + targetPeers := 8 targetBlocks := targetPeers*blockCacheLimit - 15 hashes, blocks := makeChain(targetBlocks, 0, genesis) @@ -436,45 +527,130 @@ func testMultiSynchronisation(t *testing.T, protocol int) { } } +// Tests that if a block is empty (i.e. header only), no body request should be +// made, and instead the header should be assembled into a whole block in itself. +func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) } +func TestEmptyBlockShortCircuit63(t *testing.T) { testEmptyBlockShortCircuit(t, 63) } +func TestEmptyBlockShortCircuit64(t *testing.T) { testEmptyBlockShortCircuit(t, 64) } + +func testEmptyBlockShortCircuit(t *testing.T, protocol int) { + // Create a small enough block chain to download + targetBlocks := blockCacheLimit - 15 + hashes, blocks := makeChain(targetBlocks, 0, genesis) + + tester := newTester() + tester.newPeer("peer", protocol, hashes, blocks) + + // Instrument the downloader to signal body requests + requested := int32(0) + tester.downloader.bodyFetchHook = func(headers []*types.Header) { + atomic.AddInt32(&requested, int32(len(headers))) + } + // Synchronise with the peer and make sure all blocks were retrieved + if err := tester.sync("peer", nil); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != targetBlocks+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) + } + // Validate the number of block bodies that should have been requested + needed := 0 + for _, block := range blocks { + if block != genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) { + needed++ + } + } + if int(requested) != needed { + t.Fatalf("block body retrieval count mismatch: have %v, want %v", requested, needed) + } +} + +// Tests that if a peer sends an invalid body for a requested block, it gets +// dropped immediately by the downloader. +func TestInvalidBlockBodyAttack62(t *testing.T) { testInvalidBlockBodyAttack(t, 62) } +func TestInvalidBlockBodyAttack63(t *testing.T) { testInvalidBlockBodyAttack(t, 63) } +func TestInvalidBlockBodyAttack64(t *testing.T) { testInvalidBlockBodyAttack(t, 64) } + +func testInvalidBlockBodyAttack(t *testing.T, protocol int) { + // Create two peers, one feeding invalid block bodies + targetBlocks := 4*blockCacheLimit - 15 + hashes, validBlocks := makeChain(targetBlocks, 0, genesis) + + invalidBlocks := make(map[common.Hash]*types.Block) + for hash, block := range validBlocks { + invalidBlocks[hash] = types.NewBlockWithHeader(block.Header()) + } + + tester := newTester() + tester.newPeer("valid", protocol, hashes, validBlocks) + tester.newPeer("attack", protocol, hashes, invalidBlocks) + + // Synchronise with the valid peer (will pull contents from the attacker too) + if err := tester.sync("valid", nil); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != len(hashes) { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(hashes)) + } + // Make sure the attacker was detected and dropped in the mean time + if _, ok := tester.peerHashes["attack"]; ok { + t.Fatalf("block body attacker not detected/dropped") + } +} + // Tests that a peer advertising an high TD doesn't get to stall the downloader // afterwards by not sending any useful hashes. -func TestHighTDStarvationAttack61(t *testing.T) { +func TestHighTDStarvationAttack61(t *testing.T) { testHighTDStarvationAttack(t, 61) } +func TestHighTDStarvationAttack62(t *testing.T) { testHighTDStarvationAttack(t, 62) } +func TestHighTDStarvationAttack63(t *testing.T) { testHighTDStarvationAttack(t, 63) } +func TestHighTDStarvationAttack64(t *testing.T) { testHighTDStarvationAttack(t, 64) } + +func testHighTDStarvationAttack(t *testing.T, protocol int) { tester := newTester() - tester.newPeer("attack", eth61, []common.Hash{genesis.Hash()}, nil) + hashes, blocks := makeChain(0, 0, genesis) + + tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, blocks) if err := tester.sync("attack", big.NewInt(1000000)); err != errStallingPeer { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } } // Tests that misbehaving peers are disconnected, whilst behaving ones are not. -func TestHashAttackerDropping(t *testing.T) { +func TestBlockHeaderAttackerDropping61(t *testing.T) { testBlockHeaderAttackerDropping(t, 61) } +func TestBlockHeaderAttackerDropping62(t *testing.T) { testBlockHeaderAttackerDropping(t, 62) } +func TestBlockHeaderAttackerDropping63(t *testing.T) { testBlockHeaderAttackerDropping(t, 63) } +func TestBlockHeaderAttackerDropping64(t *testing.T) { testBlockHeaderAttackerDropping(t, 64) } + +func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { // Define the disconnection requirement for individual hash fetch errors tests := []struct { result error drop bool }{ - {nil, false}, // Sync succeeded, all is well - {errBusy, false}, // Sync is already in progress, no problem - {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop - {errBadPeer, true}, // Peer was deemed bad for some reason, drop it - {errStallingPeer, true}, // Peer was detected to be stalling, drop it - {errBannedHead, true}, // Peer's head hash is a known bad hash, drop it - {errNoPeers, false}, // No peers to download from, soft race, no issue - {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue - {errTimeout, true}, // No hashes received in due time, drop the peer - {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end - {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser - {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop - {errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop - {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {nil, false}, // Sync succeeded, all is well + {errBusy, false}, // Sync is already in progress, no problem + {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop + {errBadPeer, true}, // Peer was deemed bad for some reason, drop it + {errStallingPeer, true}, // Peer was detected to be stalling, drop it + {errNoPeers, false}, // No peers to download from, soft race, no issue + {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue + {errTimeout, true}, // No hashes received in due time, drop the peer + {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end + {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end + {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser + {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop + {errInvalidBody, false}, // A bad peer was detected, but not the sync origin + {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop } // Run the tests and check disconnection status tester := newTester() for i, tt := range tests { // Register a new peer and ensure it's presence id := fmt.Sprintf("test %d", i) - if err := tester.newPeer(id, eth61, []common.Hash{genesis.Hash()}, nil); err != nil { + if err := tester.newPeer(id, protocol, []common.Hash{genesis.Hash()}, nil); err != nil { t.Fatalf("test %d: failed to register new peer: %v", i, err) } if _, ok := tester.peerHashes[id]; !ok { @@ -491,7 +667,12 @@ func TestHashAttackerDropping(t *testing.T) { } // Tests that feeding bad blocks will result in a peer drop. -func TestBlockAttackerDropping(t *testing.T) { +func TestBlockBodyAttackerDropping61(t *testing.T) { testBlockBodyAttackerDropping(t, 61) } +func TestBlockBodyAttackerDropping62(t *testing.T) { testBlockBodyAttackerDropping(t, 62) } +func TestBlockBodyAttackerDropping63(t *testing.T) { testBlockBodyAttackerDropping(t, 63) } +func TestBlockBodyAttackerDropping64(t *testing.T) { testBlockBodyAttackerDropping(t, 64) } + +func testBlockBodyAttackerDropping(t *testing.T, protocol int) { // Define the disconnection requirement for individual block import errors tests := []struct { failure bool @@ -506,7 +687,7 @@ func TestBlockAttackerDropping(t *testing.T) { for i, tt := range tests { // Register a new peer and ensure it's presence id := fmt.Sprintf("test %d", i) - if err := tester.newPeer(id, eth61, []common.Hash{common.Hash{}}, nil); err != nil { + if err := tester.newPeer(id, protocol, []common.Hash{common.Hash{}}, nil); err != nil { t.Fatalf("test %d: failed to register new peer: %v", i, err) } if _, ok := tester.peerHashes[id]; !ok { diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 4273b9168..8fd1f9a99 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -31,10 +31,16 @@ import ( "gopkg.in/fatih/set.v0" ) +// Hash and block fetchers belonging to eth/61 and below type relativeHashFetcherFn func(common.Hash) error type absoluteHashFetcherFn func(uint64, int) error type blockFetcherFn func([]common.Hash) error +// Block header and body fethers belonging to eth/62 and above +type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error +type absoluteHeaderFetcherFn func(uint64, int, int, bool) error +type blockBodyFetcherFn func([]common.Hash) error + var ( errAlreadyFetching = errors.New("already fetching blocks from peer") errAlreadyRegistered = errors.New("peer is already registered") @@ -54,25 +60,37 @@ type peer struct { ignored *set.Set // Set of hashes not to request (didn't have previously) - getRelHashes relativeHashFetcherFn // Method to retrieve a batch of hashes from an origin hash - getAbsHashes absoluteHashFetcherFn // Method to retrieve a batch of hashes from an absolute position - getBlocks blockFetcherFn // Method to retrieve a batch of blocks + getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash + getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position + getBlocks blockFetcherFn // [eth/61] Method to retrieve a batch of blocks + + getRelHeaders relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash + getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position + getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies version int // Eth protocol version number to switch strategies } // newPeer create a new downloader peer, with specific hash and block retrieval // mechanisms. -func newPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn) *peer { +func newPeer(id string, version int, head common.Hash, + getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading + getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn) *peer { return &peer{ - id: id, - head: head, - capacity: 1, + id: id, + head: head, + capacity: 1, + ignored: set.New(), + getRelHashes: getRelHashes, getAbsHashes: getAbsHashes, getBlocks: getBlocks, - ignored: set.New(), - version: version, + + getRelHeaders: getRelHeaders, + getAbsHeaders: getAbsHeaders, + getBlockBodies: getBlockBodies, + + version: version, } } @@ -83,8 +101,8 @@ func (p *peer) Reset() { p.ignored.Clear() } -// Fetch sends a block retrieval request to the remote peer. -func (p *peer) Fetch(request *fetchRequest) error { +// Fetch61 sends a block retrieval request to the remote peer. +func (p *peer) Fetch61(request *fetchRequest) error { // Short circuit if the peer is already fetching if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { return errAlreadyFetching @@ -101,10 +119,28 @@ func (p *peer) Fetch(request *fetchRequest) error { return nil } -// SetIdle sets the peer to idle, allowing it to execute new retrieval requests. +// Fetch sends a block body retrieval request to the remote peer. +func (p *peer) Fetch(request *fetchRequest) error { + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { + return errAlreadyFetching + } + p.started = time.Now() + + // Convert the header set to a retrievable slice + hashes := make([]common.Hash, 0, len(request.Headers)) + for _, header := range request.Headers { + hashes = append(hashes, header.Hash()) + } + go p.getBlockBodies(hashes) + + return nil +} + +// SetIdle61 sets the peer to idle, allowing it to execute new retrieval requests. // Its block retrieval allowance will also be updated either up- or downwards, // depending on whether the previous fetch completed in time or not. -func (p *peer) SetIdle() { +func (p *peer) SetIdle61() { // Update the peer's download allowance based on previous performance scale := 2.0 if time.Since(p.started) > blockSoftTTL { @@ -131,6 +167,36 @@ func (p *peer) SetIdle() { atomic.StoreInt32(&p.idle, 0) } +// SetIdle sets the peer to idle, allowing it to execute new retrieval requests. +// Its block body retrieval allowance will also be updated either up- or downwards, +// depending on whether the previous fetch completed in time or not. +func (p *peer) SetIdle() { + // Update the peer's download allowance based on previous performance + scale := 2.0 + if time.Since(p.started) > bodySoftTTL { + scale = 0.5 + if time.Since(p.started) > bodyHardTTL { + scale = 1 / float64(MaxBodyFetch) // reduces capacity to 1 + } + } + for { + // Calculate the new download bandwidth allowance + prev := atomic.LoadInt32(&p.capacity) + next := int32(math.Max(1, math.Min(float64(MaxBodyFetch), float64(prev)*scale))) + + // Try to update the old value + if atomic.CompareAndSwapInt32(&p.capacity, prev, next) { + // If we're having problems at 1 capacity, try to find better peers + if next == 1 { + p.Demote() + } + break + } + } + // Set the peer to idle to allow further block requests + atomic.StoreInt32(&p.idle, 0) +} + // Capacity retrieves the peers block download allowance based on its previously // discovered bandwidth capacity. func (p *peer) Capacity() int { diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 96e08e144..a527414ff 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -43,16 +43,20 @@ var ( // fetchRequest is a currently running block retrieval operation. type fetchRequest struct { - Peer *peer // Peer to which the request was sent - Hashes map[common.Hash]int // Requested hashes with their insertion index (priority) - Time time.Time // Time when the request was made + Peer *peer // Peer to which the request was sent + Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority) + Headers []*types.Header // [eth/62] Requested headers, sorted by request order + Time time.Time // Time when the request was made } // queue represents hashes that are either need fetching or are being fetched type queue struct { - hashPool map[common.Hash]int // Pending hashes, mapping to their insertion index (priority) - hashQueue *prque.Prque // Priority queue of the block hashes to fetch - hashCounter int // Counter indexing the added hashes to ensure retrieval order + hashPool map[common.Hash]int // [eth/61] Pending hashes, mapping to their insertion index (priority) + hashQueue *prque.Prque // [eth/61] Priority queue of the block hashes to fetch + hashCounter int // [eth/61] Counter indexing the added hashes to ensure retrieval order + + headerPool map[common.Hash]*types.Header // [eth/62] Pending headers, mapping from their hashes + headerQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the bodies for pendPool map[string]*fetchRequest // Currently pending block retrieval operations @@ -66,11 +70,13 @@ type queue struct { // newQueue creates a new download queue for scheduling block retrieval. func newQueue() *queue { return &queue{ - hashPool: make(map[common.Hash]int), - hashQueue: prque.New(), - pendPool: make(map[string]*fetchRequest), - blockPool: make(map[common.Hash]uint64), - blockCache: make([]*Block, blockCacheLimit), + hashPool: make(map[common.Hash]int), + hashQueue: prque.New(), + headerPool: make(map[common.Hash]*types.Header), + headerQueue: prque.New(), + pendPool: make(map[string]*fetchRequest), + blockPool: make(map[common.Hash]uint64), + blockCache: make([]*Block, blockCacheLimit), } } @@ -83,6 +89,9 @@ func (q *queue) Reset() { q.hashQueue.Reset() q.hashCounter = 0 + q.headerPool = make(map[common.Hash]*types.Header) + q.headerQueue.Reset() + q.pendPool = make(map[string]*fetchRequest) q.blockPool = make(map[common.Hash]uint64) @@ -90,21 +99,21 @@ func (q *queue) Reset() { q.blockCache = make([]*Block, blockCacheLimit) } -// Size retrieves the number of hashes in the queue, returning separately for +// Size retrieves the number of blocks in the queue, returning separately for // pending and already downloaded. func (q *queue) Size() (int, int) { q.lock.RLock() defer q.lock.RUnlock() - return len(q.hashPool), len(q.blockPool) + return len(q.hashPool) + len(q.headerPool), len(q.blockPool) } -// Pending retrieves the number of hashes pending for retrieval. +// Pending retrieves the number of blocks pending for retrieval. func (q *queue) Pending() int { q.lock.RLock() defer q.lock.RUnlock() - return q.hashQueue.Size() + return q.hashQueue.Size() + q.headerQueue.Size() } // InFlight retrieves the number of fetch requests currently in flight. @@ -124,7 +133,7 @@ func (q *queue) Throttle() bool { // Calculate the currently in-flight block requests pending := 0 for _, request := range q.pendPool { - pending += len(request.Hashes) + pending += len(request.Hashes) + len(request.Headers) } // Throttle if more blocks are in-flight than free space in the cache return pending >= len(q.blockCache)-len(q.blockPool) @@ -138,15 +147,18 @@ func (q *queue) Has(hash common.Hash) bool { if _, ok := q.hashPool[hash]; ok { return true } + if _, ok := q.headerPool[hash]; ok { + return true + } if _, ok := q.blockPool[hash]; ok { return true } return false } -// Insert adds a set of hashes for the download queue for scheduling, returning +// Insert61 adds a set of hashes for the download queue for scheduling, returning // the new hashes encountered. -func (q *queue) Insert(hashes []common.Hash, fifo bool) []common.Hash { +func (q *queue) Insert61(hashes []common.Hash, fifo bool) []common.Hash { q.lock.Lock() defer q.lock.Unlock() @@ -172,6 +184,29 @@ func (q *queue) Insert(hashes []common.Hash, fifo bool) []common.Hash { return inserts } +// Insert adds a set of headers for the download queue for scheduling, returning +// the new headers encountered. +func (q *queue) Insert(headers []*types.Header) []*types.Header { + q.lock.Lock() + defer q.lock.Unlock() + + // Insert all the headers prioritized by the contained block number + inserts := make([]*types.Header, 0, len(headers)) + for _, header := range headers { + // Make sure no duplicate requests are executed + hash := header.Hash() + if _, ok := q.headerPool[hash]; ok { + glog.V(logger.Warn).Infof("Header %x already scheduled", hash) + continue + } + // Queue the header for body retrieval + inserts = append(inserts, header) + q.headerPool[hash] = header + q.headerQueue.Push(header, -float32(header.Number.Uint64())) + } + return inserts +} + // GetHeadBlock retrieves the first block from the cache, or nil if it hasn't // been downloaded yet (or simply non existent). func (q *queue) GetHeadBlock() *Block { @@ -227,9 +262,9 @@ func (q *queue) TakeBlocks() []*Block { return blocks } -// Reserve reserves a set of hashes for the given peer, skipping any previously +// Reserve61 reserves a set of hashes for the given peer, skipping any previously // failed download. -func (q *queue) Reserve(p *peer, count int) *fetchRequest { +func (q *queue) Reserve61(p *peer, count int) *fetchRequest { q.lock.Lock() defer q.lock.Unlock() @@ -276,6 +311,68 @@ func (q *queue) Reserve(p *peer, count int) *fetchRequest { return request } +// Reserve reserves a set of headers for the given peer, skipping any previously +// failed download. Beside the next batch of needed fetches, it also returns a +// flag whether empty blocks were queued requiring processing. +func (q *queue) Reserve(p *peer, count int) (*fetchRequest, bool, error) { + q.lock.Lock() + defer q.lock.Unlock() + + // Short circuit if the pool has been depleted, or if the peer's already + // downloading something (sanity check not to corrupt state) + if q.headerQueue.Empty() { + return nil, false, nil + } + if _, ok := q.pendPool[p.id]; ok { + return nil, false, nil + } + // Calculate an upper limit on the bodies we might fetch (i.e. throttling) + space := len(q.blockCache) - len(q.blockPool) + for _, request := range q.pendPool { + space -= len(request.Headers) + } + // Retrieve a batch of headers, skipping previously failed ones + send := make([]*types.Header, 0, count) + skip := make([]*types.Header, 0) + + process := false + for proc := 0; proc < space && len(send) < count && !q.headerQueue.Empty(); proc++ { + header := q.headerQueue.PopItem().(*types.Header) + + // If the header defines an empty block, deliver straight + if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) { + if err := q.enqueue("", types.NewBlockWithHeader(header)); err != nil { + return nil, false, errInvalidChain + } + delete(q.headerPool, header.Hash()) + process, space, proc = true, space-1, proc-1 + continue + } + // If it's a content block, add to the body fetch request + if p.ignored.Has(header.Hash()) { + skip = append(skip, header) + } else { + send = append(send, header) + } + } + // Merge all the skipped headers back + for _, header := range skip { + q.headerQueue.Push(header, -float32(header.Number.Uint64())) + } + // Assemble and return the block download request + if len(send) == 0 { + return nil, process, nil + } + request := &fetchRequest{ + Peer: p, + Headers: send, + Time: time.Now(), + } + q.pendPool[p.id] = request + + return request, process, nil +} + // Cancel aborts a fetch request, returning all pending hashes to the queue. func (q *queue) Cancel(request *fetchRequest) { q.lock.Lock() @@ -284,6 +381,9 @@ func (q *queue) Cancel(request *fetchRequest) { for hash, index := range request.Hashes { q.hashQueue.Push(hash, float32(index)) } + for _, header := range request.Headers { + q.headerQueue.Push(header, -float32(header.Number.Uint64())) + } delete(q.pendPool, request.Peer.id) } @@ -310,8 +410,8 @@ func (q *queue) Expire(timeout time.Duration) []string { return peers } -// Deliver injects a block retrieval response into the download queue. -func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { +// Deliver61 injects a block retrieval response into the download queue. +func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) { q.lock.Lock() defer q.lock.Unlock() @@ -337,19 +437,12 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { errs = append(errs, fmt.Errorf("non-requested block %x", hash)) continue } - // If a requested block falls out of the range, the hash chain is invalid - index := int(int64(block.NumberU64()) - int64(q.blockOffset)) - if index >= len(q.blockCache) || index < 0 { - return errInvalidChain - } - // Otherwise merge the block and mark the hash block - q.blockCache[index] = &Block{ - RawBlock: block, - OriginPeer: id, + // Queue the block up for processing + if err := q.enqueue(id, block); err != nil { + return err } delete(request.Hashes, hash) delete(q.hashPool, hash) - q.blockPool[hash] = block.NumberU64() } // Return all failed or missing fetches to the queue for hash, index := range request.Hashes { @@ -365,6 +458,88 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { return nil } +// Deliver injects a block body retrieval response into the download queue. +func (q *queue) Deliver(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error { + q.lock.Lock() + defer q.lock.Unlock() + + // Short circuit if the block bodies were never requested + request := q.pendPool[id] + if request == nil { + return errNoFetchesPending + } + delete(q.pendPool, id) + + // If no block bodies were retrieved, mark them as unavailable for the origin peer + if len(txLists) == 0 || len(uncleLists) == 0 { + for hash, _ := range request.Headers { + request.Peer.ignored.Add(hash) + } + } + // Assemble each of the block bodies with their headers and queue for processing + errs := make([]error, 0) + for i, header := range request.Headers { + // Short circuit block assembly if no more bodies are found + if i >= len(txLists) || i >= len(uncleLists) { + break + } + // Reconstruct the next block if contents match up + if types.DeriveSha(types.Transactions(txLists[i])) != header.TxHash || types.CalcUncleHash(uncleLists[i]) != header.UncleHash { + errs = []error{errInvalidBody} + break + } + block := types.NewBlockWithHeader(header).WithBody(txLists[i], uncleLists[i]) + + // Queue the block up for processing + if err := q.enqueue(id, block); err != nil { + errs = []error{err} + break + } + request.Headers[i] = nil + delete(q.headerPool, header.Hash()) + } + // Return all failed or missing fetches to the queue + for _, header := range request.Headers { + if header != nil { + q.headerQueue.Push(header, -float32(header.Number.Uint64())) + } + } + // If none of the blocks were good, it's a stale delivery + switch { + case len(errs) == 0: + return nil + + case len(errs) == 1 && errs[0] == errInvalidBody: + return errInvalidBody + + case len(errs) == 1 && errs[0] == errInvalidChain: + return errInvalidChain + + case len(errs) == len(request.Headers): + return errStaleDelivery + + default: + return fmt.Errorf("multiple failures: %v", errs) + } +} + +// enqueue inserts a new block into the final delivery queue, waiting for pickup +// by the processor. +func (q *queue) enqueue(origin string, block *types.Block) error { + // If a requested block falls out of the range, the hash chain is invalid + index := int(int64(block.NumberU64()) - int64(q.blockOffset)) + if index >= len(q.blockCache) || index < 0 { + return errInvalidChain + } + // Otherwise merge the block and mark the hash done + q.blockCache[index] = &Block{ + RawBlock: block, + OriginPeer: origin, + } + q.blockPool[block.Header().Hash()] = block.NumberU64() + return nil +} + // Prepare configures the block cache offset to allow accepting inbound blocks. func (q *queue) Prepare(offset uint64) { q.lock.Lock() -- cgit v1.2.3