diff options
author | Jeffrey Wilcke <geffobscura@gmail.com> | 2015-07-07 21:12:56 +0800 |
---|---|---|
committer | Jeffrey Wilcke <geffobscura@gmail.com> | 2015-07-07 21:12:56 +0800 |
commit | 193c62fdba3bb5c40daad6652c18d81d43518235 (patch) | |
tree | 4b77244b1ce72160a1434036977c7a473781665c /eth/downloader | |
parent | a2ce7b99501b3273b4cee65cd6784c7d1c4645f7 (diff) | |
parent | d673c34c8d4ae83a3765ed44ae9d0fb7ce1aa3c9 (diff) | |
download | go-tangerine-193c62fdba3bb5c40daad6652c18d81d43518235.tar go-tangerine-193c62fdba3bb5c40daad6652c18d81d43518235.tar.gz go-tangerine-193c62fdba3bb5c40daad6652c18d81d43518235.tar.bz2 go-tangerine-193c62fdba3bb5c40daad6652c18d81d43518235.tar.lz go-tangerine-193c62fdba3bb5c40daad6652c18d81d43518235.tar.xz go-tangerine-193c62fdba3bb5c40daad6652c18d81d43518235.tar.zst go-tangerine-193c62fdba3bb5c40daad6652c18d81d43518235.zip |
Merge branch 'release/0.9.36'
Diffstat (limited to 'eth/downloader')
-rw-r--r-- | eth/downloader/downloader.go | 433 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 317 | ||||
-rw-r--r-- | eth/downloader/events.go | 16 | ||||
-rw-r--r-- | eth/downloader/peer.go | 42 | ||||
-rw-r--r-- | eth/downloader/queue.go | 44 |
5 files changed, 741 insertions, 111 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 39976aae1..5ce98816d 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1,3 +1,19 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>. + // Package downloader contains the manual full chain synchronisation. package downloader @@ -19,18 +35,24 @@ import ( "gopkg.in/fatih/set.v0" ) +const ( + eth60 = 60 // Constant to check for old protocol support + eth61 = 61 // Constant to check for new protocol support +) + var ( - MinHashFetch = 512 // Minimum amount of hashes to not consider a peer stalling - MaxHashFetch = 2048 // Amount of hashes to be fetched per retrieval request - MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request + MinHashFetch = 512 // Minimum amount of hashes to not consider a peer stalling + MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request + MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request hashTTL = 5 * time.Second // Time it takes for a hash request to time out blockSoftTTL = 3 * time.Second // Request completion threshold for increasing or decreasing a peer's bandwidth blockHardTTL = 3 * blockSoftTTL // Maximum time allowance before a block request is considered expired crossCheckCycle = time.Second // Period after which to check for expired cross checks - maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out - maxBlockProcess = 256 // Number of blocks to import at once into the chain + maxQueuedHashes = 256 * 1024 // Maximum number of hashes to queue for import (DOS protection) + maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out + maxBlockProcess = 256 // Number of blocks to import at once into the chain ) var ( @@ -58,6 +80,9 @@ type hashCheckFn func(common.Hash) bool // blockRetrievalFn is a callback type for retrieving a block from the local chain. type blockRetrievalFn func(common.Hash) *types.Block +// headRetrievalFn is a callback type for retrieving the head block from the local chain. +type headRetrievalFn func() *types.Block + // chainInsertFn is a callback type to insert a batch of blocks into the local chain. type chainInsertFn func(types.Blocks) (int, error) @@ -98,6 +123,7 @@ type Downloader struct { // Callbacks hasBlock hashCheckFn // Checks if a block is present in the chain getBlock blockRetrievalFn // Retrieves a block from the chain + headBlock headRetrievalFn // Retrieves the head block from the chain insertChain chainInsertFn // Injects a batch of blocks into the chain dropPeer peerDropFn // Drops a peer for misbehaving @@ -109,8 +135,9 @@ type Downloader struct { // Channels newPeerCh chan *peer - hashCh chan hashPack - blockCh chan blockPack + hashCh chan hashPack // Channel receiving inbound hashes + blockCh chan blockPack // Channel receiving inbound blocks + processCh chan bool // Channel to signal the block fetcher of new or finished work cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers @@ -123,7 +150,7 @@ type Block struct { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader { +func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, headBlock headRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader { // Create the base downloader downloader := &Downloader{ mux: mux, @@ -131,11 +158,13 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, in peers: newPeerSet(), hasBlock: hasBlock, getBlock: getBlock, + headBlock: headBlock, insertChain: insertChain, dropPeer: dropPeer, newPeerCh: make(chan *peer, 1), hashCh: make(chan hashPack, 1), blockCh: make(chan blockPack, 1), + processCh: make(chan bool, 1), } // Inject all the known bad hashes downloader.banned = set.New() @@ -175,7 +204,7 @@ func (d *Downloader) Synchronising() bool { // RegisterPeer injects a new download peer into the set of block source to be // used for fetching hashes and blocks from. -func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { +func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn) error { // If the peer wants to send a banned hash, reject if d.banned.Has(head) { glog.V(logger.Debug).Infoln("Register rejected, head hash banned:", id) @@ -183,7 +212,7 @@ func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFet } // Otherwise try to construct and register the peer glog.V(logger.Detail).Infoln("Registering peer", id) - if err := d.peers.Register(newPeer(id, head, getHashes, getBlocks)); err != nil { + if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks)); err != nil { glog.V(logger.Error).Infoln("Register failed:", err) return err } @@ -289,12 +318,38 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { } }() - glog.V(logger.Debug).Infoln("Synchronizing with the network using:", p.id) - if err = d.fetchHashes(p, hash); err != nil { - return err - } - if err = d.fetchBlocks(); err != nil { - return err + glog.V(logger.Debug).Infof("Synchronizing with the network using: %s, eth/%d", p.id, p.version) + switch p.version { + case eth60: + // Old eth/60 version, use reverse hash retrieval algorithm + if err = d.fetchHashes60(p, hash); err != nil { + return err + } + if err = d.fetchBlocks60(); err != nil { + return err + } + case eth61: + // New eth/61, use forward, concurrent hash and block retrieval algorithm + number, err := d.findAncestor(p) + if err != nil { + return err + } + errc := make(chan error, 2) + go func() { errc <- d.fetchHashes(p, number+1) }() + go func() { errc <- d.fetchBlocks(number + 1) }() + + // If any fetcher fails, cancel the other + if err := <-errc; err != nil { + d.cancel() + <-errc + return err + } + return <-errc + + default: + // Something very wrong, stop right here + glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version) + return errBadPeer } glog.V(logger.Debug).Infoln("Synchronization completed") @@ -326,10 +381,10 @@ func (d *Downloader) Terminate() { d.cancel() } -// fetchHahes starts retrieving hashes backwards from a specific peer and hash, +// fetchHashes60 starts retrieving hashes backwards from a specific peer and hash, // up until it finds a common ancestor. If the source peer times out, alternative // ones are tried for continuation. -func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { +func (d *Downloader) fetchHashes60(p *peer, h common.Hash) error { var ( start = time.Now() active = p // active peer will help determine the current active peer @@ -346,12 +401,12 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { <-timeout.C // timeout channel should be initially empty. getHashes := func(from common.Hash) { - go active.getHashes(from) + go active.getRelHashes(from) timeout.Reset(hashTTL) } // Add the hash to the queue, and start hash retrieval. - d.queue.Insert([]common.Hash{h}) + d.queue.Insert([]common.Hash{h}, false) getHashes(h) attempted[p.id] = true @@ -377,7 +432,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { if d.banned.Has(hash) { glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain", active.id) - d.queue.Insert(hashPack.hashes[:index+1]) + d.queue.Insert(hashPack.hashes[:index+1], false) if err := d.banBlocks(active.id, hash); err != nil { glog.V(logger.Debug).Infof("Failed to ban batch of blocks: %v", err) } @@ -395,7 +450,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { } } // Insert all the new hashes, but only continue if got something useful - inserts := d.queue.Insert(hashPack.hashes) + inserts := d.queue.Insert(hashPack.hashes, false) if len(inserts) == 0 && !done { glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id) return errBadPeer @@ -422,9 +477,9 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { continue } // We're done, prepare the download cache and proceed pulling the blocks - offset := 0 + offset := uint64(0) if block := d.getBlock(head); block != nil { - offset = int(block.NumberU64() + 1) + offset = block.NumberU64() + 1 } d.queue.Prepare(offset) finished = true @@ -481,10 +536,10 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { return nil } -// fetchBlocks iteratively downloads the entire schedules block-chain, taking +// fetchBlocks60 iteratively downloads the entire schedules block-chain, taking // any available peers, reserving a chunk of blocks for each, wait for delivery // and periodically checking for timeouts. -func (d *Downloader) fetchBlocks() error { +func (d *Downloader) fetchBlocks60() error { glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)") start := time.Now() @@ -619,6 +674,332 @@ out: return nil } +// findAncestor tries to locate the common ancestor block of the local chain and +// a remote peers blockchain. In the general case when our node was in sync and +// on the correct chain, checking the top N blocks should already get us a match. +// In the rare scenario when we ended up on a long soft fork (i.e. none of the +// head blocks match), we do a binary search to find the common ancestor. +func (d *Downloader) findAncestor(p *peer) (uint64, error) { + glog.V(logger.Debug).Infof("%v: looking for common ancestor", p) + + // Request out head blocks to short circuit ancestor location + head := d.headBlock().NumberU64() + from := int64(head) - int64(MaxHashFetch) + if from < 0 { + from = 0 + } + go p.getAbsHashes(uint64(from), MaxHashFetch) + + // Wait for the remote response to the head fetch + number, hash := uint64(0), common.Hash{} + timeout := time.After(hashTTL) + + for finished := false; !finished; { + select { + case <-d.cancelCh: + return 0, errCancelHashFetch + + case hashPack := <-d.hashCh: + // Discard anything not from the origin peer + if hashPack.peerId != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + break + } + // Make sure the peer actually gave something valid + hashes := hashPack.hashes + if len(hashes) == 0 { + glog.V(logger.Debug).Infof("%v: empty head hash set", p) + return 0, errEmptyHashSet + } + // Check if a common ancestor was found + finished = true + for i := len(hashes) - 1; i >= 0; i-- { + if d.hasBlock(hashes[i]) { + number, hash = uint64(from)+uint64(i), hashes[i] + break + } + } + + case <-d.blockCh: + // Out of bounds blocks received, ignore them + + case <-timeout: + glog.V(logger.Debug).Infof("%v: head hash timeout", p) + return 0, errTimeout + } + } + // If the head fetch already found an ancestor, return + if !common.EmptyHash(hash) { + glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x]", p, number, hash[:4]) + return number, nil + } + // Ancestor not found, we need to binary search over our chain + start, end := uint64(0), head + for start+1 < end { + // Split our chain interval in two, and request the hash to cross check + check := (start + end) / 2 + + timeout := time.After(hashTTL) + go p.getAbsHashes(uint64(check), 1) + + // Wait until a reply arrives to this request + for arrived := false; !arrived; { + select { + case <-d.cancelCh: + return 0, errCancelHashFetch + + case hashPack := <-d.hashCh: + // Discard anything not from the origin peer + if hashPack.peerId != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + break + } + // Make sure the peer actually gave something valid + hashes := hashPack.hashes + if len(hashes) != 1 { + glog.V(logger.Debug).Infof("%v: invalid search hash set (%d)", p, len(hashes)) + return 0, errBadPeer + } + arrived = true + + // Modify the search interval based on the response + block := d.getBlock(hashes[0]) + if block == nil { + end = check + break + } + if block.NumberU64() != check { + glog.V(logger.Debug).Infof("%v: non requested hash #%d [%x], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check) + return 0, errBadPeer + } + start = check + + case <-d.blockCh: + // Out of bounds blocks received, ignore them + + case <-timeout: + glog.V(logger.Debug).Infof("%v: search hash timeout", p) + return 0, errTimeout + } + } + } + return start, nil +} + +// fetchHashes keeps retrieving hashes from the requested number, until no more +// are returned, potentially throttling on the way. +func (d *Downloader) fetchHashes(p *peer, from uint64) error { + glog.V(logger.Debug).Infof("%v: downloading hashes from #%d", p, from) + + // Create a timeout timer, and the associated hash fetcher + timeout := time.NewTimer(0) // timer to dump a non-responsive active peer + <-timeout.C // timeout channel should be initially empty + defer timeout.Stop() + + getHashes := func(from uint64) { + glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from) + + go p.getAbsHashes(from, MaxHashFetch) + timeout.Reset(hashTTL) + } + // Start pulling hashes, until all are exhausted + getHashes(from) + for { + select { + case <-d.cancelCh: + return errCancelHashFetch + + case hashPack := <-d.hashCh: + // Make sure the active peer is giving us the hashes + if hashPack.peerId != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + break + } + timeout.Stop() + + // If no more hashes are inbound, notify the block fetcher and return + if len(hashPack.hashes) == 0 { + glog.V(logger.Debug).Infof("%v: no available hashes", p) + + select { + case d.processCh <- false: + case <-d.cancelCh: + } + return nil + } + // Otherwise insert all the new hashes, aborting in case of junk + glog.V(logger.Detail).Infof("%v: inserting %d hashes from #%d", p, len(hashPack.hashes), from) + + inserts := d.queue.Insert(hashPack.hashes, true) + if len(inserts) != len(hashPack.hashes) { + glog.V(logger.Debug).Infof("%v: stale hashes", p) + return errBadPeer + } + // Notify the block fetcher of new hashes, but stop if queue is full + cont := d.queue.Pending() < maxQueuedHashes + select { + case d.processCh <- cont: + default: + } + if !cont { + return nil + } + // Queue not yet full, fetch the next batch + from += uint64(len(hashPack.hashes)) + getHashes(from) + + case <-timeout.C: + glog.V(logger.Debug).Infof("%v: hash request timed out", p) + return errTimeout + } + } +} + +// fetchBlocks iteratively downloads the scheduled hashes, taking any available +// peers, reserving a chunk of blocks for each, waiting for delivery and also +// periodically checking for timeouts. +func (d *Downloader) fetchBlocks(from uint64) error { + glog.V(logger.Debug).Infof("Downloading blocks from #%d", from) + defer glog.V(logger.Debug).Infof("Block download terminated") + + // Create a timeout timer for scheduling expiration tasks + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + update := make(chan struct{}, 1) + + // Prepare the queue and fetch blocks until the hash fetcher's done + d.queue.Prepare(from) + finished := false + + for { + select { + case <-d.cancelCh: + return errCancelBlockFetch + + case blockPack := <-d.blockCh: + // If the peer was previously banned and failed to deliver it's pack + // in a reasonable time frame, ignore it's message. + if peer := d.peers.Peer(blockPack.peerId); peer != nil { + // Deliver the received chunk of blocks, and demote in case of errors + err := d.queue.Deliver(blockPack.peerId, blockPack.blocks) + switch err { + case nil: + // If no blocks were delivered, demote the peer (need the delivery above) + if len(blockPack.blocks) == 0 { + peer.Demote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) + break + } + // All was successful, promote the peer and potentially start processing + peer.Promote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) + go d.process() + + case errInvalidChain: + // The hash chain is invalid (blocks are not ordered properly), abort + return err + + case errNoFetchesPending: + // Peer probably timed out with its delivery but came through + // in the end, demote, but allow to to pull from this peer. + peer.Demote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: out of bound delivery", peer) + + case errStaleDelivery: + // Delivered something completely else than requested, usually + // caused by a timeout and delivery during a new sync cycle. + // Don't set it to idle as the original request should still be + // in flight. + peer.Demote() + glog.V(logger.Detail).Infof("%s: stale delivery", peer) + + default: + // Peer did something semi-useful, demote but keep it around + peer.Demote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) + go d.process() + } + } + // Blocks arrived, try to update the progress + select { + case update <- struct{}{}: + default: + } + + case cont := <-d.processCh: + // The hash fetcher sent a continuation flag, check if it's done + if !cont { + finished = true + } + // Hashes arrive, try to update the progress + select { + case update <- struct{}{}: + default: + } + + case <-ticker.C: + // Sanity check update the progress + select { + case update <- struct{}{}: + default: + } + + case <-update: + // Short circuit if we lost all our peers + if d.peers.Len() == 0 { + return errNoPeers + } + // Check for block request timeouts and demote the responsible peers + for _, pid := range d.queue.Expire(blockHardTTL) { + if peer := d.peers.Peer(pid); peer != nil { + peer.Demote() + glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) + } + } + // If there's noting more to fetch, wait or terminate + if d.queue.Pending() == 0 { + if d.queue.InFlight() == 0 && finished { + glog.V(logger.Debug).Infof("Block fetching completed") + return nil + } + break + } + // Send a download request to all idle peers, until throttled + for _, peer := range d.peers.IdlePeers() { + // Short circuit if throttling activated + if d.queue.Throttle() { + break + } + // Reserve a chunk of hashes for a peer. A nil can mean either that + // no more hashes are available, or that the peer is known not to + // have them. + request := d.queue.Reserve(peer, peer.Capacity()) + if request == nil { + continue + } + if glog.V(logger.Detail) { + glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes)) + } + // Fetch the chunk and make sure any errors return the hashes to the queue + if err := peer.Fetch(request); err != nil { + glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer) + d.queue.Cancel(request) + } + } + // Make sure that we have peers available for fetching. If all peers have been tried + // and all failed throw an error + if !d.queue.Throttle() && d.queue.InFlight() == 0 { + return errPeersUnavailable + } + } + } +} + // banBlocks retrieves a batch of blocks from a peer feeding us invalid hashes, // and bans the head of the retrieved batch. // diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 7feca8782..ff2e59d92 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -1,3 +1,19 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>. + package downloader import ( @@ -21,7 +37,7 @@ var ( genesis = core.GenesisBlockForTesting(testdb, common.Address{}, big.NewInt(0)) ) -// makeChain creates a chain of n blocks starting at and including +// makeChain creates a chain of n blocks starting at but not including // parent. the returned hash chain is ordered head->parent. func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) { blocks := core.GenerateChain(parent, testdb, n, func(i int, gen *core.BlockGen) { @@ -42,7 +58,7 @@ func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common // h2[:f] are different but have a common suffix of length n-f. func makeChainFork(n, f int, parent *types.Block) (h1, h2 []common.Hash, b1, b2 map[common.Hash]*types.Block) { // Create the common suffix. - h, b := makeChain(n-f-1, 0, parent) + h, b := makeChain(n-f, 0, parent) // Create the forks. h1, b1 = makeChain(f, 1, b[h[0]]) h1 = append(h1, h[1:]...) @@ -75,7 +91,7 @@ func newTester() *downloadTester { peerHashes: make(map[string][]common.Hash), peerBlocks: make(map[string]map[common.Hash]*types.Block), } - tester.downloader = New(new(event.TypeMux), tester.hasBlock, tester.getBlock, tester.insertChain, tester.dropPeer) + tester.downloader = New(new(event.TypeMux), tester.hasBlock, tester.getBlock, tester.headBlock, tester.insertChain, tester.dropPeer) return tester } @@ -83,7 +99,13 @@ func newTester() *downloadTester { // sync starts synchronizing with a remote peer, blocking until it completes. func (dl *downloadTester) sync(id string) error { err := dl.downloader.synchronise(id, dl.peerHashes[id][0]) - for atomic.LoadInt32(&dl.downloader.processing) == 1 { + for { + // If the queue is empty and processing stopped, break + hashes, blocks := dl.downloader.queue.Size() + if hashes+blocks == 0 && atomic.LoadInt32(&dl.downloader.processing) == 0 { + break + } + // Otherwise sleep a bit and retry time.Sleep(time.Millisecond) } return err @@ -99,6 +121,11 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { return dl.ownBlocks[hash] } +// headBlock retrieves the current head block from the canonical chain. +func (dl *downloadTester) headBlock() *types.Block { + return dl.getBlock(dl.ownHashes[len(dl.ownHashes)-1]) +} + // insertChain injects a new batch of blocks into the simulated chain. func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { for i, block := range blocks { @@ -112,15 +139,15 @@ func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { } // newPeer registers a new block download source into the downloader. -func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { - return dl.newSlowPeer(id, hashes, blocks, 0) +func (dl *downloadTester) newPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { + return dl.newSlowPeer(id, version, hashes, blocks, 0) } // newSlowPeer registers a new block download source into the downloader, with a // specific delay time on processing the network packets sent to it, simulating // potentially slow network IO. -func (dl *downloadTester) newSlowPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error { - err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id, delay), dl.peerGetBlocksFn(id, delay)) +func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error { + err := dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, version, delay), dl.peerGetBlocksFn(id, delay)) if err == nil { // Assign the owned hashes and blocks to the peer (deep copy) dl.peerHashes[id] = make([]common.Hash, len(hashes)) @@ -141,10 +168,10 @@ func (dl *downloadTester) dropPeer(id string) { dl.downloader.UnregisterPeer(id) } -// peerGetBlocksFn constructs a getHashes function associated with a particular +// peerGetRelHashesFn constructs a GetHashes function associated with a specific // peer in the download tester. The returned function can be used to retrieve // batches of hashes from the particularly requested peer. -func (dl *downloadTester) peerGetHashesFn(id string, delay time.Duration) func(head common.Hash) error { +func (dl *downloadTester) peerGetRelHashesFn(id string, delay time.Duration) func(head common.Hash) error { return func(head common.Hash) error { time.Sleep(delay) @@ -174,13 +201,43 @@ func (dl *downloadTester) peerGetHashesFn(id string, delay time.Duration) func(h } } +// peerGetAbsHashesFn constructs a GetHashesFromNumber function associated with +// a particular peer in the download tester. The returned function can be used to +// retrieve batches of hashes from the particularly requested peer. +func (dl *downloadTester) peerGetAbsHashesFn(id string, version int, delay time.Duration) func(uint64, int) error { + // If the simulated peer runs eth/60, this message is not supported + if version == eth60 { + return func(uint64, int) error { return nil } + } + // Otherwise create a method to request the blocks by number + return func(head uint64, count int) error { + time.Sleep(delay) + + limit := count + if dl.maxHashFetch > 0 { + limit = dl.maxHashFetch + } + // Gather the next batch of hashes + hashes := dl.peerHashes[id] + result := make([]common.Hash, 0, limit) + for i := 0; i < limit && len(hashes)-int(head)-1-i >= 0; i++ { + result = append(result, hashes[len(hashes)-int(head)-1-i]) + } + // Delay delivery a bit to allow attacks to unfold + go func() { + time.Sleep(time.Millisecond) + dl.downloader.DeliverHashes(id, result) + }() + return nil + } +} + // peerGetBlocksFn constructs a getBlocks function associated with a particular // peer in the download tester. The returned function can be used to retrieve // batches of blocks from the particularly requested peer. func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([]common.Hash) error { return func(hashes []common.Hash) error { time.Sleep(delay) - blocks := dl.peerBlocks[id] result := make([]*types.Block, 0, len(hashes)) for _, hash := range hashes { @@ -195,13 +252,13 @@ func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([ } // Tests that simple synchronization, without throttling from a good peer works. -func TestSynchronisation(t *testing.T) { +func TestSynchronisation60(t *testing.T) { // Create a small enough block chain to download and the tester targetBlocks := blockCacheLimit - 15 hashes, blocks := makeChain(targetBlocks, 0, genesis) tester := newTester() - tester.newPeer("peer", hashes, blocks) + tester.newPeer("peer", eth60, hashes, blocks) // Synchronise with the peer and make sure all blocks were retrieved if err := tester.sync("peer"); err != nil { @@ -212,42 +269,79 @@ func TestSynchronisation(t *testing.T) { } } -// Tests that an inactive downloader will not accept incoming hashes and blocks. -func TestInactiveDownloader(t *testing.T) { +// Tests that simple synchronization against a canonical chain works correctly. +// In this test common ancestor lookup should be short circuited and not require +// binary searching. +func TestCanonicalSynchronisation(t *testing.T) { + // Create a small enough block chain to download + targetBlocks := blockCacheLimit - 15 + hashes, blocks := makeChain(targetBlocks, 0, genesis) + tester := newTester() + tester.newPeer("peer", eth61, hashes, blocks) - // Check that neither hashes nor blocks are accepted - if err := tester.downloader.DeliverHashes("bad peer", []common.Hash{}); err != errNoSyncActive { - t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + // Synchronise with the peer and make sure all blocks were retrieved + if err := tester.sync("peer"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) } - if err := tester.downloader.DeliverBlocks("bad peer", []*types.Block{}); err != errNoSyncActive { - t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + if imported := len(tester.ownBlocks); imported != targetBlocks+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) } } -// Tests that a canceled download wipes all previously accumulated state. -func TestCancel(t *testing.T) { - // Create a small enough block chain to download and the tester - targetBlocks := blockCacheLimit - 15 +// Tests that if a large batch of blocks are being downloaded, it is throttled +// until the cached blocks are retrieved. +func TestThrottling60(t *testing.T) { + // Create a long block chain to download and the tester + targetBlocks := 8 * blockCacheLimit hashes, blocks := makeChain(targetBlocks, 0, genesis) tester := newTester() - tester.newPeer("peer", hashes, blocks) + tester.newPeer("peer", eth60, hashes, blocks) - // Make sure canceling works with a pristine downloader - tester.downloader.cancel() - hashCount, blockCount := tester.downloader.queue.Size() - if hashCount > 0 || blockCount > 0 { - t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + // Wrap the importer to allow stepping + done := make(chan int) + tester.downloader.insertChain = func(blocks types.Blocks) (int, error) { + n, err := tester.insertChain(blocks) + done <- n + return n, err } - // Synchronise with the peer, but cancel afterwards - if err := tester.sync("peer"); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + // Start a synchronisation concurrently + errc := make(chan error) + go func() { + errc <- tester.sync("peer") + }() + // Iteratively take some blocks, always checking the retrieval count + for len(tester.ownBlocks) < targetBlocks+1 { + // Wait a bit for sync to throttle itself + var cached int + for start := time.Now(); time.Since(start) < 3*time.Second; { + time.Sleep(25 * time.Millisecond) + + cached = len(tester.downloader.queue.blockPool) + if cached == blockCacheLimit || len(tester.ownBlocks)+cached == targetBlocks+1 { + break + } + } + // Make sure we filled up the cache, then exhaust it + time.Sleep(25 * time.Millisecond) // give it a chance to screw up + if cached != blockCacheLimit && len(tester.ownBlocks)+cached < targetBlocks+1 { + t.Fatalf("block count mismatch: have %v, want %v", cached, blockCacheLimit) + } + <-done // finish previous blocking import + for cached > maxBlockProcess { + cached -= <-done + } + time.Sleep(25 * time.Millisecond) // yield to the insertion } - tester.downloader.cancel() - hashCount, blockCount = tester.downloader.queue.Size() - if hashCount > 0 || blockCount > 0 { - t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + <-done // finish the last blocking import + + // Check that we haven't pulled more blocks than available + if len(tester.ownBlocks) > targetBlocks+1 { + t.Fatalf("target block count mismatch: have %v, want %v", len(tester.ownBlocks), targetBlocks+1) + } + if err := <-errc; err != nil { + t.Fatalf("block synchronization failed: %v", err) } } @@ -259,7 +353,7 @@ func TestThrottling(t *testing.T) { hashes, blocks := makeChain(targetBlocks, 0, genesis) tester := newTester() - tester.newPeer("peer", hashes, blocks) + tester.newPeer("peer", eth61, hashes, blocks) // Wrap the importer to allow stepping done := make(chan int) @@ -307,6 +401,102 @@ func TestThrottling(t *testing.T) { } } +// Tests that simple synchronization against a forked chain works correctly. In +// this test common ancestor lookup should *not* be short circuited, and a full +// binary search should be executed. +func TestForkedSynchronisation(t *testing.T) { + // Create a long enough forked chain + common, fork := MaxHashFetch, 2*MaxHashFetch + hashesA, hashesB, blocksA, blocksB := makeChainFork(common+fork, fork, genesis) + + tester := newTester() + tester.newPeer("fork A", eth61, hashesA, blocksA) + tester.newPeer("fork B", eth61, hashesB, blocksB) + + // Synchronise with the peer and make sure all blocks were retrieved + if err := tester.sync("fork A"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != common+fork+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, common+fork+1) + } + // Synchronise with the second peer and make sure that fork is pulled too + if err := tester.sync("fork B"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != common+2*fork+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, common+2*fork+1) + } +} + +// Tests that an inactive downloader will not accept incoming hashes and blocks. +func TestInactiveDownloader(t *testing.T) { + tester := newTester() + + // Check that neither hashes nor blocks are accepted + if err := tester.downloader.DeliverHashes("bad peer", []common.Hash{}); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + } + if err := tester.downloader.DeliverBlocks("bad peer", []*types.Block{}); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + } +} + +// Tests that a canceled download wipes all previously accumulated state. +func TestCancel60(t *testing.T) { + // Create a small enough block chain to download and the tester + targetBlocks := blockCacheLimit - 15 + hashes, blocks := makeChain(targetBlocks, 0, genesis) + + tester := newTester() + tester.newPeer("peer", eth60, hashes, blocks) + + // Make sure canceling works with a pristine downloader + tester.downloader.cancel() + hashCount, blockCount := tester.downloader.queue.Size() + if hashCount > 0 || blockCount > 0 { + t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + } + // Synchronise with the peer, but cancel afterwards + if err := tester.sync("peer"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + tester.downloader.cancel() + hashCount, blockCount = tester.downloader.queue.Size() + if hashCount > 0 || blockCount > 0 { + t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + } +} + +// Tests that a canceled download wipes all previously accumulated state. +func TestCancel(t *testing.T) { + // Create a small enough block chain to download and the tester + targetBlocks := blockCacheLimit - 15 + if targetBlocks >= MaxHashFetch { + targetBlocks = MaxHashFetch - 15 + } + hashes, blocks := makeChain(targetBlocks, 0, genesis) + + tester := newTester() + tester.newPeer("peer", eth61, hashes, blocks) + + // Make sure canceling works with a pristine downloader + tester.downloader.cancel() + hashCount, blockCount := tester.downloader.queue.Size() + if hashCount > 0 || blockCount > 0 { + t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + } + // Synchronise with the peer, but cancel afterwards + if err := tester.sync("peer"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + tester.downloader.cancel() + hashCount, blockCount = tester.downloader.queue.Size() + if hashCount > 0 || blockCount > 0 { + t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + } +} + // Tests that synchronisation from multiple peers works as intended (multi thread sanity test). func TestMultiSynchronisation(t *testing.T) { // Create various peers with various parts of the chain @@ -317,7 +507,7 @@ func TestMultiSynchronisation(t *testing.T) { tester := newTester() for i := 0; i < targetPeers; i++ { id := fmt.Sprintf("peer #%d", i) - tester.newPeer(id, hashes[i*blockCacheLimit:], blocks) + tester.newPeer(id, eth60, hashes[i*blockCacheLimit:], blocks) } // Synchronise with the middle peer and make sure half of the blocks were retrieved id := fmt.Sprintf("peer #%d", targetPeers/2) @@ -347,8 +537,8 @@ func TestSlowSynchronisation(t *testing.T) { targetIODelay := time.Second hashes, blocks := makeChain(targetBlocks, 0, genesis) - tester.newSlowPeer("fast", hashes, blocks, 0) - tester.newSlowPeer("slow", hashes, blocks, targetIODelay) + tester.newSlowPeer("fast", eth60, hashes, blocks, 0) + tester.newSlowPeer("slow", eth60, hashes, blocks, targetIODelay) // Try to sync with the peers (pull hashes from fast) start := time.Now() @@ -370,13 +560,14 @@ func TestSlowSynchronisation(t *testing.T) { func TestNonExistingParentAttack(t *testing.T) { tester := newTester() + // Forge a single-link chain with a forged header hashes, blocks := makeChain(1, 0, genesis) - tester.newPeer("valid", hashes, blocks) + tester.newPeer("valid", eth60, hashes, blocks) wrongblock := types.NewBlock(&types.Header{}, nil, nil, nil) wrongblock.Td = blocks[hashes[0]].Td hashes, blocks = makeChain(1, 0, wrongblock) - tester.newPeer("attack", hashes, blocks) + tester.newPeer("attack", eth60, hashes, blocks) // Try and sync with the malicious node and check that it fails if err := tester.sync("attack"); err == nil { @@ -401,8 +592,8 @@ func TestRepeatingHashAttack(t *testing.T) { // TODO: Is this thing valid?? // Create a valid chain, but drop the last link hashes, blocks := makeChain(blockCacheLimit, 0, genesis) - tester.newPeer("valid", hashes, blocks) - tester.newPeer("attack", hashes[:len(hashes)-1], blocks) + tester.newPeer("valid", eth60, hashes, blocks) + tester.newPeer("attack", eth60, hashes[:len(hashes)-1], blocks) // Try and sync with the malicious node errc := make(chan error) @@ -431,10 +622,10 @@ func TestNonExistingBlockAttack(t *testing.T) { // Create a valid chain, but forge the last link hashes, blocks := makeChain(blockCacheLimit, 0, genesis) - tester.newPeer("valid", hashes, blocks) + tester.newPeer("valid", eth60, hashes, blocks) hashes[len(hashes)/2] = common.Hash{} - tester.newPeer("attack", hashes, blocks) + tester.newPeer("attack", eth60, hashes, blocks) // Try and sync with the malicious node and check that it fails if err := tester.sync("attack"); err != errPeersUnavailable { @@ -453,7 +644,7 @@ func TestInvalidHashOrderAttack(t *testing.T) { // Create a valid long chain, but reverse some hashes within hashes, blocks := makeChain(4*blockCacheLimit, 0, genesis) - tester.newPeer("valid", hashes, blocks) + tester.newPeer("valid", eth60, hashes, blocks) chunk1 := make([]common.Hash, blockCacheLimit) chunk2 := make([]common.Hash, blockCacheLimit) @@ -462,7 +653,7 @@ func TestInvalidHashOrderAttack(t *testing.T) { copy(hashes[2*blockCacheLimit:], chunk1) copy(hashes[blockCacheLimit:], chunk2) - tester.newPeer("attack", hashes, blocks) + tester.newPeer("attack", eth60, hashes, blocks) // Try and sync with the malicious node and check that it fails if err := tester.sync("attack"); err != errInvalidChain { @@ -489,8 +680,8 @@ func TestMadeupHashChainAttack(t *testing.T) { rand.Read(randomHashes[i][:]) } - tester.newPeer("valid", hashes, blocks) - tester.newPeer("attack", randomHashes, nil) + tester.newPeer("valid", eth60, hashes, blocks) + tester.newPeer("attack", eth60, randomHashes, nil) // Try and sync with the malicious node and check that it fails if err := tester.sync("attack"); err != errCrossCheckFailed { @@ -517,7 +708,7 @@ func TestMadeupHashChainDrippingAttack(t *testing.T) { // Try and sync with the attacker, one hash at a time tester.maxHashFetch = 1 - tester.newPeer("attack", randomHashes, nil) + tester.newPeer("attack", eth60, randomHashes, nil) if err := tester.sync("attack"); err != errStallingPeer { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } @@ -540,7 +731,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { } // Try and sync with the malicious node and check that it fails tester := newTester() - tester.newPeer("attack", gapped, blocks) + tester.newPeer("attack", eth60, gapped, blocks) if err := tester.sync("attack"); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } @@ -548,13 +739,13 @@ func TestMadeupBlockChainAttack(t *testing.T) { blockSoftTTL = defaultBlockTTL crossCheckCycle = defaultCrossCheckCycle - tester.newPeer("valid", hashes, blocks) + tester.newPeer("valid", eth60, hashes, blocks) if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } -// tests that if one/multiple malicious peers try to feed a banned blockchain to +// Tests that if one/multiple malicious peers try to feed a banned blockchain to // the downloader, it will not keep refetching the same chain indefinitely, but // gradually block pieces of it, until its head is also blocked. func TestBannedChainStarvationAttack(t *testing.T) { @@ -565,8 +756,8 @@ func TestBannedChainStarvationAttack(t *testing.T) { // Create the tester and ban the selected hash. tester := newTester() tester.downloader.banned.Add(forkHashes[fork-1]) - tester.newPeer("valid", hashes, blocks) - tester.newPeer("attack", forkHashes, forkBlocks) + tester.newPeer("valid", eth60, hashes, blocks) + tester.newPeer("attack", eth60, forkHashes, forkBlocks) // Iteratively try to sync, and verify that the banned hash list grows until // the head of the invalid chain is blocked too. @@ -586,7 +777,7 @@ func TestBannedChainStarvationAttack(t *testing.T) { banned = bans } // Check that after banning an entire chain, bad peers get dropped - if err := tester.newPeer("new attacker", forkHashes, forkBlocks); err != errBannedHead { + if err := tester.newPeer("new attacker", eth60, forkHashes, forkBlocks); err != errBannedHead { t.Fatalf("peer registration mismatch: have %v, want %v", err, errBannedHead) } if peer := tester.downloader.peers.Peer("new attacker"); peer != nil { @@ -618,8 +809,8 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) { MaxBlockFetch = 4 maxBannedHashes = 256 - tester.newPeer("valid", hashes, blocks) - tester.newPeer("attack", forkHashes, forkBlocks) + tester.newPeer("valid", eth60, hashes, blocks) + tester.newPeer("attack", eth60, forkHashes, forkBlocks) // Iteratively try to sync, and verify that the banned hash list grows until // the head of the invalid chain is blocked too. @@ -664,7 +855,7 @@ func TestOverlappingDeliveryAttack(t *testing.T) { // Register an attacker that always returns non-requested blocks too tester := newTester() - tester.newPeer("attack", hashes, blocks) + tester.newPeer("attack", eth60, hashes, blocks) rawGetBlocks := tester.downloader.peers.Peer("attack").getBlocks tester.downloader.peers.Peer("attack").getBlocks = func(request []common.Hash) error { @@ -712,7 +903,7 @@ func TestHashAttackerDropping(t *testing.T) { for i, tt := range tests { // Register a new peer and ensure it's presence id := fmt.Sprintf("test %d", i) - if err := tester.newPeer(id, []common.Hash{genesis.Hash()}, nil); err != nil { + if err := tester.newPeer(id, eth60, []common.Hash{genesis.Hash()}, nil); err != nil { t.Fatalf("test %d: failed to register new peer: %v", i, err) } if _, ok := tester.peerHashes[id]; !ok { @@ -744,7 +935,7 @@ func TestBlockAttackerDropping(t *testing.T) { for i, tt := range tests { // Register a new peer and ensure it's presence id := fmt.Sprintf("test %d", i) - if err := tester.newPeer(id, []common.Hash{common.Hash{}}, nil); err != nil { + if err := tester.newPeer(id, eth60, []common.Hash{common.Hash{}}, nil); err != nil { t.Fatalf("test %d: failed to register new peer: %v", i, err) } if _, ok := tester.peerHashes[id]; !ok { diff --git a/eth/downloader/events.go b/eth/downloader/events.go index 333feb976..e5c62e121 100644 --- a/eth/downloader/events.go +++ b/eth/downloader/events.go @@ -1,3 +1,19 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>. + package downloader type DoneEvent struct{} diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index f36e133e4..89b40d1ac 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -1,3 +1,19 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>. + // Contains the active peer-set of the downloader, maintaining both failures // as well as reputation metrics to prioritize the block retrievals. @@ -15,7 +31,8 @@ import ( "gopkg.in/fatih/set.v0" ) -type hashFetcherFn func(common.Hash) error +type relativeHashFetcherFn func(common.Hash) error +type absoluteHashFetcherFn func(uint64, int) error type blockFetcherFn func([]common.Hash) error var ( @@ -37,20 +54,25 @@ type peer struct { ignored *set.Set // Set of hashes not to request (didn't have previously) - getHashes hashFetcherFn // Method to retrieve a batch of hashes (mockable for testing) - getBlocks blockFetcherFn // Method to retrieve a batch of blocks (mockable for testing) + getRelHashes relativeHashFetcherFn // Method to retrieve a batch of hashes from an origin hash + getAbsHashes absoluteHashFetcherFn // Method to retrieve a batch of hashes from an absolute position + getBlocks blockFetcherFn // Method to retrieve a batch of blocks + + version int // Eth protocol version number to switch strategies } // newPeer create a new downloader peer, with specific hash and block retrieval // mechanisms. -func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer { +func newPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn) *peer { return &peer{ - id: id, - head: head, - capacity: 1, - getHashes: getHashes, - getBlocks: getBlocks, - ignored: set.New(), + id: id, + head: head, + capacity: 1, + getRelHashes: getRelHashes, + getAbsHashes: getAbsHashes, + getBlocks: getBlocks, + ignored: set.New(), + version: version, } } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 903f043eb..a758410a5 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -1,3 +1,19 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>. + // Contains the block download scheduler to collect download tasks and schedule // them in an ordered, and throttled way. @@ -40,9 +56,9 @@ type queue struct { pendPool map[string]*fetchRequest // Currently pending block retrieval operations - blockPool map[common.Hash]int // Hash-set of the downloaded data blocks, mapping to cache indexes - blockCache []*Block // Downloaded but not yet delivered blocks - blockOffset int // Offset of the first cached block in the block-chain + blockPool map[common.Hash]uint64 // Hash-set of the downloaded data blocks, mapping to cache indexes + blockCache []*Block // Downloaded but not yet delivered blocks + blockOffset uint64 // Offset of the first cached block in the block-chain lock sync.RWMutex } @@ -53,7 +69,7 @@ func newQueue() *queue { hashPool: make(map[common.Hash]int), hashQueue: prque.New(), pendPool: make(map[string]*fetchRequest), - blockPool: make(map[common.Hash]int), + blockPool: make(map[common.Hash]uint64), blockCache: make([]*Block, blockCacheLimit), } } @@ -69,7 +85,7 @@ func (q *queue) Reset() { q.pendPool = make(map[string]*fetchRequest) - q.blockPool = make(map[common.Hash]int) + q.blockPool = make(map[common.Hash]uint64) q.blockOffset = 0 q.blockCache = make([]*Block, blockCacheLimit) } @@ -130,7 +146,7 @@ func (q *queue) Has(hash common.Hash) bool { // Insert adds a set of hashes for the download queue for scheduling, returning // the new hashes encountered. -func (q *queue) Insert(hashes []common.Hash) []common.Hash { +func (q *queue) Insert(hashes []common.Hash, fifo bool) []common.Hash { q.lock.Lock() defer q.lock.Unlock() @@ -147,7 +163,11 @@ func (q *queue) Insert(hashes []common.Hash) []common.Hash { inserts = append(inserts, hash) q.hashPool[hash] = q.hashCounter - q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first + if fifo { + q.hashQueue.Push(hash, -float32(q.hashCounter)) // Lowest gets schedules first + } else { + q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first + } } return inserts } @@ -175,7 +195,7 @@ func (q *queue) GetBlock(hash common.Hash) *Block { return nil } // Return the block if it's still available in the cache - if q.blockOffset <= index && index < q.blockOffset+len(q.blockCache) { + if q.blockOffset <= index && index < q.blockOffset+uint64(len(q.blockCache)) { return q.blockCache[index-q.blockOffset] } return nil @@ -202,7 +222,7 @@ func (q *queue) TakeBlocks() []*Block { for k, n := len(q.blockCache)-len(blocks), len(q.blockCache); k < n; k++ { q.blockCache[k] = nil } - q.blockOffset += len(blocks) + q.blockOffset += uint64(len(blocks)) return blocks } @@ -318,7 +338,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { continue } // If a requested block falls out of the range, the hash chain is invalid - index := int(block.NumberU64()) - q.blockOffset + index := int(int64(block.NumberU64()) - int64(q.blockOffset)) if index >= len(q.blockCache) || index < 0 { return errInvalidChain } @@ -329,7 +349,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { } delete(request.Hashes, hash) delete(q.hashPool, hash) - q.blockPool[hash] = int(block.NumberU64()) + q.blockPool[hash] = block.NumberU64() } // Return all failed or missing fetches to the queue for hash, index := range request.Hashes { @@ -346,7 +366,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { } // Prepare configures the block cache offset to allow accepting inbound blocks. -func (q *queue) Prepare(offset int) { +func (q *queue) Prepare(offset uint64) { q.lock.Lock() defer q.lock.Unlock() |