aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader')
-rw-r--r--eth/downloader/downloader.go667
-rw-r--r--eth/downloader/downloader_test.go739
-rw-r--r--eth/downloader/metrics.go5
-rw-r--r--eth/downloader/modes.go26
-rw-r--r--eth/downloader/peer.go192
-rw-r--r--eth/downloader/queue.go534
6 files changed, 1388 insertions, 775 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 64fb1b57b..7ae7aa221 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -19,8 +19,10 @@ package downloader
import (
"errors"
+ "fmt"
"math"
"math/big"
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -32,70 +34,96 @@ import (
"github.com/ethereum/go-ethereum/logger/glog"
)
-const (
- eth61 = 61 // Constant to check for old protocol support
- eth62 = 62 // Constant to check for new protocol support
-)
-
var (
- MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request
- MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
- MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
- MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request
- MaxStateFetch = 384 // Amount of node state values to allow fetching per request
- MaxReceiptsFetch = 384 // Amount of transaction receipts to allow fetching per request
-
- hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out
- blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth
- blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired
- headerTTL = 5 * time.Second // [eth/62] Time it takes for a header request to time out
- bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth
- bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired
-
- maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
- maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
- maxBlockProcess = 256 // Number of blocks to import at once into the chain
+ MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request
+ MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
+ MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
+ MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request
+ MaxStateFetch = 384 // Amount of node state values to allow fetching per request
+ MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
+
+ hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out
+ blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth
+ blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired
+ headerTTL = 5 * time.Second // [eth/62] Time it takes for a header request to time out
+ bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth
+ bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired
+ receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
+ receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a block body request is considered expired
+
+ maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
+ maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
+ maxResultsProcess = 256 // Number of download results to import at once into the chain
+
+ headerCheckFrequency = 64 // Verification frequency of the downloaded headers during fast sync
+ minCheckedHeaders = 1024 // Number of headers to verify fully when approaching the chain head
+ minFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync
)
var (
- errBusy = errors.New("busy")
- errUnknownPeer = errors.New("peer is unknown or unhealthy")
- errBadPeer = errors.New("action from bad peer ignored")
- errStallingPeer = errors.New("peer is stalling")
- errNoPeers = errors.New("no peers to keep download active")
- errPendingQueue = errors.New("pending items in queue")
- errTimeout = errors.New("timeout")
- errEmptyHashSet = errors.New("empty hash set by peer")
- errEmptyHeaderSet = errors.New("empty header set by peer")
- errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
- errAlreadyInPool = errors.New("hash already in pool")
- errInvalidChain = errors.New("retrieved hash chain is invalid")
- errInvalidBody = errors.New("retrieved block body is invalid")
- errCancelHashFetch = errors.New("hash fetching canceled (requested)")
- errCancelBlockFetch = errors.New("block downloading canceled (requested)")
- errCancelHeaderFetch = errors.New("block header fetching canceled (requested)")
- errCancelBodyFetch = errors.New("block body downloading canceled (requested)")
- errNoSyncActive = errors.New("no sync active")
+ errBusy = errors.New("busy")
+ errUnknownPeer = errors.New("peer is unknown or unhealthy")
+ errBadPeer = errors.New("action from bad peer ignored")
+ errStallingPeer = errors.New("peer is stalling")
+ errNoPeers = errors.New("no peers to keep download active")
+ errPendingQueue = errors.New("pending items in queue")
+ errTimeout = errors.New("timeout")
+ errEmptyHashSet = errors.New("empty hash set by peer")
+ errEmptyHeaderSet = errors.New("empty header set by peer")
+ errPeersUnavailable = errors.New("no peers available or all tried for download")
+ errAlreadyInPool = errors.New("hash already in pool")
+ errInvalidChain = errors.New("retrieved hash chain is invalid")
+ errInvalidBlock = errors.New("retrieved block is invalid")
+ errInvalidBody = errors.New("retrieved block body is invalid")
+ errInvalidReceipt = errors.New("retrieved receipt is invalid")
+ errCancelHashFetch = errors.New("hash download canceled (requested)")
+ errCancelBlockFetch = errors.New("block download canceled (requested)")
+ errCancelHeaderFetch = errors.New("block header download canceled (requested)")
+ errCancelBodyFetch = errors.New("block body download canceled (requested)")
+ errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
+ errNoSyncActive = errors.New("no sync active")
)
-// hashCheckFn is a callback type for verifying a hash's presence in the local chain.
-type hashCheckFn func(common.Hash) bool
+// headerCheckFn is a callback type for verifying a header's presence in the local chain.
+type headerCheckFn func(common.Hash) bool
+
+// blockCheckFn is a callback type for verifying a block's presence in the local chain.
+type blockCheckFn func(common.Hash) bool
+
+// headerRetrievalFn is a callback type for retrieving a header from the local chain.
+type headerRetrievalFn func(common.Hash) *types.Header
// blockRetrievalFn is a callback type for retrieving a block from the local chain.
type blockRetrievalFn func(common.Hash) *types.Block
-// headRetrievalFn is a callback type for retrieving the head block from the local chain.
-type headRetrievalFn func() *types.Block
+// headHeaderRetrievalFn is a callback type for retrieving the head header from the local chain.
+type headHeaderRetrievalFn func() *types.Header
+
+// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain.
+type headBlockRetrievalFn func() *types.Block
// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block.
type tdRetrievalFn func(common.Hash) *big.Int
-// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
-type chainInsertFn func(types.Blocks) (int, error)
+// headerChainInsertFn is a callback type to insert a batch of headers into the local chain.
+type headerChainInsertFn func([]*types.Header, bool) (int, error)
+
+// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain.
+type blockChainInsertFn func(types.Blocks) (int, error)
+
+// receiptChainInsertFn is a callback type to insert a batch of receipts into the local chain.
+type receiptChainInsertFn func(types.Blocks, []types.Receipts) (int, error)
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
+// dataPack is a data message returned by a peer for some query.
+type dataPack interface {
+ PeerId() string
+ Empty() bool
+ Stats() string
+}
+
// hashPack is a batch of block hashes returned by a peer (eth/61).
type hashPack struct {
peerId string
@@ -121,8 +149,33 @@ type bodyPack struct {
uncles [][]*types.Header
}
+// PeerId retrieves the origin peer who sent this block body packet.
+func (p *bodyPack) PeerId() string { return p.peerId }
+
+// Empty returns whether the no block bodies were delivered.
+func (p *bodyPack) Empty() bool { return len(p.transactions) == 0 || len(p.uncles) == 0 }
+
+// Stats creates a textual stats report for logging purposes.
+func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) }
+
+// receiptPack is a batch of receipts returned by a peer.
+type receiptPack struct {
+ peerId string
+ receipts [][]*types.Receipt
+}
+
+// PeerId retrieves the origin peer who sent this receipt packet.
+func (p *receiptPack) PeerId() string { return p.peerId }
+
+// Empty returns whether the no receipts were delivered.
+func (p *receiptPack) Empty() bool { return len(p.receipts) == 0 }
+
+// Stats creates a textual stats report for logging purposes.
+func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) }
+
type Downloader struct {
- mux *event.TypeMux
+ mode SyncMode // Synchronisation mode defining the strategies used
+ mux *event.TypeMux // Event multiplexer to announce sync operation events
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
@@ -135,12 +188,17 @@ type Downloader struct {
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
// Callbacks
- hasBlock hashCheckFn // Checks if a block is present in the chain
- getBlock blockRetrievalFn // Retrieves a block from the chain
- headBlock headRetrievalFn // Retrieves the head block from the chain
- getTd tdRetrievalFn // Retrieves the TD of a block from the chain
- insertChain chainInsertFn // Injects a batch of blocks into the chain
- dropPeer peerDropFn // Drops a peer for misbehaving
+ hasHeader headerCheckFn // Checks if a header is present in the chain
+ hasBlock blockCheckFn // Checks if a block is present in the chain
+ getHeader headerRetrievalFn // Retrieves a header from the chain
+ getBlock blockRetrievalFn // Retrieves a block from the chain
+ headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
+ headBlock headBlockRetrievalFn // Retrieves the head block from the chain
+ getTd tdRetrievalFn // Retrieves the TD of a block from the chain
+ insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
+ insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
+ insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
+ dropPeer peerDropFn // Drops a peer for misbehaving
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
@@ -149,46 +207,56 @@ type Downloader struct {
notified int32
// Channels
- newPeerCh chan *peer
- hashCh chan hashPack // [eth/61] Channel receiving inbound hashes
- blockCh chan blockPack // [eth/61] Channel receiving inbound blocks
- headerCh chan headerPack // [eth/62] Channel receiving inbound block headers
- bodyCh chan bodyPack // [eth/62] Channel receiving inbound block bodies
- wakeCh chan bool // Channel to signal the block/body fetcher of new tasks
+ newPeerCh chan *peer
+ hashCh chan hashPack // [eth/61] Channel receiving inbound hashes
+ blockCh chan blockPack // [eth/61] Channel receiving inbound blocks
+ headerCh chan headerPack // [eth/62] Channel receiving inbound block headers
+ bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
+ receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
+ blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks
+ bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
+ receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
// Testing hooks
- syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
- bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
- chainInsertHook func([]*Block) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
-}
-
-// Block is an origin-tagged blockchain block.
-type Block struct {
- RawBlock *types.Block
- OriginPeer string
+ syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
+ bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
+ receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
+ chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
-func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, headBlock headRetrievalFn, getTd tdRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader {
+func New(mode SyncMode, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn, getBlock blockRetrievalFn,
+ headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn,
+ insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader {
+
return &Downloader{
- mux: mux,
- queue: newQueue(),
- peers: newPeerSet(),
- hasBlock: hasBlock,
- getBlock: getBlock,
- headBlock: headBlock,
- getTd: getTd,
- insertChain: insertChain,
- dropPeer: dropPeer,
- newPeerCh: make(chan *peer, 1),
- hashCh: make(chan hashPack, 1),
- blockCh: make(chan blockPack, 1),
- headerCh: make(chan headerPack, 1),
- bodyCh: make(chan bodyPack, 1),
- wakeCh: make(chan bool, 1),
+ mode: mode,
+ mux: mux,
+ queue: newQueue(),
+ peers: newPeerSet(),
+ hasHeader: hasHeader,
+ hasBlock: hasBlock,
+ getHeader: getHeader,
+ getBlock: getBlock,
+ headHeader: headHeader,
+ headBlock: headBlock,
+ getTd: getTd,
+ insertHeaders: insertHeaders,
+ insertBlocks: insertBlocks,
+ insertReceipts: insertReceipts,
+ dropPeer: dropPeer,
+ newPeerCh: make(chan *peer, 1),
+ hashCh: make(chan hashPack, 1),
+ blockCh: make(chan blockPack, 1),
+ headerCh: make(chan headerPack, 1),
+ bodyCh: make(chan dataPack, 1),
+ receiptCh: make(chan dataPack, 1),
+ blockWakeCh: make(chan bool, 1),
+ bodyWakeCh: make(chan bool, 1),
+ receiptWakeCh: make(chan bool, 1),
}
}
@@ -211,10 +279,10 @@ func (d *Downloader) Synchronising() bool {
// used for fetching hashes and blocks from.
func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
- getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn) error {
+ getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getReceipts receiptFetcherFn) error {
glog.V(logger.Detail).Infoln("Registering peer", id)
- if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies)); err != nil {
+ if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts)); err != nil {
glog.V(logger.Error).Infoln("Register failed:", err)
return err
}
@@ -222,13 +290,15 @@ func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
}
// UnregisterPeer remove a peer from the known list, preventing any action from
-// the specified peer.
+// the specified peer. An effort is also made to return any pending fetches into
+// the queue.
func (d *Downloader) UnregisterPeer(id string) error {
glog.V(logger.Detail).Infoln("Unregistering peer", id)
if err := d.peers.Unregister(id); err != nil {
glog.V(logger.Error).Infoln("Unregister failed:", err)
return err
}
+ d.queue.Revoke(id)
return nil
}
@@ -275,16 +345,18 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error
glog.V(logger.Info).Infoln("Block synchronisation started")
}
// Abort if the queue still contains some leftover data
- if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
+ if d.queue.GetHeadResult() != nil {
return errPendingQueue
}
- // Reset the queue and peer set to clean any internal leftover state
+ // Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset()
d.peers.Reset()
- select {
- case <-d.wakeCh:
- default:
+ for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh} {
+ select {
+ case <-ch:
+ default:
+ }
}
// Create cancel channel for aborting mid-flight
d.cancelLock.Lock()
@@ -299,12 +371,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error
return d.syncWithPeer(p, hash, td)
}
+/*
// Has checks if the downloader knows about a particular hash, meaning that its
// either already downloaded of pending retrieval.
func (d *Downloader) Has(hash common.Hash) bool {
return d.queue.Has(hash)
}
-
+*/
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) {
@@ -323,7 +396,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
defer glog.V(logger.Debug).Infof("Synchronisation terminated")
switch {
- case p.version == eth61:
+ case p.version == 61:
// Look up the sync boundaries: the common ancestor and the target block
latest, err := d.fetchHeight61(p)
if err != nil {
@@ -344,6 +417,8 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
+ d.queue.Prepare(origin+1, 1)
+
errc := make(chan error, 2)
go func() { errc <- d.fetchHashes61(p, td, origin+1) }()
go func() { errc <- d.fetchBlocks61(origin + 1) }()
@@ -356,7 +431,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
}
return <-errc
- case p.version >= eth62:
+ case p.version >= 62:
// Look up the sync boundaries: the common ancestor and the target block
latest, err := d.fetchHeight(p)
if err != nil {
@@ -373,21 +448,32 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
d.syncStatsHeight = latest
d.syncStatsLock.Unlock()
- // Initiate the sync using a concurrent hash and block retrieval algorithm
+ // Initiate the sync using a concurrent header and content retrieval algorithm
+ parts := 1
+ if d.mode == FastSync {
+ parts = 2 // receipts are fetched too
+ }
+ d.queue.Prepare(origin+1, parts)
+
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- errc := make(chan error, 2)
- go func() { errc <- d.fetchHeaders(p, td, origin+1) }()
- go func() { errc <- d.fetchBodies(origin + 1) }()
-
- // If any fetcher fails, cancel the other
- if err := <-errc; err != nil {
- d.cancel()
- <-errc
- return err
+ errc := make(chan error, 3)
+ go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved
+ go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync
+ go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal sync
+
+ // If any fetcher fails, cancel the others
+ var fail error
+ for i := 0; i < cap(errc); i++ {
+ if err := <-errc; err != nil {
+ if fail == nil {
+ fail = err
+ d.cancel()
+ }
+ }
}
- return <-errc
+ return fail
default:
// Something very wrong, stop right here
@@ -637,7 +723,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
glog.V(logger.Debug).Infof("%v: no available hashes", p)
select {
- case d.wakeCh <- false:
+ case d.blockWakeCh <- false:
case <-d.cancelCh:
}
// If no hashes were retrieved at all, the peer violated it's TD promise that it had a
@@ -660,24 +746,24 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
gotHashes = true
// Otherwise insert all the new hashes, aborting in case of junk
- glog.V(logger.Detail).Infof("%v: inserting %d hashes from #%d", p, len(hashPack.hashes), from)
+ glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashPack.hashes), from)
- inserts := d.queue.Insert61(hashPack.hashes, true)
+ inserts := d.queue.Schedule61(hashPack.hashes, true)
if len(inserts) != len(hashPack.hashes) {
glog.V(logger.Debug).Infof("%v: stale hashes", p)
return errBadPeer
}
// Notify the block fetcher of new hashes, but stop if queue is full
- if d.queue.Pending() < maxQueuedHashes {
+ if d.queue.PendingBlocks() < maxQueuedHashes {
// We still have hashes to fetch, send continuation wake signal (potential)
select {
- case d.wakeCh <- true:
+ case d.blockWakeCh <- true:
default:
}
} else {
// Hash limit reached, send a termination wake signal (enforced)
select {
- case d.wakeCh <- false:
+ case d.blockWakeCh <- false:
case <-d.cancelCh:
}
return nil
@@ -707,10 +793,8 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
update := make(chan struct{}, 1)
- // Prepare the queue and fetch blocks until the hash fetcher's done
- d.queue.Prepare(from)
+ // Fetch blocks until the hash fetcher's done
finished := false
-
for {
select {
case <-d.cancelCh:
@@ -733,13 +817,13 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// If no blocks were delivered, demote the peer (need the delivery above)
if len(blockPack.blocks) == 0 {
peer.Demote()
- peer.SetIdle61()
+ peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
break
}
// All was successful, promote the peer and potentially start processing
peer.Promote()
- peer.SetIdle61()
+ peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
go d.process()
@@ -751,7 +835,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// Peer probably timed out with its delivery but came through
// in the end, demote, but allow to to pull from this peer.
peer.Demote()
- peer.SetIdle61()
+ peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)
case errStaleDelivery:
@@ -765,7 +849,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
default:
// Peer did something semi-useful, demote but keep it around
peer.Demote()
- peer.SetIdle61()
+ peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
go d.process()
}
@@ -776,7 +860,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
default:
}
- case cont := <-d.wakeCh:
+ case cont := <-d.blockWakeCh:
// The hash fetcher sent a continuation flag, check if it's done
if !cont {
finished = true
@@ -800,14 +884,14 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
return errNoPeers
}
// Check for block request timeouts and demote the responsible peers
- for _, pid := range d.queue.Expire(blockHardTTL) {
+ for _, pid := range d.queue.Expire61(blockHardTTL) {
if peer := d.peers.Peer(pid); peer != nil {
peer.Demote()
glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
}
}
- // If there's noting more to fetch, wait or terminate
- if d.queue.Pending() == 0 {
+ // If there's nothing more to fetch, wait or terminate
+ if d.queue.PendingBlocks() == 0 {
if d.queue.InFlight() == 0 && finished {
glog.V(logger.Debug).Infof("Block fetching completed")
return nil
@@ -816,16 +900,18 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
}
// Send a download request to all idle peers, until throttled
throttled := false
- for _, peer := range d.peers.IdlePeers(eth61) {
+ idles, total := d.peers.BlockIdlePeers(61)
+
+ for _, peer := range idles {
// Short circuit if throttling activated
- if d.queue.Throttle() {
+ if d.queue.ThrottleBlocks() {
throttled = true
break
}
// Reserve a chunk of hashes for a peer. A nil can mean either that
// no more hashes are available, or that the peer is known not to
// have them.
- request := d.queue.Reserve61(peer, peer.Capacity())
+ request := d.queue.Reserve61(peer, peer.BlockCapacity())
if request == nil {
continue
}
@@ -835,12 +921,12 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// Fetch the chunk and make sure any errors return the hashes to the queue
if err := peer.Fetch61(request); err != nil {
glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer)
- d.queue.Cancel(request)
+ d.queue.Cancel61(request)
}
}
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
- if !throttled && d.queue.InFlight() == 0 {
+ if !throttled && d.queue.InFlight() == 0 && len(idles) == total {
return errPeersUnavailable
}
}
@@ -891,16 +977,19 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
}
}
-// findAncestor tries to locate the common ancestor block of the local chain and
+// findAncestor tries to locate the common ancestor link of the local chain and
// a remote peers blockchain. In the general case when our node was in sync and
-// on the correct chain, checking the top N blocks should already get us a match.
+// on the correct chain, checking the top N links should already get us a match.
// In the rare scenario when we ended up on a long reorganization (i.e. none of
-// the head blocks match), we do a binary search to find the common ancestor.
+// the head links match), we do a binary search to find the common ancestor.
func (d *Downloader) findAncestor(p *peer) (uint64, error) {
glog.V(logger.Debug).Infof("%v: looking for common ancestor", p)
- // Request our head blocks to short circuit ancestor location
- head := d.headBlock().NumberU64()
+ // Request our head headers to short circuit ancestor location
+ head := d.headHeader().Number.Uint64()
+ if d.mode == FullSync {
+ head = d.headBlock().NumberU64()
+ }
from := int64(head) - int64(MaxHeaderFetch) + 1
if from < 0 {
from = 0
@@ -931,7 +1020,7 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
// Check if a common ancestor was found
finished = true
for i := len(headers) - 1; i >= 0; i-- {
- if d.hasBlock(headers[i].Hash()) {
+ if (d.mode == FullSync && d.hasBlock(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) {
number, hash = headers[i].Number.Uint64(), headers[i].Hash()
break
}
@@ -986,13 +1075,13 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
arrived = true
// Modify the search interval based on the response
- block := d.getBlock(headers[0].Hash())
- if block == nil {
+ if (d.mode == FullSync && !d.hasBlock(headers[0].Hash())) || (d.mode != FullSync && !d.hasHeader(headers[0].Hash())) {
end = check
break
}
- if block.NumberU64() != check {
- glog.V(logger.Debug).Infof("%v: non requested header #%d [%x…], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check)
+ header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists
+ if header.Number.Uint64() != check {
+ glog.V(logger.Debug).Infof("%v: non requested header #%d [%x…], instead of #%d", p, header.Number, header.Hash().Bytes()[:4], check)
return 0, errBadPeer
}
start = check
@@ -1017,6 +1106,9 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
// fetchHeaders keeps retrieving headers from the requested number, until no more
// are returned, potentially throttling on the way.
+//
+// The queue parameter can be used to switch between queuing headers for block
+// body download too, or directly import as pure header chains.
func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
glog.V(logger.Debug).Infof("%v: downloading headers from #%d", p, from)
defer glog.V(logger.Debug).Infof("%v: header download terminated", p)
@@ -1058,13 +1150,15 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
headerReqTimer.UpdateSince(request)
timeout.Stop()
- // If no more headers are inbound, notify the body fetcher and return
+ // If no more headers are inbound, notify the content fetchers and return
if len(headerPack.headers) == 0 {
glog.V(logger.Debug).Infof("%v: no available headers", p)
- select {
- case d.wakeCh <- false:
- case <-d.cancelCh:
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+ select {
+ case ch <- false:
+ case <-d.cancelCh:
+ }
}
// If no headers were retrieved at all, the peer violated it's TD promise that it had a
// better chain compared to ours. The only exception is if it's promised blocks were
@@ -1086,27 +1180,37 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
gotHeaders = true
// Otherwise insert all the new headers, aborting in case of junk
- glog.V(logger.Detail).Infof("%v: inserting %d headers from #%d", p, len(headerPack.headers), from)
+ glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headerPack.headers), from)
- inserts := d.queue.Insert(headerPack.headers, from)
- if len(inserts) != len(headerPack.headers) {
- glog.V(logger.Debug).Infof("%v: stale headers", p)
- return errBadPeer
- }
- // Notify the block fetcher of new headers, but stop if queue is full
- if d.queue.Pending() < maxQueuedHeaders {
- // We still have headers to fetch, send continuation wake signal (potential)
- select {
- case d.wakeCh <- true:
- default:
+ if d.mode == FullSync || d.mode == FastSync {
+ inserts := d.queue.Schedule(headerPack.headers, from, d.mode == FastSync)
+ if len(inserts) != len(headerPack.headers) {
+ glog.V(logger.Debug).Infof("%v: stale headers", p)
+ return errBadPeer
}
} else {
- // Header limit reached, send a termination wake signal (enforced)
- select {
- case d.wakeCh <- false:
- case <-d.cancelCh:
+ if n, err := d.insertHeaders(headerPack.headers, true); err != nil {
+ glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headerPack.headers[n].Number, headerPack.headers[n].Hash().Bytes()[:4], err)
+ return errInvalidChain
+ }
+ }
+ // Notify the content fetchers of new headers, but stop if queue is full
+ cont := d.queue.PendingBlocks() < maxQueuedHeaders || d.queue.PendingReceipts() < maxQueuedHeaders
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+ if cont {
+ // We still have headers to fetch, send continuation wake signal (potential)
+ select {
+ case ch <- true:
+ default:
+ }
+ } else {
+ // Header limit reached, send a termination wake signal (enforced)
+ select {
+ case ch <- false:
+ case <-d.cancelCh:
+ }
+ return nil
}
- return nil
}
// Queue not yet full, fetch the next batch
from += uint64(len(headerPack.headers))
@@ -1119,9 +1223,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
d.dropPeer(p.id)
// Finish the sync gracefully instead of dumping the gathered data though
- select {
- case d.wakeCh <- false:
- case <-d.cancelCh:
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+ select {
+ case ch <- false:
+ case <-d.cancelCh:
+ }
}
return nil
}
@@ -1133,22 +1239,69 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
// and also periodically checking for timeouts.
func (d *Downloader) fetchBodies(from uint64) error {
glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)
- defer glog.V(logger.Debug).Infof("Block body download terminated")
- // Create a timeout timer for scheduling expiration tasks
+ var (
+ deliver = func(packet interface{}) error {
+ pack := packet.(*bodyPack)
+ return d.queue.DeliverBlocks(pack.peerId, pack.transactions, pack.uncles)
+ }
+ expire = func() []string { return d.queue.ExpireBlocks(bodyHardTTL) }
+ fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
+ capacity = func(p *peer) int { return p.BlockCapacity() }
+ getIdles = func() ([]*peer, int) { return d.peers.BlockIdlePeers(62) }
+ setIdle = func(p *peer) { p.SetBlocksIdle() }
+ )
+ err := d.fetchParts(from, errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
+ d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBlocks, d.bodyFetchHook,
+ fetch, d.queue.CancelBlocks, capacity, getIdles, setIdle, "Body")
+
+ glog.V(logger.Debug).Infof("Block body download terminated: %v", err)
+ return err
+}
+
+// fetchReceipts iteratively downloads the scheduled block receipts, taking any
+// available peers, reserving a chunk of receipts for each, waiting for delivery
+// and also periodically checking for timeouts.
+func (d *Downloader) fetchReceipts(from uint64) error {
+ glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)
+
+ var (
+ deliver = func(packet interface{}) error {
+ pack := packet.(*receiptPack)
+ return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
+ }
+ expire = func() []string { return d.queue.ExpireReceipts(bodyHardTTL) }
+ fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
+ capacity = func(p *peer) int { return p.ReceiptCapacity() }
+ setIdle = func(p *peer) { p.SetReceiptsIdle() }
+ )
+ err := d.fetchParts(from, errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
+ d.queue.PendingReceipts, d.queue.ThrottleReceipts, d.queue.ReserveReceipts, d.receiptFetchHook,
+ fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt")
+
+ glog.V(logger.Debug).Infof("Receipt download terminated: %v", err)
+ return err
+}
+
+// fetchParts iteratively downloads scheduled block parts, taking any available
+// peers, reserving a chunk of fetch requests for each, waiting for delivery and
+// also periodically checking for timeouts.
+func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan dataPack, deliver func(packet interface{}) error, wakeCh chan bool,
+ expire func() []string, pending func() int, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header),
+ fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, idle func() ([]*peer, int), setIdle func(*peer), kind string) error {
+
+ // Create a ticker to detect expired retreival tasks
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
update := make(chan struct{}, 1)
- // Prepare the queue and fetch block bodies until the block header fetcher's done
- d.queue.Prepare(from)
+ // Prepare the queue and fetch block parts until the block header fetcher's done
finished := false
-
for {
select {
case <-d.cancelCh:
- return errCancelBlockFetch
+ return errCancel
case <-d.hashCh:
// Out of bounds eth/61 hashes received, ignore them
@@ -1156,42 +1309,41 @@ func (d *Downloader) fetchBodies(from uint64) error {
case <-d.blockCh:
// Out of bounds eth/61 blocks received, ignore them
- case bodyPack := <-d.bodyCh:
+ case packet := <-deliveryCh:
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
- if peer := d.peers.Peer(bodyPack.peerId); peer != nil {
- // Deliver the received chunk of bodies, and demote in case of errors
- err := d.queue.Deliver(bodyPack.peerId, bodyPack.transactions, bodyPack.uncles)
- switch err {
+ if peer := d.peers.Peer(packet.PeerId()); peer != nil {
+ // Deliver the received chunk of data, and demote in case of errors
+ switch err := deliver(packet); err {
case nil:
- // If no blocks were delivered, demote the peer (need the delivery above)
- if len(bodyPack.transactions) == 0 || len(bodyPack.uncles) == 0 {
+ // If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!)
+ if packet.Empty() {
peer.Demote()
- peer.SetIdle()
- glog.V(logger.Detail).Infof("%s: no block bodies delivered", peer)
+ setIdle(peer)
+ glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
break
}
// All was successful, promote the peer and potentially start processing
peer.Promote()
- peer.SetIdle()
- glog.V(logger.Detail).Infof("%s: delivered %d:%d block bodies", peer, len(bodyPack.transactions), len(bodyPack.uncles))
+ setIdle(peer)
+ glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
go d.process()
case errInvalidChain:
// The hash chain is invalid (blocks are not ordered properly), abort
return err
- case errInvalidBody:
+ case errInvalidBody, errInvalidReceipt:
// The peer delivered something very bad, drop immediately
- glog.V(logger.Error).Infof("%s: delivered invalid block, dropping", peer)
+ glog.V(logger.Error).Infof("%s: delivered invalid %s, dropping", peer, strings.ToLower(kind))
d.dropPeer(peer.id)
case errNoFetchesPending:
// Peer probably timed out with its delivery but came through
// in the end, demote, but allow to to pull from this peer.
peer.Demote()
- peer.SetIdle()
- glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)
+ setIdle(peer)
+ glog.V(logger.Detail).Infof("%s: out of bound %s delivery", peer, strings.ToLower(kind))
case errStaleDelivery:
// Delivered something completely else than requested, usually
@@ -1199,13 +1351,13 @@ func (d *Downloader) fetchBodies(from uint64) error {
// Don't set it to idle as the original request should still be
// in flight.
peer.Demote()
- glog.V(logger.Detail).Infof("%s: stale delivery", peer)
+ glog.V(logger.Detail).Infof("%s: %s stale delivery", peer, strings.ToLower(kind))
default:
// Peer did something semi-useful, demote but keep it around
peer.Demote()
- peer.SetIdle()
- glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
+ setIdle(peer)
+ glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err)
go d.process()
}
}
@@ -1215,7 +1367,7 @@ func (d *Downloader) fetchBodies(from uint64) error {
default:
}
- case cont := <-d.wakeCh:
+ case cont := <-wakeCh:
// The header fetcher sent a continuation flag, check if it's done
if !cont {
finished = true
@@ -1238,65 +1390,69 @@ func (d *Downloader) fetchBodies(from uint64) error {
if d.peers.Len() == 0 {
return errNoPeers
}
- // Check for block body request timeouts and demote the responsible peers
- for _, pid := range d.queue.Expire(bodyHardTTL) {
+ // Check for fetch request timeouts and demote the responsible peers
+ for _, pid := range expire() {
if peer := d.peers.Peer(pid); peer != nil {
peer.Demote()
- glog.V(logger.Detail).Infof("%s: block body delivery timeout", peer)
+ glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
}
}
- // If there's noting more to fetch, wait or terminate
- if d.queue.Pending() == 0 {
+ // If there's nothing more to fetch, wait or terminate
+ if pending() == 0 {
if d.queue.InFlight() == 0 && finished {
- glog.V(logger.Debug).Infof("Block body fetching completed")
+ glog.V(logger.Debug).Infof("%s fetching completed", kind)
return nil
}
break
}
// Send a download request to all idle peers, until throttled
- queuedEmptyBlocks, throttled := false, false
- for _, peer := range d.peers.IdlePeers(eth62) {
+ progressed, throttled := false, false
+ idles, total := idle()
+
+ for _, peer := range idles {
// Short circuit if throttling activated
- if d.queue.Throttle() {
+ if throttle() {
throttled = true
break
}
- // Reserve a chunk of hashes for a peer. A nil can mean either that
- // no more hashes are available, or that the peer is known not to
+ // Reserve a chunk of fetches for a peer. A nil can mean either that
+ // no more headers are available, or that the peer is known not to
// have them.
- request, process, err := d.queue.Reserve(peer, peer.Capacity())
+ request, progress, err := reserve(peer, capacity(peer))
if err != nil {
return err
}
- if process {
- queuedEmptyBlocks = true
+ if progress {
+ progressed = true
go d.process()
}
if request == nil {
continue
}
if glog.V(logger.Detail) {
- glog.Infof("%s: requesting %d block bodies", peer, len(request.Headers))
+ glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
}
// Fetch the chunk and make sure any errors return the hashes to the queue
- if d.bodyFetchHook != nil {
- d.bodyFetchHook(request.Headers)
+ if fetchHook != nil {
+ fetchHook(request.Headers)
}
- if err := peer.Fetch(request); err != nil {
- glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer)
- d.queue.Cancel(request)
+ if err := fetch(peer, request); err != nil {
+ glog.V(logger.Error).Infof("%v: %s fetch failed, rescheduling", peer, strings.ToLower(kind))
+ cancel(request)
}
}
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
- if !queuedEmptyBlocks && !throttled && d.queue.InFlight() == 0 {
+ if !progressed && !throttled && d.queue.InFlight() == 0 && len(idles) == total {
return errPeersUnavailable
}
}
}
}
-// process takes blocks from the queue and tries to import them into the chain.
+// process takes fetch results from the queue and tries to import them into the
+// chain. The type of import operation will depend on the result contents:
+// -
//
// The algorithmic flow is as follows:
// - The `processing` flag is swapped to 1 to ensure singleton access
@@ -1317,10 +1473,10 @@ func (d *Downloader) process() {
}
// If the processor just exited, but there are freshly pending items, try to
// reenter. This is needed because the goroutine spinned up for processing
- // the fresh blocks might have been rejected entry to to this present thread
+ // the fresh results might have been rejected entry to to this present thread
// not yet releasing the `processing` state.
defer func() {
- if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadBlock() != nil {
+ if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadResult() != nil {
d.process()
}
}()
@@ -1328,38 +1484,64 @@ func (d *Downloader) process() {
// the import statistics to zero.
defer atomic.StoreInt32(&d.processing, 0)
- // Repeat the processing as long as there are blocks to import
+ // Repeat the processing as long as there are results to process
for {
- // Fetch the next batch of blocks
- blocks := d.queue.TakeBlocks()
- if len(blocks) == 0 {
+ // Fetch the next batch of results
+ results := d.queue.TakeResults()
+ if len(results) == 0 {
return
}
if d.chainInsertHook != nil {
- d.chainInsertHook(blocks)
+ d.chainInsertHook(results)
}
// Actually import the blocks
- glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
- for len(blocks) != 0 {
+ if glog.V(logger.Debug) {
+ first, last := results[0].Header, results[len(results)-1].Header
+ glog.V(logger.Debug).Infof("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4])
+ }
+ for len(results) != 0 {
// Check for any termination requests
if atomic.LoadInt32(&d.interrupt) == 1 {
return
}
- // Retrieve the first batch of blocks to insert
- max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess)))
- raw := make(types.Blocks, 0, max)
- for _, block := range blocks[:max] {
- raw = append(raw, block.RawBlock)
+ // Retrieve the a batch of results to import
+ var (
+ headers = make([]*types.Header, 0, maxResultsProcess)
+ blocks = make([]*types.Block, 0, maxResultsProcess)
+ receipts = make([]types.Receipts, 0, maxResultsProcess)
+ )
+ items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
+ for _, result := range results[:items] {
+ switch {
+ case d.mode == FullSync:
+ blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
+ case d.mode == FastSync:
+ blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
+ receipts = append(receipts, result.Receipts)
+ case d.mode == LightSync:
+ headers = append(headers, result.Header)
+ }
+ }
+ // Try to process the results, aborting if there's an error
+ var (
+ err error
+ index int
+ )
+ switch {
+ case d.mode == FullSync:
+ index, err = d.insertBlocks(blocks)
+ case d.mode == FastSync:
+ index, err = d.insertReceipts(blocks, receipts)
+ case d.mode == LightSync:
+ index, err = d.insertHeaders(headers, true)
}
- // Try to inset the blocks, drop the originating peer if there's an error
- index, err := d.insertChain(raw)
if err != nil {
- glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err)
- d.dropPeer(blocks[index].OriginPeer)
+ glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash(), err)
d.cancel()
return
}
- blocks = blocks[max:]
+ // Shift the results to the next batch
+ results = results[items:]
}
}
}
@@ -1468,7 +1650,34 @@ func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transactio
d.cancelLock.RUnlock()
select {
- case d.bodyCh <- bodyPack{id, transactions, uncles}:
+ case d.bodyCh <- &bodyPack{id, transactions, uncles}:
+ return nil
+
+ case <-cancel:
+ return errNoSyncActive
+ }
+}
+
+// DeliverReceipts injects a new batch of receipts received from a remote node.
+func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
+ // Update the delivery metrics for both good and failed deliveries
+ receiptInMeter.Mark(int64(len(receipts)))
+ defer func() {
+ if err != nil {
+ receiptDropMeter.Mark(int64(len(receipts)))
+ }
+ }()
+ // Make sure the downloader is active
+ if atomic.LoadInt32(&d.synchronising) == 0 {
+ return errNoSyncActive
+ }
+ // Deliver or abort if the sync is canceled while queuing
+ d.cancelLock.RLock()
+ cancel := d.cancelCh
+ d.cancelLock.RUnlock()
+
+ select {
+ case d.receiptCh <- &receiptPack{id, receipts}:
return nil
case <-cancel:
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 96096527e..18bdb56dd 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -45,7 +45,8 @@ var (
// the returned hash chain is ordered head->parent. In addition, every 3rd block
// contains a transaction and every 5th an uncle to allow testing correct block
// reassembly.
-func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) {
+func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Block) {
+ // Generate the block chain
blocks := core.GenerateChain(parent, testdb, n, func(i int, block *core.BlockGen) {
block.SetCoinbase(common.Address{seed})
@@ -62,59 +63,80 @@ func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common
block.AddUncle(&types.Header{ParentHash: block.PrevBlock(i - 1).Hash(), Number: big.NewInt(int64(i - 1))})
}
})
+ // Convert the block-chain into a hash-chain and header/block maps
hashes := make([]common.Hash, n+1)
hashes[len(hashes)-1] = parent.Hash()
+
+ headerm := make(map[common.Hash]*types.Header, n+1)
+ headerm[parent.Hash()] = parent.Header()
+
blockm := make(map[common.Hash]*types.Block, n+1)
blockm[parent.Hash()] = parent
+
for i, b := range blocks {
hashes[len(hashes)-i-2] = b.Hash()
+ headerm[b.Hash()] = b.Header()
blockm[b.Hash()] = b
}
- return hashes, blockm
+ return hashes, headerm, blockm
}
// makeChainFork creates two chains of length n, such that h1[:f] and
// h2[:f] are different but have a common suffix of length n-f.
-func makeChainFork(n, f int, parent *types.Block) (h1, h2 []common.Hash, b1, b2 map[common.Hash]*types.Block) {
- // Create the common suffix.
- h, b := makeChain(n-f, 0, parent)
- // Create the forks.
- h1, b1 = makeChain(f, 1, b[h[0]])
- h1 = append(h1, h[1:]...)
- h2, b2 = makeChain(f, 2, b[h[0]])
- h2 = append(h2, h[1:]...)
- for hash, block := range b {
- b1[hash] = block
- b2[hash] = block
- }
- return h1, h2, b1, b2
+func makeChainFork(n, f int, parent *types.Block) ([]common.Hash, []common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]*types.Block) {
+ // Create the common suffix
+ hashes, headers, blocks := makeChain(n-f, 0, parent)
+
+ // Create the forks
+ hashes1, headers1, blocks1 := makeChain(f, 1, blocks[hashes[0]])
+ hashes1 = append(hashes1, hashes[1:]...)
+
+ hashes2, headers2, blocks2 := makeChain(f, 2, blocks[hashes[0]])
+ hashes2 = append(hashes2, hashes[1:]...)
+
+ for hash, header := range headers {
+ headers1[hash] = header
+ headers2[hash] = header
+ }
+ for hash, block := range blocks {
+ blocks1[hash] = block
+ blocks2[hash] = block
+ }
+ return hashes1, hashes2, headers1, headers2, blocks1, blocks2
}
// downloadTester is a test simulator for mocking out local block chain.
type downloadTester struct {
downloader *Downloader
- ownHashes []common.Hash // Hash chain belonging to the tester
- ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester
- ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain
- peerHashes map[string][]common.Hash // Hash chain belonging to different test peers
- peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers
- peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains
+ ownHashes []common.Hash // Hash chain belonging to the tester
+ ownHeaders map[common.Hash]*types.Header // Headers belonging to the tester
+ ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester
+ ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester
+ ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain
+ peerHashes map[string][]common.Hash // Hash chain belonging to different test peers
+ peerHeaders map[string]map[common.Hash]*types.Header // Headers belonging to different test peers
+ peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers
+ peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains
lock sync.RWMutex
}
// newTester creates a new downloader test mocker.
-func newTester() *downloadTester {
+func newTester(mode SyncMode) *downloadTester {
tester := &downloadTester{
ownHashes: []common.Hash{genesis.Hash()},
+ ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
+ ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): genesis.Receipts()},
ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()},
peerHashes: make(map[string][]common.Hash),
+ peerHeaders: make(map[string]map[common.Hash]*types.Header),
peerBlocks: make(map[string]map[common.Hash]*types.Block),
peerChainTds: make(map[string]map[common.Hash]*big.Int),
}
- tester.downloader = New(new(event.TypeMux), tester.hasBlock, tester.getBlock, tester.headBlock, tester.getTd, tester.insertChain, tester.dropPeer)
+ tester.downloader = New(mode, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader, tester.getBlock,
+ tester.headHeader, tester.headBlock, tester.getTd, tester.insertHeaders, tester.insertBlocks, tester.insertConfirmedBlocks, tester.dropPeer)
return tester
}
@@ -135,8 +157,7 @@ func (dl *downloadTester) sync(id string, td *big.Int) error {
err := dl.downloader.synchronise(id, hash, td)
for {
// If the queue is empty and processing stopped, break
- hashes, blocks := dl.downloader.queue.Size()
- if hashes+blocks == 0 && atomic.LoadInt32(&dl.downloader.processing) == 0 {
+ if dl.downloader.queue.Idle() && atomic.LoadInt32(&dl.downloader.processing) == 0 {
break
}
// Otherwise sleep a bit and retry
@@ -145,12 +166,22 @@ func (dl *downloadTester) sync(id string, td *big.Int) error {
return err
}
-// hasBlock checks if a block is pres ent in the testers canonical chain.
+// hasHeader checks if a header is present in the testers canonical chain.
+func (dl *downloadTester) hasHeader(hash common.Hash) bool {
+ return dl.getHeader(hash) != nil
+}
+
+// hasBlock checks if a block is present in the testers canonical chain.
func (dl *downloadTester) hasBlock(hash common.Hash) bool {
+ return dl.getBlock(hash) != nil
+}
+
+// getHeader retrieves a header from the testers canonical chain.
+func (dl *downloadTester) getHeader(hash common.Hash) *types.Header {
dl.lock.RLock()
defer dl.lock.RUnlock()
- return dl.getBlock(hash) != nil
+ return dl.ownHeaders[hash]
}
// getBlock retrieves a block from the testers canonical chain.
@@ -161,12 +192,25 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block {
return dl.ownBlocks[hash]
}
+// headHeader retrieves the current head header from the canonical chain.
+func (dl *downloadTester) headHeader() *types.Header {
+ dl.lock.RLock()
+ defer dl.lock.RUnlock()
+
+ return dl.getHeader(dl.ownHashes[len(dl.ownHashes)-1])
+}
+
// headBlock retrieves the current head block from the canonical chain.
func (dl *downloadTester) headBlock() *types.Block {
dl.lock.RLock()
defer dl.lock.RUnlock()
- return dl.getBlock(dl.ownHashes[len(dl.ownHashes)-1])
+ for i := len(dl.ownHashes) - 1; i >= 0; i-- {
+ if block := dl.getBlock(dl.ownHashes[i]); block != nil {
+ return block
+ }
+ }
+ return nil
}
// getTd retrieves the block's total difficulty from the canonical chain.
@@ -177,8 +221,24 @@ func (dl *downloadTester) getTd(hash common.Hash) *big.Int {
return dl.ownChainTd[hash]
}
-// insertChain injects a new batch of blocks into the simulated chain.
-func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) {
+// insertHeaders injects a new batch of headers into the simulated chain.
+func (dl *downloadTester) insertHeaders(headers []*types.Header, verify bool) (int, error) {
+ dl.lock.Lock()
+ defer dl.lock.Unlock()
+
+ for i, header := range headers {
+ if _, ok := dl.ownHeaders[header.ParentHash]; !ok {
+ return i, errors.New("unknown parent")
+ }
+ dl.ownHashes = append(dl.ownHashes, header.Hash())
+ dl.ownHeaders[header.Hash()] = header
+ dl.ownChainTd[header.Hash()] = dl.ownChainTd[header.ParentHash]
+ }
+ return len(headers), nil
+}
+
+// insertBlocks injects a new batch of blocks into the simulated chain.
+func (dl *downloadTester) insertBlocks(blocks types.Blocks) (int, error) {
dl.lock.Lock()
defer dl.lock.Unlock()
@@ -187,47 +247,74 @@ func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) {
return i, errors.New("unknown parent")
}
dl.ownHashes = append(dl.ownHashes, block.Hash())
+ dl.ownHeaders[block.Hash()] = block.Header()
dl.ownBlocks[block.Hash()] = block
dl.ownChainTd[block.Hash()] = dl.ownChainTd[block.ParentHash()]
}
return len(blocks), nil
}
+// insertBlocks injects a new batch of blocks into the simulated chain.
+func (dl *downloadTester) insertConfirmedBlocks(blocks types.Blocks, receipts []types.Receipts) (int, error) {
+ dl.lock.Lock()
+ defer dl.lock.Unlock()
+
+ for i := 0; i < len(blocks) && i < len(receipts); i++ {
+ if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok {
+ return i, errors.New("unknown parent")
+ }
+ dl.ownHashes = append(dl.ownHashes, blocks[i].Hash())
+ dl.ownHeaders[blocks[i].Hash()] = blocks[i].Header()
+ dl.ownBlocks[blocks[i].Hash()] = blocks[i]
+ dl.ownReceipts[blocks[i].Hash()] = blocks[i].Receipts()
+ dl.ownChainTd[blocks[i].Hash()] = dl.ownChainTd[blocks[i].ParentHash()]
+ }
+ return len(blocks), nil
+}
+
// newPeer registers a new block download source into the downloader.
-func (dl *downloadTester) newPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block) error {
- return dl.newSlowPeer(id, version, hashes, blocks, 0)
+func (dl *downloadTester) newPeer(id string, version int, hashes []common.Hash, headers map[common.Hash]*types.Header, blocks map[common.Hash]*types.Block) error {
+ return dl.newSlowPeer(id, version, hashes, headers, blocks, 0)
}
// newSlowPeer registers a new block download source into the downloader, with a
// specific delay time on processing the network packets sent to it, simulating
// potentially slow network IO.
-func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error {
+func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Hash, headers map[common.Hash]*types.Header, blocks map[common.Hash]*types.Block, delay time.Duration) error {
dl.lock.Lock()
defer dl.lock.Unlock()
var err error
switch version {
case 61:
- err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), nil, nil, nil)
+ err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), nil, nil, nil, nil)
case 62:
- err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay))
+ err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil)
case 63:
- err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay))
+ err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay))
case 64:
- err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay))
+ err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay))
}
if err == nil {
- // Assign the owned hashes and blocks to the peer (deep copy)
+ // Assign the owned hashes, headers and blocks to the peer (deep copy)
dl.peerHashes[id] = make([]common.Hash, len(hashes))
copy(dl.peerHashes[id], hashes)
+ dl.peerHeaders[id] = make(map[common.Hash]*types.Header)
dl.peerBlocks[id] = make(map[common.Hash]*types.Block)
dl.peerChainTds[id] = make(map[common.Hash]*big.Int)
+
for _, hash := range hashes {
+ if header, ok := headers[hash]; ok {
+ dl.peerHeaders[id][hash] = header
+ if _, ok := dl.peerHeaders[id][header.ParentHash]; ok {
+ dl.peerChainTds[id][hash] = new(big.Int).Add(header.Difficulty, dl.peerChainTds[id][header.ParentHash])
+ }
+ }
if block, ok := blocks[hash]; ok {
dl.peerBlocks[id][hash] = block
- if parent, ok := dl.peerBlocks[id][block.ParentHash()]; ok {
- dl.peerChainTds[id][hash] = new(big.Int).Add(block.Difficulty(), dl.peerChainTds[id][parent.Hash()])
+ if _, ok := dl.peerBlocks[id][block.ParentHash()]; ok {
+ dl.peerChainTds[id][hash] = new(big.Int).Add(block.Difficulty(), dl.peerChainTds[id][block.ParentHash()])
}
}
}
@@ -241,6 +328,7 @@ func (dl *downloadTester) dropPeer(id string) {
defer dl.lock.Unlock()
delete(dl.peerHashes, id)
+ delete(dl.peerHeaders, id)
delete(dl.peerBlocks, id)
delete(dl.peerChainTds, id)
@@ -358,13 +446,13 @@ func (dl *downloadTester) peerGetAbsHeadersFn(id string, delay time.Duration) fu
dl.lock.RLock()
defer dl.lock.RUnlock()
- // Gather the next batch of hashes
+ // Gather the next batch of headers
hashes := dl.peerHashes[id]
- blocks := dl.peerBlocks[id]
+ headers := dl.peerHeaders[id]
result := make([]*types.Header, 0, amount)
for i := 0; i < amount && len(hashes)-int(origin)-1-i >= 0; i++ {
- if block, ok := blocks[hashes[len(hashes)-int(origin)-1-i]]; ok {
- result = append(result, block.Header())
+ if header, ok := headers[hashes[len(hashes)-int(origin)-1-i]]; ok {
+ result = append(result, header)
}
}
// Delay delivery a bit to allow attacks to unfold
@@ -403,50 +491,99 @@ func (dl *downloadTester) peerGetBodiesFn(id string, delay time.Duration) func([
}
}
+// peerGetReceiptsFn constructs a getReceipts method associated with a particular
+// peer in the download tester. The returned function can be used to retrieve
+// batches of block receipts from the particularly requested peer.
+func (dl *downloadTester) peerGetReceiptsFn(id string, delay time.Duration) func([]common.Hash) error {
+ return func(hashes []common.Hash) error {
+ time.Sleep(delay)
+
+ dl.lock.RLock()
+ defer dl.lock.RUnlock()
+
+ blocks := dl.peerBlocks[id]
+
+ receipts := make([][]*types.Receipt, 0, len(hashes))
+ for _, hash := range hashes {
+ if block, ok := blocks[hash]; ok {
+ receipts = append(receipts, block.Receipts())
+ }
+ }
+ go dl.downloader.DeliverReceipts(id, receipts)
+
+ return nil
+ }
+}
+
+// assertOwnChain checks if the local chain contains the correct number of items
+// of the various chain components.
+func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
+ headers, blocks, receipts := length, length, length
+ switch tester.downloader.mode {
+ case FullSync:
+ receipts = 1
+ case LightSync:
+ blocks, receipts = 1, 1
+ }
+
+ if hs := len(tester.ownHeaders); hs != headers {
+ t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers)
+ }
+ if bs := len(tester.ownBlocks); bs != blocks {
+ t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, blocks)
+ }
+ if rs := len(tester.ownReceipts); rs != receipts {
+ t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts)
+ }
+}
+
// Tests that simple synchronization against a canonical chain works correctly.
// In this test common ancestor lookup should be short circuited and not require
// binary searching.
-func TestCanonicalSynchronisation61(t *testing.T) { testCanonicalSynchronisation(t, 61) }
-func TestCanonicalSynchronisation62(t *testing.T) { testCanonicalSynchronisation(t, 62) }
-func TestCanonicalSynchronisation63(t *testing.T) { testCanonicalSynchronisation(t, 63) }
-func TestCanonicalSynchronisation64(t *testing.T) { testCanonicalSynchronisation(t, 64) }
-
-func testCanonicalSynchronisation(t *testing.T, protocol int) {
+func TestCanonicalSynchronisation61(t *testing.T) { testCanonicalSynchronisation(t, 61, FullSync) }
+func TestCanonicalSynchronisation62(t *testing.T) { testCanonicalSynchronisation(t, 62, FullSync) }
+func TestCanonicalSynchronisation63Full(t *testing.T) { testCanonicalSynchronisation(t, 63, FullSync) }
+func TestCanonicalSynchronisation63Fast(t *testing.T) { testCanonicalSynchronisation(t, 63, FastSync) }
+func TestCanonicalSynchronisation64Full(t *testing.T) { testCanonicalSynchronisation(t, 64, FullSync) }
+func TestCanonicalSynchronisation64Fast(t *testing.T) { testCanonicalSynchronisation(t, 64, FastSync) }
+func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronisation(t, 64, LightSync) }
+
+func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
- hashes, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
- tester := newTester()
- tester.newPeer("peer", protocol, hashes, blocks)
+ tester := newTester(mode)
+ tester.newPeer("peer", protocol, hashes, headers, blocks)
- // Synchronise with the peer and make sure all blocks were retrieved
+ // Synchronise with the peer and make sure all relevant data was retrieved
if err := tester.sync("peer", nil); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
- if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
- t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
- }
+ assertOwnChain(t, tester, targetBlocks+1)
}
// Tests that if a large batch of blocks are being downloaded, it is throttled
// until the cached blocks are retrieved.
-func TestThrottling61(t *testing.T) { testThrottling(t, 61) }
-func TestThrottling62(t *testing.T) { testThrottling(t, 62) }
-func TestThrottling63(t *testing.T) { testThrottling(t, 63) }
-func TestThrottling64(t *testing.T) { testThrottling(t, 64) }
-
-func testThrottling(t *testing.T, protocol int) {
+func TestThrottling61(t *testing.T) { testThrottling(t, 61, FullSync) }
+func TestThrottling62(t *testing.T) { testThrottling(t, 62, FullSync) }
+func TestThrottling63Full(t *testing.T) { testThrottling(t, 63, FullSync) }
+func TestThrottling63Fast(t *testing.T) { testThrottling(t, 63, FastSync) }
+func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) }
+func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
+
+func testThrottling(t *testing.T, protocol int, mode SyncMode) {
// Create a long block chain to download and the tester
targetBlocks := 8 * blockCacheLimit
- hashes, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
- tester := newTester()
- tester.newPeer("peer", protocol, hashes, blocks)
+ tester := newTester(mode)
+ tester.newPeer("peer", protocol, hashes, headers, blocks)
// Wrap the importer to allow stepping
blocked, proceed := uint32(0), make(chan struct{})
- tester.downloader.chainInsertHook = func(blocks []*Block) {
- atomic.StoreUint32(&blocked, uint32(len(blocks)))
+ tester.downloader.chainInsertHook = func(results []*fetchResult) {
+ atomic.StoreUint32(&blocked, uint32(len(results)))
<-proceed
}
// Start a synchronisation concurrently
@@ -469,7 +606,12 @@ func testThrottling(t *testing.T, protocol int) {
time.Sleep(25 * time.Millisecond)
tester.downloader.queue.lock.RLock()
- cached = len(tester.downloader.queue.blockPool)
+ cached = len(tester.downloader.queue.blockDonePool)
+ if mode == FastSync {
+ if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
+ cached = receipts
+ }
+ }
tester.downloader.queue.lock.RUnlock()
if cached == blockCacheLimit || len(tester.ownBlocks)+cached+int(atomic.LoadUint32(&blocked)) == targetBlocks+1 {
@@ -488,9 +630,7 @@ func testThrottling(t *testing.T, protocol int) {
}
}
// Check that we haven't pulled more blocks than available
- if len(tester.ownBlocks) > targetBlocks+1 {
- t.Fatalf("target block count mismatch: have %v, want %v", len(tester.ownBlocks), targetBlocks+1)
- }
+ assertOwnChain(t, tester, targetBlocks+1)
if err := <-errc; err != nil {
t.Fatalf("block synchronization failed: %v", err)
}
@@ -499,39 +639,39 @@ func testThrottling(t *testing.T, protocol int) {
// Tests that simple synchronization against a forked chain works correctly. In
// this test common ancestor lookup should *not* be short circuited, and a full
// binary search should be executed.
-func TestForkedSynchronisation61(t *testing.T) { testForkedSynchronisation(t, 61) }
-func TestForkedSynchronisation62(t *testing.T) { testForkedSynchronisation(t, 62) }
-func TestForkedSynchronisation63(t *testing.T) { testForkedSynchronisation(t, 63) }
-func TestForkedSynchronisation64(t *testing.T) { testForkedSynchronisation(t, 64) }
-
-func testForkedSynchronisation(t *testing.T, protocol int) {
+func TestForkedSynchronisation61(t *testing.T) { testForkedSynchronisation(t, 61, FullSync) }
+func TestForkedSynchronisation62(t *testing.T) { testForkedSynchronisation(t, 62, FullSync) }
+func TestForkedSynchronisation63Full(t *testing.T) { testForkedSynchronisation(t, 63, FullSync) }
+func TestForkedSynchronisation63Fast(t *testing.T) { testForkedSynchronisation(t, 63, FastSync) }
+func TestForkedSynchronisation64Full(t *testing.T) { testForkedSynchronisation(t, 64, FullSync) }
+func TestForkedSynchronisation64Fast(t *testing.T) { testForkedSynchronisation(t, 64, FastSync) }
+func TestForkedSynchronisation64Light(t *testing.T) { testForkedSynchronisation(t, 64, LightSync) }
+
+func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) {
// Create a long enough forked chain
common, fork := MaxHashFetch, 2*MaxHashFetch
- hashesA, hashesB, blocksA, blocksB := makeChainFork(common+fork, fork, genesis)
+ hashesA, hashesB, headersA, headersB, blocksA, blocksB := makeChainFork(common+fork, fork, genesis)
- tester := newTester()
- tester.newPeer("fork A", protocol, hashesA, blocksA)
- tester.newPeer("fork B", protocol, hashesB, blocksB)
+ tester := newTester(mode)
+ tester.newPeer("fork A", protocol, hashesA, headersA, blocksA)
+ tester.newPeer("fork B", protocol, hashesB, headersB, blocksB)
// Synchronise with the peer and make sure all blocks were retrieved
if err := tester.sync("fork A", nil); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
- if imported := len(tester.ownBlocks); imported != common+fork+1 {
- t.Fatalf("synchronised block mismatch: have %v, want %v", imported, common+fork+1)
- }
+ assertOwnChain(t, tester, common+fork+1)
+
// Synchronise with the second peer and make sure that fork is pulled too
if err := tester.sync("fork B", nil); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
- if imported := len(tester.ownBlocks); imported != common+2*fork+1 {
- t.Fatalf("synchronised block mismatch: have %v, want %v", imported, common+2*fork+1)
- }
+ assertOwnChain(t, tester, common+2*fork+1)
}
// Tests that an inactive downloader will not accept incoming hashes and blocks.
func TestInactiveDownloader61(t *testing.T) {
- tester := newTester()
+ tester := newTester(FullSync)
// Check that neither hashes nor blocks are accepted
if err := tester.downloader.DeliverHashes61("bad peer", []common.Hash{}); err != errNoSyncActive {
@@ -542,9 +682,10 @@ func TestInactiveDownloader61(t *testing.T) {
}
}
-// Tests that an inactive downloader will not accept incoming block headers and bodies.
+// Tests that an inactive downloader will not accept incoming block headers and
+// bodies.
func TestInactiveDownloader62(t *testing.T) {
- tester := newTester()
+ tester := newTester(FullSync)
// Check that neither block headers nor bodies are accepted
if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive {
@@ -555,13 +696,33 @@ func TestInactiveDownloader62(t *testing.T) {
}
}
-// Tests that a canceled download wipes all previously accumulated state.
-func TestCancel61(t *testing.T) { testCancel(t, 61) }
-func TestCancel62(t *testing.T) { testCancel(t, 62) }
-func TestCancel63(t *testing.T) { testCancel(t, 63) }
-func TestCancel64(t *testing.T) { testCancel(t, 64) }
+// Tests that an inactive downloader will not accept incoming block headers,
+// bodies and receipts.
+func TestInactiveDownloader63(t *testing.T) {
+ tester := newTester(FullSync)
+
+ // Check that neither block headers nor bodies are accepted
+ if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive {
+ t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
+ }
+ if err := tester.downloader.DeliverBodies("bad peer", [][]*types.Transaction{}, [][]*types.Header{}); err != errNoSyncActive {
+ t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
+ }
+ if err := tester.downloader.DeliverReceipts("bad peer", [][]*types.Receipt{}); err != errNoSyncActive {
+ t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
+ }
+}
-func testCancel(t *testing.T, protocol int) {
+// Tests that a canceled download wipes all previously accumulated state.
+func TestCancel61(t *testing.T) { testCancel(t, 61, FullSync) }
+func TestCancel62(t *testing.T) { testCancel(t, 62, FullSync) }
+func TestCancel63Full(t *testing.T) { testCancel(t, 63, FullSync) }
+func TestCancel63Fast(t *testing.T) { testCancel(t, 63, FastSync) }
+func TestCancel64Full(t *testing.T) { testCancel(t, 64, FullSync) }
+func TestCancel64Fast(t *testing.T) { testCancel(t, 64, FastSync) }
+func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) }
+
+func testCancel(t *testing.T, protocol int, mode SyncMode) {
// Create a small enough block chain to download and the tester
targetBlocks := blockCacheLimit - 15
if targetBlocks >= MaxHashFetch {
@@ -570,80 +731,81 @@ func testCancel(t *testing.T, protocol int) {
if targetBlocks >= MaxHeaderFetch {
targetBlocks = MaxHeaderFetch - 15
}
- hashes, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
- tester := newTester()
- tester.newPeer("peer", protocol, hashes, blocks)
+ tester := newTester(mode)
+ tester.newPeer("peer", protocol, hashes, headers, blocks)
// Make sure canceling works with a pristine downloader
tester.downloader.cancel()
- downloading, importing := tester.downloader.queue.Size()
- if downloading > 0 || importing > 0 {
- t.Errorf("download or import count mismatch: %d downloading, %d importing, want 0", downloading, importing)
+ if !tester.downloader.queue.Idle() {
+ t.Errorf("download queue not idle")
}
// Synchronise with the peer, but cancel afterwards
if err := tester.sync("peer", nil); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
tester.downloader.cancel()
- downloading, importing = tester.downloader.queue.Size()
- if downloading > 0 || importing > 0 {
- t.Errorf("download or import count mismatch: %d downloading, %d importing, want 0", downloading, importing)
+ if !tester.downloader.queue.Idle() {
+ t.Errorf("download queue not idle")
}
}
// Tests that synchronisation from multiple peers works as intended (multi thread sanity test).
-func TestMultiSynchronisation61(t *testing.T) { testMultiSynchronisation(t, 61) }
-func TestMultiSynchronisation62(t *testing.T) { testMultiSynchronisation(t, 62) }
-func TestMultiSynchronisation63(t *testing.T) { testMultiSynchronisation(t, 63) }
-func TestMultiSynchronisation64(t *testing.T) { testMultiSynchronisation(t, 64) }
-
-func testMultiSynchronisation(t *testing.T, protocol int) {
+func TestMultiSynchronisation61(t *testing.T) { testMultiSynchronisation(t, 61, FullSync) }
+func TestMultiSynchronisation62(t *testing.T) { testMultiSynchronisation(t, 62, FullSync) }
+func TestMultiSynchronisation63Full(t *testing.T) { testMultiSynchronisation(t, 63, FullSync) }
+func TestMultiSynchronisation63Fast(t *testing.T) { testMultiSynchronisation(t, 63, FastSync) }
+func TestMultiSynchronisation64Full(t *testing.T) { testMultiSynchronisation(t, 64, FullSync) }
+func TestMultiSynchronisation64Fast(t *testing.T) { testMultiSynchronisation(t, 64, FastSync) }
+func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) }
+
+func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
// Create various peers with various parts of the chain
targetPeers := 8
targetBlocks := targetPeers*blockCacheLimit - 15
- hashes, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
- tester := newTester()
+ tester := newTester(mode)
for i := 0; i < targetPeers; i++ {
id := fmt.Sprintf("peer #%d", i)
- tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], blocks)
+ tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], headers, blocks)
}
// Synchronise with the middle peer and make sure half of the blocks were retrieved
id := fmt.Sprintf("peer #%d", targetPeers/2)
if err := tester.sync(id, nil); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
- if imported := len(tester.ownBlocks); imported != len(tester.peerHashes[id]) {
- t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(tester.peerHashes[id]))
- }
+ assertOwnChain(t, tester, len(tester.peerHashes[id]))
+
// Synchronise with the best peer and make sure everything is retrieved
if err := tester.sync("peer #0", nil); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
- if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
- t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
- }
+ assertOwnChain(t, tester, targetBlocks+1)
}
// Tests that synchronisations behave well in multi-version protocol environments
// and not wreak havok on other nodes in the network.
-func TestMultiProtocolSynchronisation61(t *testing.T) { testMultiProtocolSynchronisation(t, 61) }
-func TestMultiProtocolSynchronisation62(t *testing.T) { testMultiProtocolSynchronisation(t, 62) }
-func TestMultiProtocolSynchronisation63(t *testing.T) { testMultiProtocolSynchronisation(t, 63) }
-func TestMultiProtocolSynchronisation64(t *testing.T) { testMultiProtocolSynchronisation(t, 64) }
-
-func testMultiProtocolSynchronisation(t *testing.T, protocol int) {
+func TestMultiProtoSynchronisation61(t *testing.T) { testMultiProtoSync(t, 61, FullSync) }
+func TestMultiProtoSynchronisation62(t *testing.T) { testMultiProtoSync(t, 62, FullSync) }
+func TestMultiProtoSynchronisation63Full(t *testing.T) { testMultiProtoSync(t, 63, FullSync) }
+func TestMultiProtoSynchronisation63Fast(t *testing.T) { testMultiProtoSync(t, 63, FastSync) }
+func TestMultiProtoSynchronisation64Full(t *testing.T) { testMultiProtoSync(t, 64, FullSync) }
+func TestMultiProtoSynchronisation64Fast(t *testing.T) { testMultiProtoSync(t, 64, FastSync) }
+func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) }
+
+func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
- hashes, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
// Create peers of every type
- tester := newTester()
- tester.newPeer("peer 61", 61, hashes, blocks)
- tester.newPeer("peer 62", 62, hashes, blocks)
- tester.newPeer("peer 63", 63, hashes, blocks)
- tester.newPeer("peer 64", 64, hashes, blocks)
+ tester := newTester(mode)
+ tester.newPeer("peer 61", 61, hashes, headers, blocks)
+ tester.newPeer("peer 62", 62, hashes, headers, blocks)
+ tester.newPeer("peer 63", 63, hashes, headers, blocks)
+ tester.newPeer("peer 64", 64, hashes, headers, blocks)
// Synchronise with the requestd peer and make sure all blocks were retrieved
if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil); err != nil {
@@ -661,150 +823,181 @@ func testMultiProtocolSynchronisation(t *testing.T, protocol int) {
}
}
-// Tests that if a block is empty (i.e. header only), no body request should be
+// Tests that if a block is empty (e.g. header only), no body request should be
// made, and instead the header should be assembled into a whole block in itself.
-func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) }
-func TestEmptyBlockShortCircuit63(t *testing.T) { testEmptyBlockShortCircuit(t, 63) }
-func TestEmptyBlockShortCircuit64(t *testing.T) { testEmptyBlockShortCircuit(t, 64) }
-
-func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
+func TestEmptyShortCircuit62(t *testing.T) { testEmptyShortCircuit(t, 62, FullSync) }
+func TestEmptyShortCircuit63Full(t *testing.T) { testEmptyShortCircuit(t, 63, FullSync) }
+func TestEmptyShortCircuit63Fast(t *testing.T) { testEmptyShortCircuit(t, 63, FastSync) }
+func TestEmptyShortCircuit64Full(t *testing.T) { testEmptyShortCircuit(t, 64, FullSync) }
+func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, FastSync) }
+func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) }
+
+func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
- hashes, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
- tester := newTester()
- tester.newPeer("peer", protocol, hashes, blocks)
+ tester := newTester(mode)
+ tester.newPeer("peer", protocol, hashes, headers, blocks)
// Instrument the downloader to signal body requests
- requested := int32(0)
+ bodies, receipts := int32(0), int32(0)
tester.downloader.bodyFetchHook = func(headers []*types.Header) {
- atomic.AddInt32(&requested, int32(len(headers)))
+ atomic.AddInt32(&bodies, int32(len(headers)))
+ }
+ tester.downloader.receiptFetchHook = func(headers []*types.Header) {
+ atomic.AddInt32(&receipts, int32(len(headers)))
}
// Synchronise with the peer and make sure all blocks were retrieved
if err := tester.sync("peer", nil); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
- if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
- t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
- }
+ assertOwnChain(t, tester, targetBlocks+1)
+
// Validate the number of block bodies that should have been requested
- needed := 0
+ bodiesNeeded, receiptsNeeded := 0, 0
for _, block := range blocks {
- if block != genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) {
- needed++
+ if mode != LightSync && block != genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) {
+ bodiesNeeded++
}
+ if mode == FastSync && block != genesis && len(block.Receipts()) > 0 {
+ receiptsNeeded++
+ }
+ }
+ if int(bodies) != bodiesNeeded {
+ t.Errorf("body retrieval count mismatch: have %v, want %v", bodies, bodiesNeeded)
}
- if int(requested) != needed {
- t.Fatalf("block body retrieval count mismatch: have %v, want %v", requested, needed)
+ if int(receipts) != receiptsNeeded {
+ t.Errorf("receipt retrieval count mismatch: have %v, want %v", receipts, receiptsNeeded)
}
}
// Tests that headers are enqueued continuously, preventing malicious nodes from
// stalling the downloader by feeding gapped header chains.
-func TestMissingHeaderAttack62(t *testing.T) { testMissingHeaderAttack(t, 62) }
-func TestMissingHeaderAttack63(t *testing.T) { testMissingHeaderAttack(t, 63) }
-func TestMissingHeaderAttack64(t *testing.T) { testMissingHeaderAttack(t, 64) }
-
-func testMissingHeaderAttack(t *testing.T, protocol int) {
+func TestMissingHeaderAttack62(t *testing.T) { testMissingHeaderAttack(t, 62, FullSync) }
+func TestMissingHeaderAttack63Full(t *testing.T) { testMissingHeaderAttack(t, 63, FullSync) }
+func TestMissingHeaderAttack63Fast(t *testing.T) { testMissingHeaderAttack(t, 63, FastSync) }
+func TestMissingHeaderAttack64Full(t *testing.T) { testMissingHeaderAttack(t, 64, FullSync) }
+func TestMissingHeaderAttack64Fast(t *testing.T) { testMissingHeaderAttack(t, 64, FastSync) }
+func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) }
+
+func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
- hashes, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
- tester := newTester()
+ tester := newTester(mode)
// Attempt a full sync with an attacker feeding gapped headers
- tester.newPeer("attack", protocol, hashes, blocks)
+ tester.newPeer("attack", protocol, hashes, headers, blocks)
missing := targetBlocks / 2
+ delete(tester.peerHeaders["attack"], hashes[missing])
delete(tester.peerBlocks["attack"], hashes[missing])
if err := tester.sync("attack", nil); err == nil {
t.Fatalf("succeeded attacker synchronisation")
}
// Synchronise with the valid peer and make sure sync succeeds
- tester.newPeer("valid", protocol, hashes, blocks)
+ tester.newPeer("valid", protocol, hashes, headers, blocks)
if err := tester.sync("valid", nil); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
- if imported := len(tester.ownBlocks); imported != len(hashes) {
- t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(hashes))
- }
+ assertOwnChain(t, tester, targetBlocks+1)
}
// Tests that if requested headers are shifted (i.e. first is missing), the queue
// detects the invalid numbering.
-func TestShiftedHeaderAttack62(t *testing.T) { testShiftedHeaderAttack(t, 62) }
-func TestShiftedHeaderAttack63(t *testing.T) { testShiftedHeaderAttack(t, 63) }
-func TestShiftedHeaderAttack64(t *testing.T) { testShiftedHeaderAttack(t, 64) }
-
-func testShiftedHeaderAttack(t *testing.T, protocol int) {
+func TestShiftedHeaderAttack62(t *testing.T) { testShiftedHeaderAttack(t, 62, FullSync) }
+func TestShiftedHeaderAttack63Full(t *testing.T) { testShiftedHeaderAttack(t, 63, FullSync) }
+func TestShiftedHeaderAttack63Fast(t *testing.T) { testShiftedHeaderAttack(t, 63, FastSync) }
+func TestShiftedHeaderAttack64Full(t *testing.T) { testShiftedHeaderAttack(t, 64, FullSync) }
+func TestShiftedHeaderAttack64Fast(t *testing.T) { testShiftedHeaderAttack(t, 64, FastSync) }
+func TestShiftedHeaderAttack64Light(t *testing.T) { testShiftedHeaderAttack(t, 64, LightSync) }
+
+func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
- hashes, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
- tester := newTester()
+ tester := newTester(mode)
// Attempt a full sync with an attacker feeding shifted headers
- tester.newPeer("attack", protocol, hashes, blocks)
+ tester.newPeer("attack", protocol, hashes, headers, blocks)
+ delete(tester.peerHeaders["attack"], hashes[len(hashes)-2])
delete(tester.peerBlocks["attack"], hashes[len(hashes)-2])
if err := tester.sync("attack", nil); err == nil {
t.Fatalf("succeeded attacker synchronisation")
}
// Synchronise with the valid peer and make sure sync succeeds
- tester.newPeer("valid", protocol, hashes, blocks)
+ tester.newPeer("valid", protocol, hashes, headers, blocks)
if err := tester.sync("valid", nil); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
- if imported := len(tester.ownBlocks); imported != len(hashes) {
- t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(hashes))
- }
+ assertOwnChain(t, tester, targetBlocks+1)
}
-// Tests that if a peer sends an invalid body for a requested block, it gets
-// dropped immediately by the downloader.
-func TestInvalidBlockBodyAttack62(t *testing.T) { testInvalidBlockBodyAttack(t, 62) }
-func TestInvalidBlockBodyAttack63(t *testing.T) { testInvalidBlockBodyAttack(t, 63) }
-func TestInvalidBlockBodyAttack64(t *testing.T) { testInvalidBlockBodyAttack(t, 64) }
+// Tests that if a peer sends an invalid block piece (body or receipt) for a
+// requested block, it gets dropped immediately by the downloader.
+func TestInvalidContentAttack62(t *testing.T) { testInvalidContentAttack(t, 62, FullSync) }
+func TestInvalidContentAttack63Full(t *testing.T) { testInvalidContentAttack(t, 63, FullSync) }
+func TestInvalidContentAttack63Fast(t *testing.T) { testInvalidContentAttack(t, 63, FastSync) }
+func TestInvalidContentAttack64Full(t *testing.T) { testInvalidContentAttack(t, 64, FullSync) }
+func TestInvalidContentAttack64Fast(t *testing.T) { testInvalidContentAttack(t, 64, FastSync) }
+func TestInvalidContentAttack64Light(t *testing.T) { testInvalidContentAttack(t, 64, LightSync) }
-func testInvalidBlockBodyAttack(t *testing.T, protocol int) {
+func testInvalidContentAttack(t *testing.T, protocol int, mode SyncMode) {
// Create two peers, one feeding invalid block bodies
targetBlocks := 4*blockCacheLimit - 15
- hashes, validBlocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, validBlocks := makeChain(targetBlocks, 0, genesis)
invalidBlocks := make(map[common.Hash]*types.Block)
for hash, block := range validBlocks {
invalidBlocks[hash] = types.NewBlockWithHeader(block.Header())
}
+ invalidReceipts := make(map[common.Hash]*types.Block)
+ for hash, block := range validBlocks {
+ invalidReceipts[hash] = types.NewBlockWithHeader(block.Header()).WithBody(block.Transactions(), block.Uncles())
+ }
- tester := newTester()
- tester.newPeer("valid", protocol, hashes, validBlocks)
- tester.newPeer("attack", protocol, hashes, invalidBlocks)
-
+ tester := newTester(mode)
+ tester.newPeer("valid", protocol, hashes, headers, validBlocks)
+ if mode != LightSync {
+ tester.newPeer("body attack", protocol, hashes, headers, invalidBlocks)
+ }
+ if mode == FastSync {
+ tester.newPeer("receipt attack", protocol, hashes, headers, invalidReceipts)
+ }
// Synchronise with the valid peer (will pull contents from the attacker too)
if err := tester.sync("valid", nil); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
- if imported := len(tester.ownBlocks); imported != len(hashes) {
- t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(hashes))
- }
+ assertOwnChain(t, tester, targetBlocks+1)
+
// Make sure the attacker was detected and dropped in the mean time
- if _, ok := tester.peerHashes["attack"]; ok {
+ if _, ok := tester.peerHashes["body attack"]; ok {
t.Fatalf("block body attacker not detected/dropped")
}
+ if _, ok := tester.peerHashes["receipt attack"]; ok {
+ t.Fatalf("receipt attacker not detected/dropped")
+ }
}
// Tests that a peer advertising an high TD doesn't get to stall the downloader
// afterwards by not sending any useful hashes.
-func TestHighTDStarvationAttack61(t *testing.T) { testHighTDStarvationAttack(t, 61) }
-func TestHighTDStarvationAttack62(t *testing.T) { testHighTDStarvationAttack(t, 62) }
-func TestHighTDStarvationAttack63(t *testing.T) { testHighTDStarvationAttack(t, 63) }
-func TestHighTDStarvationAttack64(t *testing.T) { testHighTDStarvationAttack(t, 64) }
-
-func testHighTDStarvationAttack(t *testing.T, protocol int) {
- tester := newTester()
- hashes, blocks := makeChain(0, 0, genesis)
-
- tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, blocks)
+func TestHighTDStarvationAttack61(t *testing.T) { testHighTDStarvationAttack(t, 61, FullSync) }
+func TestHighTDStarvationAttack62(t *testing.T) { testHighTDStarvationAttack(t, 62, FullSync) }
+func TestHighTDStarvationAttack63Full(t *testing.T) { testHighTDStarvationAttack(t, 63, FullSync) }
+func TestHighTDStarvationAttack63Fast(t *testing.T) { testHighTDStarvationAttack(t, 63, FastSync) }
+func TestHighTDStarvationAttack64Full(t *testing.T) { testHighTDStarvationAttack(t, 64, FullSync) }
+func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttack(t, 64, FastSync) }
+func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) }
+
+func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
+ tester := newTester(mode)
+ hashes, headers, blocks := makeChain(0, 0, genesis)
+
+ tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, headers, blocks)
if err := tester.sync("attack", big.NewInt(1000000)); err != errStallingPeer {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
}
@@ -834,18 +1027,20 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
{errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
{errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
{errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
+ {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin
{errInvalidBody, false}, // A bad peer was detected, but not the sync origin
+ {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
{errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
}
// Run the tests and check disconnection status
- tester := newTester()
+ tester := newTester(FullSync)
for i, tt := range tests {
// Register a new peer and ensure it's presence
id := fmt.Sprintf("test %d", i)
- if err := tester.newPeer(id, protocol, []common.Hash{genesis.Hash()}, nil); err != nil {
+ if err := tester.newPeer(id, protocol, []common.Hash{genesis.Hash()}, nil, nil); err != nil {
t.Fatalf("test %d: failed to register new peer: %v", i, err)
}
if _, ok := tester.peerHashes[id]; !ok {
@@ -861,67 +1056,26 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
}
}
-// Tests that feeding bad blocks will result in a peer drop.
-func TestBlockBodyAttackerDropping61(t *testing.T) { testBlockBodyAttackerDropping(t, 61) }
-func TestBlockBodyAttackerDropping62(t *testing.T) { testBlockBodyAttackerDropping(t, 62) }
-func TestBlockBodyAttackerDropping63(t *testing.T) { testBlockBodyAttackerDropping(t, 63) }
-func TestBlockBodyAttackerDropping64(t *testing.T) { testBlockBodyAttackerDropping(t, 64) }
-
-func testBlockBodyAttackerDropping(t *testing.T, protocol int) {
- // Define the disconnection requirement for individual block import errors
- tests := []struct {
- failure bool
- drop bool
- }{
- {true, true},
- {false, false},
- }
-
- // Run the tests and check disconnection status
- tester := newTester()
- for i, tt := range tests {
- // Register a new peer and ensure it's presence
- id := fmt.Sprintf("test %d", i)
- if err := tester.newPeer(id, protocol, []common.Hash{common.Hash{}}, nil); err != nil {
- t.Fatalf("test %d: failed to register new peer: %v", i, err)
- }
- if _, ok := tester.peerHashes[id]; !ok {
- t.Fatalf("test %d: registered peer not found", i)
- }
- // Assemble a good or bad block, depending of the test
- raw := core.GenerateChain(genesis, testdb, 1, nil)[0]
- if tt.failure {
- parent := types.NewBlock(&types.Header{}, nil, nil, nil)
- raw = core.GenerateChain(parent, testdb, 1, nil)[0]
- }
- block := &Block{OriginPeer: id, RawBlock: raw}
-
- // Simulate block processing and check the result
- tester.downloader.queue.blockCache[0] = block
- tester.downloader.process()
- if _, ok := tester.peerHashes[id]; !ok != tt.drop {
- t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.failure, !ok, tt.drop)
- }
- }
-}
-
// Tests that synchronisation boundaries (origin block number and highest block
// number) is tracked and updated correctly.
-func TestSyncBoundaries61(t *testing.T) { testSyncBoundaries(t, 61) }
-func TestSyncBoundaries62(t *testing.T) { testSyncBoundaries(t, 62) }
-func TestSyncBoundaries63(t *testing.T) { testSyncBoundaries(t, 63) }
-func TestSyncBoundaries64(t *testing.T) { testSyncBoundaries(t, 64) }
-
-func testSyncBoundaries(t *testing.T, protocol int) {
+func TestSyncBoundaries61(t *testing.T) { testSyncBoundaries(t, 61, FullSync) }
+func TestSyncBoundaries62(t *testing.T) { testSyncBoundaries(t, 62, FullSync) }
+func TestSyncBoundaries63Full(t *testing.T) { testSyncBoundaries(t, 63, FullSync) }
+func TestSyncBoundaries63Fast(t *testing.T) { testSyncBoundaries(t, 63, FastSync) }
+func TestSyncBoundaries64Full(t *testing.T) { testSyncBoundaries(t, 64, FullSync) }
+func TestSyncBoundaries64Fast(t *testing.T) { testSyncBoundaries(t, 64, FastSync) }
+func TestSyncBoundaries64Light(t *testing.T) { testSyncBoundaries(t, 64, LightSync) }
+
+func testSyncBoundaries(t *testing.T, protocol int, mode SyncMode) {
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
- hashes, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
// Set a sync init hook to catch boundary changes
starting := make(chan struct{})
progress := make(chan struct{})
- tester := newTester()
+ tester := newTester(mode)
tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{}
<-progress
@@ -931,7 +1085,7 @@ func testSyncBoundaries(t *testing.T, protocol int) {
t.Fatalf("Pristine boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, 0)
}
// Synchronise half the blocks and check initial boundaries
- tester.newPeer("peer-half", protocol, hashes[targetBlocks/2:], blocks)
+ tester.newPeer("peer-half", protocol, hashes[targetBlocks/2:], headers, blocks)
pending := new(sync.WaitGroup)
pending.Add(1)
@@ -949,7 +1103,7 @@ func testSyncBoundaries(t *testing.T, protocol int) {
pending.Wait()
// Synchronise all the blocks and check continuation boundaries
- tester.newPeer("peer-full", protocol, hashes, blocks)
+ tester.newPeer("peer-full", protocol, hashes, headers, blocks)
pending.Add(1)
go func() {
@@ -969,21 +1123,24 @@ func testSyncBoundaries(t *testing.T, protocol int) {
// Tests that synchronisation boundaries (origin block number and highest block
// number) is tracked and updated correctly in case of a fork (or manual head
// revertal).
-func TestForkedSyncBoundaries61(t *testing.T) { testForkedSyncBoundaries(t, 61) }
-func TestForkedSyncBoundaries62(t *testing.T) { testForkedSyncBoundaries(t, 62) }
-func TestForkedSyncBoundaries63(t *testing.T) { testForkedSyncBoundaries(t, 63) }
-func TestForkedSyncBoundaries64(t *testing.T) { testForkedSyncBoundaries(t, 64) }
-
-func testForkedSyncBoundaries(t *testing.T, protocol int) {
+func TestForkedSyncBoundaries61(t *testing.T) { testForkedSyncBoundaries(t, 61, FullSync) }
+func TestForkedSyncBoundaries62(t *testing.T) { testForkedSyncBoundaries(t, 62, FullSync) }
+func TestForkedSyncBoundaries63Full(t *testing.T) { testForkedSyncBoundaries(t, 63, FullSync) }
+func TestForkedSyncBoundaries63Fast(t *testing.T) { testForkedSyncBoundaries(t, 63, FastSync) }
+func TestForkedSyncBoundaries64Full(t *testing.T) { testForkedSyncBoundaries(t, 64, FullSync) }
+func TestForkedSyncBoundaries64Fast(t *testing.T) { testForkedSyncBoundaries(t, 64, FastSync) }
+func TestForkedSyncBoundaries64Light(t *testing.T) { testForkedSyncBoundaries(t, 64, LightSync) }
+
+func testForkedSyncBoundaries(t *testing.T, protocol int, mode SyncMode) {
// Create a forked chain to simulate origin revertal
common, fork := MaxHashFetch, 2*MaxHashFetch
- hashesA, hashesB, blocksA, blocksB := makeChainFork(common+fork, fork, genesis)
+ hashesA, hashesB, headersA, headersB, blocksA, blocksB := makeChainFork(common+fork, fork, genesis)
// Set a sync init hook to catch boundary changes
starting := make(chan struct{})
progress := make(chan struct{})
- tester := newTester()
+ tester := newTester(mode)
tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{}
<-progress
@@ -993,7 +1150,7 @@ func testForkedSyncBoundaries(t *testing.T, protocol int) {
t.Fatalf("Pristine boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, 0)
}
// Synchronise with one of the forks and check boundaries
- tester.newPeer("fork A", protocol, hashesA, blocksA)
+ tester.newPeer("fork A", protocol, hashesA, headersA, blocksA)
pending := new(sync.WaitGroup)
pending.Add(1)
@@ -1014,7 +1171,7 @@ func testForkedSyncBoundaries(t *testing.T, protocol int) {
tester.downloader.syncStatsOrigin = tester.downloader.syncStatsHeight
// Synchronise with the second fork and check boundary resets
- tester.newPeer("fork B", protocol, hashesB, blocksB)
+ tester.newPeer("fork B", protocol, hashesB, headersB, blocksB)
pending.Add(1)
go func() {
@@ -1034,21 +1191,24 @@ func testForkedSyncBoundaries(t *testing.T, protocol int) {
// Tests that if synchronisation is aborted due to some failure, then the boundary
// origin is not updated in the next sync cycle, as it should be considered the
// continuation of the previous sync and not a new instance.
-func TestFailedSyncBoundaries61(t *testing.T) { testFailedSyncBoundaries(t, 61) }
-func TestFailedSyncBoundaries62(t *testing.T) { testFailedSyncBoundaries(t, 62) }
-func TestFailedSyncBoundaries63(t *testing.T) { testFailedSyncBoundaries(t, 63) }
-func TestFailedSyncBoundaries64(t *testing.T) { testFailedSyncBoundaries(t, 64) }
-
-func testFailedSyncBoundaries(t *testing.T, protocol int) {
+func TestFailedSyncBoundaries61(t *testing.T) { testFailedSyncBoundaries(t, 61, FullSync) }
+func TestFailedSyncBoundaries62(t *testing.T) { testFailedSyncBoundaries(t, 62, FullSync) }
+func TestFailedSyncBoundaries63Full(t *testing.T) { testFailedSyncBoundaries(t, 63, FullSync) }
+func TestFailedSyncBoundaries63Fast(t *testing.T) { testFailedSyncBoundaries(t, 63, FastSync) }
+func TestFailedSyncBoundaries64Full(t *testing.T) { testFailedSyncBoundaries(t, 64, FullSync) }
+func TestFailedSyncBoundaries64Fast(t *testing.T) { testFailedSyncBoundaries(t, 64, FastSync) }
+func TestFailedSyncBoundaries64Light(t *testing.T) { testFailedSyncBoundaries(t, 64, LightSync) }
+
+func testFailedSyncBoundaries(t *testing.T, protocol int, mode SyncMode) {
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
- hashes, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
// Set a sync init hook to catch boundary changes
starting := make(chan struct{})
progress := make(chan struct{})
- tester := newTester()
+ tester := newTester(mode)
tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{}
<-progress
@@ -1058,8 +1218,9 @@ func testFailedSyncBoundaries(t *testing.T, protocol int) {
t.Fatalf("Pristine boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, 0)
}
// Attempt a full sync with a faulty peer
- tester.newPeer("faulty", protocol, hashes, blocks)
+ tester.newPeer("faulty", protocol, hashes, headers, blocks)
missing := targetBlocks / 2
+ delete(tester.peerHeaders["faulty"], hashes[missing])
delete(tester.peerBlocks["faulty"], hashes[missing])
pending := new(sync.WaitGroup)
@@ -1079,7 +1240,7 @@ func testFailedSyncBoundaries(t *testing.T, protocol int) {
pending.Wait()
// Synchronise with a good peer and check that the boundary origin remind the same after a failure
- tester.newPeer("valid", protocol, hashes, blocks)
+ tester.newPeer("valid", protocol, hashes, headers, blocks)
pending.Add(1)
go func() {
@@ -1098,21 +1259,24 @@ func testFailedSyncBoundaries(t *testing.T, protocol int) {
// Tests that if an attacker fakes a chain height, after the attack is detected,
// the boundary height is successfully reduced at the next sync invocation.
-func TestFakedSyncBoundaries61(t *testing.T) { testFakedSyncBoundaries(t, 61) }
-func TestFakedSyncBoundaries62(t *testing.T) { testFakedSyncBoundaries(t, 62) }
-func TestFakedSyncBoundaries63(t *testing.T) { testFakedSyncBoundaries(t, 63) }
-func TestFakedSyncBoundaries64(t *testing.T) { testFakedSyncBoundaries(t, 64) }
-
-func testFakedSyncBoundaries(t *testing.T, protocol int) {
+func TestFakedSyncBoundaries61(t *testing.T) { testFakedSyncBoundaries(t, 61, FullSync) }
+func TestFakedSyncBoundaries62(t *testing.T) { testFakedSyncBoundaries(t, 62, FullSync) }
+func TestFakedSyncBoundaries63Full(t *testing.T) { testFakedSyncBoundaries(t, 63, FullSync) }
+func TestFakedSyncBoundaries63Fast(t *testing.T) { testFakedSyncBoundaries(t, 63, FastSync) }
+func TestFakedSyncBoundaries64Full(t *testing.T) { testFakedSyncBoundaries(t, 64, FullSync) }
+func TestFakedSyncBoundaries64Fast(t *testing.T) { testFakedSyncBoundaries(t, 64, FastSync) }
+func TestFakedSyncBoundaries64Light(t *testing.T) { testFakedSyncBoundaries(t, 64, LightSync) }
+
+func testFakedSyncBoundaries(t *testing.T, protocol int, mode SyncMode) {
// Create a small block chain
targetBlocks := blockCacheLimit - 15
- hashes, blocks := makeChain(targetBlocks+3, 0, genesis)
+ hashes, headers, blocks := makeChain(targetBlocks+3, 0, genesis)
// Set a sync init hook to catch boundary changes
starting := make(chan struct{})
progress := make(chan struct{})
- tester := newTester()
+ tester := newTester(mode)
tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{}
<-progress
@@ -1122,8 +1286,9 @@ func testFakedSyncBoundaries(t *testing.T, protocol int) {
t.Fatalf("Pristine boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, 0)
}
// Create and sync with an attacker that promises a higher chain than available
- tester.newPeer("attack", protocol, hashes, blocks)
+ tester.newPeer("attack", protocol, hashes, headers, blocks)
for i := 1; i < 3; i++ {
+ delete(tester.peerHeaders["attack"], hashes[i])
delete(tester.peerBlocks["attack"], hashes[i])
}
@@ -1144,7 +1309,7 @@ func testFakedSyncBoundaries(t *testing.T, protocol int) {
pending.Wait()
// Synchronise with a good peer and check that the boundary height has been reduced to the true value
- tester.newPeer("valid", protocol, hashes[3:], blocks)
+ tester.newPeer("valid", protocol, hashes[3:], headers, blocks)
pending.Add(1)
go func() {
diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go
index fd926affd..92acb6ba8 100644
--- a/eth/downloader/metrics.go
+++ b/eth/downloader/metrics.go
@@ -42,4 +42,9 @@ var (
bodyReqTimer = metrics.NewTimer("eth/downloader/bodies/req")
bodyDropMeter = metrics.NewMeter("eth/downloader/bodies/drop")
bodyTimeoutMeter = metrics.NewMeter("eth/downloader/bodies/timeout")
+
+ receiptInMeter = metrics.NewMeter("eth/downloader/receipts/in")
+ receiptReqTimer = metrics.NewTimer("eth/downloader/receipts/req")
+ receiptDropMeter = metrics.NewMeter("eth/downloader/receipts/drop")
+ receiptTimeoutMeter = metrics.NewMeter("eth/downloader/receipts/timeout")
)
diff --git a/eth/downloader/modes.go b/eth/downloader/modes.go
new file mode 100644
index 000000000..8916dbb79
--- /dev/null
+++ b/eth/downloader/modes.go
@@ -0,0 +1,26 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+// SyncMode represents the synchronisation mode of the downloader.
+type SyncMode int
+
+const (
+ FullSync SyncMode = iota // Synchronise the entire block-chain history from full blocks
+ FastSync // Quikcly download the headers, full sync only at the chain head
+ LightSync // Download only the headers and terminate afterwards
+)
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index c1d20ac61..5fc0db587 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -36,10 +36,11 @@ type relativeHashFetcherFn func(common.Hash) error
type absoluteHashFetcherFn func(uint64, int) error
type blockFetcherFn func([]common.Hash) error
-// Block header and body fethers belonging to eth/62 and above
+// Block header and body fetchers belonging to eth/62 and above
type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
type blockBodyFetcherFn func([]common.Hash) error
+type receiptFetcherFn func([]common.Hash) error
var (
errAlreadyFetching = errors.New("already fetching blocks from peer")
@@ -52,11 +53,14 @@ type peer struct {
id string // Unique identifier of the peer
head common.Hash // Hash of the peers latest known block
- idle int32 // Current activity state of the peer (idle = 0, active = 1)
- rep int32 // Simple peer reputation
+ blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
+ receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
+ rep int32 // Simple peer reputation
- capacity int32 // Number of blocks allowed to fetch per request
- started time.Time // Time instance when the last fetch was started
+ blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request
+ receiptCapacity int32 // Number of receipts allowed to fetch per request
+ blockStarted time.Time // Time instance when the last block (body)fetch was started
+ receiptStarted time.Time // Time instance when the last receipt fetch was started
ignored *set.Set // Set of hashes not to request (didn't have previously)
@@ -68,6 +72,8 @@ type peer struct {
getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position
getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies
+ getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts
+
version int // Eth protocol version number to switch strategies
}
@@ -75,12 +81,14 @@ type peer struct {
// mechanisms.
func newPeer(id string, version int, head common.Hash,
getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
- getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn) *peer {
+ getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
+ getReceipts receiptFetcherFn) *peer {
return &peer{
- id: id,
- head: head,
- capacity: 1,
- ignored: set.New(),
+ id: id,
+ head: head,
+ blockCapacity: 1,
+ receiptCapacity: 1,
+ ignored: set.New(),
getRelHashes: getRelHashes,
getAbsHashes: getAbsHashes,
@@ -90,24 +98,28 @@ func newPeer(id string, version int, head common.Hash,
getAbsHeaders: getAbsHeaders,
getBlockBodies: getBlockBodies,
+ getReceipts: getReceipts,
+
version: version,
}
}
// Reset clears the internal state of a peer entity.
func (p *peer) Reset() {
- atomic.StoreInt32(&p.idle, 0)
- atomic.StoreInt32(&p.capacity, 1)
+ atomic.StoreInt32(&p.blockIdle, 0)
+ atomic.StoreInt32(&p.receiptIdle, 0)
+ atomic.StoreInt32(&p.blockCapacity, 1)
+ atomic.StoreInt32(&p.receiptCapacity, 1)
p.ignored.Clear()
}
// Fetch61 sends a block retrieval request to the remote peer.
func (p *peer) Fetch61(request *fetchRequest) error {
// Short circuit if the peer is already fetching
- if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) {
+ if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) {
return errAlreadyFetching
}
- p.started = time.Now()
+ p.blockStarted = time.Now()
// Convert the hash set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Hashes))
@@ -119,13 +131,13 @@ func (p *peer) Fetch61(request *fetchRequest) error {
return nil
}
-// Fetch sends a block body retrieval request to the remote peer.
-func (p *peer) Fetch(request *fetchRequest) error {
+// FetchBodies sends a block body retrieval request to the remote peer.
+func (p *peer) FetchBodies(request *fetchRequest) error {
// Short circuit if the peer is already fetching
- if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) {
+ if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) {
return errAlreadyFetching
}
- p.started = time.Now()
+ p.blockStarted = time.Now()
// Convert the header set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Headers))
@@ -137,55 +149,64 @@ func (p *peer) Fetch(request *fetchRequest) error {
return nil
}
-// SetIdle61 sets the peer to idle, allowing it to execute new retrieval requests.
-// Its block retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time or not.
-func (p *peer) SetIdle61() {
- // Update the peer's download allowance based on previous performance
- scale := 2.0
- if time.Since(p.started) > blockSoftTTL {
- scale = 0.5
- if time.Since(p.started) > blockHardTTL {
- scale = 1 / float64(MaxBlockFetch) // reduces capacity to 1
- }
+// FetchReceipts sends a receipt retrieval request to the remote peer.
+func (p *peer) FetchReceipts(request *fetchRequest) error {
+ // Short circuit if the peer is already fetching
+ if !atomic.CompareAndSwapInt32(&p.receiptIdle, 0, 1) {
+ return errAlreadyFetching
}
- for {
- // Calculate the new download bandwidth allowance
- prev := atomic.LoadInt32(&p.capacity)
- next := int32(math.Max(1, math.Min(float64(MaxBlockFetch), float64(prev)*scale)))
+ p.receiptStarted = time.Now()
- // Try to update the old value
- if atomic.CompareAndSwapInt32(&p.capacity, prev, next) {
- // If we're having problems at 1 capacity, try to find better peers
- if next == 1 {
- p.Demote()
- }
- break
- }
+ // Convert the header set to a retrievable slice
+ hashes := make([]common.Hash, 0, len(request.Headers))
+ for _, header := range request.Headers {
+ hashes = append(hashes, header.Hash())
}
- // Set the peer to idle to allow further block requests
- atomic.StoreInt32(&p.idle, 0)
+ go p.getReceipts(hashes)
+
+ return nil
+}
+
+// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests.
+// Its block retrieval allowance will also be updated either up- or downwards,
+// depending on whether the previous fetch completed in time or not.
+func (p *peer) SetBlocksIdle() {
+ p.setIdle(p.blockStarted, blockSoftTTL, blockHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle)
}
-// SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
+// SetBodiesIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its block body retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
-func (p *peer) SetIdle() {
+func (p *peer) SetBodiesIdle() {
+ p.setIdle(p.blockStarted, bodySoftTTL, bodyHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle)
+}
+
+// SetReceiptsIdle sets the peer to idle, allowing it to execute new retrieval requests.
+// Its receipt retrieval allowance will also be updated either up- or downwards,
+// depending on whether the previous fetch completed in time or not.
+func (p *peer) SetReceiptsIdle() {
+ p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle)
+}
+
+// setIdle sets the peer to idle, allowing it to execute new retrieval requests.
+// Its data retrieval allowance will also be updated either up- or downwards,
+// depending on whether the previous fetch completed in time or not.
+func (p *peer) setIdle(started time.Time, softTTL, hardTTL time.Duration, maxFetch int, capacity, idle *int32) {
// Update the peer's download allowance based on previous performance
scale := 2.0
- if time.Since(p.started) > bodySoftTTL {
+ if time.Since(started) > softTTL {
scale = 0.5
- if time.Since(p.started) > bodyHardTTL {
- scale = 1 / float64(MaxBodyFetch) // reduces capacity to 1
+ if time.Since(started) > hardTTL {
+ scale = 1 / float64(maxFetch) // reduces capacity to 1
}
}
for {
// Calculate the new download bandwidth allowance
- prev := atomic.LoadInt32(&p.capacity)
- next := int32(math.Max(1, math.Min(float64(MaxBodyFetch), float64(prev)*scale)))
+ prev := atomic.LoadInt32(capacity)
+ next := int32(math.Max(1, math.Min(float64(maxFetch), float64(prev)*scale)))
// Try to update the old value
- if atomic.CompareAndSwapInt32(&p.capacity, prev, next) {
+ if atomic.CompareAndSwapInt32(capacity, prev, next) {
// If we're having problems at 1 capacity, try to find better peers
if next == 1 {
p.Demote()
@@ -193,14 +214,20 @@ func (p *peer) SetIdle() {
break
}
}
- // Set the peer to idle to allow further block requests
- atomic.StoreInt32(&p.idle, 0)
+ // Set the peer to idle to allow further fetch requests
+ atomic.StoreInt32(idle, 0)
+}
+
+// BlockCapacity retrieves the peers block download allowance based on its
+// previously discovered bandwidth capacity.
+func (p *peer) BlockCapacity() int {
+ return int(atomic.LoadInt32(&p.blockCapacity))
}
-// Capacity retrieves the peers block download allowance based on its previously
-// discovered bandwidth capacity.
-func (p *peer) Capacity() int {
- return int(atomic.LoadInt32(&p.capacity))
+// ReceiptCapacity retrieves the peers block download allowance based on its
+// previously discovered bandwidth capacity.
+func (p *peer) ReceiptCapacity() int {
+ return int(atomic.LoadInt32(&p.receiptCapacity))
}
// Promote increases the peer's reputation.
@@ -226,7 +253,8 @@ func (p *peer) Demote() {
func (p *peer) String() string {
return fmt.Sprintf("Peer %s [%s]", p.id,
fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+
- fmt.Sprintf("capacity %3d, ", atomic.LoadInt32(&p.capacity))+
+ fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+
+ fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+
fmt.Sprintf("ignored %4d", p.ignored.Size()),
)
}
@@ -310,26 +338,52 @@ func (ps *peerSet) AllPeers() []*peer {
return list
}
-// IdlePeers retrieves a flat list of all the currently idle peers within the
+// BlockIdlePeers retrieves a flat list of all the currently idle peers within the
// active peer set, ordered by their reputation.
-func (ps *peerSet) IdlePeers(version int) []*peer {
+func (ps *peerSet) BlockIdlePeers(version int) ([]*peer, int) {
ps.lock.RLock()
defer ps.lock.RUnlock()
- list := make([]*peer, 0, len(ps.peers))
+ idle, total := make([]*peer, 0, len(ps.peers)), 0
for _, p := range ps.peers {
- if (version == eth61 && p.version == eth61) || (version >= eth62 && p.version >= eth62) {
- if atomic.LoadInt32(&p.idle) == 0 {
- list = append(list, p)
+ if (version == 61 && p.version == 61) || (version >= 62 && p.version >= 62) {
+ if atomic.LoadInt32(&p.blockIdle) == 0 {
+ idle = append(idle, p)
}
+ total++
}
}
- for i := 0; i < len(list); i++ {
- for j := i + 1; j < len(list); j++ {
- if atomic.LoadInt32(&list[i].rep) < atomic.LoadInt32(&list[j].rep) {
- list[i], list[j] = list[j], list[i]
+ for i := 0; i < len(idle); i++ {
+ for j := i + 1; j < len(idle); j++ {
+ if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) {
+ idle[i], idle[j] = idle[j], idle[i]
}
}
}
- return list
+ return idle, total
+}
+
+// ReceiptIdlePeers retrieves a flat list of all the currently idle peers within the
+// active peer set, ordered by their reputation.
+func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ idle, total := make([]*peer, 0, len(ps.peers)), 0
+ for _, p := range ps.peers {
+ if p.version >= 63 {
+ if atomic.LoadInt32(&p.receiptIdle) == 0 {
+ idle = append(idle, p)
+ }
+ total++
+ }
+ }
+ for i := 0; i < len(idle); i++ {
+ for j := i + 1; j < len(idle); j++ {
+ if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) {
+ idle[i], idle[j] = idle[j], idle[i]
+ }
+ }
+ }
+ return idle, total
}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 49d1046fb..c53ad939e 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -29,11 +29,12 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/rcrowley/go-metrics"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
var (
- blockCacheLimit = 8 * MaxBlockFetch // Maximum number of blocks to cache before throttling the download
+ blockCacheLimit = 1024 // Maximum number of blocks to cache before throttling the download
)
var (
@@ -41,29 +42,47 @@ var (
errStaleDelivery = errors.New("stale delivery")
)
-// fetchRequest is a currently running block retrieval operation.
+// fetchRequest is a currently running data retrieval operation.
type fetchRequest struct {
Peer *peer // Peer to which the request was sent
- Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority)
+ Hashes map[common.Hash]int // [eth/61] Requested block with their insertion index (priority)
Headers []*types.Header // [eth/62] Requested headers, sorted by request order
Time time.Time // Time when the request was made
}
+// fetchResult is the assembly collecting partial results from potentially more
+// than one fetcher routines, until all outstanding retrievals complete and the
+// result as a whole can be processed.
+type fetchResult struct {
+ Pending int // Number of data fetches still pending
+
+ Header *types.Header
+ Uncles []*types.Header
+ Transactions types.Transactions
+ Receipts types.Receipts
+}
+
// queue represents hashes that are either need fetching or are being fetched
type queue struct {
hashPool map[common.Hash]int // [eth/61] Pending hashes, mapping to their insertion index (priority)
hashQueue *prque.Prque // [eth/61] Priority queue of the block hashes to fetch
hashCounter int // [eth/61] Counter indexing the added hashes to ensure retrieval order
- headerPool map[common.Hash]*types.Header // [eth/62] Pending headers, mapping from their hashes
- headerQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the bodies for
- headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
+ headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
- pendPool map[string]*fetchRequest // Currently pending block retrieval operations
+ blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers
+ blockTaskQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for
+ blockPendPool map[string]*fetchRequest // [eth/62] Currently pending block (body) retrieval operations
+ blockDonePool map[common.Hash]struct{} // [eth/62] Set of the completed block (body) fetches
- blockPool map[common.Hash]uint64 // Hash-set of the downloaded data blocks, mapping to cache indexes
- blockCache []*Block // Downloaded but not yet delivered blocks
- blockOffset uint64 // Offset of the first cached block in the block-chain
+ receiptTaskPool map[common.Hash]*types.Header // [eth/63] Pending receipt retrieval tasks, mapping hashes to headers
+ receiptTaskQueue *prque.Prque // [eth/63] Priority queue of the headers to fetch the receipts for
+ receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations
+ receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches
+
+ resultCache []*fetchResult // Downloaded but not yet delivered fetch results
+ resultOffset uint64 // Offset of the first cached fetch result in the block-chain
+ resultParts int // Number of fetch components required to complete an item
lock sync.RWMutex
}
@@ -71,13 +90,17 @@ type queue struct {
// newQueue creates a new download queue for scheduling block retrieval.
func newQueue() *queue {
return &queue{
- hashPool: make(map[common.Hash]int),
- hashQueue: prque.New(),
- headerPool: make(map[common.Hash]*types.Header),
- headerQueue: prque.New(),
- pendPool: make(map[string]*fetchRequest),
- blockPool: make(map[common.Hash]uint64),
- blockCache: make([]*Block, blockCacheLimit),
+ hashPool: make(map[common.Hash]int),
+ hashQueue: prque.New(),
+ blockTaskPool: make(map[common.Hash]*types.Header),
+ blockTaskQueue: prque.New(),
+ blockPendPool: make(map[string]*fetchRequest),
+ blockDonePool: make(map[common.Hash]struct{}),
+ receiptTaskPool: make(map[common.Hash]*types.Header),
+ receiptTaskQueue: prque.New(),
+ receiptPendPool: make(map[string]*fetchRequest),
+ receiptDonePool: make(map[common.Hash]struct{}),
+ resultCache: make([]*fetchResult, blockCacheLimit),
}
}
@@ -90,32 +113,37 @@ func (q *queue) Reset() {
q.hashQueue.Reset()
q.hashCounter = 0
- q.headerPool = make(map[common.Hash]*types.Header)
- q.headerQueue.Reset()
q.headerHead = common.Hash{}
- q.pendPool = make(map[string]*fetchRequest)
+ q.blockTaskPool = make(map[common.Hash]*types.Header)
+ q.blockTaskQueue.Reset()
+ q.blockPendPool = make(map[string]*fetchRequest)
+ q.blockDonePool = make(map[common.Hash]struct{})
+
+ q.receiptTaskPool = make(map[common.Hash]*types.Header)
+ q.receiptTaskQueue.Reset()
+ q.receiptPendPool = make(map[string]*fetchRequest)
+ q.receiptDonePool = make(map[common.Hash]struct{})
- q.blockPool = make(map[common.Hash]uint64)
- q.blockOffset = 0
- q.blockCache = make([]*Block, blockCacheLimit)
+ q.resultCache = make([]*fetchResult, blockCacheLimit)
+ q.resultOffset = 0
+ q.resultParts = 0
}
-// Size retrieves the number of blocks in the queue, returning separately for
-// pending and already downloaded.
-func (q *queue) Size() (int, int) {
+// PendingBlocks retrieves the number of block (body) requests pending for retrieval.
+func (q *queue) PendingBlocks() int {
q.lock.RLock()
defer q.lock.RUnlock()
- return len(q.hashPool) + len(q.headerPool), len(q.blockPool)
+ return q.hashQueue.Size() + q.blockTaskQueue.Size()
}
-// Pending retrieves the number of blocks pending for retrieval.
-func (q *queue) Pending() int {
+// PendingReceipts retrieves the number of block receipts pending for retrieval.
+func (q *queue) PendingReceipts() int {
q.lock.RLock()
defer q.lock.RUnlock()
- return q.hashQueue.Size() + q.headerQueue.Size()
+ return q.receiptTaskQueue.Size()
}
// InFlight retrieves the number of fetch requests currently in flight.
@@ -123,44 +151,55 @@ func (q *queue) InFlight() int {
q.lock.RLock()
defer q.lock.RUnlock()
- return len(q.pendPool)
+ return len(q.blockPendPool) + len(q.receiptPendPool)
+}
+
+// Idle returns if the queue is fully idle or has some data still inside. This
+// method is used by the tester to detect termination events.
+func (q *queue) Idle() bool {
+ q.lock.RLock()
+ defer q.lock.RUnlock()
+
+ queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size()
+ pending := len(q.blockPendPool) + len(q.receiptPendPool)
+ cached := len(q.blockDonePool) + len(q.receiptDonePool)
+
+ return (queued + pending + cached) == 0
}
-// Throttle checks if the download should be throttled (active block fetches
-// exceed block cache).
-func (q *queue) Throttle() bool {
+// ThrottleBlocks checks if the download should be throttled (active block (body)
+// fetches exceed block cache).
+func (q *queue) ThrottleBlocks() bool {
q.lock.RLock()
defer q.lock.RUnlock()
- // Calculate the currently in-flight block requests
+ // Calculate the currently in-flight block (body) requests
pending := 0
- for _, request := range q.pendPool {
+ for _, request := range q.blockPendPool {
pending += len(request.Hashes) + len(request.Headers)
}
- // Throttle if more blocks are in-flight than free space in the cache
- return pending >= len(q.blockCache)-len(q.blockPool)
+ // Throttle if more blocks (bodies) are in-flight than free space in the cache
+ return pending >= len(q.resultCache)-len(q.blockDonePool)
}
-// Has checks if a hash is within the download queue or not.
-func (q *queue) Has(hash common.Hash) bool {
+// ThrottleReceipts checks if the download should be throttled (active receipt
+// fetches exceed block cache).
+func (q *queue) ThrottleReceipts() bool {
q.lock.RLock()
defer q.lock.RUnlock()
- if _, ok := q.hashPool[hash]; ok {
- return true
- }
- if _, ok := q.headerPool[hash]; ok {
- return true
- }
- if _, ok := q.blockPool[hash]; ok {
- return true
+ // Calculate the currently in-flight receipt requests
+ pending := 0
+ for _, request := range q.receiptPendPool {
+ pending += len(request.Headers)
}
- return false
+ // Throttle if more receipts are in-flight than free space in the cache
+ return pending >= len(q.resultCache)-len(q.receiptDonePool)
}
-// Insert61 adds a set of hashes for the download queue for scheduling, returning
+// Schedule61 adds a set of hashes for the download queue for scheduling, returning
// the new hashes encountered.
-func (q *queue) Insert61(hashes []common.Hash, fifo bool) []common.Hash {
+func (q *queue) Schedule61(hashes []common.Hash, fifo bool) []common.Hash {
q.lock.Lock()
defer q.lock.Unlock()
@@ -186,22 +225,17 @@ func (q *queue) Insert61(hashes []common.Hash, fifo bool) []common.Hash {
return inserts
}
-// Insert adds a set of headers for the download queue for scheduling, returning
+// Schedule adds a set of headers for the download queue for scheduling, returning
// the new headers encountered.
-func (q *queue) Insert(headers []*types.Header, from uint64) []*types.Header {
+func (q *queue) Schedule(headers []*types.Header, from uint64, receipts bool) []*types.Header {
q.lock.Lock()
defer q.lock.Unlock()
// Insert all the headers prioritized by the contained block number
inserts := make([]*types.Header, 0, len(headers))
for _, header := range headers {
- // Make sure no duplicate requests are executed
- hash := header.Hash()
- if _, ok := q.headerPool[hash]; ok {
- glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled", header.Number.Uint64(), hash[:4])
- continue
- }
// Make sure chain order is honored and preserved throughout
+ hash := header.Hash()
if header.Number == nil || header.Number.Uint64() != from {
glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ordering, expected %d", header.Number, hash[:4], from)
break
@@ -210,69 +244,72 @@ func (q *queue) Insert(headers []*types.Header, from uint64) []*types.Header {
glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ancestry", header.Number, hash[:4])
break
}
- // Queue the header for body retrieval
+ // Make sure no duplicate requests are executed
+ if _, ok := q.blockTaskPool[hash]; ok {
+ glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for block fetch", header.Number.Uint64(), hash[:4])
+ continue
+ }
+ if _, ok := q.receiptTaskPool[hash]; ok {
+ glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for receipt fetch", header.Number.Uint64(), hash[:4])
+ continue
+ }
+ // Queue the header for content retrieval
+ q.blockTaskPool[hash] = header
+ q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
+ if receipts {
+ q.receiptTaskPool[hash] = header
+ q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
+ }
inserts = append(inserts, header)
- q.headerPool[hash] = header
- q.headerQueue.Push(header, -float32(header.Number.Uint64()))
q.headerHead = hash
from++
}
return inserts
}
-// GetHeadBlock retrieves the first block from the cache, or nil if it hasn't
+// GetHeadResult retrieves the first fetch result from the cache, or nil if it hasn't
// been downloaded yet (or simply non existent).
-func (q *queue) GetHeadBlock() *Block {
+func (q *queue) GetHeadResult() *fetchResult {
q.lock.RLock()
defer q.lock.RUnlock()
- if len(q.blockCache) == 0 {
+ if len(q.resultCache) == 0 || q.resultCache[0] == nil {
return nil
}
- return q.blockCache[0]
-}
-
-// GetBlock retrieves a downloaded block, or nil if non-existent.
-func (q *queue) GetBlock(hash common.Hash) *Block {
- q.lock.RLock()
- defer q.lock.RUnlock()
-
- // Short circuit if the block hasn't been downloaded yet
- index, ok := q.blockPool[hash]
- if !ok {
+ if q.resultCache[0].Pending > 0 {
return nil
}
- // Return the block if it's still available in the cache
- if q.blockOffset <= index && index < q.blockOffset+uint64(len(q.blockCache)) {
- return q.blockCache[index-q.blockOffset]
- }
- return nil
+ return q.resultCache[0]
}
-// TakeBlocks retrieves and permanently removes a batch of blocks from the cache.
-func (q *queue) TakeBlocks() []*Block {
+// TakeResults retrieves and permanently removes a batch of fetch results from
+// the cache.
+func (q *queue) TakeResults() []*fetchResult {
q.lock.Lock()
defer q.lock.Unlock()
- // Accumulate all available blocks
- blocks := []*Block{}
- for _, block := range q.blockCache {
- if block == nil {
+ // Accumulate all available results
+ results := []*fetchResult{}
+ for _, result := range q.resultCache {
+ if result == nil || result.Pending > 0 {
break
}
- blocks = append(blocks, block)
- delete(q.blockPool, block.RawBlock.Hash())
+ results = append(results, result)
+
+ hash := result.Header.Hash()
+ delete(q.blockDonePool, hash)
+ delete(q.receiptDonePool, hash)
}
- // Delete the blocks from the slice and let them be garbage collected
- // without this slice trick the blocks would stay in memory until nil
- // would be assigned to q.blocks
- copy(q.blockCache, q.blockCache[len(blocks):])
- for k, n := len(q.blockCache)-len(blocks), len(q.blockCache); k < n; k++ {
- q.blockCache[k] = nil
+ // Delete the results from the slice and let them be garbage collected
+ // without this slice trick the results would stay in memory until nil
+ // would be assigned to them.
+ copy(q.resultCache, q.resultCache[len(results):])
+ for k, n := len(q.resultCache)-len(results), len(q.resultCache); k < n; k++ {
+ q.resultCache[k] = nil
}
- q.blockOffset += uint64(len(blocks))
+ q.resultOffset += uint64(len(results))
- return blocks
+ return results
}
// Reserve61 reserves a set of hashes for the given peer, skipping any previously
@@ -286,12 +323,12 @@ func (q *queue) Reserve61(p *peer, count int) *fetchRequest {
if q.hashQueue.Empty() {
return nil
}
- if _, ok := q.pendPool[p.id]; ok {
+ if _, ok := q.blockPendPool[p.id]; ok {
return nil
}
// Calculate an upper limit on the hashes we might fetch (i.e. throttling)
- space := len(q.blockCache) - len(q.blockPool)
- for _, request := range q.pendPool {
+ space := len(q.resultCache) - len(q.blockDonePool)
+ for _, request := range q.blockPendPool {
space -= len(request.Hashes)
}
// Retrieve a batch of hashes, skipping previously failed ones
@@ -319,49 +356,82 @@ func (q *queue) Reserve61(p *peer, count int) *fetchRequest {
Hashes: send,
Time: time.Now(),
}
- q.pendPool[p.id] = request
+ q.blockPendPool[p.id] = request
return request
}
-// Reserve reserves a set of headers for the given peer, skipping any previously
-// failed download. Beside the next batch of needed fetches, it also returns a
-// flag whether empty blocks were queued requiring processing.
-func (q *queue) Reserve(p *peer, count int) (*fetchRequest, bool, error) {
+// ReserveBlocks reserves a set of body fetches for the given peer, skipping any
+// previously failed downloads. Beside the next batch of needed fetches, it also
+// returns a flag whether empty blocks were queued requiring processing.
+func (q *queue) ReserveBlocks(p *peer, count int) (*fetchRequest, bool, error) {
+ noop := func(header *types.Header) bool {
+ return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash
+ }
+ return q.reserveFetch(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, noop)
+}
+
+// ReserveReceipts reserves a set of receipt fetches for the given peer, skipping
+// any previously failed downloads. Beside the next batch of needed fetches, it
+// also returns a flag whether empty receipts were queued requiring importing.
+func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error) {
+ noop := func(header *types.Header) bool {
+ return header.ReceiptHash == types.EmptyRootHash
+ }
+ return q.reserveFetch(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, noop)
+}
+
+// reserveFetch reserves a set of data download operations for a given peer,
+// skipping any previously failed ones. This method is a generic version used
+// by the individual special reservation functions.
+func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
+ pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, noop func(*types.Header) bool) (*fetchRequest, bool, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the pool has been depleted, or if the peer's already
// downloading something (sanity check not to corrupt state)
- if q.headerQueue.Empty() {
+ if taskQueue.Empty() {
return nil, false, nil
}
- if _, ok := q.pendPool[p.id]; ok {
+ if _, ok := pendPool[p.id]; ok {
return nil, false, nil
}
- // Calculate an upper limit on the bodies we might fetch (i.e. throttling)
- space := len(q.blockCache) - len(q.blockPool)
- for _, request := range q.pendPool {
+ // Calculate an upper limit on the items we might fetch (i.e. throttling)
+ space := len(q.resultCache) - len(donePool)
+ for _, request := range pendPool {
space -= len(request.Headers)
}
- // Retrieve a batch of headers, skipping previously failed ones
+ // Retrieve a batch of tasks, skipping previously failed ones
send := make([]*types.Header, 0, count)
skip := make([]*types.Header, 0)
- process := false
- for proc := 0; proc < space && len(send) < count && !q.headerQueue.Empty(); proc++ {
- header := q.headerQueue.PopItem().(*types.Header)
+ progress := false
+ for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ {
+ header := taskQueue.PopItem().(*types.Header)
- // If the header defines an empty block, deliver straight
- if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
- if err := q.enqueue("", types.NewBlockWithHeader(header)); err != nil {
- return nil, false, errInvalidChain
+ // If we're the first to request this task, initialize the result container
+ index := int(header.Number.Int64() - int64(q.resultOffset))
+ if index >= len(q.resultCache) || index < 0 {
+ return nil, false, errInvalidChain
+ }
+ if q.resultCache[index] == nil {
+ q.resultCache[index] = &fetchResult{
+ Pending: q.resultParts,
+ Header: header,
}
- delete(q.headerPool, header.Hash())
- process, space, proc = true, space-1, proc-1
+ }
+ // If this fetch task is a noop, skip this fetch operation
+ if noop(header) {
+ donePool[header.Hash()] = struct{}{}
+ delete(taskPool, header.Hash())
+
+ space, proc = space-1, proc-1
+ q.resultCache[index].Pending--
+ progress = true
continue
}
- // If it's a content block, add to the body fetch request
+ // Otherwise if not a known unknown block, add to the retrieve list
if p.ignored.Has(header.Hash()) {
skip = append(skip, header)
} else {
@@ -370,24 +440,41 @@ func (q *queue) Reserve(p *peer, count int) (*fetchRequest, bool, error) {
}
// Merge all the skipped headers back
for _, header := range skip {
- q.headerQueue.Push(header, -float32(header.Number.Uint64()))
+ taskQueue.Push(header, -float32(header.Number.Uint64()))
}
// Assemble and return the block download request
if len(send) == 0 {
- return nil, process, nil
+ return nil, progress, nil
}
request := &fetchRequest{
Peer: p,
Headers: send,
Time: time.Now(),
}
- q.pendPool[p.id] = request
+ pendPool[p.id] = request
+
+ return request, progress, nil
+}
+
+// Cancel61 aborts a fetch request, returning all pending hashes to the queue.
+func (q *queue) Cancel61(request *fetchRequest) {
+ q.cancel(request, nil, q.blockPendPool)
+}
+
+// CancelBlocks aborts a body fetch request, returning all pending hashes to the
+// task queue.
+func (q *queue) CancelBlocks(request *fetchRequest) {
+ q.cancel(request, q.blockTaskQueue, q.blockPendPool)
+}
- return request, process, nil
+// CancelReceipts aborts a body fetch request, returning all pending hashes to
+// the task queue.
+func (q *queue) CancelReceipts(request *fetchRequest) {
+ q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
}
-// Cancel aborts a fetch request, returning all pending hashes to the queue.
-func (q *queue) Cancel(request *fetchRequest) {
+// Cancel aborts a fetch request, returning all pending hashes to the task queue.
+func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -395,20 +482,62 @@ func (q *queue) Cancel(request *fetchRequest) {
q.hashQueue.Push(hash, float32(index))
}
for _, header := range request.Headers {
- q.headerQueue.Push(header, -float32(header.Number.Uint64()))
+ taskQueue.Push(header, -float32(header.Number.Uint64()))
}
- delete(q.pendPool, request.Peer.id)
+ delete(pendPool, request.Peer.id)
}
-// Expire checks for in flight requests that exceeded a timeout allowance,
+// Revoke cancels all pending requests belonging to a given peer. This method is
+// meant to be called during a peer drop to quickly reassign owned data fetches
+// to remaining nodes.
+func (q *queue) Revoke(peerId string) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ if request, ok := q.blockPendPool[peerId]; ok {
+ for hash, index := range request.Hashes {
+ q.hashQueue.Push(hash, float32(index))
+ }
+ for _, header := range request.Headers {
+ q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
+ }
+ delete(q.blockPendPool, peerId)
+ }
+ if request, ok := q.receiptPendPool[peerId]; ok {
+ for _, header := range request.Headers {
+ q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
+ }
+ delete(q.receiptPendPool, peerId)
+ }
+}
+
+// Expire61 checks for in flight requests that exceeded a timeout allowance,
// canceling them and returning the responsible peers for penalization.
-func (q *queue) Expire(timeout time.Duration) []string {
+func (q *queue) Expire61(timeout time.Duration) []string {
+ return q.expire(timeout, q.blockPendPool, nil)
+}
+
+// ExpireBlocks checks for in flight block body requests that exceeded a timeout
+// allowance, canceling them and returning the responsible peers for penalization.
+func (q *queue) ExpireBlocks(timeout time.Duration) []string {
+ return q.expire(timeout, q.blockPendPool, q.blockTaskQueue)
+}
+
+// ExpireReceipts checks for in flight receipt requests that exceeded a timeout
+// allowance, canceling them and returning the responsible peers for penalization.
+func (q *queue) ExpireReceipts(timeout time.Duration) []string {
+ return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue)
+}
+
+// expire is the generic check that move expired tasks from a pending pool back
+// into a task pool, returning all entities caught with expired tasks.
+func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque) []string {
q.lock.Lock()
defer q.lock.Unlock()
// Iterate over the expired requests and return each to the queue
peers := []string{}
- for id, request := range q.pendPool {
+ for id, request := range pendPool {
if time.Since(request.Time) > timeout {
// Update the metrics with the timeout
if len(request.Hashes) > 0 {
@@ -421,14 +550,14 @@ func (q *queue) Expire(timeout time.Duration) []string {
q.hashQueue.Push(hash, float32(index))
}
for _, header := range request.Headers {
- q.headerQueue.Push(header, -float32(header.Number.Uint64()))
+ taskQueue.Push(header, -float32(header.Number.Uint64()))
}
peers = append(peers, id)
}
}
// Remove the expired requests from the pending pool
for _, id := range peers {
- delete(q.pendPool, id)
+ delete(pendPool, id)
}
return peers
}
@@ -439,12 +568,12 @@ func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) {
defer q.lock.Unlock()
// Short circuit if the blocks were never requested
- request := q.pendPool[id]
+ request := q.blockPendPool[id]
if request == nil {
return errNoFetchesPending
}
blockReqTimer.UpdateSince(request.Time)
- delete(q.pendPool, id)
+ delete(q.blockPendPool, id)
// If no blocks were retrieved, mark them as unavailable for the origin peer
if len(blocks) == 0 {
@@ -461,10 +590,19 @@ func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) {
errs = append(errs, fmt.Errorf("non-requested block %x", hash))
continue
}
- // Queue the block up for processing
- if err := q.enqueue(id, block); err != nil {
- return err
+ // Reconstruct the next result if contents match up
+ index := int(block.Number().Int64() - int64(q.resultOffset))
+ if index >= len(q.resultCache) || index < 0 {
+ errs = []error{errInvalidChain}
+ break
+ }
+ q.resultCache[index] = &fetchResult{
+ Header: block.Header(),
+ Transactions: block.Transactions(),
+ Uncles: block.Uncles(),
}
+ q.blockDonePool[block.Hash()] = struct{}{}
+
delete(request.Hashes, hash)
delete(q.hashPool, hash)
}
@@ -473,60 +611,94 @@ func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) {
q.hashQueue.Push(hash, float32(index))
}
// If none of the blocks were good, it's a stale delivery
- if len(errs) != 0 {
- if len(errs) == len(blocks) {
- return errStaleDelivery
- }
+ switch {
+ case len(errs) == 0:
+ return nil
+
+ case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
+ return errs[0]
+
+ case len(errs) == len(request.Headers):
+ return errStaleDelivery
+
+ default:
return fmt.Errorf("multiple failures: %v", errs)
}
- return nil
}
-// Deliver injects a block body retrieval response into the download queue.
-func (q *queue) Deliver(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error {
+// DeliverBlocks injects a block (body) retrieval response into the results queue.
+func (q *queue) DeliverBlocks(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error {
+ reconstruct := func(header *types.Header, index int, result *fetchResult) error {
+ if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
+ return errInvalidBody
+ }
+ result.Transactions = txLists[index]
+ result.Uncles = uncleLists[index]
+ return nil
+ }
+ return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, bodyReqTimer, len(txLists), reconstruct)
+}
+
+// DeliverReceipts injects a receipt retrieval response into the results queue.
+func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error {
+ reconstruct := func(header *types.Header, index int, result *fetchResult) error {
+ if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash {
+ return errInvalidReceipt
+ }
+ result.Receipts = receiptList[index]
+ return nil
+ }
+ return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, receiptReqTimer, len(receiptList), reconstruct)
+}
+
+// deliver injects a data retrieval response into the results queue.
+func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest,
+ donePool map[common.Hash]struct{}, reqTimer metrics.Timer, results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) error {
q.lock.Lock()
defer q.lock.Unlock()
- // Short circuit if the block bodies were never requested
- request := q.pendPool[id]
+ // Short circuit if the data was never requested
+ request := pendPool[id]
if request == nil {
return errNoFetchesPending
}
- bodyReqTimer.UpdateSince(request.Time)
- delete(q.pendPool, id)
+ reqTimer.UpdateSince(request.Time)
+ delete(pendPool, id)
- // If no block bodies were retrieved, mark them as unavailable for the origin peer
- if len(txLists) == 0 || len(uncleLists) == 0 {
+ // If no data items were retrieved, mark them as unavailable for the origin peer
+ if results == 0 {
for hash, _ := range request.Headers {
request.Peer.ignored.Add(hash)
}
}
- // Assemble each of the block bodies with their headers and queue for processing
+ // Assemble each of the results with their headers and retrieved data parts
errs := make([]error, 0)
for i, header := range request.Headers {
- // Short circuit block assembly if no more bodies are found
- if i >= len(txLists) || i >= len(uncleLists) {
+ // Short circuit assembly if no more fetch results are found
+ if i >= results {
break
}
- // Reconstruct the next block if contents match up
- if types.DeriveSha(types.Transactions(txLists[i])) != header.TxHash || types.CalcUncleHash(uncleLists[i]) != header.UncleHash {
- errs = []error{errInvalidBody}
+ // Reconstruct the next result if contents match up
+ index := int(header.Number.Int64() - int64(q.resultOffset))
+ if index >= len(q.resultCache) || index < 0 || q.resultCache[index] == nil {
+ errs = []error{errInvalidChain}
break
}
- block := types.NewBlockWithHeader(header).WithBody(txLists[i], uncleLists[i])
-
- // Queue the block up for processing
- if err := q.enqueue(id, block); err != nil {
+ if err := reconstruct(header, i, q.resultCache[index]); err != nil {
errs = []error{err}
break
}
+ donePool[header.Hash()] = struct{}{}
+ q.resultCache[index].Pending--
+
+ // Clean up a successful fetch
request.Headers[i] = nil
- delete(q.headerPool, header.Hash())
+ delete(taskPool, header.Hash())
}
// Return all failed or missing fetches to the queue
for _, header := range request.Headers {
if header != nil {
- q.headerQueue.Push(header, -float32(header.Number.Uint64()))
+ taskQueue.Push(header, -float32(header.Number.Uint64()))
}
}
// If none of the blocks were good, it's a stale delivery
@@ -534,11 +706,8 @@ func (q *queue) Deliver(id string, txLists [][]*types.Transaction, uncleLists []
case len(errs) == 0:
return nil
- case len(errs) == 1 && errs[0] == errInvalidBody:
- return errInvalidBody
-
- case len(errs) == 1 && errs[0] == errInvalidChain:
- return errInvalidChain
+ case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBody || errs[0] == errInvalidReceipt):
+ return errs[0]
case len(errs) == len(request.Headers):
return errStaleDelivery
@@ -548,29 +717,14 @@ func (q *queue) Deliver(id string, txLists [][]*types.Transaction, uncleLists []
}
}
-// enqueue inserts a new block into the final delivery queue, waiting for pickup
-// by the processor.
-func (q *queue) enqueue(origin string, block *types.Block) error {
- // If a requested block falls out of the range, the hash chain is invalid
- index := int(int64(block.NumberU64()) - int64(q.blockOffset))
- if index >= len(q.blockCache) || index < 0 {
- return errInvalidChain
- }
- // Otherwise merge the block and mark the hash done
- q.blockCache[index] = &Block{
- RawBlock: block,
- OriginPeer: origin,
- }
- q.blockPool[block.Header().Hash()] = block.NumberU64()
- return nil
-}
-
-// Prepare configures the block cache offset to allow accepting inbound blocks.
-func (q *queue) Prepare(offset uint64) {
+// Prepare configures the result cache to allow accepting and caching inbound
+// fetch results.
+func (q *queue) Prepare(offset uint64, parts int) {
q.lock.Lock()
defer q.lock.Unlock()
- if q.blockOffset < offset {
- q.blockOffset = offset
+ if q.resultOffset < offset {
+ q.resultOffset = offset
}
+ q.resultParts = parts
}