diff options
author | Péter Szilágyi <peterke@gmail.com> | 2019-05-17 00:10:58 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-05-17 00:10:58 +0800 |
commit | f5d89cdb72c1e82e9deb54754bef8dd20bf12591 (patch) | |
tree | b6e2ee16b90cd494eaefec59fb4aaaac71cc6a96 /eth | |
parent | 60386b3545659d99c7488e456c780606db101936 (diff) | |
parent | 9eba3a9fff2f47f5e094c36a7c905380b0ac8b1f (diff) | |
download | go-tangerine-f5d89cdb72c1e82e9deb54754bef8dd20bf12591.tar go-tangerine-f5d89cdb72c1e82e9deb54754bef8dd20bf12591.tar.gz go-tangerine-f5d89cdb72c1e82e9deb54754bef8dd20bf12591.tar.bz2 go-tangerine-f5d89cdb72c1e82e9deb54754bef8dd20bf12591.tar.lz go-tangerine-f5d89cdb72c1e82e9deb54754bef8dd20bf12591.tar.xz go-tangerine-f5d89cdb72c1e82e9deb54754bef8dd20bf12591.tar.zst go-tangerine-f5d89cdb72c1e82e9deb54754bef8dd20bf12591.zip |
Merge pull request #19244 from karalabe/freezer-2
cmd, core, eth, les, node: chain freezer on top of db rework
Diffstat (limited to 'eth')
-rw-r--r-- | eth/backend.go | 2 | ||||
-rw-r--r-- | eth/config.go | 1 | ||||
-rw-r--r-- | eth/downloader/downloader.go | 80 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 74 | ||||
-rw-r--r-- | eth/downloader/testchain_test.go | 2 |
5 files changed, 128 insertions, 31 deletions
diff --git a/eth/backend.go b/eth/backend.go index f69615776..6b9c98bf2 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -120,7 +120,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024) // Assemble the Ethereum object - chainDb, err := ctx.OpenDatabase("chaindata", config.DatabaseCache, config.DatabaseHandles, "eth/db/chaindata/") + chainDb, err := ctx.OpenDatabaseWithFreezer("chaindata", config.DatabaseCache, config.DatabaseHandles, config.DatabaseFreezer, "eth/db/chaindata/") if err != nil { return nil, err } diff --git a/eth/config.go b/eth/config.go index fbe6597b6..ccd5674a7 100644 --- a/eth/config.go +++ b/eth/config.go @@ -114,6 +114,7 @@ type Config struct { SkipBcVersionCheck bool `toml:"-"` DatabaseHandles int `toml:"-"` DatabaseCache int + DatabaseFreezer string TrieCleanCache int TrieDirtyCache int diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 0e19fe9e6..495fa0e74 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -25,7 +25,7 @@ import ( "sync/atomic" "time" - ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" @@ -46,20 +46,20 @@ var ( MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request MaxStateFetch = 384 // Amount of node state values to allow fetching per request - MaxForkAncestry = 3 * params.EpochDuration // Maximum chain reorganisation - rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests - rttMaxEstimate = 20 * time.Second // Maximum round-trip time to target for download requests - rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value - ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion - ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts + rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests + rttMaxEstimate = 20 * time.Second // Maximum round-trip time to target for download requests + rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value + ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion + ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts qosTuningPeers = 5 // Number of peers to tune based on (best peers) qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value - maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) - maxHeadersProcess = 2048 // Number of header download results to import at once into the chain - maxResultsProcess = 2048 // Number of content download results to import at once into the chain + maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) + maxHeadersProcess = 2048 // Number of header download results to import at once into the chain + maxResultsProcess = 2048 // Number of content download results to import at once into the chain + maxForkAncestry uint64 = params.ImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it) reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs @@ -129,6 +129,7 @@ type Downloader struct { synchronising int32 notified int32 committed int32 + ancientLimit uint64 // The maximum block number which can be regarded as ancient data. // Channels headerCh chan dataPack // [eth/62] Channel receiving inbound block headers @@ -206,7 +207,7 @@ type BlockChain interface { InsertChain(types.Blocks) (int, error) // InsertReceiptChain inserts a batch of receipts into the local chain. - InsertReceiptChain(types.Blocks, []types.Receipts) (int, error) + InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error) } // New creates a new downloader to fetch hashes and blocks from remote peers. @@ -438,7 +439,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode) defer func(start time.Time) { - log.Debug("Synchronisation terminated", "elapsed", time.Since(start)) + log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start))) }(time.Now()) // Look up the sync boundaries: the common ancestor and the target block @@ -475,12 +476,49 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I if d.mode == FastSync && pivot != 0 { d.committed = 0 } + if d.mode == FastSync { + // Set the ancient data limitation. + // If we are running fast sync, all block data older than ancientLimit will be + // written to the ancient store. More recent data will be written to the active + // database and will wait for the freezer to migrate. + // + // If there is a checkpoint available, then calculate the ancientLimit through + // that. Otherwise calculate the ancient limit through the advertised height + // of the remote peer. + // + // The reason for picking checkpoint first is that a malicious peer can give us + // a fake (very high) height, forcing the ancient limit to also be very high. + // The peer would start to feed us valid blocks until head, resulting in all of + // the blocks might be written into the ancient store. A following mini-reorg + // could cause issues. + if d.checkpoint != 0 && d.checkpoint > maxForkAncestry+1 { + d.ancientLimit = d.checkpoint + } else if height > maxForkAncestry+1 { + d.ancientLimit = height - maxForkAncestry - 1 + } + frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here. + // If a part of blockchain data has already been written into active store, + // disable the ancient style insertion explicitly. + if origin >= frozen && frozen != 0 { + d.ancientLimit = 0 + log.Info("Disabling direct-ancient mode", "origin", origin, "ancient", frozen-1) + } else if d.ancientLimit > 0 { + log.Debug("Enabling direct-ancient mode", "ancient", d.ancientLimit) + } + // Rewind the ancient store and blockchain if reorg happens. + if origin+1 < frozen { + var hashes []common.Hash + for i := origin + 1; i < d.lightchain.CurrentHeader().Number.Uint64(); i++ { + hashes = append(hashes, rawdb.ReadCanonicalHash(d.stateDB, i)) + } + d.lightchain.Rollback(hashes) + } + } // Initiate the sync using a concurrent header and content retrieval algorithm d.queue.Prepare(origin+1, d.mode) if d.syncInitHook != nil { d.syncInitHook(origin, height) } - fetchers := []func() error{ func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync @@ -544,6 +582,9 @@ func (d *Downloader) cancel() { func (d *Downloader) Cancel() { d.cancel() d.cancelWg.Wait() + + d.ancientLimit = 0 + log.Debug("Reset ancient limit to zero") } // Terminate interrupts the downloader, canceling all pending operations. @@ -684,9 +725,9 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight) // Recap floor value for binary search - if localHeight >= MaxForkAncestry { + if localHeight >= maxForkAncestry { // We're above the max reorg threshold, find the earliest fork point - floor = int64(localHeight - MaxForkAncestry) + floor = int64(localHeight - maxForkAncestry) } // If we're doing a light sync, ensure the floor doesn't go below the CHT, as // all headers before that point will be missing. @@ -1315,7 +1356,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv // queue until the stream ends or a failure occurs. func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error { // Keep a count of uncertain headers to roll back - rollback := []*types.Header{} + var rollback []*types.Header defer func() { if len(rollback) > 0 { // Flatten the headers and roll them back @@ -1409,11 +1450,10 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er limit = len(headers) } chunk := headers[:limit] - // In case of header only syncing, validate the chunk immediately if d.mode == FastSync || d.mode == LightSync { // Collect the yet unknown headers to mark them as uncertain - unknown := make([]*types.Header, 0, len(headers)) + unknown := make([]*types.Header, 0, len(chunk)) for _, header := range chunk { if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) { unknown = append(unknown, header) @@ -1663,7 +1703,7 @@ func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *state blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) receipts[i] = result.Receipts } - if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil { + if index, err := d.blockchain.InsertReceiptChain(blocks, receipts, d.ancientLimit); err != nil { log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) return errInvalidChain } @@ -1675,7 +1715,7 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error { log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash()) // Commit the pivot block as the new head, will require full sync from here on - if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}); err != nil { + if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}, d.ancientLimit); err != nil { return err } if err := d.blockchain.FastSyncCommitHead(block.Hash()); err != nil { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index ac7f48abd..5b56ff161 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -37,7 +37,7 @@ import ( // Reduce some of the parameters to make the tester faster. func init() { - MaxForkAncestry = uint64(10000) + maxForkAncestry = 10000 blockCacheItems = 1024 fsHeaderContCheck = 500 * time.Millisecond } @@ -57,6 +57,11 @@ type downloadTester struct { ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain + ancientHeaders map[common.Hash]*types.Header // Ancient headers belonging to the tester + ancientBlocks map[common.Hash]*types.Block // Ancient blocks belonging to the tester + ancientReceipts map[common.Hash]types.Receipts // Ancient receipts belonging to the tester + ancientChainTd map[common.Hash]*big.Int // Ancient total difficulties of the blocks in the local chain + lock sync.RWMutex } @@ -71,6 +76,12 @@ func newTester() *downloadTester { ownBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis}, ownReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil}, ownChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()}, + + // Initialize ancient store with test genesis block + ancientHeaders: map[common.Hash]*types.Header{testGenesis.Hash(): testGenesis.Header()}, + ancientBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis}, + ancientReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil}, + ancientChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()}, } tester.stateDb = rawdb.NewMemoryDatabase() tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00}) @@ -122,6 +133,9 @@ func (dl *downloadTester) HasFastBlock(hash common.Hash, number uint64) bool { dl.lock.RLock() defer dl.lock.RUnlock() + if _, ok := dl.ancientReceipts[hash]; ok { + return true + } _, ok := dl.ownReceipts[hash] return ok } @@ -131,6 +145,10 @@ func (dl *downloadTester) GetHeaderByHash(hash common.Hash) *types.Header { dl.lock.RLock() defer dl.lock.RUnlock() + header := dl.ancientHeaders[hash] + if header != nil { + return header + } return dl.ownHeaders[hash] } @@ -139,6 +157,10 @@ func (dl *downloadTester) GetBlockByHash(hash common.Hash) *types.Block { dl.lock.RLock() defer dl.lock.RUnlock() + block := dl.ancientBlocks[hash] + if block != nil { + return block + } return dl.ownBlocks[hash] } @@ -148,6 +170,9 @@ func (dl *downloadTester) CurrentHeader() *types.Header { defer dl.lock.RUnlock() for i := len(dl.ownHashes) - 1; i >= 0; i-- { + if header := dl.ancientHeaders[dl.ownHashes[i]]; header != nil { + return header + } if header := dl.ownHeaders[dl.ownHashes[i]]; header != nil { return header } @@ -161,6 +186,12 @@ func (dl *downloadTester) CurrentBlock() *types.Block { defer dl.lock.RUnlock() for i := len(dl.ownHashes) - 1; i >= 0; i-- { + if block := dl.ancientBlocks[dl.ownHashes[i]]; block != nil { + if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil { + return block + } + return block + } if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil { if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil { return block @@ -176,6 +207,9 @@ func (dl *downloadTester) CurrentFastBlock() *types.Block { defer dl.lock.RUnlock() for i := len(dl.ownHashes) - 1; i >= 0; i-- { + if block := dl.ancientBlocks[dl.ownHashes[i]]; block != nil { + return block + } if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil { return block } @@ -198,6 +232,9 @@ func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int { dl.lock.RLock() defer dl.lock.RUnlock() + if td := dl.ancientChainTd[hash]; td != nil { + return td + } return dl.ownChainTd[hash] } @@ -254,7 +291,7 @@ func (dl *downloadTester) InsertChain(blocks types.Blocks) (i int, err error) { } // InsertReceiptChain injects a new batch of receipts into the simulated chain. -func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts) (i int, err error) { +func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts, ancientLimit uint64) (i int, err error) { dl.lock.Lock() defer dl.lock.Unlock() @@ -262,11 +299,25 @@ func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []typ if _, ok := dl.ownHeaders[blocks[i].Hash()]; !ok { return i, errors.New("unknown owner") } - if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok { - return i, errors.New("unknown parent") + if _, ok := dl.ancientBlocks[blocks[i].ParentHash()]; !ok { + if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok { + return i, errors.New("unknown parent") + } + } + if blocks[i].NumberU64() <= ancientLimit { + dl.ancientBlocks[blocks[i].Hash()] = blocks[i] + dl.ancientReceipts[blocks[i].Hash()] = receipts[i] + + // Migrate from active db to ancient db + dl.ancientHeaders[blocks[i].Hash()] = blocks[i].Header() + dl.ancientChainTd[blocks[i].Hash()] = new(big.Int).Add(dl.ancientChainTd[blocks[i].ParentHash()], blocks[i].Difficulty()) + + delete(dl.ownHeaders, blocks[i].Hash()) + delete(dl.ownChainTd, blocks[i].Hash()) + } else { + dl.ownBlocks[blocks[i].Hash()] = blocks[i] + dl.ownReceipts[blocks[i].Hash()] = receipts[i] } - dl.ownBlocks[blocks[i].Hash()] = blocks[i] - dl.ownReceipts[blocks[i].Hash()] = receipts[i] } return len(blocks), nil } @@ -284,6 +335,11 @@ func (dl *downloadTester) Rollback(hashes []common.Hash) { delete(dl.ownHeaders, hashes[i]) delete(dl.ownReceipts, hashes[i]) delete(dl.ownBlocks, hashes[i]) + + delete(dl.ancientChainTd, hashes[i]) + delete(dl.ancientHeaders, hashes[i]) + delete(dl.ancientReceipts, hashes[i]) + delete(dl.ancientBlocks, hashes[i]) } } @@ -411,13 +467,13 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng if tester.downloader.mode == LightSync { blocks, receipts = 1, 1 } - if hs := len(tester.ownHeaders); hs != headers { + if hs := len(tester.ownHeaders) + len(tester.ancientHeaders) - 1; hs != headers { t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers) } - if bs := len(tester.ownBlocks); bs != blocks { + if bs := len(tester.ownBlocks) + len(tester.ancientBlocks) - 1; bs != blocks { t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, blocks) } - if rs := len(tester.ownReceipts); rs != receipts { + if rs := len(tester.ownReceipts) + len(tester.ancientReceipts) - 1; rs != receipts { t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts) } } diff --git a/eth/downloader/testchain_test.go b/eth/downloader/testchain_test.go index 4ae342dc6..f410152f5 100644 --- a/eth/downloader/testchain_test.go +++ b/eth/downloader/testchain_test.go @@ -45,7 +45,7 @@ var testChainBase = newTestChain(blockCacheItems+200, testGenesis) var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain func init() { - var forkLen = int(MaxForkAncestry + 50) + var forkLen = int(maxForkAncestry + 50) var wg sync.WaitGroup wg.Add(3) go func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1); wg.Done() }() |