aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
Diffstat (limited to 'eth')
-rw-r--r--eth/downloader/downloader.go568
-rw-r--r--eth/downloader/downloader_test.go123
-rw-r--r--eth/downloader/metrics.go5
-rw-r--r--eth/downloader/peer.go107
-rw-r--r--eth/downloader/queue.go271
-rw-r--r--eth/downloader/types.go137
-rw-r--r--eth/handler.go25
-rw-r--r--eth/peer.go2
8 files changed, 779 insertions, 459 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 24ba3da17..96177ae8a 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -19,7 +19,6 @@ package downloader
import (
"errors"
- "fmt"
"math"
"math/big"
"strings"
@@ -29,9 +28,11 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/rcrowley/go-metrics"
)
var (
@@ -39,8 +40,8 @@ var (
MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request
- MaxStateFetch = 384 // Amount of node state values to allow fetching per request
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
+ MaxStateFetch = 384 // Amount of node state values to allow fetching per request
hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out
blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth
@@ -49,10 +50,13 @@ var (
bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth
bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired
receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
- receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a block body request is considered expired
+ receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired
+ stateSoftTTL = 2 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
+ stateHardTTL = 3 * stateSoftTTL // [eth/63] Maximum time allowance before a node data request is considered expired
maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
+ maxQueuedStates = 256 * 1024 // [eth/63] Maximum number of state requests to queue (DOS protection)
maxResultsProcess = 256 // Number of download results to import at once into the chain
headerCheckFrequency = 64 // Verification frequency of the downloaded headers during fast sync
@@ -84,98 +88,6 @@ var (
errNoSyncActive = errors.New("no sync active")
)
-// headerCheckFn is a callback type for verifying a header's presence in the local chain.
-type headerCheckFn func(common.Hash) bool
-
-// blockCheckFn is a callback type for verifying a block's presence in the local chain.
-type blockCheckFn func(common.Hash) bool
-
-// headerRetrievalFn is a callback type for retrieving a header from the local chain.
-type headerRetrievalFn func(common.Hash) *types.Header
-
-// blockRetrievalFn is a callback type for retrieving a block from the local chain.
-type blockRetrievalFn func(common.Hash) *types.Block
-
-// headHeaderRetrievalFn is a callback type for retrieving the head header from the local chain.
-type headHeaderRetrievalFn func() *types.Header
-
-// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain.
-type headBlockRetrievalFn func() *types.Block
-
-// headFastBlockRetrievalFn is a callback type for retrieving the head fast block from the local chain.
-type headFastBlockRetrievalFn func() *types.Block
-
-// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block.
-type tdRetrievalFn func(common.Hash) *big.Int
-
-// headerChainInsertFn is a callback type to insert a batch of headers into the local chain.
-type headerChainInsertFn func([]*types.Header, bool) (int, error)
-
-// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain.
-type blockChainInsertFn func(types.Blocks) (int, error)
-
-// receiptChainInsertFn is a callback type to insert a batch of receipts into the local chain.
-type receiptChainInsertFn func(types.Blocks, []types.Receipts) (int, error)
-
-// peerDropFn is a callback type for dropping a peer detected as malicious.
-type peerDropFn func(id string)
-
-// dataPack is a data message returned by a peer for some query.
-type dataPack interface {
- PeerId() string
- Empty() bool
- Stats() string
-}
-
-// hashPack is a batch of block hashes returned by a peer (eth/61).
-type hashPack struct {
- peerId string
- hashes []common.Hash
-}
-
-// blockPack is a batch of blocks returned by a peer (eth/61).
-type blockPack struct {
- peerId string
- blocks []*types.Block
-}
-
-// headerPack is a batch of block headers returned by a peer.
-type headerPack struct {
- peerId string
- headers []*types.Header
-}
-
-// bodyPack is a batch of block bodies returned by a peer.
-type bodyPack struct {
- peerId string
- transactions [][]*types.Transaction
- uncles [][]*types.Header
-}
-
-// PeerId retrieves the origin peer who sent this block body packet.
-func (p *bodyPack) PeerId() string { return p.peerId }
-
-// Empty returns whether the no block bodies were delivered.
-func (p *bodyPack) Empty() bool { return len(p.transactions) == 0 || len(p.uncles) == 0 }
-
-// Stats creates a textual stats report for logging purposes.
-func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) }
-
-// receiptPack is a batch of receipts returned by a peer.
-type receiptPack struct {
- peerId string
- receipts [][]*types.Receipt
-}
-
-// PeerId retrieves the origin peer who sent this receipt packet.
-func (p *receiptPack) PeerId() string { return p.peerId }
-
-// Empty returns whether the no receipts were delivered.
-func (p *receiptPack) Empty() bool { return len(p.receipts) == 0 }
-
-// Stats creates a textual stats report for logging purposes.
-func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) }
-
type Downloader struct {
mode SyncMode // Synchronisation mode defining the strategies used
mux *event.TypeMux // Event multiplexer to announce sync operation events
@@ -186,23 +98,26 @@ type Downloader struct {
interrupt int32 // Atomic boolean to signal termination
// Statistics
- syncStatsOrigin uint64 // Origin block number where syncing started at
- syncStatsHeight uint64 // Highest block number known when syncing started
- syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
+ syncStatsChainOrigin uint64 // Origin block number where syncing started at
+ syncStatsChainHeight uint64 // Highest block number known when syncing started
+ syncStatsStateTotal uint64 // Total number of node state entries known so far
+ syncStatsStateDone uint64 // Number of state trie entries already pulled
+ syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
// Callbacks
- hasHeader headerCheckFn // Checks if a header is present in the chain
- hasBlock blockCheckFn // Checks if a block is present in the chain
- getHeader headerRetrievalFn // Retrieves a header from the chain
- getBlock blockRetrievalFn // Retrieves a block from the chain
- headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
- headBlock headBlockRetrievalFn // Retrieves the head block from the chain
- headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
- getTd tdRetrievalFn // Retrieves the TD of a block from the chain
- insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
- insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
- insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
- dropPeer peerDropFn // Drops a peer for misbehaving
+ hasHeader headerCheckFn // Checks if a header is present in the chain
+ hasBlock blockCheckFn // Checks if a block is present in the chain
+ getHeader headerRetrievalFn // Retrieves a header from the chain
+ getBlock blockRetrievalFn // Retrieves a block from the chain
+ headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
+ headBlock headBlockRetrievalFn // Retrieves the head block from the chain
+ headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
+ commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head
+ getTd tdRetrievalFn // Retrieves the TD of a block from the chain
+ insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
+ insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
+ insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
+ dropPeer peerDropFn // Drops a peer for misbehaving
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
@@ -212,14 +127,16 @@ type Downloader struct {
// Channels
newPeerCh chan *peer
- hashCh chan hashPack // [eth/61] Channel receiving inbound hashes
- blockCh chan blockPack // [eth/61] Channel receiving inbound blocks
- headerCh chan headerPack // [eth/62] Channel receiving inbound block headers
- bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
- receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
- blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks
- bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
- receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
+ hashCh chan dataPack // [eth/61] Channel receiving inbound hashes
+ blockCh chan dataPack // [eth/61] Channel receiving inbound blocks
+ headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
+ bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
+ receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
+ stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
+ blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks
+ bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
+ receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
+ stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
@@ -232,36 +149,40 @@ type Downloader struct {
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
-func New(mode SyncMode, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn, getBlock blockRetrievalFn,
- headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn, getTd tdRetrievalFn,
- insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader {
+func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn,
+ getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn,
+ commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn,
+ insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader {
return &Downloader{
- mode: mode,
- mux: mux,
- queue: newQueue(),
- peers: newPeerSet(),
- hasHeader: hasHeader,
- hasBlock: hasBlock,
- getHeader: getHeader,
- getBlock: getBlock,
- headHeader: headHeader,
- headBlock: headBlock,
- headFastBlock: headFastBlock,
- getTd: getTd,
- insertHeaders: insertHeaders,
- insertBlocks: insertBlocks,
- insertReceipts: insertReceipts,
- dropPeer: dropPeer,
- newPeerCh: make(chan *peer, 1),
- hashCh: make(chan hashPack, 1),
- blockCh: make(chan blockPack, 1),
- headerCh: make(chan headerPack, 1),
- bodyCh: make(chan dataPack, 1),
- receiptCh: make(chan dataPack, 1),
- blockWakeCh: make(chan bool, 1),
- bodyWakeCh: make(chan bool, 1),
- receiptWakeCh: make(chan bool, 1),
+ mode: mode,
+ mux: mux,
+ queue: newQueue(stateDb),
+ peers: newPeerSet(),
+ hasHeader: hasHeader,
+ hasBlock: hasBlock,
+ getHeader: getHeader,
+ getBlock: getBlock,
+ headHeader: headHeader,
+ headBlock: headBlock,
+ headFastBlock: headFastBlock,
+ commitHeadBlock: commitHeadBlock,
+ getTd: getTd,
+ insertHeaders: insertHeaders,
+ insertBlocks: insertBlocks,
+ insertReceipts: insertReceipts,
+ dropPeer: dropPeer,
+ newPeerCh: make(chan *peer, 1),
+ hashCh: make(chan dataPack, 1),
+ blockCh: make(chan dataPack, 1),
+ headerCh: make(chan dataPack, 1),
+ bodyCh: make(chan dataPack, 1),
+ receiptCh: make(chan dataPack, 1),
+ stateCh: make(chan dataPack, 1),
+ blockWakeCh: make(chan bool, 1),
+ bodyWakeCh: make(chan bool, 1),
+ receiptWakeCh: make(chan bool, 1),
+ stateWakeCh: make(chan bool, 1),
}
}
@@ -272,7 +193,7 @@ func (d *Downloader) Boundaries() (uint64, uint64) {
d.syncStatsLock.RLock()
defer d.syncStatsLock.RUnlock()
- return d.syncStatsOrigin, d.syncStatsHeight
+ return d.syncStatsChainOrigin, d.syncStatsChainHeight
}
// Synchronising returns whether the downloader is currently retrieving blocks.
@@ -284,10 +205,11 @@ func (d *Downloader) Synchronising() bool {
// used for fetching hashes and blocks from.
func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
- getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getReceipts receiptFetcherFn) error {
+ getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
+ getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
glog.V(logger.Detail).Infoln("Registering peer", id)
- if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts)); err != nil {
+ if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
glog.V(logger.Error).Infoln("Register failed:", err)
return err
}
@@ -357,12 +279,18 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error
d.queue.Reset()
d.peers.Reset()
- for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh} {
+ for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
select {
case <-ch:
default:
}
}
+ // Reset and ephemeral sync statistics
+ d.syncStatsLock.Lock()
+ d.syncStatsStateTotal = 0
+ d.syncStatsStateDone = 0
+ d.syncStatsLock.Unlock()
+
// Create cancel channel for aborting mid-flight
d.cancelLock.Lock()
d.cancelCh = make(chan struct{})
@@ -414,17 +342,17 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
return err
}
d.syncStatsLock.Lock()
- if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin {
- d.syncStatsOrigin = origin
+ if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
+ d.syncStatsChainOrigin = origin
}
- d.syncStatsHeight = latest
+ d.syncStatsChainHeight = latest
d.syncStatsLock.Unlock()
// Initiate the sync using a concurrent hash and block retrieval algorithm
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- d.queue.Prepare(origin+1, 1)
+ d.queue.Prepare(origin+1, d.mode, 0)
errc := make(chan error, 2)
go func() { errc <- d.fetchHashes61(p, td, origin+1) }()
@@ -449,26 +377,27 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
return err
}
d.syncStatsLock.Lock()
- if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin {
- d.syncStatsOrigin = origin
+ if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
+ d.syncStatsChainOrigin = origin
}
- d.syncStatsHeight = latest
+ d.syncStatsChainHeight = latest
d.syncStatsLock.Unlock()
// Initiate the sync using a concurrent header and content retrieval algorithm
- parts := 1
- if d.mode == FastSync {
- parts = 2 // receipts are fetched too
+ pivot := uint64(0)
+ if latest > uint64(minFullBlocks) {
+ pivot = latest - uint64(minFullBlocks)
}
- d.queue.Prepare(origin+1, parts)
+ d.queue.Prepare(origin+1, d.mode, pivot)
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- errc := make(chan error, 3)
+ errc := make(chan error, 4)
go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved
+ go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync
go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync
- go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal sync
+ go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync
// If any fetcher fails, cancel the others
var fail error
@@ -538,14 +467,14 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
case <-d.hashCh:
// Out of bounds hashes received, ignore them
- case blockPack := <-d.blockCh:
+ case packet := <-d.blockCh:
// Discard anything not from the origin peer
- if blockPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", blockPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
- blocks := blockPack.blocks
+ blocks := packet.(*blockPack).blocks
if len(blocks) != 1 {
glog.V(logger.Debug).Infof("%v: invalid number of head blocks: %d != 1", p, len(blocks))
return 0, errBadPeer
@@ -584,14 +513,14 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
case <-d.cancelCh:
return 0, errCancelHashFetch
- case hashPack := <-d.hashCh:
+ case packet := <-d.hashCh:
// Discard anything not from the origin peer
- if hashPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
- hashes := hashPack.hashes
+ hashes := packet.(*hashPack).hashes
if len(hashes) == 0 {
glog.V(logger.Debug).Infof("%v: empty head hash set", p)
return 0, errEmptyHashSet
@@ -639,14 +568,14 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
case <-d.cancelCh:
return 0, errCancelHashFetch
- case hashPack := <-d.hashCh:
+ case packet := <-d.hashCh:
// Discard anything not from the origin peer
- if hashPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
- hashes := hashPack.hashes
+ hashes := packet.(*hashPack).hashes
if len(hashes) != 1 {
glog.V(logger.Debug).Infof("%v: invalid search hash set (%d)", p, len(hashes))
return 0, errBadPeer
@@ -716,17 +645,17 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
case <-d.bodyCh:
// Out of bounds eth/62 block bodies received, ignore them
- case hashPack := <-d.hashCh:
+ case packet := <-d.hashCh:
// Make sure the active peer is giving us the hashes
- if hashPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
break
}
hashReqTimer.UpdateSince(request)
timeout.Stop()
// If no more hashes are inbound, notify the block fetcher and return
- if len(hashPack.hashes) == 0 {
+ if packet.Items() == 0 {
glog.V(logger.Debug).Infof("%v: no available hashes", p)
select {
@@ -751,12 +680,13 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
return nil
}
gotHashes = true
+ hashes := packet.(*hashPack).hashes
// Otherwise insert all the new hashes, aborting in case of junk
- glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashPack.hashes), from)
+ glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashes), from)
- inserts := d.queue.Schedule61(hashPack.hashes, true)
- if len(inserts) != len(hashPack.hashes) {
+ inserts := d.queue.Schedule61(hashes, true)
+ if len(inserts) != len(hashes) {
glog.V(logger.Debug).Infof("%v: stale hashes", p)
return errBadPeer
}
@@ -776,7 +706,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
return nil
}
// Queue not yet full, fetch the next batch
- from += uint64(len(hashPack.hashes))
+ from += uint64(len(hashes))
getHashes(from)
case <-timeout.C:
@@ -813,16 +743,17 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
case <-d.bodyCh:
// Out of bounds eth/62 block bodies received, ignore them
- case blockPack := <-d.blockCh:
+ case packet := <-d.blockCh:
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
- if peer := d.peers.Peer(blockPack.peerId); peer != nil {
+ if peer := d.peers.Peer(packet.PeerId()); peer != nil {
// Deliver the received chunk of blocks, and demote in case of errors
- err := d.queue.Deliver61(blockPack.peerId, blockPack.blocks)
+ blocks := packet.(*blockPack).blocks
+ err := d.queue.DeliverBlocks(peer.id, blocks)
switch err {
case nil:
// If no blocks were delivered, demote the peer (need the delivery above)
- if len(blockPack.blocks) == 0 {
+ if len(blocks) == 0 {
peer.Demote()
peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
@@ -831,7 +762,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// All was successful, promote the peer and potentially start processing
peer.Promote()
peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
+ glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
go d.process()
case errInvalidChain:
@@ -891,7 +822,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
return errNoPeers
}
// Check for block request timeouts and demote the responsible peers
- for _, pid := range d.queue.Expire61(blockHardTTL) {
+ for _, pid := range d.queue.ExpireBlocks(blockHardTTL) {
if peer := d.peers.Peer(pid); peer != nil {
peer.Demote()
glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
@@ -907,7 +838,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
}
// Send a download request to all idle peers, until throttled
throttled := false
- idles, total := d.peers.BlockIdlePeers(61)
+ idles, total := d.peers.BlockIdlePeers()
for _, peer := range idles {
// Short circuit if throttling activated
@@ -918,7 +849,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// Reserve a chunk of hashes for a peer. A nil can mean either that
// no more hashes are available, or that the peer is known not to
// have them.
- request := d.queue.Reserve61(peer, peer.BlockCapacity())
+ request := d.queue.ReserveBlocks(peer, peer.BlockCapacity())
if request == nil {
continue
}
@@ -928,7 +859,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// Fetch the chunk and make sure any errors return the hashes to the queue
if err := peer.Fetch61(request); err != nil {
glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer)
- d.queue.Cancel61(request)
+ d.queue.CancelBlocks(request)
}
}
// Make sure that we have peers available for fetching. If all peers have been tried
@@ -954,14 +885,14 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
case <-d.cancelCh:
return 0, errCancelBlockFetch
- case headerPack := <-d.headerCh:
+ case packet := <-d.headerCh:
// Discard anything not from the origin peer
- if headerPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
- headers := headerPack.headers
+ headers := packet.(*headerPack).headers
if len(headers) != 1 {
glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
return 0, errBadPeer
@@ -1014,14 +945,14 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
case <-d.cancelCh:
return 0, errCancelHashFetch
- case headerPack := <-d.headerCh:
+ case packet := <-d.headerCh:
// Discard anything not from the origin peer
- if headerPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
- headers := headerPack.headers
+ headers := packet.(*headerPack).headers
if len(headers) == 0 {
glog.V(logger.Debug).Infof("%v: empty head header set", p)
return 0, errEmptyHeaderSet
@@ -1069,14 +1000,14 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
case <-d.cancelCh:
return 0, errCancelHashFetch
- case headerPack := <-d.headerCh:
+ case packer := <-d.headerCh:
// Discard anything not from the origin peer
- if headerPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId)
+ if packer.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packer.PeerId())
break
}
// Make sure the peer actually gave something valid
- headers := headerPack.headers
+ headers := packer.(*headerPack).headers
if len(headers) != 1 {
glog.V(logger.Debug).Infof("%v: invalid search header set (%d)", p, len(headers))
return 0, errBadPeer
@@ -1150,20 +1081,20 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
case <-d.blockCh:
// Out of bounds eth/61 blocks received, ignore them
- case headerPack := <-d.headerCh:
+ case packet := <-d.headerCh:
// Make sure the active peer is giving us the headers
- if headerPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", headerPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", packet.PeerId())
break
}
headerReqTimer.UpdateSince(request)
timeout.Stop()
// If no more headers are inbound, notify the content fetchers and return
- if len(headerPack.headers) == 0 {
+ if packet.Items() == 0 {
glog.V(logger.Debug).Infof("%v: no available headers", p)
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@@ -1187,26 +1118,27 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
return nil
}
gotHeaders = true
+ headers := packet.(*headerPack).headers
// Otherwise insert all the new headers, aborting in case of junk
- glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headerPack.headers), from)
+ glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
if d.mode == FastSync || d.mode == LightSync {
- if n, err := d.insertHeaders(headerPack.headers, false); err != nil {
- glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headerPack.headers[n].Number, headerPack.headers[n].Hash().Bytes()[:4], err)
+ if n, err := d.insertHeaders(headers, false); err != nil {
+ glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headers[n].Number, headers[n].Hash().Bytes()[:4], err)
return errInvalidChain
}
}
if d.mode == FullSync || d.mode == FastSync {
- inserts := d.queue.Schedule(headerPack.headers, from, d.mode == FastSync)
- if len(inserts) != len(headerPack.headers) {
+ inserts := d.queue.Schedule(headers, from)
+ if len(inserts) != len(headers) {
glog.V(logger.Debug).Infof("%v: stale headers", p)
return errBadPeer
}
}
// Notify the content fetchers of new headers, but stop if queue is full
cont := d.queue.PendingBlocks() < maxQueuedHeaders || d.queue.PendingReceipts() < maxQueuedHeaders
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
if cont {
// We still have headers to fetch, send continuation wake signal (potential)
select {
@@ -1223,7 +1155,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
}
}
// Queue not yet full, fetch the next batch
- from += uint64(len(headerPack.headers))
+ from += uint64(len(headers))
getHeaders(from)
case <-timeout.C:
@@ -1233,7 +1165,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
d.dropPeer(p.id)
// Finish the sync gracefully instead of dumping the gathered data though
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@@ -1251,19 +1183,19 @@ func (d *Downloader) fetchBodies(from uint64) error {
glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)
var (
- deliver = func(packet interface{}) error {
+ deliver = func(packet dataPack) error {
pack := packet.(*bodyPack)
- return d.queue.DeliverBlocks(pack.peerId, pack.transactions, pack.uncles)
+ return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
}
- expire = func() []string { return d.queue.ExpireBlocks(bodyHardTTL) }
+ expire = func() []string { return d.queue.ExpireBodies(bodyHardTTL) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
capacity = func(p *peer) int { return p.BlockCapacity() }
- getIdles = func() ([]*peer, int) { return d.peers.BlockIdlePeers(62) }
+ getIdles = func() ([]*peer, int) { return d.peers.BodyIdlePeers() }
setIdle = func(p *peer) { p.SetBlocksIdle() }
)
- err := d.fetchParts(from, errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
- d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBlocks, d.bodyFetchHook,
- fetch, d.queue.CancelBlocks, capacity, getIdles, setIdle, "Body")
+ err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
+ d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBodies, d.bodyFetchHook,
+ fetch, d.queue.CancelBodies, capacity, getIdles, setIdle, "Body")
glog.V(logger.Debug).Infof("Block body download terminated: %v", err)
return err
@@ -1276,7 +1208,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)
var (
- deliver = func(packet interface{}) error {
+ deliver = func(packet dataPack) error {
pack := packet.(*receiptPack)
return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
}
@@ -1285,7 +1217,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
capacity = func(p *peer) int { return p.ReceiptCapacity() }
setIdle = func(p *peer) { p.SetReceiptsIdle() }
)
- err := d.fetchParts(from, errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
+ err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.ThrottleReceipts, d.queue.ReserveReceipts, d.receiptFetchHook,
fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt")
@@ -1293,10 +1225,46 @@ func (d *Downloader) fetchReceipts(from uint64) error {
return err
}
+// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any
+// available peers, reserving a chunk of nodes for each, waiting for delivery and
+// also periodically checking for timeouts.
+func (d *Downloader) fetchNodeData() error {
+ glog.V(logger.Debug).Infof("Downloading node state data")
+
+ var (
+ deliver = func(packet dataPack) error {
+ start := time.Now()
+ done, found, err := d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states)
+
+ d.syncStatsLock.Lock()
+ totalDone, totalKnown := d.syncStatsStateDone+uint64(done), d.syncStatsStateTotal+uint64(found)
+ d.syncStatsStateDone, d.syncStatsStateTotal = totalDone, totalKnown
+ d.syncStatsLock.Unlock()
+
+ glog.V(logger.Info).Infof("imported %d [%d / %d] state entries in %v.", done, totalDone, totalKnown, time.Since(start))
+ return err
+ }
+ expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) }
+ throttle = func() bool { return false }
+ reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
+ return d.queue.ReserveNodeData(p, count), false, nil
+ }
+ fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
+ capacity = func(p *peer) int { return p.NodeDataCapacity() }
+ setIdle = func(p *peer) { p.SetNodeDataIdle() }
+ )
+ err := d.fetchParts(errCancelReceiptFetch, d.stateCh, deliver, d.stateWakeCh, expire,
+ d.queue.PendingNodeData, throttle, reserve, nil, fetch, d.queue.CancelNodeData,
+ capacity, d.peers.ReceiptIdlePeers, setIdle, "State")
+
+ glog.V(logger.Debug).Infof("Node state data download terminated: %v", err)
+ return err
+}
+
// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
-func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan dataPack, deliver func(packet interface{}) error, wakeCh chan bool,
+func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool,
expire func() []string, pending func() int, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header),
fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, idle func() ([]*peer, int), setIdle func(*peer), kind string) error {
@@ -1327,7 +1295,7 @@ func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan da
switch err := deliver(packet); err {
case nil:
// If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!)
- if packet.Empty() {
+ if packet.Items() == 0 {
peer.Demote()
setIdle(peer)
glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
@@ -1441,7 +1409,11 @@ func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan da
continue
}
if glog.V(logger.Detail) {
- glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
+ if len(request.Headers) > 0 {
+ glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
+ } else {
+ glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind))
+ }
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {
@@ -1528,7 +1500,9 @@ func (d *Downloader) process() {
blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
case d.mode == FastSync:
blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
- receipts = append(receipts, result.Receipts)
+ if result.Header.Number.Uint64() <= d.queue.fastSyncPivot {
+ receipts = append(receipts, result.Receipts)
+ }
case d.mode == LightSync:
headers = append(headers, result.Header)
}
@@ -1539,12 +1513,16 @@ func (d *Downloader) process() {
index int
)
switch {
- case d.mode == FullSync:
- index, err = d.insertBlocks(blocks)
- case d.mode == FastSync:
- index, err = d.insertReceipts(blocks, receipts)
- case d.mode == LightSync:
+ case len(headers) > 0:
index, err = d.insertHeaders(headers, true)
+
+ case len(receipts) > 0:
+ index, err = d.insertReceipts(blocks, receipts)
+ if err == nil && blocks[len(blocks)-1].NumberU64() == d.queue.fastSyncPivot {
+ err = d.commitHeadBlock(blocks[len(blocks)-1].Hash())
+ }
+ default:
+ index, err = d.insertBlocks(blocks)
}
if err != nil {
glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
@@ -1557,125 +1535,47 @@ func (d *Downloader) process() {
}
}
-// DeliverHashes61 injects a new batch of hashes received from a remote node into
+// DeliverHashes injects a new batch of hashes received from a remote node into
// the download schedule. This is usually invoked through the BlockHashesMsg by
// the protocol handler.
-func (d *Downloader) DeliverHashes61(id string, hashes []common.Hash) (err error) {
- // Update the delivery metrics for both good and failed deliveries
- hashInMeter.Mark(int64(len(hashes)))
- defer func() {
- if err != nil {
- hashDropMeter.Mark(int64(len(hashes)))
- }
- }()
- // Make sure the downloader is active
- if atomic.LoadInt32(&d.synchronising) == 0 {
- return errNoSyncActive
- }
- // Deliver or abort if the sync is canceled while queuing
- d.cancelLock.RLock()
- cancel := d.cancelCh
- d.cancelLock.RUnlock()
-
- select {
- case d.hashCh <- hashPack{id, hashes}:
- return nil
-
- case <-cancel:
- return errNoSyncActive
- }
+func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) (err error) {
+ return d.deliver(id, d.hashCh, &hashPack{id, hashes}, hashInMeter, hashDropMeter)
}
-// DeliverBlocks61 injects a new batch of blocks received from a remote node.
+// DeliverBlocks injects a new batch of blocks received from a remote node.
// This is usually invoked through the BlocksMsg by the protocol handler.
-func (d *Downloader) DeliverBlocks61(id string, blocks []*types.Block) (err error) {
- // Update the delivery metrics for both good and failed deliveries
- blockInMeter.Mark(int64(len(blocks)))
- defer func() {
- if err != nil {
- blockDropMeter.Mark(int64(len(blocks)))
- }
- }()
- // Make sure the downloader is active
- if atomic.LoadInt32(&d.synchronising) == 0 {
- return errNoSyncActive
- }
- // Deliver or abort if the sync is canceled while queuing
- d.cancelLock.RLock()
- cancel := d.cancelCh
- d.cancelLock.RUnlock()
-
- select {
- case d.blockCh <- blockPack{id, blocks}:
- return nil
-
- case <-cancel:
- return errNoSyncActive
- }
+func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) (err error) {
+ return d.deliver(id, d.blockCh, &blockPack{id, blocks}, blockInMeter, blockDropMeter)
}
// DeliverHeaders injects a new batch of blck headers received from a remote
// node into the download schedule.
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
- // Update the delivery metrics for both good and failed deliveries
- headerInMeter.Mark(int64(len(headers)))
- defer func() {
- if err != nil {
- headerDropMeter.Mark(int64(len(headers)))
- }
- }()
- // Make sure the downloader is active
- if atomic.LoadInt32(&d.synchronising) == 0 {
- return errNoSyncActive
- }
- // Deliver or abort if the sync is canceled while queuing
- d.cancelLock.RLock()
- cancel := d.cancelCh
- d.cancelLock.RUnlock()
-
- select {
- case d.headerCh <- headerPack{id, headers}:
- return nil
-
- case <-cancel:
- return errNoSyncActive
- }
+ return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
}
// DeliverBodies injects a new batch of block bodies received from a remote node.
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
- // Update the delivery metrics for both good and failed deliveries
- bodyInMeter.Mark(int64(len(transactions)))
- defer func() {
- if err != nil {
- bodyDropMeter.Mark(int64(len(transactions)))
- }
- }()
- // Make sure the downloader is active
- if atomic.LoadInt32(&d.synchronising) == 0 {
- return errNoSyncActive
- }
- // Deliver or abort if the sync is canceled while queuing
- d.cancelLock.RLock()
- cancel := d.cancelCh
- d.cancelLock.RUnlock()
-
- select {
- case d.bodyCh <- &bodyPack{id, transactions, uncles}:
- return nil
-
- case <-cancel:
- return errNoSyncActive
- }
+ return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
}
// DeliverReceipts injects a new batch of receipts received from a remote node.
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
+ return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
+}
+
+// DeliverNodeData injects a new batch of node state data received from a remote node.
+func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
+ return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
+}
+
+// deliver injects a new batch of data received from a remote node.
+func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
// Update the delivery metrics for both good and failed deliveries
- receiptInMeter.Mark(int64(len(receipts)))
+ inMeter.Mark(int64(packet.Items()))
defer func() {
if err != nil {
- receiptDropMeter.Mark(int64(len(receipts)))
+ dropMeter.Mark(int64(packet.Items()))
}
}()
// Make sure the downloader is active
@@ -1688,7 +1588,7 @@ func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (er
d.cancelLock.RUnlock()
select {
- case d.receiptCh <- &receiptPack{id, receipts}:
+ case destCh <- packet:
return nil
case <-cancel:
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 68c4ca26e..8944ae4b0 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -27,11 +27,13 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
+ "github.com/ethereum/go-ethereum/trie"
)
var (
@@ -115,6 +117,7 @@ func makeChainFork(n, f int, parent *types.Block, parentReceipts types.Receipts)
// downloadTester is a test simulator for mocking out local block chain.
type downloadTester struct {
+ stateDb ethdb.Database
downloader *Downloader
ownHashes []common.Hash // Hash chain belonging to the tester
@@ -146,8 +149,10 @@ func newTester(mode SyncMode) *downloadTester {
peerReceipts: make(map[string]map[common.Hash]types.Receipts),
peerChainTds: make(map[string]map[common.Hash]*big.Int),
}
- tester.downloader = New(mode, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader, tester.getBlock,
- tester.headHeader, tester.headBlock, tester.headFastBlock, tester.getTd, tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.dropPeer)
+ tester.stateDb, _ = ethdb.NewMemDatabase()
+ tester.downloader = New(mode, tester.stateDb, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader,
+ tester.getBlock, tester.headHeader, tester.headBlock, tester.headFastBlock, tester.commitHeadBlock, tester.getTd,
+ tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.dropPeer)
return tester
}
@@ -213,7 +218,7 @@ func (dl *downloadTester) headHeader() *types.Header {
return header
}
}
- return nil
+ return genesis.Header()
}
// headBlock retrieves the current head block from the canonical chain.
@@ -223,10 +228,12 @@ func (dl *downloadTester) headBlock() *types.Block {
for i := len(dl.ownHashes) - 1; i >= 0; i-- {
if block := dl.getBlock(dl.ownHashes[i]); block != nil {
- return block
+ if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil {
+ return block
+ }
}
}
- return nil
+ return genesis
}
// headFastBlock retrieves the current head fast-sync block from the canonical chain.
@@ -236,12 +243,20 @@ func (dl *downloadTester) headFastBlock() *types.Block {
for i := len(dl.ownHashes) - 1; i >= 0; i-- {
if block := dl.getBlock(dl.ownHashes[i]); block != nil {
- if _, ok := dl.ownReceipts[block.Hash()]; ok {
- return block
- }
+ return block
}
}
- return nil
+ return genesis
+}
+
+// commitHeadBlock manually sets the head block to a given hash.
+func (dl *downloadTester) commitHeadBlock(hash common.Hash) error {
+ // For now only check that the state trie is correct
+ if block := dl.getBlock(hash); block != nil {
+ _, err := trie.NewSecure(block.Root(), dl.stateDb)
+ return err
+ }
+ return fmt.Errorf("non existent block: %x", hash[:4])
}
// getTd retrieves the block's total difficulty from the canonical chain.
@@ -283,6 +298,7 @@ func (dl *downloadTester) insertBlocks(blocks types.Blocks) (int, error) {
dl.ownHashes = append(dl.ownHashes, block.Hash())
dl.ownHeaders[block.Hash()] = block.Header()
dl.ownBlocks[block.Hash()] = block
+ dl.stateDb.Put(block.Root().Bytes(), []byte{})
dl.ownChainTd[block.Hash()] = dl.ownChainTd[block.ParentHash()]
}
return len(blocks), nil
@@ -321,13 +337,13 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha
var err error
switch version {
case 61:
- err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), nil, nil, nil, nil)
+ err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), nil, nil, nil, nil, nil)
case 62:
- err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil)
+ err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil)
case 63:
- err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay))
+ err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
case 64:
- err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay))
+ err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
}
if err == nil {
// Assign the owned hashes, headers and blocks to the peer (deep copy)
@@ -399,7 +415,7 @@ func (dl *downloadTester) peerGetRelHashesFn(id string, delay time.Duration) fun
// Delay delivery a bit to allow attacks to unfold
go func() {
time.Sleep(time.Millisecond)
- dl.downloader.DeliverHashes61(id, result)
+ dl.downloader.DeliverHashes(id, result)
}()
return nil
}
@@ -424,7 +440,7 @@ func (dl *downloadTester) peerGetAbsHashesFn(id string, delay time.Duration) fun
// Delay delivery a bit to allow attacks to unfold
go func() {
time.Sleep(time.Millisecond)
- dl.downloader.DeliverHashes61(id, result)
+ dl.downloader.DeliverHashes(id, result)
}()
return nil
}
@@ -447,7 +463,7 @@ func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([
result = append(result, block)
}
}
- go dl.downloader.DeliverBlocks61(id, result)
+ go dl.downloader.DeliverBlocks(id, result)
return nil
}
@@ -553,17 +569,54 @@ func (dl *downloadTester) peerGetReceiptsFn(id string, delay time.Duration) func
}
}
+// peerGetNodeDataFn constructs a getNodeData method associated with a particular
+// peer in the download tester. The returned function can be used to retrieve
+// batches of node state data from the particularly requested peer.
+func (dl *downloadTester) peerGetNodeDataFn(id string, delay time.Duration) func([]common.Hash) error {
+ return func(hashes []common.Hash) error {
+ time.Sleep(delay)
+
+ dl.lock.RLock()
+ defer dl.lock.RUnlock()
+
+ results := make([][]byte, 0, len(hashes))
+ for _, hash := range hashes {
+ if data, err := testdb.Get(hash.Bytes()); err == nil {
+ results = append(results, data)
+ }
+ }
+ go dl.downloader.DeliverNodeData(id, results)
+
+ return nil
+ }
+}
+
// assertOwnChain checks if the local chain contains the correct number of items
// of the various chain components.
func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
- headers, blocks, receipts := length, length, length
+ assertOwnForkedChain(t, tester, 1, []int{length})
+}
+
+// assertOwnForkedChain checks if the local forked chain contains the correct
+// number of items of the various chain components.
+func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, lengths []int) {
+ // Initialize the counters for the first fork
+ headers, blocks, receipts := lengths[0], lengths[0], lengths[0]-minFullBlocks
+ if receipts < 0 {
+ receipts = 1
+ }
+ // Update the counters for each subsequent fork
+ for _, length := range lengths[1:] {
+ headers += length - common
+ blocks += length - common
+ receipts += length - common - minFullBlocks
+ }
switch tester.downloader.mode {
case FullSync:
receipts = 1
case LightSync:
blocks, receipts = 1, 1
}
-
if hs := len(tester.ownHeaders); hs != headers {
t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers)
}
@@ -573,6 +626,14 @@ func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
if rs := len(tester.ownReceipts); rs != receipts {
t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts)
}
+ // Verify the state trie too for fast syncs
+ if tester.downloader.mode == FastSync {
+ if index := lengths[len(lengths)-1] - minFullBlocks - 1; index > 0 {
+ if statedb := state.New(tester.ownHeaders[tester.ownHashes[index]].Root, tester.stateDb); statedb == nil {
+ t.Fatalf("state reconstruction failed")
+ }
+ }
+ }
}
// Tests that simple synchronization against a canonical chain works correctly.
@@ -647,7 +708,9 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
cached = len(tester.downloader.queue.blockDonePool)
if mode == FastSync {
if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
- cached = receipts
+ if tester.downloader.queue.resultCache[receipts].Header.Number.Uint64() < tester.downloader.queue.fastSyncPivot {
+ cached = receipts
+ }
}
}
tester.downloader.queue.lock.RUnlock()
@@ -704,7 +767,7 @@ func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) {
if err := tester.sync("fork B", nil); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
- assertOwnChain(t, tester, common+2*fork+1)
+ assertOwnForkedChain(t, tester, common+1, []int{common + fork + 1, common + fork + 1})
}
// Tests that an inactive downloader will not accept incoming hashes and blocks.
@@ -712,10 +775,10 @@ func TestInactiveDownloader61(t *testing.T) {
tester := newTester(FullSync)
// Check that neither hashes nor blocks are accepted
- if err := tester.downloader.DeliverHashes61("bad peer", []common.Hash{}); err != errNoSyncActive {
+ if err := tester.downloader.DeliverHashes("bad peer", []common.Hash{}); err != errNoSyncActive {
t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
}
- if err := tester.downloader.DeliverBlocks61("bad peer", []*types.Block{}); err != errNoSyncActive {
+ if err := tester.downloader.DeliverBlocks("bad peer", []*types.Block{}); err != errNoSyncActive {
t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
}
}
@@ -809,14 +872,6 @@ func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
id := fmt.Sprintf("peer #%d", i)
tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], headers, blocks, receipts)
}
- // Synchronise with the middle peer and make sure half of the blocks were retrieved
- id := fmt.Sprintf("peer #%d", targetPeers/2)
- if err := tester.sync(id, nil); err != nil {
- t.Fatalf("failed to synchronise blocks: %v", err)
- }
- assertOwnChain(t, tester, len(tester.peerHashes[id]))
-
- // Synchronise with the best peer and make sure everything is retrieved
if err := tester.sync("peer #0", nil); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
@@ -870,8 +925,8 @@ func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, F
func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) }
func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
- // Create a small enough block chain to download
- targetBlocks := blockCacheLimit - 15
+ // Create a block chain to download
+ targetBlocks := 2*blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
tester := newTester(mode)
@@ -898,8 +953,8 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
bodiesNeeded++
}
}
- for _, receipt := range receipts {
- if mode == FastSync && len(receipt) > 0 {
+ for hash, receipt := range receipts {
+ if mode == FastSync && len(receipt) > 0 && headers[hash].Number.Uint64() <= uint64(targetBlocks-minFullBlocks) {
receiptsNeeded++
}
}
diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go
index 92acb6ba8..d6fcfa25c 100644
--- a/eth/downloader/metrics.go
+++ b/eth/downloader/metrics.go
@@ -47,4 +47,9 @@ var (
receiptReqTimer = metrics.NewTimer("eth/downloader/receipts/req")
receiptDropMeter = metrics.NewMeter("eth/downloader/receipts/drop")
receiptTimeoutMeter = metrics.NewMeter("eth/downloader/receipts/timeout")
+
+ stateInMeter = metrics.NewMeter("eth/downloader/states/in")
+ stateReqTimer = metrics.NewTimer("eth/downloader/states/req")
+ stateDropMeter = metrics.NewMeter("eth/downloader/states/drop")
+ stateTimeoutMeter = metrics.NewMeter("eth/downloader/states/timeout")
)
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 5fc0db587..5011d5d46 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -41,6 +41,7 @@ type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
type blockBodyFetcherFn func([]common.Hash) error
type receiptFetcherFn func([]common.Hash) error
+type stateFetcherFn func([]common.Hash) error
var (
errAlreadyFetching = errors.New("already fetching blocks from peer")
@@ -55,12 +56,16 @@ type peer struct {
blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
+ stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1)
rep int32 // Simple peer reputation
- blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request
- receiptCapacity int32 // Number of receipts allowed to fetch per request
- blockStarted time.Time // Time instance when the last block (body)fetch was started
- receiptStarted time.Time // Time instance when the last receipt fetch was started
+ blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request
+ receiptCapacity int32 // Number of receipts allowed to fetch per request
+ stateCapacity int32 // Number of node data pieces allowed to fetch per request
+
+ blockStarted time.Time // Time instance when the last block (body)fetch was started
+ receiptStarted time.Time // Time instance when the last receipt fetch was started
+ stateStarted time.Time // Time instance when the last node data fetch was started
ignored *set.Set // Set of hashes not to request (didn't have previously)
@@ -73,6 +78,7 @@ type peer struct {
getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies
getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts
+ getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data
version int // Eth protocol version number to switch strategies
}
@@ -82,12 +88,13 @@ type peer struct {
func newPeer(id string, version int, head common.Hash,
getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
- getReceipts receiptFetcherFn) *peer {
+ getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer {
return &peer{
id: id,
head: head,
blockCapacity: 1,
receiptCapacity: 1,
+ stateCapacity: 1,
ignored: set.New(),
getRelHashes: getRelHashes,
@@ -99,6 +106,7 @@ func newPeer(id string, version int, head common.Hash,
getBlockBodies: getBlockBodies,
getReceipts: getReceipts,
+ getNodeData: getNodeData,
version: version,
}
@@ -110,6 +118,7 @@ func (p *peer) Reset() {
atomic.StoreInt32(&p.receiptIdle, 0)
atomic.StoreInt32(&p.blockCapacity, 1)
atomic.StoreInt32(&p.receiptCapacity, 1)
+ atomic.StoreInt32(&p.stateCapacity, 1)
p.ignored.Clear()
}
@@ -167,6 +176,24 @@ func (p *peer) FetchReceipts(request *fetchRequest) error {
return nil
}
+// FetchNodeData sends a node state data retrieval request to the remote peer.
+func (p *peer) FetchNodeData(request *fetchRequest) error {
+ // Short circuit if the peer is already fetching
+ if !atomic.CompareAndSwapInt32(&p.stateIdle, 0, 1) {
+ return errAlreadyFetching
+ }
+ p.stateStarted = time.Now()
+
+ // Convert the hash set to a retrievable slice
+ hashes := make([]common.Hash, 0, len(request.Hashes))
+ for hash, _ := range request.Hashes {
+ hashes = append(hashes, hash)
+ }
+ go p.getNodeData(hashes)
+
+ return nil
+}
+
// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its block retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
@@ -188,6 +215,13 @@ func (p *peer) SetReceiptsIdle() {
p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle)
}
+// SetNodeDataIdle sets the peer to idle, allowing it to execute new retrieval
+// requests. Its node data retrieval allowance will also be updated either up- or
+// downwards, depending on whether the previous fetch completed in time or not.
+func (p *peer) SetNodeDataIdle() {
+ p.setIdle(p.stateStarted, stateSoftTTL, stateSoftTTL, MaxStateFetch, &p.stateCapacity, &p.stateIdle)
+}
+
// setIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its data retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
@@ -230,6 +264,12 @@ func (p *peer) ReceiptCapacity() int {
return int(atomic.LoadInt32(&p.receiptCapacity))
}
+// NodeDataCapacity retrieves the peers block download allowance based on its
+// previously discovered bandwidth capacity.
+func (p *peer) NodeDataCapacity() int {
+ return int(atomic.LoadInt32(&p.stateCapacity))
+}
+
// Promote increases the peer's reputation.
func (p *peer) Promote() {
atomic.AddInt32(&p.rep, 1)
@@ -340,39 +380,50 @@ func (ps *peerSet) AllPeers() []*peer {
// BlockIdlePeers retrieves a flat list of all the currently idle peers within the
// active peer set, ordered by their reputation.
-func (ps *peerSet) BlockIdlePeers(version int) ([]*peer, int) {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
-
- idle, total := make([]*peer, 0, len(ps.peers)), 0
- for _, p := range ps.peers {
- if (version == 61 && p.version == 61) || (version >= 62 && p.version >= 62) {
- if atomic.LoadInt32(&p.blockIdle) == 0 {
- idle = append(idle, p)
- }
- total++
- }
+func (ps *peerSet) BlockIdlePeers() ([]*peer, int) {
+ idle := func(p *peer) bool {
+ return atomic.LoadInt32(&p.blockIdle) == 0
}
- for i := 0; i < len(idle); i++ {
- for j := i + 1; j < len(idle); j++ {
- if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) {
- idle[i], idle[j] = idle[j], idle[i]
- }
- }
+ return ps.idlePeers(61, 61, idle)
+}
+
+// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
+// the active peer set, ordered by their reputation.
+func (ps *peerSet) BodyIdlePeers() ([]*peer, int) {
+ idle := func(p *peer) bool {
+ return atomic.LoadInt32(&p.blockIdle) == 0
}
- return idle, total
+ return ps.idlePeers(62, 64, idle)
}
-// ReceiptIdlePeers retrieves a flat list of all the currently idle peers within the
-// active peer set, ordered by their reputation.
+// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
+// within the active peer set, ordered by their reputation.
func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) {
+ idle := func(p *peer) bool {
+ return atomic.LoadInt32(&p.receiptIdle) == 0
+ }
+ return ps.idlePeers(63, 64, idle)
+}
+
+// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
+// peers within the active peer set, ordered by their reputation.
+func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) {
+ idle := func(p *peer) bool {
+ return atomic.LoadInt32(&p.stateIdle) == 0
+ }
+ return ps.idlePeers(63, 64, idle)
+}
+
+// idlePeers retrieves a flat list of all currently idle peers satisfying the
+// protocol version constraints, using the provided function to check idleness.
+func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool) ([]*peer, int) {
ps.lock.RLock()
defer ps.lock.RUnlock()
idle, total := make([]*peer, 0, len(ps.peers)), 0
for _, p := range ps.peers {
- if p.version >= 63 {
- if atomic.LoadInt32(&p.receiptIdle) == 0 {
+ if p.version >= minProtocol && p.version <= maxProtocol {
+ if idleCheck(p) {
idle = append(idle, p)
}
total++
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index c53ad939e..942ed0d63 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -26,9 +26,13 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/trie"
"github.com/rcrowley/go-metrics"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
@@ -39,13 +43,14 @@ var (
var (
errNoFetchesPending = errors.New("no fetches pending")
+ errStateSyncPending = errors.New("state trie sync already scheduled")
errStaleDelivery = errors.New("stale delivery")
)
// fetchRequest is a currently running data retrieval operation.
type fetchRequest struct {
Peer *peer // Peer to which the request was sent
- Hashes map[common.Hash]int // [eth/61] Requested block with their insertion index (priority)
+ Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority)
Headers []*types.Header // [eth/62] Requested headers, sorted by request order
Time time.Time // Time when the request was made
}
@@ -64,6 +69,9 @@ type fetchResult struct {
// queue represents hashes that are either need fetching or are being fetched
type queue struct {
+ mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
+ fastSyncPivot uint64 // Block number where the fast sync pivots into archive synchronisation mode
+
hashPool map[common.Hash]int // [eth/61] Pending hashes, mapping to their insertion index (priority)
hashQueue *prque.Prque // [eth/61] Priority queue of the block hashes to fetch
hashCounter int // [eth/61] Counter indexing the added hashes to ensure retrieval order
@@ -80,15 +88,22 @@ type queue struct {
receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations
receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches
+ stateTaskIndex int // [eth/63] Counter indexing the added hashes to ensure prioritized retrieval order
+ stateTaskPool map[common.Hash]int // [eth/63] Pending node data retrieval tasks, mapping to their priority
+ stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for
+ statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations
+
+ stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly
+ stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator
+
resultCache []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block-chain
- resultParts int // Number of fetch components required to complete an item
lock sync.RWMutex
}
// newQueue creates a new download queue for scheduling block retrieval.
-func newQueue() *queue {
+func newQueue(stateDb ethdb.Database) *queue {
return &queue{
hashPool: make(map[common.Hash]int),
hashQueue: prque.New(),
@@ -100,6 +115,10 @@ func newQueue() *queue {
receiptTaskQueue: prque.New(),
receiptPendPool: make(map[string]*fetchRequest),
receiptDonePool: make(map[common.Hash]struct{}),
+ stateTaskPool: make(map[common.Hash]int),
+ stateTaskQueue: prque.New(),
+ statePendPool: make(map[string]*fetchRequest),
+ stateDatabase: stateDb,
resultCache: make([]*fetchResult, blockCacheLimit),
}
}
@@ -109,6 +128,9 @@ func (q *queue) Reset() {
q.lock.Lock()
defer q.lock.Unlock()
+ q.mode = FullSync
+ q.fastSyncPivot = 0
+
q.hashPool = make(map[common.Hash]int)
q.hashQueue.Reset()
q.hashCounter = 0
@@ -125,9 +147,14 @@ func (q *queue) Reset() {
q.receiptPendPool = make(map[string]*fetchRequest)
q.receiptDonePool = make(map[common.Hash]struct{})
+ q.stateTaskIndex = 0
+ q.stateTaskPool = make(map[common.Hash]int)
+ q.stateTaskQueue.Reset()
+ q.statePendPool = make(map[string]*fetchRequest)
+ q.stateScheduler = nil
+
q.resultCache = make([]*fetchResult, blockCacheLimit)
q.resultOffset = 0
- q.resultParts = 0
}
// PendingBlocks retrieves the number of block (body) requests pending for retrieval.
@@ -146,12 +173,20 @@ func (q *queue) PendingReceipts() int {
return q.receiptTaskQueue.Size()
}
+// PendingNodeData retrieves the number of node data entries pending for retrieval.
+func (q *queue) PendingNodeData() int {
+ q.lock.RLock()
+ defer q.lock.RUnlock()
+
+ return q.stateTaskQueue.Size()
+}
+
// InFlight retrieves the number of fetch requests currently in flight.
func (q *queue) InFlight() int {
q.lock.RLock()
defer q.lock.RUnlock()
- return len(q.blockPendPool) + len(q.receiptPendPool)
+ return len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
}
// Idle returns if the queue is fully idle or has some data still inside. This
@@ -160,8 +195,8 @@ func (q *queue) Idle() bool {
q.lock.RLock()
defer q.lock.RUnlock()
- queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size()
- pending := len(q.blockPendPool) + len(q.receiptPendPool)
+ queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size()
+ pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
cached := len(q.blockDonePool) + len(q.receiptDonePool)
return (queued + pending + cached) == 0
@@ -227,7 +262,7 @@ func (q *queue) Schedule61(hashes []common.Hash, fifo bool) []common.Hash {
// Schedule adds a set of headers for the download queue for scheduling, returning
// the new headers encountered.
-func (q *queue) Schedule(headers []*types.Header, from uint64, receipts bool) []*types.Header {
+func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
q.lock.Lock()
defer q.lock.Unlock()
@@ -256,10 +291,21 @@ func (q *queue) Schedule(headers []*types.Header, from uint64, receipts bool) []
// Queue the header for content retrieval
q.blockTaskPool[hash] = header
q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
- if receipts {
+
+ if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot {
+ // Fast phase of the fast sync, retrieve receipts too
q.receiptTaskPool[hash] = header
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
}
+ if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot {
+ // Pivoting point of the fast sync, retrieve the state tries
+ q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase)
+ for _, hash := range q.stateScheduler.Missing(0) {
+ q.stateTaskPool[hash] = q.stateTaskIndex
+ q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex))
+ q.stateTaskIndex++
+ }
+ }
inserts = append(inserts, header)
q.headerHead = hash
from++
@@ -279,6 +325,9 @@ func (q *queue) GetHeadResult() *fetchResult {
if q.resultCache[0].Pending > 0 {
return nil
}
+ if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot && len(q.stateTaskPool) > 0 {
+ return nil
+ }
return q.resultCache[0]
}
@@ -291,9 +340,18 @@ func (q *queue) TakeResults() []*fetchResult {
// Accumulate all available results
results := []*fetchResult{}
for _, result := range q.resultCache {
+ // Stop if no more results are ready
if result == nil || result.Pending > 0 {
break
}
+ // The fast sync pivot block may only be processed after state fetch completes
+ if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot && len(q.stateTaskPool) > 0 {
+ break
+ }
+ // If we've just inserted the fast sync pivot, stop as the following batch needs different insertion
+ if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 {
+ break
+ }
results = append(results, result)
hash := result.Header.Hash()
@@ -312,31 +370,45 @@ func (q *queue) TakeResults() []*fetchResult {
return results
}
-// Reserve61 reserves a set of hashes for the given peer, skipping any previously
-// failed download.
-func (q *queue) Reserve61(p *peer, count int) *fetchRequest {
+// ReserveBlocks reserves a set of block hashes for the given peer, skipping any
+// previously failed download.
+func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest {
+ return q.reserveHashes(p, count, q.hashQueue, q.blockPendPool, len(q.resultCache)-len(q.blockDonePool))
+}
+
+// ReserveNodeData reserves a set of node data hashes for the given peer, skipping
+// any previously failed download.
+func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest {
+ return q.reserveHashes(p, count, q.stateTaskQueue, q.statePendPool, 0)
+}
+
+// reserveHashes reserves a set of hashes for the given peer, skipping previously
+// failed ones.
+func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, maxPending int) *fetchRequest {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the pool has been depleted, or if the peer's already
// downloading something (sanity check not to corrupt state)
- if q.hashQueue.Empty() {
+ if taskQueue.Empty() {
return nil
}
- if _, ok := q.blockPendPool[p.id]; ok {
+ if _, ok := pendPool[p.id]; ok {
return nil
}
// Calculate an upper limit on the hashes we might fetch (i.e. throttling)
- space := len(q.resultCache) - len(q.blockDonePool)
- for _, request := range q.blockPendPool {
- space -= len(request.Hashes)
+ allowance := maxPending
+ if allowance > 0 {
+ for _, request := range pendPool {
+ allowance -= len(request.Hashes)
+ }
}
// Retrieve a batch of hashes, skipping previously failed ones
send := make(map[common.Hash]int)
skip := make(map[common.Hash]int)
- for proc := 0; proc < space && len(send) < count && !q.hashQueue.Empty(); proc++ {
- hash, priority := q.hashQueue.Pop()
+ for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ {
+ hash, priority := taskQueue.Pop()
if p.ignored.Has(hash) {
skip[hash.(common.Hash)] = int(priority)
} else {
@@ -345,7 +417,7 @@ func (q *queue) Reserve61(p *peer, count int) *fetchRequest {
}
// Merge all the skipped hashes back
for hash, index := range skip {
- q.hashQueue.Push(hash, float32(index))
+ taskQueue.Push(hash, float32(index))
}
// Assemble and return the block download request
if len(send) == 0 {
@@ -356,19 +428,19 @@ func (q *queue) Reserve61(p *peer, count int) *fetchRequest {
Hashes: send,
Time: time.Now(),
}
- q.blockPendPool[p.id] = request
+ pendPool[p.id] = request
return request
}
-// ReserveBlocks reserves a set of body fetches for the given peer, skipping any
+// ReserveBodies reserves a set of body fetches for the given peer, skipping any
// previously failed downloads. Beside the next batch of needed fetches, it also
// returns a flag whether empty blocks were queued requiring processing.
-func (q *queue) ReserveBlocks(p *peer, count int) (*fetchRequest, bool, error) {
+func (q *queue) ReserveBodies(p *peer, count int) (*fetchRequest, bool, error) {
noop := func(header *types.Header) bool {
return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash
}
- return q.reserveFetch(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, noop)
+ return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, noop)
}
// ReserveReceipts reserves a set of receipt fetches for the given peer, skipping
@@ -378,13 +450,13 @@ func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error)
noop := func(header *types.Header) bool {
return header.ReceiptHash == types.EmptyRootHash
}
- return q.reserveFetch(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, noop)
+ return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, noop)
}
-// reserveFetch reserves a set of data download operations for a given peer,
+// reserveHeaders reserves a set of data download operations for a given peer,
// skipping any previously failed ones. This method is a generic version used
// by the individual special reservation functions.
-func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
+func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, noop func(*types.Header) bool) (*fetchRequest, bool, error) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -416,8 +488,12 @@ func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types
return nil, false, errInvalidChain
}
if q.resultCache[index] == nil {
+ components := 1
+ if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot {
+ components = 2
+ }
q.resultCache[index] = &fetchResult{
- Pending: q.resultParts,
+ Pending: components,
Header: header,
}
}
@@ -456,30 +532,36 @@ func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types
return request, progress, nil
}
-// Cancel61 aborts a fetch request, returning all pending hashes to the queue.
-func (q *queue) Cancel61(request *fetchRequest) {
- q.cancel(request, nil, q.blockPendPool)
+// CancelBlocks aborts a fetch request, returning all pending hashes to the queue.
+func (q *queue) CancelBlocks(request *fetchRequest) {
+ q.cancel(request, q.hashQueue, q.blockPendPool)
}
-// CancelBlocks aborts a body fetch request, returning all pending hashes to the
+// CancelBodies aborts a body fetch request, returning all pending headers to the
// task queue.
-func (q *queue) CancelBlocks(request *fetchRequest) {
+func (q *queue) CancelBodies(request *fetchRequest) {
q.cancel(request, q.blockTaskQueue, q.blockPendPool)
}
-// CancelReceipts aborts a body fetch request, returning all pending hashes to
+// CancelReceipts aborts a body fetch request, returning all pending headers to
// the task queue.
func (q *queue) CancelReceipts(request *fetchRequest) {
q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
}
+// CancelNodeData aborts a node state data fetch request, returning all pending
+// hashes to the task queue.
+func (q *queue) CancelNodeData(request *fetchRequest) {
+ q.cancel(request, q.stateTaskQueue, q.statePendPool)
+}
+
// Cancel aborts a fetch request, returning all pending hashes to the task queue.
func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
q.lock.Lock()
defer q.lock.Unlock()
for hash, index := range request.Hashes {
- q.hashQueue.Push(hash, float32(index))
+ taskQueue.Push(hash, float32(index))
}
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
@@ -509,29 +591,41 @@ func (q *queue) Revoke(peerId string) {
}
delete(q.receiptPendPool, peerId)
}
+ if request, ok := q.statePendPool[peerId]; ok {
+ for hash, index := range request.Hashes {
+ q.stateTaskQueue.Push(hash, float32(index))
+ }
+ delete(q.statePendPool, peerId)
+ }
}
-// Expire61 checks for in flight requests that exceeded a timeout allowance,
+// ExpireBlocks checks for in flight requests that exceeded a timeout allowance,
// canceling them and returning the responsible peers for penalization.
-func (q *queue) Expire61(timeout time.Duration) []string {
- return q.expire(timeout, q.blockPendPool, nil)
+func (q *queue) ExpireBlocks(timeout time.Duration) []string {
+ return q.expire(timeout, q.blockPendPool, q.hashQueue, blockTimeoutMeter)
}
-// ExpireBlocks checks for in flight block body requests that exceeded a timeout
+// ExpireBodies checks for in flight block body requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalization.
-func (q *queue) ExpireBlocks(timeout time.Duration) []string {
- return q.expire(timeout, q.blockPendPool, q.blockTaskQueue)
+func (q *queue) ExpireBodies(timeout time.Duration) []string {
+ return q.expire(timeout, q.blockPendPool, q.blockTaskQueue, bodyTimeoutMeter)
}
// ExpireReceipts checks for in flight receipt requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalization.
func (q *queue) ExpireReceipts(timeout time.Duration) []string {
- return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue)
+ return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter)
+}
+
+// ExpireNodeData checks for in flight node data requests that exceeded a timeout
+// allowance, canceling them and returning the responsible peers for penalization.
+func (q *queue) ExpireNodeData(timeout time.Duration) []string {
+ return q.expire(timeout, q.statePendPool, q.stateTaskQueue, stateTimeoutMeter)
}
// expire is the generic check that move expired tasks from a pending pool back
// into a task pool, returning all entities caught with expired tasks.
-func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque) []string {
+func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string {
q.lock.Lock()
defer q.lock.Unlock()
@@ -540,14 +634,11 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
for id, request := range pendPool {
if time.Since(request.Time) > timeout {
// Update the metrics with the timeout
- if len(request.Hashes) > 0 {
- blockTimeoutMeter.Mark(1)
- } else {
- bodyTimeoutMeter.Mark(1)
- }
+ timeoutMeter.Mark(1)
+
// Return any non satisfied requests to the pool
for hash, index := range request.Hashes {
- q.hashQueue.Push(hash, float32(index))
+ taskQueue.Push(hash, float32(index))
}
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
@@ -562,8 +653,8 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
return peers
}
-// Deliver61 injects a block retrieval response into the download queue.
-func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) {
+// DeliverBlocks injects a block retrieval response into the download queue.
+func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
q.lock.Lock()
defer q.lock.Unlock()
@@ -626,8 +717,8 @@ func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) {
}
}
-// DeliverBlocks injects a block (body) retrieval response into the results queue.
-func (q *queue) DeliverBlocks(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error {
+// DeliverBodies injects a block body retrieval response into the results queue.
+func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error {
reconstruct := func(header *types.Header, index int, result *fetchResult) error {
if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
return errInvalidBody
@@ -717,14 +808,84 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
}
}
+// DeliverNodeData injects a node state data retrieval response into the queue.
+func (q *queue) DeliverNodeData(id string, data [][]byte) (int, int, error) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ // Short circuit if the data was never requested
+ request := q.statePendPool[id]
+ if request == nil {
+ return 0, 0, errNoFetchesPending
+ }
+ stateReqTimer.UpdateSince(request.Time)
+ delete(q.statePendPool, id)
+
+ // If no data was retrieved, mark them as unavailable for the origin peer
+ if len(data) == 0 {
+ for hash, _ := range request.Hashes {
+ request.Peer.ignored.Add(hash)
+ }
+ }
+ // Iterate over the downloaded data and verify each of them
+ errs := make([]error, 0)
+ processed := 0
+ for _, blob := range data {
+ // Skip any blocks that were not requested
+ hash := common.BytesToHash(crypto.Sha3(blob))
+ if _, ok := request.Hashes[hash]; !ok {
+ errs = append(errs, fmt.Errorf("non-requested state data %x", hash))
+ continue
+ }
+ // Inject the next state trie item into the database
+ if err := q.stateScheduler.Process([]trie.SyncResult{{hash, blob}}); err != nil {
+ errs = []error{err}
+ break
+ }
+ processed++
+
+ delete(request.Hashes, hash)
+ delete(q.stateTaskPool, hash)
+ }
+ // Return all failed or missing fetches to the queue
+ for hash, index := range request.Hashes {
+ q.stateTaskQueue.Push(hash, float32(index))
+ }
+ // Also enqueue any newly required state trie nodes
+ discovered := 0
+ if len(q.stateTaskPool) < maxQueuedStates {
+ for _, hash := range q.stateScheduler.Missing(4 * MaxStateFetch) {
+ q.stateTaskPool[hash] = q.stateTaskIndex
+ q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex))
+ q.stateTaskIndex++
+ discovered++
+ }
+ }
+ // If none of the data items were good, it's a stale delivery
+ switch {
+ case len(errs) == 0:
+ return processed, discovered, nil
+
+ case len(errs) == len(request.Hashes):
+ return processed, discovered, errStaleDelivery
+
+ default:
+ return processed, discovered, fmt.Errorf("multiple failures: %v", errs)
+ }
+}
+
// Prepare configures the result cache to allow accepting and caching inbound
// fetch results.
-func (q *queue) Prepare(offset uint64, parts int) {
+func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64) {
q.lock.Lock()
defer q.lock.Unlock()
if q.resultOffset < offset {
q.resultOffset = offset
}
- q.resultParts = parts
+ q.fastSyncPivot = 0
+ if mode == FastSync {
+ q.fastSyncPivot = pivot
+ }
+ q.mode = mode
}
diff --git a/eth/downloader/types.go b/eth/downloader/types.go
new file mode 100644
index 000000000..221ef38f6
--- /dev/null
+++ b/eth/downloader/types.go
@@ -0,0 +1,137 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+ "fmt"
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+)
+
+// headerCheckFn is a callback type for verifying a header's presence in the local chain.
+type headerCheckFn func(common.Hash) bool
+
+// blockCheckFn is a callback type for verifying a block's presence in the local chain.
+type blockCheckFn func(common.Hash) bool
+
+// headerRetrievalFn is a callback type for retrieving a header from the local chain.
+type headerRetrievalFn func(common.Hash) *types.Header
+
+// blockRetrievalFn is a callback type for retrieving a block from the local chain.
+type blockRetrievalFn func(common.Hash) *types.Block
+
+// headHeaderRetrievalFn is a callback type for retrieving the head header from the local chain.
+type headHeaderRetrievalFn func() *types.Header
+
+// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain.
+type headBlockRetrievalFn func() *types.Block
+
+// headFastBlockRetrievalFn is a callback type for retrieving the head fast block from the local chain.
+type headFastBlockRetrievalFn func() *types.Block
+
+// headBlockCommitterFn is a callback for directly committing the head block to a certain entity.
+type headBlockCommitterFn func(common.Hash) error
+
+// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block.
+type tdRetrievalFn func(common.Hash) *big.Int
+
+// headerChainInsertFn is a callback type to insert a batch of headers into the local chain.
+type headerChainInsertFn func([]*types.Header, bool) (int, error)
+
+// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain.
+type blockChainInsertFn func(types.Blocks) (int, error)
+
+// receiptChainInsertFn is a callback type to insert a batch of receipts into the local chain.
+type receiptChainInsertFn func(types.Blocks, []types.Receipts) (int, error)
+
+// peerDropFn is a callback type for dropping a peer detected as malicious.
+type peerDropFn func(id string)
+
+// dataPack is a data message returned by a peer for some query.
+type dataPack interface {
+ PeerId() string
+ Items() int
+ Stats() string
+}
+
+// hashPack is a batch of block hashes returned by a peer (eth/61).
+type hashPack struct {
+ peerId string
+ hashes []common.Hash
+}
+
+func (p *hashPack) PeerId() string { return p.peerId }
+func (p *hashPack) Items() int { return len(p.hashes) }
+func (p *hashPack) Stats() string { return fmt.Sprintf("%d", len(p.hashes)) }
+
+// blockPack is a batch of blocks returned by a peer (eth/61).
+type blockPack struct {
+ peerId string
+ blocks []*types.Block
+}
+
+func (p *blockPack) PeerId() string { return p.peerId }
+func (p *blockPack) Items() int { return len(p.blocks) }
+func (p *blockPack) Stats() string { return fmt.Sprintf("%d", len(p.blocks)) }
+
+// headerPack is a batch of block headers returned by a peer.
+type headerPack struct {
+ peerId string
+ headers []*types.Header
+}
+
+func (p *headerPack) PeerId() string { return p.peerId }
+func (p *headerPack) Items() int { return len(p.headers) }
+func (p *headerPack) Stats() string { return fmt.Sprintf("%d", len(p.headers)) }
+
+// bodyPack is a batch of block bodies returned by a peer.
+type bodyPack struct {
+ peerId string
+ transactions [][]*types.Transaction
+ uncles [][]*types.Header
+}
+
+func (p *bodyPack) PeerId() string { return p.peerId }
+func (p *bodyPack) Items() int {
+ if len(p.transactions) <= len(p.uncles) {
+ return len(p.transactions)
+ }
+ return len(p.uncles)
+}
+func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) }
+
+// receiptPack is a batch of receipts returned by a peer.
+type receiptPack struct {
+ peerId string
+ receipts [][]*types.Receipt
+}
+
+func (p *receiptPack) PeerId() string { return p.peerId }
+func (p *receiptPack) Items() int { return len(p.receipts) }
+func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) }
+
+// statePack is a batch of states returned by a peer.
+type statePack struct {
+ peerId string
+ states [][]byte
+}
+
+func (p *statePack) PeerId() string { return p.peerId }
+func (p *statePack) Items() int { return len(p.states) }
+func (p *statePack) Stats() string { return fmt.Sprintf("%d", len(p.states)) }
diff --git a/eth/handler.go b/eth/handler.go
index 1117cb1b7..b0916d50b 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -129,9 +129,9 @@ func NewProtocolManager(mode Mode, networkId int, mux *event.TypeMux, txpool txP
case LightMode:
syncMode = downloader.LightSync
}
- manager.downloader = downloader.New(syncMode, manager.eventMux, blockchain.HasHeader, blockchain.HasBlock, blockchain.GetHeader,
- blockchain.GetBlock, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.GetTd,
- blockchain.InsertHeaderChain, blockchain.InsertChain, blockchain.InsertReceiptChain, manager.removePeer)
+ manager.downloader = downloader.New(syncMode, chaindb, manager.eventMux, blockchain.HasHeader, blockchain.HasBlock, blockchain.GetHeader,
+ blockchain.GetBlock, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.FastSyncCommitHead,
+ blockchain.GetTd, blockchain.InsertHeaderChain, blockchain.InsertChain, blockchain.InsertReceiptChain, manager.removePeer)
validator := func(block *types.Block, parent *types.Block) error {
return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false)
@@ -220,8 +220,8 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(),
- p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks,
- p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts); err != nil {
+ p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, p.RequestHeadersByHash,
+ p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
@@ -307,7 +307,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
break
}
// Deliver them all to the downloader for queuing
- err := pm.downloader.DeliverHashes61(p.id, hashes)
+ err := pm.downloader.DeliverHashes(p.id, hashes)
if err != nil {
glog.V(logger.Debug).Infoln(err)
}
@@ -353,7 +353,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
// Filter out any explicitly requested blocks, deliver the rest to the downloader
if blocks := pm.fetcher.FilterBlocks(blocks); len(blocks) > 0 {
- pm.downloader.DeliverBlocks61(p.id, blocks)
+ pm.downloader.DeliverBlocks(p.id, blocks)
}
// Block header query, collect the requested headers and reply
@@ -515,6 +515,17 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
return p.SendNodeData(data)
+ case p.version >= eth63 && msg.Code == NodeDataMsg:
+ // A batch of node state data arrived to one of our previous requests
+ var data [][]byte
+ if err := msg.Decode(&data); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Deliver all to the downloader
+ if err := pm.downloader.DeliverNodeData(p.id, data); err != nil {
+ glog.V(logger.Debug).Infof("failed to deliver node state data: %v", err)
+ }
+
case p.version >= eth63 && msg.Code == GetReceiptsMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
diff --git a/eth/peer.go b/eth/peer.go
index e24be97f1..68ce903a6 100644
--- a/eth/peer.go
+++ b/eth/peer.go
@@ -191,7 +191,7 @@ func (p *peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error {
return p2p.Send(p.rw, BlockBodiesMsg, bodies)
}
-// SendNodeData sends a batch of arbitrary internal data, corresponding to the
+// SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the
// hashes requested.
func (p *peer) SendNodeData(data [][]byte) error {
return p2p.Send(p.rw, NodeDataMsg, data)