aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/downloader.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r--eth/downloader/downloader.go568
1 files changed, 234 insertions, 334 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 24ba3da17..96177ae8a 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -19,7 +19,6 @@ package downloader
import (
"errors"
- "fmt"
"math"
"math/big"
"strings"
@@ -29,9 +28,11 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/rcrowley/go-metrics"
)
var (
@@ -39,8 +40,8 @@ var (
MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request
- MaxStateFetch = 384 // Amount of node state values to allow fetching per request
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
+ MaxStateFetch = 384 // Amount of node state values to allow fetching per request
hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out
blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth
@@ -49,10 +50,13 @@ var (
bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth
bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired
receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
- receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a block body request is considered expired
+ receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired
+ stateSoftTTL = 2 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
+ stateHardTTL = 3 * stateSoftTTL // [eth/63] Maximum time allowance before a node data request is considered expired
maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
+ maxQueuedStates = 256 * 1024 // [eth/63] Maximum number of state requests to queue (DOS protection)
maxResultsProcess = 256 // Number of download results to import at once into the chain
headerCheckFrequency = 64 // Verification frequency of the downloaded headers during fast sync
@@ -84,98 +88,6 @@ var (
errNoSyncActive = errors.New("no sync active")
)
-// headerCheckFn is a callback type for verifying a header's presence in the local chain.
-type headerCheckFn func(common.Hash) bool
-
-// blockCheckFn is a callback type for verifying a block's presence in the local chain.
-type blockCheckFn func(common.Hash) bool
-
-// headerRetrievalFn is a callback type for retrieving a header from the local chain.
-type headerRetrievalFn func(common.Hash) *types.Header
-
-// blockRetrievalFn is a callback type for retrieving a block from the local chain.
-type blockRetrievalFn func(common.Hash) *types.Block
-
-// headHeaderRetrievalFn is a callback type for retrieving the head header from the local chain.
-type headHeaderRetrievalFn func() *types.Header
-
-// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain.
-type headBlockRetrievalFn func() *types.Block
-
-// headFastBlockRetrievalFn is a callback type for retrieving the head fast block from the local chain.
-type headFastBlockRetrievalFn func() *types.Block
-
-// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block.
-type tdRetrievalFn func(common.Hash) *big.Int
-
-// headerChainInsertFn is a callback type to insert a batch of headers into the local chain.
-type headerChainInsertFn func([]*types.Header, bool) (int, error)
-
-// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain.
-type blockChainInsertFn func(types.Blocks) (int, error)
-
-// receiptChainInsertFn is a callback type to insert a batch of receipts into the local chain.
-type receiptChainInsertFn func(types.Blocks, []types.Receipts) (int, error)
-
-// peerDropFn is a callback type for dropping a peer detected as malicious.
-type peerDropFn func(id string)
-
-// dataPack is a data message returned by a peer for some query.
-type dataPack interface {
- PeerId() string
- Empty() bool
- Stats() string
-}
-
-// hashPack is a batch of block hashes returned by a peer (eth/61).
-type hashPack struct {
- peerId string
- hashes []common.Hash
-}
-
-// blockPack is a batch of blocks returned by a peer (eth/61).
-type blockPack struct {
- peerId string
- blocks []*types.Block
-}
-
-// headerPack is a batch of block headers returned by a peer.
-type headerPack struct {
- peerId string
- headers []*types.Header
-}
-
-// bodyPack is a batch of block bodies returned by a peer.
-type bodyPack struct {
- peerId string
- transactions [][]*types.Transaction
- uncles [][]*types.Header
-}
-
-// PeerId retrieves the origin peer who sent this block body packet.
-func (p *bodyPack) PeerId() string { return p.peerId }
-
-// Empty returns whether the no block bodies were delivered.
-func (p *bodyPack) Empty() bool { return len(p.transactions) == 0 || len(p.uncles) == 0 }
-
-// Stats creates a textual stats report for logging purposes.
-func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) }
-
-// receiptPack is a batch of receipts returned by a peer.
-type receiptPack struct {
- peerId string
- receipts [][]*types.Receipt
-}
-
-// PeerId retrieves the origin peer who sent this receipt packet.
-func (p *receiptPack) PeerId() string { return p.peerId }
-
-// Empty returns whether the no receipts were delivered.
-func (p *receiptPack) Empty() bool { return len(p.receipts) == 0 }
-
-// Stats creates a textual stats report for logging purposes.
-func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) }
-
type Downloader struct {
mode SyncMode // Synchronisation mode defining the strategies used
mux *event.TypeMux // Event multiplexer to announce sync operation events
@@ -186,23 +98,26 @@ type Downloader struct {
interrupt int32 // Atomic boolean to signal termination
// Statistics
- syncStatsOrigin uint64 // Origin block number where syncing started at
- syncStatsHeight uint64 // Highest block number known when syncing started
- syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
+ syncStatsChainOrigin uint64 // Origin block number where syncing started at
+ syncStatsChainHeight uint64 // Highest block number known when syncing started
+ syncStatsStateTotal uint64 // Total number of node state entries known so far
+ syncStatsStateDone uint64 // Number of state trie entries already pulled
+ syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
// Callbacks
- hasHeader headerCheckFn // Checks if a header is present in the chain
- hasBlock blockCheckFn // Checks if a block is present in the chain
- getHeader headerRetrievalFn // Retrieves a header from the chain
- getBlock blockRetrievalFn // Retrieves a block from the chain
- headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
- headBlock headBlockRetrievalFn // Retrieves the head block from the chain
- headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
- getTd tdRetrievalFn // Retrieves the TD of a block from the chain
- insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
- insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
- insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
- dropPeer peerDropFn // Drops a peer for misbehaving
+ hasHeader headerCheckFn // Checks if a header is present in the chain
+ hasBlock blockCheckFn // Checks if a block is present in the chain
+ getHeader headerRetrievalFn // Retrieves a header from the chain
+ getBlock blockRetrievalFn // Retrieves a block from the chain
+ headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
+ headBlock headBlockRetrievalFn // Retrieves the head block from the chain
+ headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
+ commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head
+ getTd tdRetrievalFn // Retrieves the TD of a block from the chain
+ insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
+ insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
+ insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
+ dropPeer peerDropFn // Drops a peer for misbehaving
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
@@ -212,14 +127,16 @@ type Downloader struct {
// Channels
newPeerCh chan *peer
- hashCh chan hashPack // [eth/61] Channel receiving inbound hashes
- blockCh chan blockPack // [eth/61] Channel receiving inbound blocks
- headerCh chan headerPack // [eth/62] Channel receiving inbound block headers
- bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
- receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
- blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks
- bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
- receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
+ hashCh chan dataPack // [eth/61] Channel receiving inbound hashes
+ blockCh chan dataPack // [eth/61] Channel receiving inbound blocks
+ headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
+ bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
+ receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
+ stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
+ blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks
+ bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
+ receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
+ stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
@@ -232,36 +149,40 @@ type Downloader struct {
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
-func New(mode SyncMode, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn, getBlock blockRetrievalFn,
- headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn, getTd tdRetrievalFn,
- insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader {
+func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn,
+ getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn,
+ commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn,
+ insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader {
return &Downloader{
- mode: mode,
- mux: mux,
- queue: newQueue(),
- peers: newPeerSet(),
- hasHeader: hasHeader,
- hasBlock: hasBlock,
- getHeader: getHeader,
- getBlock: getBlock,
- headHeader: headHeader,
- headBlock: headBlock,
- headFastBlock: headFastBlock,
- getTd: getTd,
- insertHeaders: insertHeaders,
- insertBlocks: insertBlocks,
- insertReceipts: insertReceipts,
- dropPeer: dropPeer,
- newPeerCh: make(chan *peer, 1),
- hashCh: make(chan hashPack, 1),
- blockCh: make(chan blockPack, 1),
- headerCh: make(chan headerPack, 1),
- bodyCh: make(chan dataPack, 1),
- receiptCh: make(chan dataPack, 1),
- blockWakeCh: make(chan bool, 1),
- bodyWakeCh: make(chan bool, 1),
- receiptWakeCh: make(chan bool, 1),
+ mode: mode,
+ mux: mux,
+ queue: newQueue(stateDb),
+ peers: newPeerSet(),
+ hasHeader: hasHeader,
+ hasBlock: hasBlock,
+ getHeader: getHeader,
+ getBlock: getBlock,
+ headHeader: headHeader,
+ headBlock: headBlock,
+ headFastBlock: headFastBlock,
+ commitHeadBlock: commitHeadBlock,
+ getTd: getTd,
+ insertHeaders: insertHeaders,
+ insertBlocks: insertBlocks,
+ insertReceipts: insertReceipts,
+ dropPeer: dropPeer,
+ newPeerCh: make(chan *peer, 1),
+ hashCh: make(chan dataPack, 1),
+ blockCh: make(chan dataPack, 1),
+ headerCh: make(chan dataPack, 1),
+ bodyCh: make(chan dataPack, 1),
+ receiptCh: make(chan dataPack, 1),
+ stateCh: make(chan dataPack, 1),
+ blockWakeCh: make(chan bool, 1),
+ bodyWakeCh: make(chan bool, 1),
+ receiptWakeCh: make(chan bool, 1),
+ stateWakeCh: make(chan bool, 1),
}
}
@@ -272,7 +193,7 @@ func (d *Downloader) Boundaries() (uint64, uint64) {
d.syncStatsLock.RLock()
defer d.syncStatsLock.RUnlock()
- return d.syncStatsOrigin, d.syncStatsHeight
+ return d.syncStatsChainOrigin, d.syncStatsChainHeight
}
// Synchronising returns whether the downloader is currently retrieving blocks.
@@ -284,10 +205,11 @@ func (d *Downloader) Synchronising() bool {
// used for fetching hashes and blocks from.
func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
- getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getReceipts receiptFetcherFn) error {
+ getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
+ getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
glog.V(logger.Detail).Infoln("Registering peer", id)
- if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts)); err != nil {
+ if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
glog.V(logger.Error).Infoln("Register failed:", err)
return err
}
@@ -357,12 +279,18 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error
d.queue.Reset()
d.peers.Reset()
- for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh} {
+ for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
select {
case <-ch:
default:
}
}
+ // Reset and ephemeral sync statistics
+ d.syncStatsLock.Lock()
+ d.syncStatsStateTotal = 0
+ d.syncStatsStateDone = 0
+ d.syncStatsLock.Unlock()
+
// Create cancel channel for aborting mid-flight
d.cancelLock.Lock()
d.cancelCh = make(chan struct{})
@@ -414,17 +342,17 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
return err
}
d.syncStatsLock.Lock()
- if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin {
- d.syncStatsOrigin = origin
+ if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
+ d.syncStatsChainOrigin = origin
}
- d.syncStatsHeight = latest
+ d.syncStatsChainHeight = latest
d.syncStatsLock.Unlock()
// Initiate the sync using a concurrent hash and block retrieval algorithm
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- d.queue.Prepare(origin+1, 1)
+ d.queue.Prepare(origin+1, d.mode, 0)
errc := make(chan error, 2)
go func() { errc <- d.fetchHashes61(p, td, origin+1) }()
@@ -449,26 +377,27 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
return err
}
d.syncStatsLock.Lock()
- if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin {
- d.syncStatsOrigin = origin
+ if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
+ d.syncStatsChainOrigin = origin
}
- d.syncStatsHeight = latest
+ d.syncStatsChainHeight = latest
d.syncStatsLock.Unlock()
// Initiate the sync using a concurrent header and content retrieval algorithm
- parts := 1
- if d.mode == FastSync {
- parts = 2 // receipts are fetched too
+ pivot := uint64(0)
+ if latest > uint64(minFullBlocks) {
+ pivot = latest - uint64(minFullBlocks)
}
- d.queue.Prepare(origin+1, parts)
+ d.queue.Prepare(origin+1, d.mode, pivot)
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- errc := make(chan error, 3)
+ errc := make(chan error, 4)
go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved
+ go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync
go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync
- go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal sync
+ go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync
// If any fetcher fails, cancel the others
var fail error
@@ -538,14 +467,14 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
case <-d.hashCh:
// Out of bounds hashes received, ignore them
- case blockPack := <-d.blockCh:
+ case packet := <-d.blockCh:
// Discard anything not from the origin peer
- if blockPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", blockPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
- blocks := blockPack.blocks
+ blocks := packet.(*blockPack).blocks
if len(blocks) != 1 {
glog.V(logger.Debug).Infof("%v: invalid number of head blocks: %d != 1", p, len(blocks))
return 0, errBadPeer
@@ -584,14 +513,14 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
case <-d.cancelCh:
return 0, errCancelHashFetch
- case hashPack := <-d.hashCh:
+ case packet := <-d.hashCh:
// Discard anything not from the origin peer
- if hashPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
- hashes := hashPack.hashes
+ hashes := packet.(*hashPack).hashes
if len(hashes) == 0 {
glog.V(logger.Debug).Infof("%v: empty head hash set", p)
return 0, errEmptyHashSet
@@ -639,14 +568,14 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
case <-d.cancelCh:
return 0, errCancelHashFetch
- case hashPack := <-d.hashCh:
+ case packet := <-d.hashCh:
// Discard anything not from the origin peer
- if hashPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
- hashes := hashPack.hashes
+ hashes := packet.(*hashPack).hashes
if len(hashes) != 1 {
glog.V(logger.Debug).Infof("%v: invalid search hash set (%d)", p, len(hashes))
return 0, errBadPeer
@@ -716,17 +645,17 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
case <-d.bodyCh:
// Out of bounds eth/62 block bodies received, ignore them
- case hashPack := <-d.hashCh:
+ case packet := <-d.hashCh:
// Make sure the active peer is giving us the hashes
- if hashPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
break
}
hashReqTimer.UpdateSince(request)
timeout.Stop()
// If no more hashes are inbound, notify the block fetcher and return
- if len(hashPack.hashes) == 0 {
+ if packet.Items() == 0 {
glog.V(logger.Debug).Infof("%v: no available hashes", p)
select {
@@ -751,12 +680,13 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
return nil
}
gotHashes = true
+ hashes := packet.(*hashPack).hashes
// Otherwise insert all the new hashes, aborting in case of junk
- glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashPack.hashes), from)
+ glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashes), from)
- inserts := d.queue.Schedule61(hashPack.hashes, true)
- if len(inserts) != len(hashPack.hashes) {
+ inserts := d.queue.Schedule61(hashes, true)
+ if len(inserts) != len(hashes) {
glog.V(logger.Debug).Infof("%v: stale hashes", p)
return errBadPeer
}
@@ -776,7 +706,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
return nil
}
// Queue not yet full, fetch the next batch
- from += uint64(len(hashPack.hashes))
+ from += uint64(len(hashes))
getHashes(from)
case <-timeout.C:
@@ -813,16 +743,17 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
case <-d.bodyCh:
// Out of bounds eth/62 block bodies received, ignore them
- case blockPack := <-d.blockCh:
+ case packet := <-d.blockCh:
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
- if peer := d.peers.Peer(blockPack.peerId); peer != nil {
+ if peer := d.peers.Peer(packet.PeerId()); peer != nil {
// Deliver the received chunk of blocks, and demote in case of errors
- err := d.queue.Deliver61(blockPack.peerId, blockPack.blocks)
+ blocks := packet.(*blockPack).blocks
+ err := d.queue.DeliverBlocks(peer.id, blocks)
switch err {
case nil:
// If no blocks were delivered, demote the peer (need the delivery above)
- if len(blockPack.blocks) == 0 {
+ if len(blocks) == 0 {
peer.Demote()
peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
@@ -831,7 +762,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// All was successful, promote the peer and potentially start processing
peer.Promote()
peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
+ glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
go d.process()
case errInvalidChain:
@@ -891,7 +822,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
return errNoPeers
}
// Check for block request timeouts and demote the responsible peers
- for _, pid := range d.queue.Expire61(blockHardTTL) {
+ for _, pid := range d.queue.ExpireBlocks(blockHardTTL) {
if peer := d.peers.Peer(pid); peer != nil {
peer.Demote()
glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
@@ -907,7 +838,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
}
// Send a download request to all idle peers, until throttled
throttled := false
- idles, total := d.peers.BlockIdlePeers(61)
+ idles, total := d.peers.BlockIdlePeers()
for _, peer := range idles {
// Short circuit if throttling activated
@@ -918,7 +849,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// Reserve a chunk of hashes for a peer. A nil can mean either that
// no more hashes are available, or that the peer is known not to
// have them.
- request := d.queue.Reserve61(peer, peer.BlockCapacity())
+ request := d.queue.ReserveBlocks(peer, peer.BlockCapacity())
if request == nil {
continue
}
@@ -928,7 +859,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// Fetch the chunk and make sure any errors return the hashes to the queue
if err := peer.Fetch61(request); err != nil {
glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer)
- d.queue.Cancel61(request)
+ d.queue.CancelBlocks(request)
}
}
// Make sure that we have peers available for fetching. If all peers have been tried
@@ -954,14 +885,14 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
case <-d.cancelCh:
return 0, errCancelBlockFetch
- case headerPack := <-d.headerCh:
+ case packet := <-d.headerCh:
// Discard anything not from the origin peer
- if headerPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
- headers := headerPack.headers
+ headers := packet.(*headerPack).headers
if len(headers) != 1 {
glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
return 0, errBadPeer
@@ -1014,14 +945,14 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
case <-d.cancelCh:
return 0, errCancelHashFetch
- case headerPack := <-d.headerCh:
+ case packet := <-d.headerCh:
// Discard anything not from the origin peer
- if headerPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
- headers := headerPack.headers
+ headers := packet.(*headerPack).headers
if len(headers) == 0 {
glog.V(logger.Debug).Infof("%v: empty head header set", p)
return 0, errEmptyHeaderSet
@@ -1069,14 +1000,14 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
case <-d.cancelCh:
return 0, errCancelHashFetch
- case headerPack := <-d.headerCh:
+ case packer := <-d.headerCh:
// Discard anything not from the origin peer
- if headerPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId)
+ if packer.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packer.PeerId())
break
}
// Make sure the peer actually gave something valid
- headers := headerPack.headers
+ headers := packer.(*headerPack).headers
if len(headers) != 1 {
glog.V(logger.Debug).Infof("%v: invalid search header set (%d)", p, len(headers))
return 0, errBadPeer
@@ -1150,20 +1081,20 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
case <-d.blockCh:
// Out of bounds eth/61 blocks received, ignore them
- case headerPack := <-d.headerCh:
+ case packet := <-d.headerCh:
// Make sure the active peer is giving us the headers
- if headerPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", headerPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", packet.PeerId())
break
}
headerReqTimer.UpdateSince(request)
timeout.Stop()
// If no more headers are inbound, notify the content fetchers and return
- if len(headerPack.headers) == 0 {
+ if packet.Items() == 0 {
glog.V(logger.Debug).Infof("%v: no available headers", p)
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@@ -1187,26 +1118,27 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
return nil
}
gotHeaders = true
+ headers := packet.(*headerPack).headers
// Otherwise insert all the new headers, aborting in case of junk
- glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headerPack.headers), from)
+ glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
if d.mode == FastSync || d.mode == LightSync {
- if n, err := d.insertHeaders(headerPack.headers, false); err != nil {
- glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headerPack.headers[n].Number, headerPack.headers[n].Hash().Bytes()[:4], err)
+ if n, err := d.insertHeaders(headers, false); err != nil {
+ glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headers[n].Number, headers[n].Hash().Bytes()[:4], err)
return errInvalidChain
}
}
if d.mode == FullSync || d.mode == FastSync {
- inserts := d.queue.Schedule(headerPack.headers, from, d.mode == FastSync)
- if len(inserts) != len(headerPack.headers) {
+ inserts := d.queue.Schedule(headers, from)
+ if len(inserts) != len(headers) {
glog.V(logger.Debug).Infof("%v: stale headers", p)
return errBadPeer
}
}
// Notify the content fetchers of new headers, but stop if queue is full
cont := d.queue.PendingBlocks() < maxQueuedHeaders || d.queue.PendingReceipts() < maxQueuedHeaders
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
if cont {
// We still have headers to fetch, send continuation wake signal (potential)
select {
@@ -1223,7 +1155,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
}
}
// Queue not yet full, fetch the next batch
- from += uint64(len(headerPack.headers))
+ from += uint64(len(headers))
getHeaders(from)
case <-timeout.C:
@@ -1233,7 +1165,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
d.dropPeer(p.id)
// Finish the sync gracefully instead of dumping the gathered data though
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@@ -1251,19 +1183,19 @@ func (d *Downloader) fetchBodies(from uint64) error {
glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)
var (
- deliver = func(packet interface{}) error {
+ deliver = func(packet dataPack) error {
pack := packet.(*bodyPack)
- return d.queue.DeliverBlocks(pack.peerId, pack.transactions, pack.uncles)
+ return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
}
- expire = func() []string { return d.queue.ExpireBlocks(bodyHardTTL) }
+ expire = func() []string { return d.queue.ExpireBodies(bodyHardTTL) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
capacity = func(p *peer) int { return p.BlockCapacity() }
- getIdles = func() ([]*peer, int) { return d.peers.BlockIdlePeers(62) }
+ getIdles = func() ([]*peer, int) { return d.peers.BodyIdlePeers() }
setIdle = func(p *peer) { p.SetBlocksIdle() }
)
- err := d.fetchParts(from, errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
- d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBlocks, d.bodyFetchHook,
- fetch, d.queue.CancelBlocks, capacity, getIdles, setIdle, "Body")
+ err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
+ d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBodies, d.bodyFetchHook,
+ fetch, d.queue.CancelBodies, capacity, getIdles, setIdle, "Body")
glog.V(logger.Debug).Infof("Block body download terminated: %v", err)
return err
@@ -1276,7 +1208,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)
var (
- deliver = func(packet interface{}) error {
+ deliver = func(packet dataPack) error {
pack := packet.(*receiptPack)
return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
}
@@ -1285,7 +1217,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
capacity = func(p *peer) int { return p.ReceiptCapacity() }
setIdle = func(p *peer) { p.SetReceiptsIdle() }
)
- err := d.fetchParts(from, errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
+ err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.ThrottleReceipts, d.queue.ReserveReceipts, d.receiptFetchHook,
fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt")
@@ -1293,10 +1225,46 @@ func (d *Downloader) fetchReceipts(from uint64) error {
return err
}
+// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any
+// available peers, reserving a chunk of nodes for each, waiting for delivery and
+// also periodically checking for timeouts.
+func (d *Downloader) fetchNodeData() error {
+ glog.V(logger.Debug).Infof("Downloading node state data")
+
+ var (
+ deliver = func(packet dataPack) error {
+ start := time.Now()
+ done, found, err := d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states)
+
+ d.syncStatsLock.Lock()
+ totalDone, totalKnown := d.syncStatsStateDone+uint64(done), d.syncStatsStateTotal+uint64(found)
+ d.syncStatsStateDone, d.syncStatsStateTotal = totalDone, totalKnown
+ d.syncStatsLock.Unlock()
+
+ glog.V(logger.Info).Infof("imported %d [%d / %d] state entries in %v.", done, totalDone, totalKnown, time.Since(start))
+ return err
+ }
+ expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) }
+ throttle = func() bool { return false }
+ reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
+ return d.queue.ReserveNodeData(p, count), false, nil
+ }
+ fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
+ capacity = func(p *peer) int { return p.NodeDataCapacity() }
+ setIdle = func(p *peer) { p.SetNodeDataIdle() }
+ )
+ err := d.fetchParts(errCancelReceiptFetch, d.stateCh, deliver, d.stateWakeCh, expire,
+ d.queue.PendingNodeData, throttle, reserve, nil, fetch, d.queue.CancelNodeData,
+ capacity, d.peers.ReceiptIdlePeers, setIdle, "State")
+
+ glog.V(logger.Debug).Infof("Node state data download terminated: %v", err)
+ return err
+}
+
// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
-func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan dataPack, deliver func(packet interface{}) error, wakeCh chan bool,
+func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool,
expire func() []string, pending func() int, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header),
fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, idle func() ([]*peer, int), setIdle func(*peer), kind string) error {
@@ -1327,7 +1295,7 @@ func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan da
switch err := deliver(packet); err {
case nil:
// If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!)
- if packet.Empty() {
+ if packet.Items() == 0 {
peer.Demote()
setIdle(peer)
glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
@@ -1441,7 +1409,11 @@ func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan da
continue
}
if glog.V(logger.Detail) {
- glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
+ if len(request.Headers) > 0 {
+ glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
+ } else {
+ glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind))
+ }
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {
@@ -1528,7 +1500,9 @@ func (d *Downloader) process() {
blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
case d.mode == FastSync:
blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
- receipts = append(receipts, result.Receipts)
+ if result.Header.Number.Uint64() <= d.queue.fastSyncPivot {
+ receipts = append(receipts, result.Receipts)
+ }
case d.mode == LightSync:
headers = append(headers, result.Header)
}
@@ -1539,12 +1513,16 @@ func (d *Downloader) process() {
index int
)
switch {
- case d.mode == FullSync:
- index, err = d.insertBlocks(blocks)
- case d.mode == FastSync:
- index, err = d.insertReceipts(blocks, receipts)
- case d.mode == LightSync:
+ case len(headers) > 0:
index, err = d.insertHeaders(headers, true)
+
+ case len(receipts) > 0:
+ index, err = d.insertReceipts(blocks, receipts)
+ if err == nil && blocks[len(blocks)-1].NumberU64() == d.queue.fastSyncPivot {
+ err = d.commitHeadBlock(blocks[len(blocks)-1].Hash())
+ }
+ default:
+ index, err = d.insertBlocks(blocks)
}
if err != nil {
glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
@@ -1557,125 +1535,47 @@ func (d *Downloader) process() {
}
}
-// DeliverHashes61 injects a new batch of hashes received from a remote node into
+// DeliverHashes injects a new batch of hashes received from a remote node into
// the download schedule. This is usually invoked through the BlockHashesMsg by
// the protocol handler.
-func (d *Downloader) DeliverHashes61(id string, hashes []common.Hash) (err error) {
- // Update the delivery metrics for both good and failed deliveries
- hashInMeter.Mark(int64(len(hashes)))
- defer func() {
- if err != nil {
- hashDropMeter.Mark(int64(len(hashes)))
- }
- }()
- // Make sure the downloader is active
- if atomic.LoadInt32(&d.synchronising) == 0 {
- return errNoSyncActive
- }
- // Deliver or abort if the sync is canceled while queuing
- d.cancelLock.RLock()
- cancel := d.cancelCh
- d.cancelLock.RUnlock()
-
- select {
- case d.hashCh <- hashPack{id, hashes}:
- return nil
-
- case <-cancel:
- return errNoSyncActive
- }
+func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) (err error) {
+ return d.deliver(id, d.hashCh, &hashPack{id, hashes}, hashInMeter, hashDropMeter)
}
-// DeliverBlocks61 injects a new batch of blocks received from a remote node.
+// DeliverBlocks injects a new batch of blocks received from a remote node.
// This is usually invoked through the BlocksMsg by the protocol handler.
-func (d *Downloader) DeliverBlocks61(id string, blocks []*types.Block) (err error) {
- // Update the delivery metrics for both good and failed deliveries
- blockInMeter.Mark(int64(len(blocks)))
- defer func() {
- if err != nil {
- blockDropMeter.Mark(int64(len(blocks)))
- }
- }()
- // Make sure the downloader is active
- if atomic.LoadInt32(&d.synchronising) == 0 {
- return errNoSyncActive
- }
- // Deliver or abort if the sync is canceled while queuing
- d.cancelLock.RLock()
- cancel := d.cancelCh
- d.cancelLock.RUnlock()
-
- select {
- case d.blockCh <- blockPack{id, blocks}:
- return nil
-
- case <-cancel:
- return errNoSyncActive
- }
+func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) (err error) {
+ return d.deliver(id, d.blockCh, &blockPack{id, blocks}, blockInMeter, blockDropMeter)
}
// DeliverHeaders injects a new batch of blck headers received from a remote
// node into the download schedule.
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
- // Update the delivery metrics for both good and failed deliveries
- headerInMeter.Mark(int64(len(headers)))
- defer func() {
- if err != nil {
- headerDropMeter.Mark(int64(len(headers)))
- }
- }()
- // Make sure the downloader is active
- if atomic.LoadInt32(&d.synchronising) == 0 {
- return errNoSyncActive
- }
- // Deliver or abort if the sync is canceled while queuing
- d.cancelLock.RLock()
- cancel := d.cancelCh
- d.cancelLock.RUnlock()
-
- select {
- case d.headerCh <- headerPack{id, headers}:
- return nil
-
- case <-cancel:
- return errNoSyncActive
- }
+ return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
}
// DeliverBodies injects a new batch of block bodies received from a remote node.
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
- // Update the delivery metrics for both good and failed deliveries
- bodyInMeter.Mark(int64(len(transactions)))
- defer func() {
- if err != nil {
- bodyDropMeter.Mark(int64(len(transactions)))
- }
- }()
- // Make sure the downloader is active
- if atomic.LoadInt32(&d.synchronising) == 0 {
- return errNoSyncActive
- }
- // Deliver or abort if the sync is canceled while queuing
- d.cancelLock.RLock()
- cancel := d.cancelCh
- d.cancelLock.RUnlock()
-
- select {
- case d.bodyCh <- &bodyPack{id, transactions, uncles}:
- return nil
-
- case <-cancel:
- return errNoSyncActive
- }
+ return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
}
// DeliverReceipts injects a new batch of receipts received from a remote node.
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
+ return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
+}
+
+// DeliverNodeData injects a new batch of node state data received from a remote node.
+func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
+ return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
+}
+
+// deliver injects a new batch of data received from a remote node.
+func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
// Update the delivery metrics for both good and failed deliveries
- receiptInMeter.Mark(int64(len(receipts)))
+ inMeter.Mark(int64(packet.Items()))
defer func() {
if err != nil {
- receiptDropMeter.Mark(int64(len(receipts)))
+ dropMeter.Mark(int64(packet.Items()))
}
}()
// Make sure the downloader is active
@@ -1688,7 +1588,7 @@ func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (er
d.cancelLock.RUnlock()
select {
- case d.receiptCh <- &receiptPack{id, receipts}:
+ case destCh <- packet:
return nil
case <-cancel: