diff options
author | Péter Szilágyi <peterke@gmail.com> | 2016-11-01 19:31:12 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-11-01 19:31:12 +0800 |
commit | f4d878f3d83187d27411c0ea5ebd55a82b27e35e (patch) | |
tree | 0a684c4ebb5ce6846862df6c10d0209991122751 | |
parent | 61acd18e7967d67fd6db24be360586cf017d577b (diff) | |
parent | e1b4acfb6e6a0f930afeb79749ac56c381609258 (diff) | |
download | go-tangerine-f4d878f3d83187d27411c0ea5ebd55a82b27e35e.tar go-tangerine-f4d878f3d83187d27411c0ea5ebd55a82b27e35e.tar.gz go-tangerine-f4d878f3d83187d27411c0ea5ebd55a82b27e35e.tar.bz2 go-tangerine-f4d878f3d83187d27411c0ea5ebd55a82b27e35e.tar.lz go-tangerine-f4d878f3d83187d27411c0ea5ebd55a82b27e35e.tar.xz go-tangerine-f4d878f3d83187d27411c0ea5ebd55a82b27e35e.tar.zst go-tangerine-f4d878f3d83187d27411c0ea5ebd55a82b27e35e.zip |
Merge pull request #3216 from karalabe/fastsync-bigdb-tuning
core/state, eth/downloader, trie: reset fast-failure on progress
-rw-r--r-- | core/state/sync.go | 6 | ||||
-rw-r--r-- | core/state/sync_test.go | 10 | ||||
-rw-r--r-- | eth/downloader/downloader.go | 33 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 344 | ||||
-rw-r--r-- | eth/downloader/queue.go | 45 | ||||
-rw-r--r-- | trie/sync.go | 18 | ||||
-rw-r--r-- | trie/sync_test.go | 12 |
7 files changed, 273 insertions, 195 deletions
diff --git a/core/state/sync.go b/core/state/sync.go index ef2b4b84c..bab9c8e7e 100644 --- a/core/state/sync.go +++ b/core/state/sync.go @@ -59,8 +59,10 @@ func (s *StateSync) Missing(max int) []common.Hash { return (*trie.TrieSync)(s).Missing(max) } -// Process injects a batch of retrieved trie nodes data. -func (s *StateSync) Process(list []trie.SyncResult) (int, error) { +// Process injects a batch of retrieved trie nodes data, returning if something +// was committed to the database and also the index of an entry if processing of +// it failed. +func (s *StateSync) Process(list []trie.SyncResult) (bool, int, error) { return (*trie.TrieSync)(s).Process(list) } diff --git a/core/state/sync_test.go b/core/state/sync_test.go index 949df7301..f5390d80f 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -138,7 +138,7 @@ func testIterativeStateSync(t *testing.T, batch int) { } results[i] = trie.SyncResult{Hash: hash, Data: data} } - if index, err := sched.Process(results); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } queue = append(queue[:0], sched.Missing(batch)...) @@ -168,7 +168,7 @@ func TestIterativeDelayedStateSync(t *testing.T) { } results[i] = trie.SyncResult{Hash: hash, Data: data} } - if index, err := sched.Process(results); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } queue = append(queue[len(results):], sched.Missing(0)...) @@ -206,7 +206,7 @@ func testIterativeRandomStateSync(t *testing.T, batch int) { results = append(results, trie.SyncResult{Hash: hash, Data: data}) } // Feed the retrieved results back and queue new tasks - if index, err := sched.Process(results); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } queue = make(map[common.Hash]struct{}) @@ -249,7 +249,7 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) { } } // Feed the retrieved results back and queue new tasks - if index, err := sched.Process(results); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } for _, hash := range sched.Missing(0) { @@ -283,7 +283,7 @@ func TestIncompleteStateSync(t *testing.T) { results[i] = trie.SyncResult{Hash: hash, Data: data} } // Process each of the state nodes - if index, err := sched.Process(results); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } for _, result := range results { diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 987be2b7a..da20e48a2 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -64,12 +64,12 @@ var ( maxHeadersProcess = 2048 // Number of header download results to import at once into the chain maxResultsProcess = 2048 // Number of content download results to import at once into the chain - fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync - fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected - fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it - fsPivotInterval = 512 // Number of headers out of which to randomize the pivot point - fsMinFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync - fsCriticalTrials = 10 // Number of times to retry in the cricical section before bailing + fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync + fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected + fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it + fsPivotInterval = 256 // Number of headers out of which to randomize the pivot point + fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync + fsCriticalTrials = uint32(32) // Number of times to retry in the cricical section before bailing ) var ( @@ -105,7 +105,7 @@ type Downloader struct { peers *peerSet // Set of active peers from which download can proceed fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries) - fsPivotFails int // Number of fast sync failures in the critical section + fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section rttEstimate uint64 // Round trip time to target for download requests rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) @@ -361,7 +361,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode // Set the requested sync mode, unless it's forbidden d.mode = mode - if d.mode == FastSync && d.fsPivotFails >= fsCriticalTrials { + if d.mode == FastSync && atomic.LoadUint32(&d.fsPivotFails) >= fsCriticalTrials { d.mode = FullSync } // Retrieve the origin peer and initiate the downloading process @@ -480,6 +480,11 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error { d.queue.Close() d.cancel() wg.Wait() + + // If sync failed in the critical section, bump the fail counter + if err != nil && d.mode == FastSync && d.fsPivotLock != nil { + atomic.AddUint32(&d.fsPivotFails, 1) + } return err } @@ -926,10 +931,10 @@ func (d *Downloader) fetchNodeData() error { var ( deliver = func(packet dataPack) (int, error) { start := time.Now() - return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) { + return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) { // If the peer returned old-requested data, forgive if err == trie.ErrNotRequested { - glog.V(logger.Info).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId()) + glog.V(logger.Debug).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId()) return } if err != nil { @@ -951,6 +956,11 @@ func (d *Downloader) fetchNodeData() error { syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below d.syncStatsLock.Unlock() + // If real database progress was made, reset any fast-sync pivot failure + if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 { + glog.V(logger.Debug).Infof("fast-sync progressed, resetting fail counter from %d", atomic.LoadUint32(&d.fsPivotFails)) + atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block + } // Log a message to the user and return if delivered > 0 { glog.V(logger.Info).Infof("imported %3d state entries in %9v: processed %d, pending at least %d", delivered, common.PrettyDuration(time.Since(start)), syncStatsStateDone, pending) @@ -1177,7 +1187,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // If we're already past the pivot point, this could be an attack, thread carefully if rollback[len(rollback)-1].Number.Uint64() > pivot { // If we didn't ever fail, lock in te pivot header (must! not! change!) - if d.fsPivotFails == 0 { + if atomic.LoadUint32(&d.fsPivotFails) == 0 { for _, header := range rollback { if header.Number.Uint64() == pivot { glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4]) @@ -1185,7 +1195,6 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { } } } - d.fsPivotFails++ } } }() diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 366c248bb..2849712ab 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -37,25 +37,79 @@ import ( ) var ( - testdb, _ = ethdb.NewMemDatabase() testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") testAddress = crypto.PubkeyToAddress(testKey.PublicKey) - genesis = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000)) ) // Reduce some of the parameters to make the tester faster. func init() { MaxForkAncestry = uint64(10000) blockCacheLimit = 1024 + fsCriticalTrials = 10 +} + +// downloadTester is a test simulator for mocking out local block chain. +type downloadTester struct { + downloader *Downloader + + genesis *types.Block // Genesis blocks used by the tester and peers + stateDb ethdb.Database // Database used by the tester for syncing from peers + peerDb ethdb.Database // Database of the peers containing all data + + ownHashes []common.Hash // Hash chain belonging to the tester + ownHeaders map[common.Hash]*types.Header // Headers belonging to the tester + ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester + ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester + ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain + + peerHashes map[string][]common.Hash // Hash chain belonging to different test peers + peerHeaders map[string]map[common.Hash]*types.Header // Headers belonging to different test peers + peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers + peerReceipts map[string]map[common.Hash]types.Receipts // Receipts belonging to different test peers + peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains + + peerMissingStates map[string]map[common.Hash]bool // State entries that fast sync should not return + + lock sync.RWMutex +} + +// newTester creates a new downloader test mocker. +func newTester() *downloadTester { + testdb, _ := ethdb.NewMemDatabase() + genesis := core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000)) + + tester := &downloadTester{ + genesis: genesis, + peerDb: testdb, + ownHashes: []common.Hash{genesis.Hash()}, + ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()}, + ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis}, + ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): nil}, + ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()}, + peerHashes: make(map[string][]common.Hash), + peerHeaders: make(map[string]map[common.Hash]*types.Header), + peerBlocks: make(map[string]map[common.Hash]*types.Block), + peerReceipts: make(map[string]map[common.Hash]types.Receipts), + peerChainTds: make(map[string]map[common.Hash]*big.Int), + peerMissingStates: make(map[string]map[common.Hash]bool), + } + tester.stateDb, _ = ethdb.NewMemDatabase() + tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00}) + + tester.downloader = New(tester.stateDb, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader, + tester.getBlock, tester.headHeader, tester.headBlock, tester.headFastBlock, tester.commitHeadBlock, tester.getTd, + tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.rollback, tester.dropPeer) + + return tester } // makeChain creates a chain of n blocks starting at and including parent. // the returned hash chain is ordered head->parent. In addition, every 3rd block // contains a transaction and every 5th an uncle to allow testing correct block // reassembly. -func makeChain(n int, seed byte, parent *types.Block, parentReceipts types.Receipts, heavy bool) ([]common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]types.Receipts) { +func (dl *downloadTester) makeChain(n int, seed byte, parent *types.Block, parentReceipts types.Receipts, heavy bool) ([]common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]types.Receipts) { // Generate the block chain - blocks, receipts := core.GenerateChain(nil, parent, testdb, n, func(i int, block *core.BlockGen) { + blocks, receipts := core.GenerateChain(nil, parent, dl.peerDb, n, func(i int, block *core.BlockGen) { block.SetCoinbase(common.Address{seed}) // If a heavy chain is requested, delay blocks to raise difficulty @@ -63,7 +117,7 @@ func makeChain(n int, seed byte, parent *types.Block, parentReceipts types.Recei block.OffsetTime(-1) } // If the block number is multiple of 3, send a bonus transaction to the miner - if parent == genesis && i%3 == 0 { + if parent == dl.genesis && i%3 == 0 { tx, err := types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(testKey) if err != nil { panic(err) @@ -102,19 +156,19 @@ func makeChain(n int, seed byte, parent *types.Block, parentReceipts types.Recei // makeChainFork creates two chains of length n, such that h1[:f] and // h2[:f] are different but have a common suffix of length n-f. -func makeChainFork(n, f int, parent *types.Block, parentReceipts types.Receipts, balanced bool) ([]common.Hash, []common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]*types.Block, map[common.Hash]types.Receipts, map[common.Hash]types.Receipts) { +func (dl *downloadTester) makeChainFork(n, f int, parent *types.Block, parentReceipts types.Receipts, balanced bool) ([]common.Hash, []common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]*types.Block, map[common.Hash]types.Receipts, map[common.Hash]types.Receipts) { // Create the common suffix - hashes, headers, blocks, receipts := makeChain(n-f, 0, parent, parentReceipts, false) + hashes, headers, blocks, receipts := dl.makeChain(n-f, 0, parent, parentReceipts, false) // Create the forks, making the second heavyer if non balanced forks were requested - hashes1, headers1, blocks1, receipts1 := makeChain(f, 1, blocks[hashes[0]], receipts[hashes[0]], false) + hashes1, headers1, blocks1, receipts1 := dl.makeChain(f, 1, blocks[hashes[0]], receipts[hashes[0]], false) hashes1 = append(hashes1, hashes[1:]...) heavy := false if !balanced { heavy = true } - hashes2, headers2, blocks2, receipts2 := makeChain(f, 2, blocks[hashes[0]], receipts[hashes[0]], heavy) + hashes2, headers2, blocks2, receipts2 := dl.makeChain(f, 2, blocks[hashes[0]], receipts[hashes[0]], heavy) hashes2 = append(hashes2, hashes[1:]...) for hash, header := range headers { @@ -132,53 +186,6 @@ func makeChainFork(n, f int, parent *types.Block, parentReceipts types.Receipts, return hashes1, hashes2, headers1, headers2, blocks1, blocks2, receipts1, receipts2 } -// downloadTester is a test simulator for mocking out local block chain. -type downloadTester struct { - stateDb ethdb.Database - downloader *Downloader - - ownHashes []common.Hash // Hash chain belonging to the tester - ownHeaders map[common.Hash]*types.Header // Headers belonging to the tester - ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester - ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester - ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain - - peerHashes map[string][]common.Hash // Hash chain belonging to different test peers - peerHeaders map[string]map[common.Hash]*types.Header // Headers belonging to different test peers - peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers - peerReceipts map[string]map[common.Hash]types.Receipts // Receipts belonging to different test peers - peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains - - peerMissingStates map[string]map[common.Hash]bool // State entries that fast sync should not return - - lock sync.RWMutex -} - -// newTester creates a new downloader test mocker. -func newTester() *downloadTester { - tester := &downloadTester{ - ownHashes: []common.Hash{genesis.Hash()}, - ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()}, - ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis}, - ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): nil}, - ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()}, - peerHashes: make(map[string][]common.Hash), - peerHeaders: make(map[string]map[common.Hash]*types.Header), - peerBlocks: make(map[string]map[common.Hash]*types.Block), - peerReceipts: make(map[string]map[common.Hash]types.Receipts), - peerChainTds: make(map[string]map[common.Hash]*big.Int), - peerMissingStates: make(map[string]map[common.Hash]bool), - } - tester.stateDb, _ = ethdb.NewMemDatabase() - tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00}) - - tester.downloader = New(tester.stateDb, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader, - tester.getBlock, tester.headHeader, tester.headBlock, tester.headFastBlock, tester.commitHeadBlock, tester.getTd, - tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.rollback, tester.dropPeer) - - return tester -} - // terminate aborts any operations on the embedded downloader and releases all // held resources. func (dl *downloadTester) terminate() { @@ -251,7 +258,7 @@ func (dl *downloadTester) headHeader() *types.Header { return header } } - return genesis.Header() + return dl.genesis.Header() } // headBlock retrieves the current head block from the canonical chain. @@ -266,7 +273,7 @@ func (dl *downloadTester) headBlock() *types.Block { } } } - return genesis + return dl.genesis } // headFastBlock retrieves the current head fast-sync block from the canonical chain. @@ -279,7 +286,7 @@ func (dl *downloadTester) headFastBlock() *types.Block { return block } } - return genesis + return dl.genesis } // commitHeadBlock manually sets the head block to a given hash. @@ -351,7 +358,7 @@ func (dl *downloadTester) insertBlocks(blocks types.Blocks) (int, error) { return len(blocks), nil } -// insertReceipts injects a new batch of blocks into the simulated chain. +// insertReceipts injects a new batch of receipts into the simulated chain. func (dl *downloadTester) insertReceipts(blocks types.Blocks, receipts []types.Receipts) (int, error) { dl.lock.Lock() defer dl.lock.Unlock() @@ -586,7 +593,7 @@ func (dl *downloadTester) peerGetNodeDataFn(id string, delay time.Duration) func results := make([][]byte, 0, len(hashes)) for _, hash := range hashes { - if data, err := testdb.Get(hash.Bytes()); err == nil { + if data, err := dl.peerDb.Get(hash.Bytes()); err == nil { if !dl.peerMissingStates[id][hash] { results = append(results, data) } @@ -669,13 +676,13 @@ func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronis func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) { t.Parallel() - // Create a small enough block chain to download - targetBlocks := blockCacheLimit - 15 - hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) - tester := newTester() defer tester.terminate() + // Create a small enough block chain to download + targetBlocks := blockCacheLimit - 15 + hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) // Synchronise with the peer and make sure all relevant data was retrieved @@ -694,13 +701,13 @@ func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) } func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) } func testThrottling(t *testing.T, protocol int, mode SyncMode) { - // Create a long block chain to download and the tester - targetBlocks := 8 * blockCacheLimit - hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) - tester := newTester() defer tester.terminate() + // Create a long block chain to download and the tester + targetBlocks := 8 * blockCacheLimit + hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) // Wrap the importer to allow stepping @@ -782,13 +789,13 @@ func TestForkedSync64Light(t *testing.T) { testForkedSync(t, 64, LightSync) } func testForkedSync(t *testing.T, protocol int, mode SyncMode) { t.Parallel() - // Create a long enough forked chain - common, fork := MaxHashFetch, 2*MaxHashFetch - hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true) - tester := newTester() defer tester.terminate() + // Create a long enough forked chain + common, fork := MaxHashFetch, 2*MaxHashFetch + hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, true) + tester.newPeer("fork A", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("fork B", protocol, hashesB, headersB, blocksB, receiptsB) @@ -817,13 +824,13 @@ func TestHeavyForkedSync64Light(t *testing.T) { testHeavyForkedSync(t, 64, Light func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) { t.Parallel() - // Create a long enough forked chain - common, fork := MaxHashFetch, 4*MaxHashFetch - hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, false) - tester := newTester() defer tester.terminate() + // Create a long enough forked chain + common, fork := MaxHashFetch, 4*MaxHashFetch + hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, false) + tester.newPeer("light", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("heavy", protocol, hashesB[fork/2:], headersB, blocksB, receiptsB) @@ -853,13 +860,13 @@ func TestBoundedForkedSync64Light(t *testing.T) { testBoundedForkedSync(t, 64, L func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) { t.Parallel() - // Create a long enough forked chain - common, fork := 13, int(MaxForkAncestry+17) - hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true) - tester := newTester() defer tester.terminate() + // Create a long enough forked chain + common, fork := 13, int(MaxForkAncestry+17) + hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, true) + tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("rewriter", protocol, hashesB, headersB, blocksB, receiptsB) @@ -888,13 +895,13 @@ func TestBoundedHeavyForkedSync64Light(t *testing.T) { testBoundedHeavyForkedSyn func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) { t.Parallel() - // Create a long enough forked chain - common, fork := 13, int(MaxForkAncestry+17) - hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, false) - tester := newTester() defer tester.terminate() + // Create a long enough forked chain + common, fork := 13, int(MaxForkAncestry+17) + hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, false) + tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("heavy-rewriter", protocol, hashesB[MaxForkAncestry-17:], headersB, blocksB, receiptsB) // Root the fork below the ancestor limit @@ -958,6 +965,9 @@ func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) } func testCancel(t *testing.T, protocol int, mode SyncMode) { t.Parallel() + tester := newTester() + defer tester.terminate() + // Create a small enough block chain to download and the tester targetBlocks := blockCacheLimit - 15 if targetBlocks >= MaxHashFetch { @@ -966,10 +976,7 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) { if targetBlocks >= MaxHeaderFetch { targetBlocks = MaxHeaderFetch - 15 } - hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) - - tester := newTester() - defer tester.terminate() + hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) @@ -999,13 +1006,13 @@ func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) { t.Parallel() + tester := newTester() + defer tester.terminate() + // Create various peers with various parts of the chain targetPeers := 8 targetBlocks := targetPeers*blockCacheLimit - 15 - hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) - - tester := newTester() - defer tester.terminate() + hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) for i := 0; i < targetPeers; i++ { id := fmt.Sprintf("peer #%d", i) @@ -1029,14 +1036,14 @@ func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) { t.Parallel() + tester := newTester() + defer tester.terminate() + // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 - hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) + hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) // Create peers of every type - tester := newTester() - defer tester.terminate() - tester.newPeer("peer 62", 62, hashes, headers, blocks, nil) tester.newPeer("peer 63", 63, hashes, headers, blocks, receipts) tester.newPeer("peer 64", 64, hashes, headers, blocks, receipts) @@ -1068,13 +1075,13 @@ func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, L func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { t.Parallel() - // Create a block chain to download - targetBlocks := 2*blockCacheLimit - 15 - hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) - tester := newTester() defer tester.terminate() + // Create a block chain to download + targetBlocks := 2*blockCacheLimit - 15 + hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) // Instrument the downloader to signal body requests @@ -1094,7 +1101,7 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { // Validate the number of block bodies that should have been requested bodiesNeeded, receiptsNeeded := 0, 0 for _, block := range blocks { - if mode != LightSync && block != genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) { + if mode != LightSync && block != tester.genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) { bodiesNeeded++ } } @@ -1123,13 +1130,13 @@ func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 6 func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) { t.Parallel() - // Create a small enough block chain to download - targetBlocks := blockCacheLimit - 15 - hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) - tester := newTester() defer tester.terminate() + // Create a small enough block chain to download + targetBlocks := blockCacheLimit - 15 + hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) + // Attempt a full sync with an attacker feeding gapped headers tester.newPeer("attack", protocol, hashes, headers, blocks, receipts) missing := targetBlocks / 2 @@ -1156,13 +1163,13 @@ func TestShiftedHeaderAttack64Fast(t *testing.T) { testShiftedHeaderAttack(t, 6 func TestShiftedHeaderAttack64Light(t *testing.T) { testShiftedHeaderAttack(t, 64, LightSync) } func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) { - // Create a small enough block chain to download - targetBlocks := blockCacheLimit - 15 - hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) - tester := newTester() defer tester.terminate() + // Create a small enough block chain to download + targetBlocks := blockCacheLimit - 15 + hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) + // Attempt a full sync with an attacker feeding shifted headers tester.newPeer("attack", protocol, hashes, headers, blocks, receipts) delete(tester.peerHeaders["attack"], hashes[len(hashes)-2]) @@ -1188,13 +1195,13 @@ func TestInvalidHeaderRollback64Fast(t *testing.T) { testInvalidHeaderRollback( func TestInvalidHeaderRollback64Light(t *testing.T) { testInvalidHeaderRollback(t, 64, LightSync) } func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { - // Create a small enough block chain to download - targetBlocks := 3*fsHeaderSafetyNet + fsMinFullBlocks - hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) - tester := newTester() defer tester.terminate() + // Create a small enough block chain to download + targetBlocks := 3*fsHeaderSafetyNet + fsPivotInterval + fsMinFullBlocks + hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) + // Attempt to sync with an attacker that feeds junk during the fast sync phase. // This should result in the last fsHeaderSafetyNet headers being rolled back. tester.newPeer("fast-attack", protocol, hashes, headers, blocks, receipts) @@ -1286,7 +1293,7 @@ func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil, false) + hashes, headers, blocks, receipts := tester.makeChain(0, 0, tester.genesis, nil, false) tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, headers, blocks, receipts) if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer { @@ -1333,7 +1340,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { for i, tt := range tests { // Register a new peer and ensure it's presence id := fmt.Sprintf("test %d", i) - if err := tester.newPeer(id, protocol, []common.Hash{genesis.Hash()}, nil, nil, nil); err != nil { + if err := tester.newPeer(id, protocol, []common.Hash{tester.genesis.Hash()}, nil, nil, nil); err != nil { t.Fatalf("test %d: failed to register new peer: %v", i, err) } if _, ok := tester.peerHashes[id]; !ok { @@ -1342,7 +1349,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { // Simulate a synchronisation and check the required result tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result } - tester.downloader.Synchronise(id, genesis.Hash(), big.NewInt(1000), FullSync) + tester.downloader.Synchronise(id, tester.genesis.Hash(), big.NewInt(1000), FullSync) if _, ok := tester.peerHashes[id]; !ok != tt.drop { t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop) } @@ -1361,17 +1368,17 @@ func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync) func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { t.Parallel() + tester := newTester() + defer tester.terminate() + // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 - hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) + hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) // Set a sync init hook to catch progress changes starting := make(chan struct{}) progress := make(chan struct{}) - tester := newTester() - defer tester.terminate() - tester.downloader.syncInitHook = func(origin, latest uint64) { starting <- struct{}{} <-progress @@ -1434,17 +1441,17 @@ func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64, func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) { t.Parallel() + tester := newTester() + defer tester.terminate() + // Create a forked chain to simulate origin revertal common, fork := MaxHashFetch, 2*MaxHashFetch - hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true) + hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, true) // Set a sync init hook to catch progress changes starting := make(chan struct{}) progress := make(chan struct{}) - tester := newTester() - defer tester.terminate() - tester.downloader.syncInitHook = func(origin, latest uint64) { starting <- struct{}{} <-progress @@ -1510,17 +1517,17 @@ func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64, func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { t.Parallel() + tester := newTester() + defer tester.terminate() + // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 - hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) + hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) // Set a sync init hook to catch progress changes starting := make(chan struct{}) progress := make(chan struct{}) - tester := newTester() - defer tester.terminate() - tester.downloader.syncInitHook = func(origin, latest uint64) { starting <- struct{}{} <-progress @@ -1587,17 +1594,17 @@ func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, L func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { t.Parallel() + tester := newTester() + defer tester.terminate() + // Create a small block chain targetBlocks := blockCacheLimit - 15 - hashes, headers, blocks, receipts := makeChain(targetBlocks+3, 0, genesis, nil, false) + hashes, headers, blocks, receipts := tester.makeChain(targetBlocks+3, 0, tester.genesis, nil, false) // Set a sync init hook to catch progress changes starting := make(chan struct{}) progress := make(chan struct{}) - tester := newTester() - defer tester.terminate() - tester.downloader.syncInitHook = func(origin, latest uint64) { starting <- struct{}{} <-progress @@ -1664,10 +1671,16 @@ func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64, func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { t.Parallel() - hashes, headers, blocks, receipts := makeChain(5, 0, genesis, nil, false) + + master := newTester() + defer master.terminate() + + hashes, headers, blocks, receipts := master.makeChain(5, 0, master.genesis, nil, false) fakeHeads := []*types.Header{{}, {}, {}, {}} for i := 0; i < 200; i++ { tester := newTester() + tester.peerDb = master.peerDb + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) // Whenever the downloader requests headers, flood it with // a lot of unrequested header deliveries. @@ -1703,44 +1716,81 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { // Tests that if fast sync aborts in the critical section, it can restart a few // times before giving up. -func TestFastCriticalRestarts63(t *testing.T) { testFastCriticalRestarts(t, 63) } -func TestFastCriticalRestarts64(t *testing.T) { testFastCriticalRestarts(t, 64) } +func TestFastCriticalRestartsFail63(t *testing.T) { testFastCriticalRestarts(t, 63, false) } +func TestFastCriticalRestartsFail64(t *testing.T) { testFastCriticalRestarts(t, 64, false) } +func TestFastCriticalRestartsCont63(t *testing.T) { testFastCriticalRestarts(t, 63, true) } +func TestFastCriticalRestartsCont64(t *testing.T) { testFastCriticalRestarts(t, 64, true) } -func testFastCriticalRestarts(t *testing.T, protocol int) { - t.Parallel() +func testFastCriticalRestarts(t *testing.T, protocol int, progress bool) { + tester := newTester() + defer tester.terminate() // Create a large enough blockchin to actually fast sync on targetBlocks := fsMinFullBlocks + 2*fsPivotInterval - 15 - hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) - - // Create a tester peer with the critical section state roots missing (force failures) - tester := newTester() - defer tester.terminate() + hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) + // Create a tester peer with a critical section header missing (force failures) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) + delete(tester.peerHeaders["peer"], hashes[fsMinFullBlocks-1]) + tester.downloader.dropPeer = func(id string) {} // We reuse the same "faulty" peer throughout the test + + // Remove all possible pivot state roots and slow down replies (test failure resets later) for i := 0; i < fsPivotInterval; i++ { tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true } - tester.downloader.dropPeer = func(id string) {} // We reuse the same "faulty" peer throughout the test + tester.downloader.peers.peers["peer"].getNodeData = tester.peerGetNodeDataFn("peer", 500*time.Millisecond) // Enough to reach the critical section // Synchronise with the peer a few times and make sure they fail until the retry limit - for i := 0; i < fsCriticalTrials; i++ { + for i := 0; i < int(fsCriticalTrials)-1; i++ { // Attempt a sync and ensure it fails properly if err := tester.sync("peer", nil, FastSync); err == nil { t.Fatalf("failing fast sync succeeded: %v", err) } - time.Sleep(100 * time.Millisecond) // Make sure no in-flight requests remain + time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain // If it's the first failure, pivot should be locked => reenable all others to detect pivot changes if i == 0 { + if tester.downloader.fsPivotLock == nil { + time.Sleep(400 * time.Millisecond) // Make sure the first huge timeout expires too + t.Fatalf("pivot block not locked in after critical section failure") + } tester.lock.Lock() + tester.peerHeaders["peer"][hashes[fsMinFullBlocks-1]] = headers[hashes[fsMinFullBlocks-1]] tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true} + tester.downloader.peers.peers["peer"].getNodeData = tester.peerGetNodeDataFn("peer", 0) tester.lock.Unlock() } } + // Return all nodes if we're testing fast sync progression + if progress { + tester.lock.Lock() + tester.peerMissingStates["peer"] = map[common.Hash]bool{} + tester.lock.Unlock() + + if err := tester.sync("peer", nil, FastSync); err != nil { + t.Fatalf("failed to synchronise blocks in progressed fast sync: %v", err) + } + time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain + + if fails := atomic.LoadUint32(&tester.downloader.fsPivotFails); fails != 1 { + t.Fatalf("progressed pivot trial count mismatch: have %v, want %v", fails, 1) + } + assertOwnChain(t, tester, targetBlocks+1) + } else { + if err := tester.sync("peer", nil, FastSync); err == nil { + t.Fatalf("succeeded to synchronise blocks in failed fast sync") + } + time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain + + if fails := atomic.LoadUint32(&tester.downloader.fsPivotFails); fails != fsCriticalTrials { + t.Fatalf("failed pivot trial count mismatch: have %v, want %v", fails, fsCriticalTrials) + } + } // Retry limit exhausted, downloader will switch to full sync, should succeed if err := tester.sync("peer", nil, FastSync); err != nil { t.Fatalf("failed to synchronise blocks in slow sync: %v", err) } - assertOwnChain(t, tester, targetBlocks+1) + // Note, we can't assert the chain here because the test asserter assumes sync + // completed using a single mode of operation, whereas fast-then-slow can result + // in arbitrary intermediate state that's not cleanly verifiable. } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index fd239f7e4..b7ad92099 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -362,20 +362,20 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { // Make sure chain order is honoured and preserved throughout hash := header.Hash() if header.Number == nil || header.Number.Uint64() != from { - glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ordering, expected %d", header.Number, hash[:4], from) + glog.V(logger.Warn).Infof("Header #%v [%x…] broke chain ordering, expected %d", header.Number, hash[:4], from) break } if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash { - glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ancestry", header.Number, hash[:4]) + glog.V(logger.Warn).Infof("Header #%v [%x…] broke chain ancestry", header.Number, hash[:4]) break } // Make sure no duplicate requests are executed if _, ok := q.blockTaskPool[hash]; ok { - glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for block fetch", header.Number.Uint64(), hash[:4]) + glog.V(logger.Warn).Infof("Header #%d [%x…] already scheduled for block fetch", header.Number.Uint64(), hash[:4]) continue } if _, ok := q.receiptTaskPool[hash]; ok { - glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for receipt fetch", header.Number.Uint64(), hash[:4]) + glog.V(logger.Warn).Infof("Header #%d [%x…] already scheduled for receipt fetch", header.Number.Uint64(), hash[:4]) continue } // Queue the header for content retrieval @@ -388,7 +388,16 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64())) } if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot { - // Pivoting point of the fast sync, retrieve the state tries + // Pivoting point of the fast sync, switch the state retrieval to this + glog.V(logger.Debug).Infof("Switching state downloads to %d [%x…]", header.Number.Uint64(), header.Hash().Bytes()[:4]) + + q.stateTaskIndex = 0 + q.stateTaskPool = make(map[common.Hash]int) + q.stateTaskQueue.Reset() + for _, req := range q.statePendPool { + req.Hashes = make(map[common.Hash]int) // Make sure executing requests fail, but don't disappear + } + q.stateSchedLock.Lock() q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase) q.stateSchedLock.Unlock() @@ -866,10 +875,10 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh accepted := len(headers) == MaxHeaderFetch if accepted { if headers[0].Number.Uint64() != request.From { - glog.V(logger.Detail).Infof("Peer %s: first header #%v [%x] broke chain ordering, expected %d", id, headers[0].Number, headers[0].Hash().Bytes()[:4], request.From) + glog.V(logger.Detail).Infof("Peer %s: first header #%v [%x…] broke chain ordering, expected %d", id, headers[0].Number, headers[0].Hash().Bytes()[:4], request.From) accepted = false } else if headers[len(headers)-1].Hash() != target { - glog.V(logger.Detail).Infof("Peer %s: last header #%v [%x] broke skeleton structure, expected %x", id, headers[len(headers)-1].Number, headers[len(headers)-1].Hash().Bytes()[:4], target[:4]) + glog.V(logger.Detail).Infof("Peer %s: last header #%v [%x…] broke skeleton structure, expected %x", id, headers[len(headers)-1].Number, headers[len(headers)-1].Hash().Bytes()[:4], target[:4]) accepted = false } } @@ -877,12 +886,12 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh for i, header := range headers[1:] { hash := header.Hash() if want := request.From + 1 + uint64(i); header.Number.Uint64() != want { - glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ordering, expected %d", id, header.Number, hash[:4], want) + glog.V(logger.Warn).Infof("Peer %s: header #%v [%x…] broke chain ordering, expected %d", id, header.Number, hash[:4], want) accepted = false break } if headers[i].Hash() != header.ParentHash { - glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ancestry", id, header.Number, hash[:4]) + glog.V(logger.Warn).Infof("Peer %s: header #%v [%x…] broke chain ancestry", id, header.Number, hash[:4]) accepted = false break } @@ -1039,9 +1048,8 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ } // DeliverNodeData injects a node state data retrieval response into the queue. -// The method returns the number of node state entries originally requested, and -// the number of them actually accepted from the delivery. -func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) { +// The method returns the number of node state accepted from the delivery. +func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, bool, error)) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -1099,31 +1107,34 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i // deliverNodeData is the asynchronous node data processor that injects a batch // of sync results into the state scheduler. -func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) { +func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(int, bool, error)) { // Wake up WaitResults after the state has been written because it // might be waiting for the pivot block state to get completed. defer q.active.Signal() // Process results one by one to permit task fetches in between + progressed := false for i, result := range results { q.stateSchedLock.Lock() if q.stateScheduler == nil { // Syncing aborted since this async delivery started, bail out q.stateSchedLock.Unlock() - callback(errNoFetchesPending, i) + callback(i, progressed, errNoFetchesPending) return } - if _, err := q.stateScheduler.Process([]trie.SyncResult{result}); err != nil { + if prog, _, err := q.stateScheduler.Process([]trie.SyncResult{result}); err != nil { // Processing a state result failed, bail out q.stateSchedLock.Unlock() - callback(err, i) + callback(i, progressed, err) return + } else if prog { + progressed = true } // Item processing succeeded, release the lock (temporarily) q.stateSchedLock.Unlock() } - callback(nil, len(results)) + callback(len(results), progressed, nil) } // Prepare configures the result cache to allow accepting and caching inbound diff --git a/trie/sync.go b/trie/sync.go index 58b8a1fb6..2158ab750 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -142,34 +142,40 @@ func (s *TrieSync) Missing(max int) []common.Hash { return requests } -// Process injects a batch of retrieved trie nodes data. -func (s *TrieSync) Process(results []SyncResult) (int, error) { +// Process injects a batch of retrieved trie nodes data, returning if something +// was committed to the database and also the index of an entry if processing of +// it failed. +func (s *TrieSync) Process(results []SyncResult) (bool, int, error) { + committed := false + for i, item := range results { // If the item was not requested, bail out request := s.requests[item.Hash] if request == nil { - return i, ErrNotRequested + return committed, i, ErrNotRequested } // If the item is a raw entry request, commit directly if request.raw { request.data = item.Data s.commit(request, nil) + committed = true continue } // Decode the node data content and update the request node, err := decodeNode(item.Hash[:], item.Data, 0) if err != nil { - return i, err + return committed, i, err } request.data = item.Data // Create and schedule a request for all the children nodes requests, err := s.children(request, node) if err != nil { - return i, err + return committed, i, err } if len(requests) == 0 && request.deps == 0 { s.commit(request, nil) + committed = true continue } request.deps += len(requests) @@ -177,7 +183,7 @@ func (s *TrieSync) Process(results []SyncResult) (int, error) { s.schedule(child) } } - return 0, nil + return committed, 0, nil } // Pending returns the number of state entries currently pending for download. diff --git a/trie/sync_test.go b/trie/sync_test.go index a763dc564..5292fe5cb 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -122,7 +122,7 @@ func testIterativeTrieSync(t *testing.T, batch int) { } results[i] = SyncResult{hash, data} } - if index, err := sched.Process(results); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } queue = append(queue[:0], sched.Missing(batch)...) @@ -152,7 +152,7 @@ func TestIterativeDelayedTrieSync(t *testing.T) { } results[i] = SyncResult{hash, data} } - if index, err := sched.Process(results); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } queue = append(queue[len(results):], sched.Missing(10000)...) @@ -190,7 +190,7 @@ func testIterativeRandomTrieSync(t *testing.T, batch int) { results = append(results, SyncResult{hash, data}) } // Feed the retrieved results back and queue new tasks - if index, err := sched.Process(results); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } queue = make(map[common.Hash]struct{}) @@ -231,7 +231,7 @@ func TestIterativeRandomDelayedTrieSync(t *testing.T) { } } // Feed the retrieved results back and queue new tasks - if index, err := sched.Process(results); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } for _, result := range results { @@ -272,7 +272,7 @@ func TestDuplicateAvoidanceTrieSync(t *testing.T) { results[i] = SyncResult{hash, data} } - if index, err := sched.Process(results); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } queue = append(queue[:0], sched.Missing(0)...) @@ -304,7 +304,7 @@ func TestIncompleteTrieSync(t *testing.T) { results[i] = SyncResult{hash, data} } // Process each of the trie nodes - if index, err := sched.Process(results); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } for _, result := range results { |