From 4f1d92b3329572d75a20b9f9e1cccdf74aa7c79f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= <peterke@gmail.com>
Date: Fri, 27 May 2016 14:26:00 +0300
Subject: eth/downloader, trie: pull head state concurrently with chain

---
 eth/downloader/downloader.go | 56 ++++++++++++++++++++++----------------------
 eth/downloader/queue.go      |  8 ++++++-
 2 files changed, 35 insertions(+), 29 deletions(-)

(limited to 'eth/downloader')

diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 74bff2b66..8cb0d21f7 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -35,6 +35,7 @@ import (
 	"github.com/ethereum/go-ethereum/logger"
 	"github.com/ethereum/go-ethereum/logger/glog"
 	"github.com/ethereum/go-ethereum/params"
+	"github.com/ethereum/go-ethereum/trie"
 	"github.com/rcrowley/go-metrics"
 )
 
@@ -114,7 +115,6 @@ type Downloader struct {
 	// Statistics
 	syncStatsChainOrigin uint64       // Origin block number where syncing started at
 	syncStatsChainHeight uint64       // Highest block number known when syncing started
-	syncStatsStateTotal  uint64       // Total number of node state entries known so far
 	syncStatsStateDone   uint64       // Number of state trie entries already pulled
 	syncStatsLock        sync.RWMutex // Lock protecting the sync stats fields
 
@@ -321,12 +321,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
 			empty = true
 		}
 	}
-	// Reset any ephemeral sync statistics
-	d.syncStatsLock.Lock()
-	d.syncStatsStateTotal = 0
-	d.syncStatsStateDone = 0
-	d.syncStatsLock.Unlock()
-
 	// Create cancel channel for aborting mid-flight
 	d.cancelLock.Lock()
 	d.cancelCh = make(chan struct{})
@@ -382,7 +376,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
 		d.syncStatsLock.Unlock()
 
 		// Initiate the sync using a concurrent hash and block retrieval algorithm
-		d.queue.Prepare(origin+1, d.mode, 0)
+		d.queue.Prepare(origin+1, d.mode, 0, nil)
 		if d.syncInitHook != nil {
 			d.syncInitHook(origin, latest)
 		}
@@ -397,7 +391,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
 		if err != nil {
 			return err
 		}
-		origin, err := d.findAncestor(p, latest)
+		height := latest.Number.Uint64()
+
+		origin, err := d.findAncestor(p, height)
 		if err != nil {
 			return err
 		}
@@ -405,22 +401,22 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
 		if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
 			d.syncStatsChainOrigin = origin
 		}
-		d.syncStatsChainHeight = latest
+		d.syncStatsChainHeight = height
 		d.syncStatsLock.Unlock()
 
 		// Initiate the sync using a concurrent header and content retrieval algorithm
 		pivot := uint64(0)
 		switch d.mode {
 		case LightSync:
-			pivot = latest
+			pivot = height
 		case FastSync:
 			// Calculate the new fast/slow sync pivot point
 			pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
 			if err != nil {
 				panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
 			}
-			if latest > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
-				pivot = latest - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
+			if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
+				pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
 			}
 			// If the point is below the origin, move origin back to ensure state download
 			if pivot < origin {
@@ -432,9 +428,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
 			}
 			glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
 		}
-		d.queue.Prepare(origin+1, d.mode, pivot)
+		d.queue.Prepare(origin+1, d.mode, pivot, latest)
 		if d.syncInitHook != nil {
-			d.syncInitHook(origin, latest)
+			d.syncInitHook(origin, height)
 		}
 		return d.spawnSync(origin+1,
 			func() error { return d.fetchHeaders(p, origin+1) },    // Headers are always retrieved
@@ -952,7 +948,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
 
 // fetchHeight retrieves the head header of the remote peer to aid in estimating
 // the total time a pending synchronisation would take.
-func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
+func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
 	glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
 
 	// Request the advertised remote head block and wait for the response
@@ -962,7 +958,7 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
 	for {
 		select {
 		case <-d.cancelCh:
-			return 0, errCancelBlockFetch
+			return nil, errCancelBlockFetch
 
 		case packet := <-d.headerCh:
 			// Discard anything not from the origin peer
@@ -974,13 +970,13 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
 			headers := packet.(*headerPack).headers
 			if len(headers) != 1 {
 				glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
-				return 0, errBadPeer
+				return nil, errBadPeer
 			}
-			return headers[0].Number.Uint64(), nil
+			return headers[0], nil
 
 		case <-timeout:
 			glog.V(logger.Debug).Infof("%v: head header timeout", p)
-			return 0, errTimeout
+			return nil, errTimeout
 
 		case <-d.bodyCh:
 		case <-d.stateCh:
@@ -1369,10 +1365,10 @@ func (d *Downloader) fetchNodeData() error {
 		deliver = func(packet dataPack) (int, error) {
 			start := time.Now()
 			return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
-				// If the peer gave us nothing, stalling fast sync, drop
-				if delivered == 0 {
-					glog.V(logger.Debug).Infof("peer %s: stalling state delivery, dropping", packet.PeerId())
-					d.dropPeer(packet.PeerId())
+				// If the peer returned old-requested data, forgive
+				if err == trie.ErrNotRequested {
+					glog.V(logger.Info).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId())
+					return
 				}
 				if err != nil {
 					// If the node data processing failed, the root hash is very wrong, abort
@@ -1381,17 +1377,21 @@ func (d *Downloader) fetchNodeData() error {
 					return
 				}
 				// Processing succeeded, notify state fetcher of continuation
-				if d.queue.PendingNodeData() > 0 {
+				pending := d.queue.PendingNodeData()
+				if pending > 0 {
 					select {
 					case d.stateWakeCh <- true:
 					default:
 					}
 				}
-				// Log a message to the user and return
 				d.syncStatsLock.Lock()
-				defer d.syncStatsLock.Unlock()
 				d.syncStatsStateDone += uint64(delivered)
-				glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
+				d.syncStatsLock.Unlock()
+
+				// Log a message to the user and return
+				if delivered > 0 {
+					glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d, pending at least %d", delivered, time.Since(start), d.syncStatsStateDone, pending)
+				}
 			})
 		}
 		expire   = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) }
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 195eae4ff..01897af6d 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -1262,13 +1262,19 @@ func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error,
 
 // Prepare configures the result cache to allow accepting and caching inbound
 // fetch results.
-func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64) {
+func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) {
 	q.lock.Lock()
 	defer q.lock.Unlock()
 
+	// Prepare the queue for sync results
 	if q.resultOffset < offset {
 		q.resultOffset = offset
 	}
 	q.fastSyncPivot = pivot
 	q.mode = mode
+
+	// If long running fast sync, also start up a head stateretrieval immediately
+	if mode == FastSync && pivot > 0 {
+		q.stateScheduler = state.NewStateSync(head.Root, q.stateDatabase)
+	}
 }
-- 
cgit v1.2.3