aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/state/sync.go6
-rw-r--r--core/state/sync_test.go10
-rw-r--r--eth/downloader/downloader.go33
-rw-r--r--eth/downloader/downloader_test.go344
-rw-r--r--eth/downloader/queue.go45
-rw-r--r--trie/sync.go18
-rw-r--r--trie/sync_test.go12
7 files changed, 273 insertions, 195 deletions
diff --git a/core/state/sync.go b/core/state/sync.go
index ef2b4b84c..bab9c8e7e 100644
--- a/core/state/sync.go
+++ b/core/state/sync.go
@@ -59,8 +59,10 @@ func (s *StateSync) Missing(max int) []common.Hash {
return (*trie.TrieSync)(s).Missing(max)
}
-// Process injects a batch of retrieved trie nodes data.
-func (s *StateSync) Process(list []trie.SyncResult) (int, error) {
+// Process injects a batch of retrieved trie nodes data, returning if something
+// was committed to the database and also the index of an entry if processing of
+// it failed.
+func (s *StateSync) Process(list []trie.SyncResult) (bool, int, error) {
return (*trie.TrieSync)(s).Process(list)
}
diff --git a/core/state/sync_test.go b/core/state/sync_test.go
index 949df7301..f5390d80f 100644
--- a/core/state/sync_test.go
+++ b/core/state/sync_test.go
@@ -138,7 +138,7 @@ func testIterativeStateSync(t *testing.T, batch int) {
}
results[i] = trie.SyncResult{Hash: hash, Data: data}
}
- if index, err := sched.Process(results); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
queue = append(queue[:0], sched.Missing(batch)...)
@@ -168,7 +168,7 @@ func TestIterativeDelayedStateSync(t *testing.T) {
}
results[i] = trie.SyncResult{Hash: hash, Data: data}
}
- if index, err := sched.Process(results); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
queue = append(queue[len(results):], sched.Missing(0)...)
@@ -206,7 +206,7 @@ func testIterativeRandomStateSync(t *testing.T, batch int) {
results = append(results, trie.SyncResult{Hash: hash, Data: data})
}
// Feed the retrieved results back and queue new tasks
- if index, err := sched.Process(results); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
queue = make(map[common.Hash]struct{})
@@ -249,7 +249,7 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) {
}
}
// Feed the retrieved results back and queue new tasks
- if index, err := sched.Process(results); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
for _, hash := range sched.Missing(0) {
@@ -283,7 +283,7 @@ func TestIncompleteStateSync(t *testing.T) {
results[i] = trie.SyncResult{Hash: hash, Data: data}
}
// Process each of the state nodes
- if index, err := sched.Process(results); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
for _, result := range results {
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 987be2b7a..da20e48a2 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -64,12 +64,12 @@ var (
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
maxResultsProcess = 2048 // Number of content download results to import at once into the chain
- fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
- fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
- fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
- fsPivotInterval = 512 // Number of headers out of which to randomize the pivot point
- fsMinFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync
- fsCriticalTrials = 10 // Number of times to retry in the cricical section before bailing
+ fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
+ fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
+ fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
+ fsPivotInterval = 256 // Number of headers out of which to randomize the pivot point
+ fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync
+ fsCriticalTrials = uint32(32) // Number of times to retry in the cricical section before bailing
)
var (
@@ -105,7 +105,7 @@ type Downloader struct {
peers *peerSet // Set of active peers from which download can proceed
fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
- fsPivotFails int // Number of fast sync failures in the critical section
+ fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section
rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
@@ -361,7 +361,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// Set the requested sync mode, unless it's forbidden
d.mode = mode
- if d.mode == FastSync && d.fsPivotFails >= fsCriticalTrials {
+ if d.mode == FastSync && atomic.LoadUint32(&d.fsPivotFails) >= fsCriticalTrials {
d.mode = FullSync
}
// Retrieve the origin peer and initiate the downloading process
@@ -480,6 +480,11 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
d.queue.Close()
d.cancel()
wg.Wait()
+
+ // If sync failed in the critical section, bump the fail counter
+ if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
+ atomic.AddUint32(&d.fsPivotFails, 1)
+ }
return err
}
@@ -926,10 +931,10 @@ func (d *Downloader) fetchNodeData() error {
var (
deliver = func(packet dataPack) (int, error) {
start := time.Now()
- return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
+ return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) {
// If the peer returned old-requested data, forgive
if err == trie.ErrNotRequested {
- glog.V(logger.Info).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId())
+ glog.V(logger.Debug).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId())
return
}
if err != nil {
@@ -951,6 +956,11 @@ func (d *Downloader) fetchNodeData() error {
syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below
d.syncStatsLock.Unlock()
+ // If real database progress was made, reset any fast-sync pivot failure
+ if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 {
+ glog.V(logger.Debug).Infof("fast-sync progressed, resetting fail counter from %d", atomic.LoadUint32(&d.fsPivotFails))
+ atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
+ }
// Log a message to the user and return
if delivered > 0 {
glog.V(logger.Info).Infof("imported %3d state entries in %9v: processed %d, pending at least %d", delivered, common.PrettyDuration(time.Since(start)), syncStatsStateDone, pending)
@@ -1177,7 +1187,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// If we're already past the pivot point, this could be an attack, thread carefully
if rollback[len(rollback)-1].Number.Uint64() > pivot {
// If we didn't ever fail, lock in te pivot header (must! not! change!)
- if d.fsPivotFails == 0 {
+ if atomic.LoadUint32(&d.fsPivotFails) == 0 {
for _, header := range rollback {
if header.Number.Uint64() == pivot {
glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4])
@@ -1185,7 +1195,6 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
}
}
}
- d.fsPivotFails++
}
}
}()
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 366c248bb..2849712ab 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -37,25 +37,79 @@ import (
)
var (
- testdb, _ = ethdb.NewMemDatabase()
testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
testAddress = crypto.PubkeyToAddress(testKey.PublicKey)
- genesis = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000))
)
// Reduce some of the parameters to make the tester faster.
func init() {
MaxForkAncestry = uint64(10000)
blockCacheLimit = 1024
+ fsCriticalTrials = 10
+}
+
+// downloadTester is a test simulator for mocking out local block chain.
+type downloadTester struct {
+ downloader *Downloader
+
+ genesis *types.Block // Genesis blocks used by the tester and peers
+ stateDb ethdb.Database // Database used by the tester for syncing from peers
+ peerDb ethdb.Database // Database of the peers containing all data
+
+ ownHashes []common.Hash // Hash chain belonging to the tester
+ ownHeaders map[common.Hash]*types.Header // Headers belonging to the tester
+ ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester
+ ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester
+ ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain
+
+ peerHashes map[string][]common.Hash // Hash chain belonging to different test peers
+ peerHeaders map[string]map[common.Hash]*types.Header // Headers belonging to different test peers
+ peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers
+ peerReceipts map[string]map[common.Hash]types.Receipts // Receipts belonging to different test peers
+ peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains
+
+ peerMissingStates map[string]map[common.Hash]bool // State entries that fast sync should not return
+
+ lock sync.RWMutex
+}
+
+// newTester creates a new downloader test mocker.
+func newTester() *downloadTester {
+ testdb, _ := ethdb.NewMemDatabase()
+ genesis := core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000))
+
+ tester := &downloadTester{
+ genesis: genesis,
+ peerDb: testdb,
+ ownHashes: []common.Hash{genesis.Hash()},
+ ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
+ ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
+ ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): nil},
+ ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()},
+ peerHashes: make(map[string][]common.Hash),
+ peerHeaders: make(map[string]map[common.Hash]*types.Header),
+ peerBlocks: make(map[string]map[common.Hash]*types.Block),
+ peerReceipts: make(map[string]map[common.Hash]types.Receipts),
+ peerChainTds: make(map[string]map[common.Hash]*big.Int),
+ peerMissingStates: make(map[string]map[common.Hash]bool),
+ }
+ tester.stateDb, _ = ethdb.NewMemDatabase()
+ tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00})
+
+ tester.downloader = New(tester.stateDb, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader,
+ tester.getBlock, tester.headHeader, tester.headBlock, tester.headFastBlock, tester.commitHeadBlock, tester.getTd,
+ tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.rollback, tester.dropPeer)
+
+ return tester
}
// makeChain creates a chain of n blocks starting at and including parent.
// the returned hash chain is ordered head->parent. In addition, every 3rd block
// contains a transaction and every 5th an uncle to allow testing correct block
// reassembly.
-func makeChain(n int, seed byte, parent *types.Block, parentReceipts types.Receipts, heavy bool) ([]common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]types.Receipts) {
+func (dl *downloadTester) makeChain(n int, seed byte, parent *types.Block, parentReceipts types.Receipts, heavy bool) ([]common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]types.Receipts) {
// Generate the block chain
- blocks, receipts := core.GenerateChain(nil, parent, testdb, n, func(i int, block *core.BlockGen) {
+ blocks, receipts := core.GenerateChain(nil, parent, dl.peerDb, n, func(i int, block *core.BlockGen) {
block.SetCoinbase(common.Address{seed})
// If a heavy chain is requested, delay blocks to raise difficulty
@@ -63,7 +117,7 @@ func makeChain(n int, seed byte, parent *types.Block, parentReceipts types.Recei
block.OffsetTime(-1)
}
// If the block number is multiple of 3, send a bonus transaction to the miner
- if parent == genesis && i%3 == 0 {
+ if parent == dl.genesis && i%3 == 0 {
tx, err := types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(testKey)
if err != nil {
panic(err)
@@ -102,19 +156,19 @@ func makeChain(n int, seed byte, parent *types.Block, parentReceipts types.Recei
// makeChainFork creates two chains of length n, such that h1[:f] and
// h2[:f] are different but have a common suffix of length n-f.
-func makeChainFork(n, f int, parent *types.Block, parentReceipts types.Receipts, balanced bool) ([]common.Hash, []common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]*types.Block, map[common.Hash]types.Receipts, map[common.Hash]types.Receipts) {
+func (dl *downloadTester) makeChainFork(n, f int, parent *types.Block, parentReceipts types.Receipts, balanced bool) ([]common.Hash, []common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]*types.Block, map[common.Hash]types.Receipts, map[common.Hash]types.Receipts) {
// Create the common suffix
- hashes, headers, blocks, receipts := makeChain(n-f, 0, parent, parentReceipts, false)
+ hashes, headers, blocks, receipts := dl.makeChain(n-f, 0, parent, parentReceipts, false)
// Create the forks, making the second heavyer if non balanced forks were requested
- hashes1, headers1, blocks1, receipts1 := makeChain(f, 1, blocks[hashes[0]], receipts[hashes[0]], false)
+ hashes1, headers1, blocks1, receipts1 := dl.makeChain(f, 1, blocks[hashes[0]], receipts[hashes[0]], false)
hashes1 = append(hashes1, hashes[1:]...)
heavy := false
if !balanced {
heavy = true
}
- hashes2, headers2, blocks2, receipts2 := makeChain(f, 2, blocks[hashes[0]], receipts[hashes[0]], heavy)
+ hashes2, headers2, blocks2, receipts2 := dl.makeChain(f, 2, blocks[hashes[0]], receipts[hashes[0]], heavy)
hashes2 = append(hashes2, hashes[1:]...)
for hash, header := range headers {
@@ -132,53 +186,6 @@ func makeChainFork(n, f int, parent *types.Block, parentReceipts types.Receipts,
return hashes1, hashes2, headers1, headers2, blocks1, blocks2, receipts1, receipts2
}
-// downloadTester is a test simulator for mocking out local block chain.
-type downloadTester struct {
- stateDb ethdb.Database
- downloader *Downloader
-
- ownHashes []common.Hash // Hash chain belonging to the tester
- ownHeaders map[common.Hash]*types.Header // Headers belonging to the tester
- ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester
- ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester
- ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain
-
- peerHashes map[string][]common.Hash // Hash chain belonging to different test peers
- peerHeaders map[string]map[common.Hash]*types.Header // Headers belonging to different test peers
- peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers
- peerReceipts map[string]map[common.Hash]types.Receipts // Receipts belonging to different test peers
- peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains
-
- peerMissingStates map[string]map[common.Hash]bool // State entries that fast sync should not return
-
- lock sync.RWMutex
-}
-
-// newTester creates a new downloader test mocker.
-func newTester() *downloadTester {
- tester := &downloadTester{
- ownHashes: []common.Hash{genesis.Hash()},
- ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
- ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
- ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): nil},
- ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()},
- peerHashes: make(map[string][]common.Hash),
- peerHeaders: make(map[string]map[common.Hash]*types.Header),
- peerBlocks: make(map[string]map[common.Hash]*types.Block),
- peerReceipts: make(map[string]map[common.Hash]types.Receipts),
- peerChainTds: make(map[string]map[common.Hash]*big.Int),
- peerMissingStates: make(map[string]map[common.Hash]bool),
- }
- tester.stateDb, _ = ethdb.NewMemDatabase()
- tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00})
-
- tester.downloader = New(tester.stateDb, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader,
- tester.getBlock, tester.headHeader, tester.headBlock, tester.headFastBlock, tester.commitHeadBlock, tester.getTd,
- tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.rollback, tester.dropPeer)
-
- return tester
-}
-
// terminate aborts any operations on the embedded downloader and releases all
// held resources.
func (dl *downloadTester) terminate() {
@@ -251,7 +258,7 @@ func (dl *downloadTester) headHeader() *types.Header {
return header
}
}
- return genesis.Header()
+ return dl.genesis.Header()
}
// headBlock retrieves the current head block from the canonical chain.
@@ -266,7 +273,7 @@ func (dl *downloadTester) headBlock() *types.Block {
}
}
}
- return genesis
+ return dl.genesis
}
// headFastBlock retrieves the current head fast-sync block from the canonical chain.
@@ -279,7 +286,7 @@ func (dl *downloadTester) headFastBlock() *types.Block {
return block
}
}
- return genesis
+ return dl.genesis
}
// commitHeadBlock manually sets the head block to a given hash.
@@ -351,7 +358,7 @@ func (dl *downloadTester) insertBlocks(blocks types.Blocks) (int, error) {
return len(blocks), nil
}
-// insertReceipts injects a new batch of blocks into the simulated chain.
+// insertReceipts injects a new batch of receipts into the simulated chain.
func (dl *downloadTester) insertReceipts(blocks types.Blocks, receipts []types.Receipts) (int, error) {
dl.lock.Lock()
defer dl.lock.Unlock()
@@ -586,7 +593,7 @@ func (dl *downloadTester) peerGetNodeDataFn(id string, delay time.Duration) func
results := make([][]byte, 0, len(hashes))
for _, hash := range hashes {
- if data, err := testdb.Get(hash.Bytes()); err == nil {
+ if data, err := dl.peerDb.Get(hash.Bytes()); err == nil {
if !dl.peerMissingStates[id][hash] {
results = append(results, data)
}
@@ -669,13 +676,13 @@ func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronis
func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
- // Create a small enough block chain to download
- targetBlocks := blockCacheLimit - 15
- hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
-
tester := newTester()
defer tester.terminate()
+ // Create a small enough block chain to download
+ targetBlocks := blockCacheLimit - 15
+ hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
+
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Synchronise with the peer and make sure all relevant data was retrieved
@@ -694,13 +701,13 @@ func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) }
func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
func testThrottling(t *testing.T, protocol int, mode SyncMode) {
- // Create a long block chain to download and the tester
- targetBlocks := 8 * blockCacheLimit
- hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
-
tester := newTester()
defer tester.terminate()
+ // Create a long block chain to download and the tester
+ targetBlocks := 8 * blockCacheLimit
+ hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
+
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Wrap the importer to allow stepping
@@ -782,13 +789,13 @@ func TestForkedSync64Light(t *testing.T) { testForkedSync(t, 64, LightSync) }
func testForkedSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
- // Create a long enough forked chain
- common, fork := MaxHashFetch, 2*MaxHashFetch
- hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true)
-
tester := newTester()
defer tester.terminate()
+ // Create a long enough forked chain
+ common, fork := MaxHashFetch, 2*MaxHashFetch
+ hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, true)
+
tester.newPeer("fork A", protocol, hashesA, headersA, blocksA, receiptsA)
tester.newPeer("fork B", protocol, hashesB, headersB, blocksB, receiptsB)
@@ -817,13 +824,13 @@ func TestHeavyForkedSync64Light(t *testing.T) { testHeavyForkedSync(t, 64, Light
func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
- // Create a long enough forked chain
- common, fork := MaxHashFetch, 4*MaxHashFetch
- hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, false)
-
tester := newTester()
defer tester.terminate()
+ // Create a long enough forked chain
+ common, fork := MaxHashFetch, 4*MaxHashFetch
+ hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, false)
+
tester.newPeer("light", protocol, hashesA, headersA, blocksA, receiptsA)
tester.newPeer("heavy", protocol, hashesB[fork/2:], headersB, blocksB, receiptsB)
@@ -853,13 +860,13 @@ func TestBoundedForkedSync64Light(t *testing.T) { testBoundedForkedSync(t, 64, L
func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
- // Create a long enough forked chain
- common, fork := 13, int(MaxForkAncestry+17)
- hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true)
-
tester := newTester()
defer tester.terminate()
+ // Create a long enough forked chain
+ common, fork := 13, int(MaxForkAncestry+17)
+ hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, true)
+
tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA)
tester.newPeer("rewriter", protocol, hashesB, headersB, blocksB, receiptsB)
@@ -888,13 +895,13 @@ func TestBoundedHeavyForkedSync64Light(t *testing.T) { testBoundedHeavyForkedSyn
func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
- // Create a long enough forked chain
- common, fork := 13, int(MaxForkAncestry+17)
- hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, false)
-
tester := newTester()
defer tester.terminate()
+ // Create a long enough forked chain
+ common, fork := 13, int(MaxForkAncestry+17)
+ hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, false)
+
tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA)
tester.newPeer("heavy-rewriter", protocol, hashesB[MaxForkAncestry-17:], headersB, blocksB, receiptsB) // Root the fork below the ancestor limit
@@ -958,6 +965,9 @@ func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) }
func testCancel(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
+ tester := newTester()
+ defer tester.terminate()
+
// Create a small enough block chain to download and the tester
targetBlocks := blockCacheLimit - 15
if targetBlocks >= MaxHashFetch {
@@ -966,10 +976,7 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) {
if targetBlocks >= MaxHeaderFetch {
targetBlocks = MaxHeaderFetch - 15
}
- hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
-
- tester := newTester()
- defer tester.terminate()
+ hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
@@ -999,13 +1006,13 @@ func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t,
func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
+ tester := newTester()
+ defer tester.terminate()
+
// Create various peers with various parts of the chain
targetPeers := 8
targetBlocks := targetPeers*blockCacheLimit - 15
- hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
-
- tester := newTester()
- defer tester.terminate()
+ hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
for i := 0; i < targetPeers; i++ {
id := fmt.Sprintf("peer #%d", i)
@@ -1029,14 +1036,14 @@ func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t,
func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
+ tester := newTester()
+ defer tester.terminate()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
- hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
+ hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
// Create peers of every type
- tester := newTester()
- defer tester.terminate()
-
tester.newPeer("peer 62", 62, hashes, headers, blocks, nil)
tester.newPeer("peer 63", 63, hashes, headers, blocks, receipts)
tester.newPeer("peer 64", 64, hashes, headers, blocks, receipts)
@@ -1068,13 +1075,13 @@ func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, L
func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
- // Create a block chain to download
- targetBlocks := 2*blockCacheLimit - 15
- hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
-
tester := newTester()
defer tester.terminate()
+ // Create a block chain to download
+ targetBlocks := 2*blockCacheLimit - 15
+ hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
+
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Instrument the downloader to signal body requests
@@ -1094,7 +1101,7 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
// Validate the number of block bodies that should have been requested
bodiesNeeded, receiptsNeeded := 0, 0
for _, block := range blocks {
- if mode != LightSync && block != genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) {
+ if mode != LightSync && block != tester.genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) {
bodiesNeeded++
}
}
@@ -1123,13 +1130,13 @@ func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 6
func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
- // Create a small enough block chain to download
- targetBlocks := blockCacheLimit - 15
- hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
-
tester := newTester()
defer tester.terminate()
+ // Create a small enough block chain to download
+ targetBlocks := blockCacheLimit - 15
+ hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
+
// Attempt a full sync with an attacker feeding gapped headers
tester.newPeer("attack", protocol, hashes, headers, blocks, receipts)
missing := targetBlocks / 2
@@ -1156,13 +1163,13 @@ func TestShiftedHeaderAttack64Fast(t *testing.T) { testShiftedHeaderAttack(t, 6
func TestShiftedHeaderAttack64Light(t *testing.T) { testShiftedHeaderAttack(t, 64, LightSync) }
func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
- // Create a small enough block chain to download
- targetBlocks := blockCacheLimit - 15
- hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
-
tester := newTester()
defer tester.terminate()
+ // Create a small enough block chain to download
+ targetBlocks := blockCacheLimit - 15
+ hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
+
// Attempt a full sync with an attacker feeding shifted headers
tester.newPeer("attack", protocol, hashes, headers, blocks, receipts)
delete(tester.peerHeaders["attack"], hashes[len(hashes)-2])
@@ -1188,13 +1195,13 @@ func TestInvalidHeaderRollback64Fast(t *testing.T) { testInvalidHeaderRollback(
func TestInvalidHeaderRollback64Light(t *testing.T) { testInvalidHeaderRollback(t, 64, LightSync) }
func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
- // Create a small enough block chain to download
- targetBlocks := 3*fsHeaderSafetyNet + fsMinFullBlocks
- hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
-
tester := newTester()
defer tester.terminate()
+ // Create a small enough block chain to download
+ targetBlocks := 3*fsHeaderSafetyNet + fsPivotInterval + fsMinFullBlocks
+ hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
+
// Attempt to sync with an attacker that feeds junk during the fast sync phase.
// This should result in the last fsHeaderSafetyNet headers being rolled back.
tester.newPeer("fast-attack", protocol, hashes, headers, blocks, receipts)
@@ -1286,7 +1293,7 @@ func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
tester := newTester()
defer tester.terminate()
- hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil, false)
+ hashes, headers, blocks, receipts := tester.makeChain(0, 0, tester.genesis, nil, false)
tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, headers, blocks, receipts)
if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer {
@@ -1333,7 +1340,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
for i, tt := range tests {
// Register a new peer and ensure it's presence
id := fmt.Sprintf("test %d", i)
- if err := tester.newPeer(id, protocol, []common.Hash{genesis.Hash()}, nil, nil, nil); err != nil {
+ if err := tester.newPeer(id, protocol, []common.Hash{tester.genesis.Hash()}, nil, nil, nil); err != nil {
t.Fatalf("test %d: failed to register new peer: %v", i, err)
}
if _, ok := tester.peerHashes[id]; !ok {
@@ -1342,7 +1349,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
// Simulate a synchronisation and check the required result
tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result }
- tester.downloader.Synchronise(id, genesis.Hash(), big.NewInt(1000), FullSync)
+ tester.downloader.Synchronise(id, tester.genesis.Hash(), big.NewInt(1000), FullSync)
if _, ok := tester.peerHashes[id]; !ok != tt.drop {
t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop)
}
@@ -1361,17 +1368,17 @@ func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync)
func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
+ tester := newTester()
+ defer tester.terminate()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
- hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
+ hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
// Set a sync init hook to catch progress changes
starting := make(chan struct{})
progress := make(chan struct{})
- tester := newTester()
- defer tester.terminate()
-
tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{}
<-progress
@@ -1434,17 +1441,17 @@ func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64,
func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
+ tester := newTester()
+ defer tester.terminate()
+
// Create a forked chain to simulate origin revertal
common, fork := MaxHashFetch, 2*MaxHashFetch
- hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true)
+ hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, true)
// Set a sync init hook to catch progress changes
starting := make(chan struct{})
progress := make(chan struct{})
- tester := newTester()
- defer tester.terminate()
-
tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{}
<-progress
@@ -1510,17 +1517,17 @@ func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64,
func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
+ tester := newTester()
+ defer tester.terminate()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
- hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
+ hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
// Set a sync init hook to catch progress changes
starting := make(chan struct{})
progress := make(chan struct{})
- tester := newTester()
- defer tester.terminate()
-
tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{}
<-progress
@@ -1587,17 +1594,17 @@ func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, L
func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
+ tester := newTester()
+ defer tester.terminate()
+
// Create a small block chain
targetBlocks := blockCacheLimit - 15
- hashes, headers, blocks, receipts := makeChain(targetBlocks+3, 0, genesis, nil, false)
+ hashes, headers, blocks, receipts := tester.makeChain(targetBlocks+3, 0, tester.genesis, nil, false)
// Set a sync init hook to catch progress changes
starting := make(chan struct{})
progress := make(chan struct{})
- tester := newTester()
- defer tester.terminate()
-
tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{}
<-progress
@@ -1664,10 +1671,16 @@ func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64,
func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
- hashes, headers, blocks, receipts := makeChain(5, 0, genesis, nil, false)
+
+ master := newTester()
+ defer master.terminate()
+
+ hashes, headers, blocks, receipts := master.makeChain(5, 0, master.genesis, nil, false)
fakeHeads := []*types.Header{{}, {}, {}, {}}
for i := 0; i < 200; i++ {
tester := newTester()
+ tester.peerDb = master.peerDb
+
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Whenever the downloader requests headers, flood it with
// a lot of unrequested header deliveries.
@@ -1703,44 +1716,81 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
// Tests that if fast sync aborts in the critical section, it can restart a few
// times before giving up.
-func TestFastCriticalRestarts63(t *testing.T) { testFastCriticalRestarts(t, 63) }
-func TestFastCriticalRestarts64(t *testing.T) { testFastCriticalRestarts(t, 64) }
+func TestFastCriticalRestartsFail63(t *testing.T) { testFastCriticalRestarts(t, 63, false) }
+func TestFastCriticalRestartsFail64(t *testing.T) { testFastCriticalRestarts(t, 64, false) }
+func TestFastCriticalRestartsCont63(t *testing.T) { testFastCriticalRestarts(t, 63, true) }
+func TestFastCriticalRestartsCont64(t *testing.T) { testFastCriticalRestarts(t, 64, true) }
-func testFastCriticalRestarts(t *testing.T, protocol int) {
- t.Parallel()
+func testFastCriticalRestarts(t *testing.T, protocol int, progress bool) {
+ tester := newTester()
+ defer tester.terminate()
// Create a large enough blockchin to actually fast sync on
targetBlocks := fsMinFullBlocks + 2*fsPivotInterval - 15
- hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
-
- // Create a tester peer with the critical section state roots missing (force failures)
- tester := newTester()
- defer tester.terminate()
+ hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
+ // Create a tester peer with a critical section header missing (force failures)
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
+ delete(tester.peerHeaders["peer"], hashes[fsMinFullBlocks-1])
+ tester.downloader.dropPeer = func(id string) {} // We reuse the same "faulty" peer throughout the test
+
+ // Remove all possible pivot state roots and slow down replies (test failure resets later)
for i := 0; i < fsPivotInterval; i++ {
tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true
}
- tester.downloader.dropPeer = func(id string) {} // We reuse the same "faulty" peer throughout the test
+ tester.downloader.peers.peers["peer"].getNodeData = tester.peerGetNodeDataFn("peer", 500*time.Millisecond) // Enough to reach the critical section
// Synchronise with the peer a few times and make sure they fail until the retry limit
- for i := 0; i < fsCriticalTrials; i++ {
+ for i := 0; i < int(fsCriticalTrials)-1; i++ {
// Attempt a sync and ensure it fails properly
if err := tester.sync("peer", nil, FastSync); err == nil {
t.Fatalf("failing fast sync succeeded: %v", err)
}
- time.Sleep(100 * time.Millisecond) // Make sure no in-flight requests remain
+ time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain
// If it's the first failure, pivot should be locked => reenable all others to detect pivot changes
if i == 0 {
+ if tester.downloader.fsPivotLock == nil {
+ time.Sleep(400 * time.Millisecond) // Make sure the first huge timeout expires too
+ t.Fatalf("pivot block not locked in after critical section failure")
+ }
tester.lock.Lock()
+ tester.peerHeaders["peer"][hashes[fsMinFullBlocks-1]] = headers[hashes[fsMinFullBlocks-1]]
tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true}
+ tester.downloader.peers.peers["peer"].getNodeData = tester.peerGetNodeDataFn("peer", 0)
tester.lock.Unlock()
}
}
+ // Return all nodes if we're testing fast sync progression
+ if progress {
+ tester.lock.Lock()
+ tester.peerMissingStates["peer"] = map[common.Hash]bool{}
+ tester.lock.Unlock()
+
+ if err := tester.sync("peer", nil, FastSync); err != nil {
+ t.Fatalf("failed to synchronise blocks in progressed fast sync: %v", err)
+ }
+ time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain
+
+ if fails := atomic.LoadUint32(&tester.downloader.fsPivotFails); fails != 1 {
+ t.Fatalf("progressed pivot trial count mismatch: have %v, want %v", fails, 1)
+ }
+ assertOwnChain(t, tester, targetBlocks+1)
+ } else {
+ if err := tester.sync("peer", nil, FastSync); err == nil {
+ t.Fatalf("succeeded to synchronise blocks in failed fast sync")
+ }
+ time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain
+
+ if fails := atomic.LoadUint32(&tester.downloader.fsPivotFails); fails != fsCriticalTrials {
+ t.Fatalf("failed pivot trial count mismatch: have %v, want %v", fails, fsCriticalTrials)
+ }
+ }
// Retry limit exhausted, downloader will switch to full sync, should succeed
if err := tester.sync("peer", nil, FastSync); err != nil {
t.Fatalf("failed to synchronise blocks in slow sync: %v", err)
}
- assertOwnChain(t, tester, targetBlocks+1)
+ // Note, we can't assert the chain here because the test asserter assumes sync
+ // completed using a single mode of operation, whereas fast-then-slow can result
+ // in arbitrary intermediate state that's not cleanly verifiable.
}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index fd239f7e4..b7ad92099 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -362,20 +362,20 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
// Make sure chain order is honoured and preserved throughout
hash := header.Hash()
if header.Number == nil || header.Number.Uint64() != from {
- glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ordering, expected %d", header.Number, hash[:4], from)
+ glog.V(logger.Warn).Infof("Header #%v [%x…] broke chain ordering, expected %d", header.Number, hash[:4], from)
break
}
if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash {
- glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ancestry", header.Number, hash[:4])
+ glog.V(logger.Warn).Infof("Header #%v [%x…] broke chain ancestry", header.Number, hash[:4])
break
}
// Make sure no duplicate requests are executed
if _, ok := q.blockTaskPool[hash]; ok {
- glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for block fetch", header.Number.Uint64(), hash[:4])
+ glog.V(logger.Warn).Infof("Header #%d [%x…] already scheduled for block fetch", header.Number.Uint64(), hash[:4])
continue
}
if _, ok := q.receiptTaskPool[hash]; ok {
- glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for receipt fetch", header.Number.Uint64(), hash[:4])
+ glog.V(logger.Warn).Infof("Header #%d [%x…] already scheduled for receipt fetch", header.Number.Uint64(), hash[:4])
continue
}
// Queue the header for content retrieval
@@ -388,7 +388,16 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
}
if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot {
- // Pivoting point of the fast sync, retrieve the state tries
+ // Pivoting point of the fast sync, switch the state retrieval to this
+ glog.V(logger.Debug).Infof("Switching state downloads to %d [%x…]", header.Number.Uint64(), header.Hash().Bytes()[:4])
+
+ q.stateTaskIndex = 0
+ q.stateTaskPool = make(map[common.Hash]int)
+ q.stateTaskQueue.Reset()
+ for _, req := range q.statePendPool {
+ req.Hashes = make(map[common.Hash]int) // Make sure executing requests fail, but don't disappear
+ }
+
q.stateSchedLock.Lock()
q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase)
q.stateSchedLock.Unlock()
@@ -866,10 +875,10 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
accepted := len(headers) == MaxHeaderFetch
if accepted {
if headers[0].Number.Uint64() != request.From {
- glog.V(logger.Detail).Infof("Peer %s: first header #%v [%x] broke chain ordering, expected %d", id, headers[0].Number, headers[0].Hash().Bytes()[:4], request.From)
+ glog.V(logger.Detail).Infof("Peer %s: first header #%v [%x…] broke chain ordering, expected %d", id, headers[0].Number, headers[0].Hash().Bytes()[:4], request.From)
accepted = false
} else if headers[len(headers)-1].Hash() != target {
- glog.V(logger.Detail).Infof("Peer %s: last header #%v [%x] broke skeleton structure, expected %x", id, headers[len(headers)-1].Number, headers[len(headers)-1].Hash().Bytes()[:4], target[:4])
+ glog.V(logger.Detail).Infof("Peer %s: last header #%v [%x…] broke skeleton structure, expected %x", id, headers[len(headers)-1].Number, headers[len(headers)-1].Hash().Bytes()[:4], target[:4])
accepted = false
}
}
@@ -877,12 +886,12 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
for i, header := range headers[1:] {
hash := header.Hash()
if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
- glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ordering, expected %d", id, header.Number, hash[:4], want)
+ glog.V(logger.Warn).Infof("Peer %s: header #%v [%x…] broke chain ordering, expected %d", id, header.Number, hash[:4], want)
accepted = false
break
}
if headers[i].Hash() != header.ParentHash {
- glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ancestry", id, header.Number, hash[:4])
+ glog.V(logger.Warn).Infof("Peer %s: header #%v [%x…] broke chain ancestry", id, header.Number, hash[:4])
accepted = false
break
}
@@ -1039,9 +1048,8 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
}
// DeliverNodeData injects a node state data retrieval response into the queue.
-// The method returns the number of node state entries originally requested, and
-// the number of them actually accepted from the delivery.
-func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) {
+// The method returns the number of node state accepted from the delivery.
+func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, bool, error)) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -1099,31 +1107,34 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
// deliverNodeData is the asynchronous node data processor that injects a batch
// of sync results into the state scheduler.
-func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) {
+func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(int, bool, error)) {
// Wake up WaitResults after the state has been written because it
// might be waiting for the pivot block state to get completed.
defer q.active.Signal()
// Process results one by one to permit task fetches in between
+ progressed := false
for i, result := range results {
q.stateSchedLock.Lock()
if q.stateScheduler == nil {
// Syncing aborted since this async delivery started, bail out
q.stateSchedLock.Unlock()
- callback(errNoFetchesPending, i)
+ callback(i, progressed, errNoFetchesPending)
return
}
- if _, err := q.stateScheduler.Process([]trie.SyncResult{result}); err != nil {
+ if prog, _, err := q.stateScheduler.Process([]trie.SyncResult{result}); err != nil {
// Processing a state result failed, bail out
q.stateSchedLock.Unlock()
- callback(err, i)
+ callback(i, progressed, err)
return
+ } else if prog {
+ progressed = true
}
// Item processing succeeded, release the lock (temporarily)
q.stateSchedLock.Unlock()
}
- callback(nil, len(results))
+ callback(len(results), progressed, nil)
}
// Prepare configures the result cache to allow accepting and caching inbound
diff --git a/trie/sync.go b/trie/sync.go
index 58b8a1fb6..2158ab750 100644
--- a/trie/sync.go
+++ b/trie/sync.go
@@ -142,34 +142,40 @@ func (s *TrieSync) Missing(max int) []common.Hash {
return requests
}
-// Process injects a batch of retrieved trie nodes data.
-func (s *TrieSync) Process(results []SyncResult) (int, error) {
+// Process injects a batch of retrieved trie nodes data, returning if something
+// was committed to the database and also the index of an entry if processing of
+// it failed.
+func (s *TrieSync) Process(results []SyncResult) (bool, int, error) {
+ committed := false
+
for i, item := range results {
// If the item was not requested, bail out
request := s.requests[item.Hash]
if request == nil {
- return i, ErrNotRequested
+ return committed, i, ErrNotRequested
}
// If the item is a raw entry request, commit directly
if request.raw {
request.data = item.Data
s.commit(request, nil)
+ committed = true
continue
}
// Decode the node data content and update the request
node, err := decodeNode(item.Hash[:], item.Data, 0)
if err != nil {
- return i, err
+ return committed, i, err
}
request.data = item.Data
// Create and schedule a request for all the children nodes
requests, err := s.children(request, node)
if err != nil {
- return i, err
+ return committed, i, err
}
if len(requests) == 0 && request.deps == 0 {
s.commit(request, nil)
+ committed = true
continue
}
request.deps += len(requests)
@@ -177,7 +183,7 @@ func (s *TrieSync) Process(results []SyncResult) (int, error) {
s.schedule(child)
}
}
- return 0, nil
+ return committed, 0, nil
}
// Pending returns the number of state entries currently pending for download.
diff --git a/trie/sync_test.go b/trie/sync_test.go
index a763dc564..5292fe5cb 100644
--- a/trie/sync_test.go
+++ b/trie/sync_test.go
@@ -122,7 +122,7 @@ func testIterativeTrieSync(t *testing.T, batch int) {
}
results[i] = SyncResult{hash, data}
}
- if index, err := sched.Process(results); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
queue = append(queue[:0], sched.Missing(batch)...)
@@ -152,7 +152,7 @@ func TestIterativeDelayedTrieSync(t *testing.T) {
}
results[i] = SyncResult{hash, data}
}
- if index, err := sched.Process(results); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
queue = append(queue[len(results):], sched.Missing(10000)...)
@@ -190,7 +190,7 @@ func testIterativeRandomTrieSync(t *testing.T, batch int) {
results = append(results, SyncResult{hash, data})
}
// Feed the retrieved results back and queue new tasks
- if index, err := sched.Process(results); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
queue = make(map[common.Hash]struct{})
@@ -231,7 +231,7 @@ func TestIterativeRandomDelayedTrieSync(t *testing.T) {
}
}
// Feed the retrieved results back and queue new tasks
- if index, err := sched.Process(results); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
for _, result := range results {
@@ -272,7 +272,7 @@ func TestDuplicateAvoidanceTrieSync(t *testing.T) {
results[i] = SyncResult{hash, data}
}
- if index, err := sched.Process(results); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
queue = append(queue[:0], sched.Missing(0)...)
@@ -304,7 +304,7 @@ func TestIncompleteTrieSync(t *testing.T) {
results[i] = SyncResult{hash, data}
}
// Process each of the trie nodes
- if index, err := sched.Process(results); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
for _, result := range results {