aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/downloader.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r--eth/downloader/downloader.go560
1 files changed, 305 insertions, 255 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 839969f03..cca4fe7a9 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -34,7 +34,6 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
- "github.com/ethereum/go-ethereum/trie"
"github.com/rcrowley/go-metrics"
)
@@ -99,8 +98,9 @@ type Downloader struct {
mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
mux *event.TypeMux // Event multiplexer to announce sync operation events
- queue *queue // Scheduler for selecting the hashes to download
- peers *peerSet // Set of active peers from which download can proceed
+ queue *queue // Scheduler for selecting the hashes to download
+ peers *peerSet // Set of active peers from which download can proceed
+ stateDB ethdb.Database
fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section
@@ -109,26 +109,16 @@ type Downloader struct {
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
// Statistics
- syncStatsChainOrigin uint64 // Origin block number where syncing started at
- syncStatsChainHeight uint64 // Highest block number known when syncing started
- syncStatsStateDone uint64 // Number of state trie entries already pulled
+ syncStatsChainOrigin uint64 // Origin block number where syncing started at
+ syncStatsChainHeight uint64 // Highest block number known when syncing started
+ syncStatsState stateSyncStats
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
+ lightchain LightChain
+ blockchain BlockChain
+
// Callbacks
- hasHeader headerCheckFn // Checks if a header is present in the chain
- hasBlockAndState blockAndStateCheckFn // Checks if a block and associated state is present in the chain
- getHeader headerRetrievalFn // Retrieves a header from the chain
- getBlock blockRetrievalFn // Retrieves a block from the chain
- headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
- headBlock headBlockRetrievalFn // Retrieves the head block from the chain
- headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
- commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head
- getTd tdRetrievalFn // Retrieves the TD of a block from the chain
- insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
- insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
- insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
- rollback chainRollbackFn // Removes a batch of recently added chain links
- dropPeer peerDropFn // Drops a peer for misbehaving
+ dropPeer peerDropFn // Drops a peer for misbehaving
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
@@ -136,16 +126,18 @@ type Downloader struct {
notified int32
// Channels
- newPeerCh chan *peer
headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
- stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
- stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
+ // for stateFetcher
+ stateSyncStart chan *stateSync
+ trackStateReq chan *stateReq
+ stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
+
// Cancellation and termination
cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
cancelCh chan struct{} // Channel to cancel mid-flight syncs
@@ -161,45 +153,83 @@ type Downloader struct {
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
}
+// LightChain encapsulates functions required to synchronise a light chain.
+type LightChain interface {
+ // HasHeader verifies a header's presence in the local chain.
+ HasHeader(h common.Hash, number uint64) bool
+
+ // GetHeaderByHash retrieves a header from the local chain.
+ GetHeaderByHash(common.Hash) *types.Header
+
+ // CurrentHeader retrieves the head header from the local chain.
+ CurrentHeader() *types.Header
+
+ // GetTdByHash returns the total difficulty of a local block.
+ GetTdByHash(common.Hash) *big.Int
+
+ // InsertHeaderChain inserts a batch of headers into the local chain.
+ InsertHeaderChain([]*types.Header, int) (int, error)
+
+ // Rollback removes a few recently added elements from the local chain.
+ Rollback([]common.Hash)
+}
+
+// BlockChain encapsulates functions required to sync a (full or fast) blockchain.
+type BlockChain interface {
+ LightChain
+
+ // HasBlockAndState verifies block and associated states' presence in the local chain.
+ HasBlockAndState(common.Hash) bool
+
+ // GetBlockByHash retrieves a block from the local chain.
+ GetBlockByHash(common.Hash) *types.Block
+
+ // CurrentBlock retrieves the head block from the local chain.
+ CurrentBlock() *types.Block
+
+ // CurrentFastBlock retrieves the head fast block from the local chain.
+ CurrentFastBlock() *types.Block
+
+ // FastSyncCommitHead directly commits the head block to a certain entity.
+ FastSyncCommitHead(common.Hash) error
+
+ // InsertChain inserts a batch of blocks into the local chain.
+ InsertChain(types.Blocks) (int, error)
+
+ // InsertReceiptChain inserts a batch of receipts into the local chain.
+ InsertReceiptChain(types.Blocks, []types.Receipts) (int, error)
+}
+
// New creates a new downloader to fetch hashes and blocks from remote peers.
-func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlockAndState blockAndStateCheckFn,
- getHeader headerRetrievalFn, getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn,
- headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn,
- insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader {
+func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
+ if lightchain == nil {
+ lightchain = chain
+ }
dl := &Downloader{
- mode: mode,
- mux: mux,
- queue: newQueue(stateDb),
- peers: newPeerSet(),
- rttEstimate: uint64(rttMaxEstimate),
- rttConfidence: uint64(1000000),
- hasHeader: hasHeader,
- hasBlockAndState: hasBlockAndState,
- getHeader: getHeader,
- getBlock: getBlock,
- headHeader: headHeader,
- headBlock: headBlock,
- headFastBlock: headFastBlock,
- commitHeadBlock: commitHeadBlock,
- getTd: getTd,
- insertHeaders: insertHeaders,
- insertBlocks: insertBlocks,
- insertReceipts: insertReceipts,
- rollback: rollback,
- dropPeer: dropPeer,
- newPeerCh: make(chan *peer, 1),
- headerCh: make(chan dataPack, 1),
- bodyCh: make(chan dataPack, 1),
- receiptCh: make(chan dataPack, 1),
- stateCh: make(chan dataPack, 1),
- bodyWakeCh: make(chan bool, 1),
- receiptWakeCh: make(chan bool, 1),
- stateWakeCh: make(chan bool, 1),
- headerProcCh: make(chan []*types.Header, 1),
- quitCh: make(chan struct{}),
+ mode: mode,
+ stateDB: stateDb,
+ mux: mux,
+ queue: newQueue(),
+ peers: newPeerSet(),
+ rttEstimate: uint64(rttMaxEstimate),
+ rttConfidence: uint64(1000000),
+ blockchain: chain,
+ lightchain: lightchain,
+ dropPeer: dropPeer,
+ headerCh: make(chan dataPack, 1),
+ bodyCh: make(chan dataPack, 1),
+ receiptCh: make(chan dataPack, 1),
+ bodyWakeCh: make(chan bool, 1),
+ receiptWakeCh: make(chan bool, 1),
+ headerProcCh: make(chan []*types.Header, 1),
+ quitCh: make(chan struct{}),
+ stateCh: make(chan dataPack),
+ stateSyncStart: make(chan *stateSync),
+ trackStateReq: make(chan *stateReq),
}
go dl.qosTuner()
+ go dl.stateFetcher()
return dl
}
@@ -211,9 +241,6 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he
// of processed and the total number of known states are also returned. Otherwise
// these are zero.
func (d *Downloader) Progress() ethereum.SyncProgress {
- // Fetch the pending state count outside of the lock to prevent unforeseen deadlocks
- pendingStates := uint64(d.queue.PendingNodeData())
-
// Lock the current stats and return the progress
d.syncStatsLock.RLock()
defer d.syncStatsLock.RUnlock()
@@ -221,18 +248,18 @@ func (d *Downloader) Progress() ethereum.SyncProgress {
current := uint64(0)
switch d.mode {
case FullSync:
- current = d.headBlock().NumberU64()
+ current = d.blockchain.CurrentBlock().NumberU64()
case FastSync:
- current = d.headFastBlock().NumberU64()
+ current = d.blockchain.CurrentFastBlock().NumberU64()
case LightSync:
- current = d.headHeader().Number.Uint64()
+ current = d.lightchain.CurrentHeader().Number.Uint64()
}
return ethereum.SyncProgress{
StartingBlock: d.syncStatsChainOrigin,
CurrentBlock: current,
HighestBlock: d.syncStatsChainHeight,
- PulledStates: d.syncStatsStateDone,
- KnownStates: d.syncStatsStateDone + pendingStates,
+ PulledStates: d.syncStatsState.processed,
+ KnownStates: d.syncStatsState.processed + d.syncStatsState.pending,
}
}
@@ -243,13 +270,11 @@ func (d *Downloader) Synchronising() bool {
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
-func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHeadRetrievalFn,
- getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
- getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
+func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error {
logger := log.New("peer", id)
logger.Trace("Registering sync peer")
- if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData, logger)); err != nil {
+ if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil {
logger.Error("Failed to register sync peer", "err", err)
return err
}
@@ -258,6 +283,11 @@ func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHea
return nil
}
+// RegisterLightPeer injects a light client peer, wrapping it so it appears as a regular peer.
+func (d *Downloader) RegisterLightPeer(id string, version int, peer LightPeer) error {
+ return d.RegisterPeer(id, version, &lightPeerWrapper{peer})
+}
+
// UnregisterPeer remove a peer from the known list, preventing any action from
// the specified peer. An effort is also made to return any pending fetches into
// the queue.
@@ -324,13 +354,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
d.queue.Reset()
d.peers.Reset()
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case <-ch:
default:
}
}
- for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} {
+ for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} {
for empty := false; !empty; {
select {
case <-ch:
@@ -369,7 +399,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
-func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) {
+func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
d.mux.Post(StartEvent{})
defer func() {
// reset on error
@@ -439,30 +469,40 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
}
- return d.spawnSync(origin+1,
- func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
- func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved
- func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
- func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
- func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
- )
+
+ fetchers := []func() error{
+ func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
+ func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
+ func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
+ func() error { return d.processHeaders(origin+1, td) },
+ }
+ if d.mode == FastSync {
+ fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
+ } else if d.mode == FullSync {
+ fetchers = append(fetchers, d.processFullSyncContent)
+ }
+ err = d.spawnSync(fetchers)
+ if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
+ // If sync failed in the critical section, bump the fail counter.
+ atomic.AddUint32(&d.fsPivotFails, 1)
+ }
+ return err
}
// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
-func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
+func (d *Downloader) spawnSync(fetchers []func() error) error {
var wg sync.WaitGroup
- errc := make(chan error, len(fetchers)+1)
- wg.Add(len(fetchers) + 1)
- go func() { defer wg.Done(); errc <- d.processContent() }()
+ errc := make(chan error, len(fetchers))
+ wg.Add(len(fetchers))
for _, fn := range fetchers {
fn := fn
go func() { defer wg.Done(); errc <- fn() }()
}
// Wait for the first error, then terminate the others.
var err error
- for i := 0; i < len(fetchers)+1; i++ {
- if i == len(fetchers) {
+ for i := 0; i < len(fetchers); i++ {
+ if i == len(fetchers)-1 {
// Close the queue when all fetchers have exited.
// This will cause the block processor to end when
// it has processed the queue.
@@ -475,11 +515,6 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
d.queue.Close()
d.Cancel()
wg.Wait()
-
- // If sync failed in the critical section, bump the fail counter
- if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
- atomic.AddUint32(&d.fsPivotFails, 1)
- }
return err
}
@@ -517,12 +552,12 @@ func (d *Downloader) Terminate() {
// fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
-func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
+func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
p.log.Debug("Retrieving remote chain height")
// Request the advertised remote head block and wait for the response
- head, _ := p.currentHead()
- go p.getRelHeaders(head, 1, 0, false)
+ head, _ := p.peer.Head()
+ go p.peer.RequestHeadersByHash(head, 1, 0, false)
ttl := d.requestTTL()
timeout := time.After(ttl)
@@ -552,7 +587,6 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
return nil, errTimeout
case <-d.bodyCh:
- case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
@@ -564,15 +598,15 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
// on the correct chain, checking the top N links should already get us a match.
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
// the head links match), we do a binary search to find the common ancestor.
-func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
+func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) {
// Figure out the valid ancestor range to prevent rewrite attacks
- floor, ceil := int64(-1), d.headHeader().Number.Uint64()
+ floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64()
p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height)
if d.mode == FullSync {
- ceil = d.headBlock().NumberU64()
+ ceil = d.blockchain.CurrentBlock().NumberU64()
} else if d.mode == FastSync {
- ceil = d.headFastBlock().NumberU64()
+ ceil = d.blockchain.CurrentFastBlock().NumberU64()
}
if ceil >= MaxForkAncestry {
floor = int64(ceil - MaxForkAncestry)
@@ -592,7 +626,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
if count > limit {
count = limit
}
- go p.getAbsHeaders(uint64(from), count, 15, false)
+ go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false)
// Wait for the remote response to the head fetch
number, hash := uint64(0), common.Hash{}
@@ -632,7 +666,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
continue
}
// Otherwise check if we already know the header or not
- if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) {
+ if (d.mode == FullSync && d.blockchain.HasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) {
number, hash = headers[i].Number.Uint64(), headers[i].Hash()
// If every header is known, even future ones, the peer straight out lied about its head
@@ -649,7 +683,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
return 0, errTimeout
case <-d.bodyCh:
- case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
@@ -675,7 +708,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
ttl := d.requestTTL()
timeout := time.After(ttl)
- go p.getAbsHeaders(uint64(check), 1, 0, false)
+ go p.peer.RequestHeadersByNumber(check, 1, 0, false)
// Wait until a reply arrives to this request
for arrived := false; !arrived; {
@@ -698,11 +731,11 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
arrived = true
// Modify the search interval based on the response
- if (d.mode == FullSync && !d.hasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.hasHeader(headers[0].Hash())) {
+ if (d.mode == FullSync && !d.blockchain.HasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash(), headers[0].Number.Uint64())) {
end = check
break
}
- header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists
+ header := d.lightchain.GetHeaderByHash(headers[0].Hash()) // Independent of sync mode, header surely exists
if header.Number.Uint64() != check {
p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
return 0, errBadPeer
@@ -714,7 +747,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
return 0, errTimeout
case <-d.bodyCh:
- case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
@@ -737,7 +769,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// other peers are only accepted if they map cleanly to the skeleton. If no one
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped.
-func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
+func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
p.log.Debug("Directing header downloads", "origin", from)
defer p.log.Debug("Header download terminated")
@@ -757,10 +789,10 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
if skeleton {
p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
- go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
+ go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
} else {
p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
- go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
+ go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
}
}
// Start pulling the header chain skeleton until all is done
@@ -827,7 +859,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
d.dropPeer(p.id)
// Finish the sync gracefully instead of dumping the gathered data though
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@@ -862,12 +894,12 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
}
expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
throttle = func() bool { return false }
- reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
+ reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveHeaders(p, count), false, nil
}
- fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
- capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) }
- setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) }
+ fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
+ capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
+ setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) }
)
err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
@@ -891,9 +923,9 @@ func (d *Downloader) fetchBodies(from uint64) error {
return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
}
expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
- fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
- capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) }
- setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
+ fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
+ capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
+ setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
)
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
@@ -915,9 +947,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
}
expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
- fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
- capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) }
- setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
+ fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
+ capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
+ setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
)
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
@@ -927,68 +959,6 @@ func (d *Downloader) fetchReceipts(from uint64) error {
return err
}
-// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any
-// available peers, reserving a chunk of nodes for each, waiting for delivery and
-// also periodically checking for timeouts.
-func (d *Downloader) fetchNodeData() error {
- log.Debug("Downloading node state data")
-
- var (
- deliver = func(packet dataPack) (int, error) {
- start := time.Now()
- return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) {
- // If the peer returned old-requested data, forgive
- if err == trie.ErrNotRequested {
- log.Debug("Forgiving reply to stale state request", "peer", packet.PeerId())
- return
- }
- if err != nil {
- // If the node data processing failed, the root hash is very wrong, abort
- log.Error("State processing failed", "peer", packet.PeerId(), "err", err)
- d.Cancel()
- return
- }
- // Processing succeeded, notify state fetcher of continuation
- pending := d.queue.PendingNodeData()
- if pending > 0 {
- select {
- case d.stateWakeCh <- true:
- default:
- }
- }
- d.syncStatsLock.Lock()
- d.syncStatsStateDone += uint64(delivered)
- syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below
- d.syncStatsLock.Unlock()
-
- // If real database progress was made, reset any fast-sync pivot failure
- if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 {
- log.Debug("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&d.fsPivotFails))
- atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
- }
- // Log a message to the user and return
- if delivered > 0 {
- log.Info("Imported new state entries", "count", delivered, "elapsed", common.PrettyDuration(time.Since(start)), "processed", syncStatsStateDone, "pending", pending)
- }
- })
- }
- expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) }
- throttle = func() bool { return false }
- reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
- return d.queue.ReserveNodeData(p, count), false, nil
- }
- fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
- capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) }
- setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
- )
- err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
- d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
- d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "states")
-
- log.Debug("Node state data download terminated", "err", err)
- return err
-}
-
// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
@@ -1015,9 +985,9 @@ func (d *Downloader) fetchNodeData() error {
// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
// - kind: textual label of the type being downloaded to display in log mesages
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
- expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
- fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
- idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error {
+ expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error),
+ fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
+ idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error {
// Create a ticker to detect expired retrieval tasks
ticker := time.NewTicker(100 * time.Millisecond)
@@ -1123,6 +1093,10 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
throttled = true
break
}
+ // Short circuit if there is no more available task.
+ if pending() == 0 {
+ break
+ }
// Reserve a chunk of fetches for a peer. A nil can mean either that
// no more headers are available, or that the peer is known not to
// have them.
@@ -1182,29 +1156,25 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
for i, header := range rollback {
hashes[i] = header.Hash()
}
- lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, common.Big0, common.Big0
- if d.headFastBlock != nil {
- lastFastBlock = d.headFastBlock().Number()
- }
- if d.headBlock != nil {
- lastBlock = d.headBlock().Number()
+ lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
+ if d.mode != LightSync {
+ lastFastBlock = d.blockchain.CurrentFastBlock().Number()
+ lastBlock = d.blockchain.CurrentBlock().Number()
}
- d.rollback(hashes)
+ d.lightchain.Rollback(hashes)
curFastBlock, curBlock := common.Big0, common.Big0
- if d.headFastBlock != nil {
- curFastBlock = d.headFastBlock().Number()
- }
- if d.headBlock != nil {
- curBlock = d.headBlock().Number()
+ if d.mode != LightSync {
+ curFastBlock = d.blockchain.CurrentFastBlock().Number()
+ curBlock = d.blockchain.CurrentBlock().Number()
}
log.Warn("Rolled back headers", "count", len(hashes),
- "header", fmt.Sprintf("%d->%d", lastHeader, d.headHeader().Number),
+ "header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock))
// If we're already past the pivot point, this could be an attack, thread carefully
if rollback[len(rollback)-1].Number.Uint64() > pivot {
- // If we didn't ever fail, lock in te pivot header (must! not! change!)
+ // If we didn't ever fail, lock in the pivot header (must! not! change!)
if atomic.LoadUint32(&d.fsPivotFails) == 0 {
for _, header := range rollback {
if header.Number.Uint64() == pivot {
@@ -1229,7 +1199,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// Terminate header processing if we synced up
if len(headers) == 0 {
// Notify everyone that headers are fully processed
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@@ -1248,7 +1218,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if d.mode != LightSync {
- if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
+ if !gotHeaders && td.Cmp(d.blockchain.GetTdByHash(d.blockchain.CurrentBlock().Hash())) > 0 {
return errStallingPeer
}
}
@@ -1260,7 +1230,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if d.mode == FastSync || d.mode == LightSync {
- if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 {
+ if td.Cmp(d.lightchain.GetTdByHash(d.lightchain.CurrentHeader().Hash())) > 0 {
return errStallingPeer
}
}
@@ -1290,7 +1260,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// Collect the yet unknown headers to mark them as uncertain
unknown := make([]*types.Header, 0, len(headers))
for _, header := range chunk {
- if !d.hasHeader(header.Hash()) {
+ if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) {
unknown = append(unknown, header)
}
}
@@ -1299,7 +1269,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
frequency = 1
}
- if n, err := d.insertHeaders(chunk, frequency); err != nil {
+ if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
// If some headers were inserted, add them too to the rollback list
if n > 0 {
rollback = append(rollback, chunk[:n]...)
@@ -1341,7 +1311,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
origin += uint64(limit)
}
// Signal the content downloaders of the availablility of new tasks
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- true:
default:
@@ -1351,71 +1321,151 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
}
}
-// processContent takes fetch results from the queue and tries to import them
-// into the chain. The type of import operation will depend on the result contents.
-func (d *Downloader) processContent() error {
- pivot := d.queue.FastSyncPivot()
+// processFullSyncContent takes fetch results from the queue and imports them into the chain.
+func (d *Downloader) processFullSyncContent() error {
for {
results := d.queue.WaitResults()
if len(results) == 0 {
- return nil // queue empty
+ return nil
}
if d.chainInsertHook != nil {
d.chainInsertHook(results)
}
- // Actually import the blocks
- first, last := results[0].Header, results[len(results)-1].Header
+ if err := d.importBlockResults(results); err != nil {
+ return err
+ }
+ }
+}
+
+func (d *Downloader) importBlockResults(results []*fetchResult) error {
+ for len(results) != 0 {
+ // Check for any termination requests. This makes clean shutdown faster.
+ select {
+ case <-d.quitCh:
+ return errCancelContentProcessing
+ default:
+ }
+ // Retrieve the a batch of results to import
+ items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
+ first, last := results[0].Header, results[items-1].Header
log.Debug("Inserting downloaded chain", "items", len(results),
"firstnum", first.Number, "firsthash", first.Hash(),
"lastnum", last.Number, "lasthash", last.Hash(),
)
- for len(results) != 0 {
- // Check for any termination requests
- select {
- case <-d.quitCh:
- return errCancelContentProcessing
- default:
- }
- // Retrieve the a batch of results to import
- var (
- blocks = make([]*types.Block, 0, maxResultsProcess)
- receipts = make([]types.Receipts, 0, maxResultsProcess)
- )
- items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
- for _, result := range results[:items] {
- switch {
- case d.mode == FullSync:
- blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
- case d.mode == FastSync:
- blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
- if result.Header.Number.Uint64() <= pivot {
- receipts = append(receipts, result.Receipts)
- }
- }
- }
- // Try to process the results, aborting if there's an error
- var (
- err error
- index int
- )
- switch {
- case len(receipts) > 0:
- index, err = d.insertReceipts(blocks, receipts)
- if err == nil && blocks[len(blocks)-1].NumberU64() == pivot {
- log.Debug("Committing block as new head", "number", blocks[len(blocks)-1].Number(), "hash", blocks[len(blocks)-1].Hash())
- index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash())
- }
- default:
- index, err = d.insertBlocks(blocks)
+ blocks := make([]*types.Block, items)
+ for i, result := range results[:items] {
+ blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ }
+ if index, err := d.blockchain.InsertChain(blocks); err != nil {
+ log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
+ return errInvalidChain
+ }
+ // Shift the results to the next batch
+ results = results[items:]
+ }
+ return nil
+}
+
+// processFastSyncContent takes fetch results from the queue and writes them to the
+// database. It also controls the synchronisation of state nodes of the pivot block.
+func (d *Downloader) processFastSyncContent(latest *types.Header) error {
+ // Start syncing state of the reported head block.
+ // This should get us most of the state of the pivot block.
+ stateSync := d.syncState(latest.Root)
+ defer stateSync.Cancel()
+ go func() {
+ if err := stateSync.Wait(); err != nil {
+ d.queue.Close() // wake up WaitResults
+ }
+ }()
+
+ pivot := d.queue.FastSyncPivot()
+ for {
+ results := d.queue.WaitResults()
+ if len(results) == 0 {
+ return stateSync.Cancel()
+ }
+ if d.chainInsertHook != nil {
+ d.chainInsertHook(results)
+ }
+ P, beforeP, afterP := splitAroundPivot(pivot, results)
+ if err := d.commitFastSyncData(beforeP, stateSync); err != nil {
+ return err
+ }
+ if P != nil {
+ stateSync.Cancel()
+ if err := d.commitPivotBlock(P); err != nil {
+ return err
}
- if err != nil {
- log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
- return errInvalidChain
+ }
+ if err := d.importBlockResults(afterP); err != nil {
+ return err
+ }
+ }
+}
+
+func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
+ for _, result := range results {
+ num := result.Header.Number.Uint64()
+ switch {
+ case num < pivot:
+ before = append(before, result)
+ case num == pivot:
+ p = result
+ default:
+ after = append(after, result)
+ }
+ }
+ return p, before, after
+}
+
+func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error {
+ for len(results) != 0 {
+ // Check for any termination requests.
+ select {
+ case <-d.quitCh:
+ return errCancelContentProcessing
+ case <-stateSync.done:
+ if err := stateSync.Wait(); err != nil {
+ return err
}
- // Shift the results to the next batch
- results = results[items:]
+ default:
+ }
+ // Retrieve the a batch of results to import
+ items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
+ first, last := results[0].Header, results[items-1].Header
+ log.Debug("Inserting fast-sync blocks", "items", len(results),
+ "firstnum", first.Number, "firsthash", first.Hash(),
+ "lastnumn", last.Number, "lasthash", last.Hash(),
+ )
+ blocks := make([]*types.Block, items)
+ receipts := make([]types.Receipts, items)
+ for i, result := range results[:items] {
+ blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ receipts[i] = result.Receipts
+ }
+ if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil {
+ log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
+ return errInvalidChain
}
+ // Shift the results to the next batch
+ results = results[items:]
+ }
+ return nil
+}
+
+func (d *Downloader) commitPivotBlock(result *fetchResult) error {
+ b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ // Sync the pivot block state. This should complete reasonably quickly because
+ // we've already synced up to the reported head block state earlier.
+ if err := d.syncState(b.Root()).Wait(); err != nil {
+ return err
+ }
+ log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash())
+ if _, err := d.blockchain.InsertReceiptChain([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil {
+ return err
}
+ return d.blockchain.FastSyncCommitHead(b.Hash())
}
// DeliverHeaders injects a new batch of block headers received from a remote
@@ -1468,7 +1518,7 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i
func (d *Downloader) qosTuner() {
for {
// Retrieve the current median RTT and integrate into the previoust target RTT
- rtt := time.Duration(float64(1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
+ rtt := time.Duration((1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
atomic.StoreUint64(&d.rttEstimate, uint64(rtt))
// A new RTT cycle passed, increase our confidence in the estimated RTT