aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2019-05-17 00:10:58 +0800
committerGitHub <noreply@github.com>2019-05-17 00:10:58 +0800
commitf5d89cdb72c1e82e9deb54754bef8dd20bf12591 (patch)
treeb6e2ee16b90cd494eaefec59fb4aaaac71cc6a96 /eth
parent60386b3545659d99c7488e456c780606db101936 (diff)
parent9eba3a9fff2f47f5e094c36a7c905380b0ac8b1f (diff)
downloadgo-tangerine-f5d89cdb72c1e82e9deb54754bef8dd20bf12591.tar
go-tangerine-f5d89cdb72c1e82e9deb54754bef8dd20bf12591.tar.gz
go-tangerine-f5d89cdb72c1e82e9deb54754bef8dd20bf12591.tar.bz2
go-tangerine-f5d89cdb72c1e82e9deb54754bef8dd20bf12591.tar.lz
go-tangerine-f5d89cdb72c1e82e9deb54754bef8dd20bf12591.tar.xz
go-tangerine-f5d89cdb72c1e82e9deb54754bef8dd20bf12591.tar.zst
go-tangerine-f5d89cdb72c1e82e9deb54754bef8dd20bf12591.zip
Merge pull request #19244 from karalabe/freezer-2
cmd, core, eth, les, node: chain freezer on top of db rework
Diffstat (limited to 'eth')
-rw-r--r--eth/backend.go2
-rw-r--r--eth/config.go1
-rw-r--r--eth/downloader/downloader.go80
-rw-r--r--eth/downloader/downloader_test.go74
-rw-r--r--eth/downloader/testchain_test.go2
5 files changed, 128 insertions, 31 deletions
diff --git a/eth/backend.go b/eth/backend.go
index f69615776..6b9c98bf2 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -120,7 +120,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024)
// Assemble the Ethereum object
- chainDb, err := ctx.OpenDatabase("chaindata", config.DatabaseCache, config.DatabaseHandles, "eth/db/chaindata/")
+ chainDb, err := ctx.OpenDatabaseWithFreezer("chaindata", config.DatabaseCache, config.DatabaseHandles, config.DatabaseFreezer, "eth/db/chaindata/")
if err != nil {
return nil, err
}
diff --git a/eth/config.go b/eth/config.go
index fbe6597b6..ccd5674a7 100644
--- a/eth/config.go
+++ b/eth/config.go
@@ -114,6 +114,7 @@ type Config struct {
SkipBcVersionCheck bool `toml:"-"`
DatabaseHandles int `toml:"-"`
DatabaseCache int
+ DatabaseFreezer string
TrieCleanCache int
TrieDirtyCache int
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 0e19fe9e6..495fa0e74 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -25,7 +25,7 @@ import (
"sync/atomic"
"time"
- ethereum "github.com/ethereum/go-ethereum"
+ "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
@@ -46,20 +46,20 @@ var (
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
MaxStateFetch = 384 // Amount of node state values to allow fetching per request
- MaxForkAncestry = 3 * params.EpochDuration // Maximum chain reorganisation
- rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests
- rttMaxEstimate = 20 * time.Second // Maximum round-trip time to target for download requests
- rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
- ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
- ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts
+ rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests
+ rttMaxEstimate = 20 * time.Second // Maximum round-trip time to target for download requests
+ rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
+ ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
+ ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts
qosTuningPeers = 5 // Number of peers to tune based on (best peers)
qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
- maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
- maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
- maxResultsProcess = 2048 // Number of content download results to import at once into the chain
+ maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
+ maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
+ maxResultsProcess = 2048 // Number of content download results to import at once into the chain
+ maxForkAncestry uint64 = params.ImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it)
reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection
reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs
@@ -129,6 +129,7 @@ type Downloader struct {
synchronising int32
notified int32
committed int32
+ ancientLimit uint64 // The maximum block number which can be regarded as ancient data.
// Channels
headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
@@ -206,7 +207,7 @@ type BlockChain interface {
InsertChain(types.Blocks) (int, error)
// InsertReceiptChain inserts a batch of receipts into the local chain.
- InsertReceiptChain(types.Blocks, []types.Receipts) (int, error)
+ InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error)
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
@@ -438,7 +439,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode)
defer func(start time.Time) {
- log.Debug("Synchronisation terminated", "elapsed", time.Since(start))
+ log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start)))
}(time.Now())
// Look up the sync boundaries: the common ancestor and the target block
@@ -475,12 +476,49 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
if d.mode == FastSync && pivot != 0 {
d.committed = 0
}
+ if d.mode == FastSync {
+ // Set the ancient data limitation.
+ // If we are running fast sync, all block data older than ancientLimit will be
+ // written to the ancient store. More recent data will be written to the active
+ // database and will wait for the freezer to migrate.
+ //
+ // If there is a checkpoint available, then calculate the ancientLimit through
+ // that. Otherwise calculate the ancient limit through the advertised height
+ // of the remote peer.
+ //
+ // The reason for picking checkpoint first is that a malicious peer can give us
+ // a fake (very high) height, forcing the ancient limit to also be very high.
+ // The peer would start to feed us valid blocks until head, resulting in all of
+ // the blocks might be written into the ancient store. A following mini-reorg
+ // could cause issues.
+ if d.checkpoint != 0 && d.checkpoint > maxForkAncestry+1 {
+ d.ancientLimit = d.checkpoint
+ } else if height > maxForkAncestry+1 {
+ d.ancientLimit = height - maxForkAncestry - 1
+ }
+ frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.
+ // If a part of blockchain data has already been written into active store,
+ // disable the ancient style insertion explicitly.
+ if origin >= frozen && frozen != 0 {
+ d.ancientLimit = 0
+ log.Info("Disabling direct-ancient mode", "origin", origin, "ancient", frozen-1)
+ } else if d.ancientLimit > 0 {
+ log.Debug("Enabling direct-ancient mode", "ancient", d.ancientLimit)
+ }
+ // Rewind the ancient store and blockchain if reorg happens.
+ if origin+1 < frozen {
+ var hashes []common.Hash
+ for i := origin + 1; i < d.lightchain.CurrentHeader().Number.Uint64(); i++ {
+ hashes = append(hashes, rawdb.ReadCanonicalHash(d.stateDB, i))
+ }
+ d.lightchain.Rollback(hashes)
+ }
+ }
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(origin+1, d.mode)
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
}
-
fetchers := []func() error{
func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
@@ -544,6 +582,9 @@ func (d *Downloader) cancel() {
func (d *Downloader) Cancel() {
d.cancel()
d.cancelWg.Wait()
+
+ d.ancientLimit = 0
+ log.Debug("Reset ancient limit to zero")
}
// Terminate interrupts the downloader, canceling all pending operations.
@@ -684,9 +725,9 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight)
// Recap floor value for binary search
- if localHeight >= MaxForkAncestry {
+ if localHeight >= maxForkAncestry {
// We're above the max reorg threshold, find the earliest fork point
- floor = int64(localHeight - MaxForkAncestry)
+ floor = int64(localHeight - maxForkAncestry)
}
// If we're doing a light sync, ensure the floor doesn't go below the CHT, as
// all headers before that point will be missing.
@@ -1315,7 +1356,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
// queue until the stream ends or a failure occurs.
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
// Keep a count of uncertain headers to roll back
- rollback := []*types.Header{}
+ var rollback []*types.Header
defer func() {
if len(rollback) > 0 {
// Flatten the headers and roll them back
@@ -1409,11 +1450,10 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
limit = len(headers)
}
chunk := headers[:limit]
-
// In case of header only syncing, validate the chunk immediately
if d.mode == FastSync || d.mode == LightSync {
// Collect the yet unknown headers to mark them as uncertain
- unknown := make([]*types.Header, 0, len(headers))
+ unknown := make([]*types.Header, 0, len(chunk))
for _, header := range chunk {
if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) {
unknown = append(unknown, header)
@@ -1663,7 +1703,7 @@ func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *state
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
receipts[i] = result.Receipts
}
- if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil {
+ if index, err := d.blockchain.InsertReceiptChain(blocks, receipts, d.ancientLimit); err != nil {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
return errInvalidChain
}
@@ -1675,7 +1715,7 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash())
// Commit the pivot block as the new head, will require full sync from here on
- if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}); err != nil {
+ if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}, d.ancientLimit); err != nil {
return err
}
if err := d.blockchain.FastSyncCommitHead(block.Hash()); err != nil {
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index ac7f48abd..5b56ff161 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -37,7 +37,7 @@ import (
// Reduce some of the parameters to make the tester faster.
func init() {
- MaxForkAncestry = uint64(10000)
+ maxForkAncestry = 10000
blockCacheItems = 1024
fsHeaderContCheck = 500 * time.Millisecond
}
@@ -57,6 +57,11 @@ type downloadTester struct {
ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester
ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain
+ ancientHeaders map[common.Hash]*types.Header // Ancient headers belonging to the tester
+ ancientBlocks map[common.Hash]*types.Block // Ancient blocks belonging to the tester
+ ancientReceipts map[common.Hash]types.Receipts // Ancient receipts belonging to the tester
+ ancientChainTd map[common.Hash]*big.Int // Ancient total difficulties of the blocks in the local chain
+
lock sync.RWMutex
}
@@ -71,6 +76,12 @@ func newTester() *downloadTester {
ownBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis},
ownReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil},
ownChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()},
+
+ // Initialize ancient store with test genesis block
+ ancientHeaders: map[common.Hash]*types.Header{testGenesis.Hash(): testGenesis.Header()},
+ ancientBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis},
+ ancientReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil},
+ ancientChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()},
}
tester.stateDb = rawdb.NewMemoryDatabase()
tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00})
@@ -122,6 +133,9 @@ func (dl *downloadTester) HasFastBlock(hash common.Hash, number uint64) bool {
dl.lock.RLock()
defer dl.lock.RUnlock()
+ if _, ok := dl.ancientReceipts[hash]; ok {
+ return true
+ }
_, ok := dl.ownReceipts[hash]
return ok
}
@@ -131,6 +145,10 @@ func (dl *downloadTester) GetHeaderByHash(hash common.Hash) *types.Header {
dl.lock.RLock()
defer dl.lock.RUnlock()
+ header := dl.ancientHeaders[hash]
+ if header != nil {
+ return header
+ }
return dl.ownHeaders[hash]
}
@@ -139,6 +157,10 @@ func (dl *downloadTester) GetBlockByHash(hash common.Hash) *types.Block {
dl.lock.RLock()
defer dl.lock.RUnlock()
+ block := dl.ancientBlocks[hash]
+ if block != nil {
+ return block
+ }
return dl.ownBlocks[hash]
}
@@ -148,6 +170,9 @@ func (dl *downloadTester) CurrentHeader() *types.Header {
defer dl.lock.RUnlock()
for i := len(dl.ownHashes) - 1; i >= 0; i-- {
+ if header := dl.ancientHeaders[dl.ownHashes[i]]; header != nil {
+ return header
+ }
if header := dl.ownHeaders[dl.ownHashes[i]]; header != nil {
return header
}
@@ -161,6 +186,12 @@ func (dl *downloadTester) CurrentBlock() *types.Block {
defer dl.lock.RUnlock()
for i := len(dl.ownHashes) - 1; i >= 0; i-- {
+ if block := dl.ancientBlocks[dl.ownHashes[i]]; block != nil {
+ if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil {
+ return block
+ }
+ return block
+ }
if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil {
if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil {
return block
@@ -176,6 +207,9 @@ func (dl *downloadTester) CurrentFastBlock() *types.Block {
defer dl.lock.RUnlock()
for i := len(dl.ownHashes) - 1; i >= 0; i-- {
+ if block := dl.ancientBlocks[dl.ownHashes[i]]; block != nil {
+ return block
+ }
if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil {
return block
}
@@ -198,6 +232,9 @@ func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int {
dl.lock.RLock()
defer dl.lock.RUnlock()
+ if td := dl.ancientChainTd[hash]; td != nil {
+ return td
+ }
return dl.ownChainTd[hash]
}
@@ -254,7 +291,7 @@ func (dl *downloadTester) InsertChain(blocks types.Blocks) (i int, err error) {
}
// InsertReceiptChain injects a new batch of receipts into the simulated chain.
-func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts) (i int, err error) {
+func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts, ancientLimit uint64) (i int, err error) {
dl.lock.Lock()
defer dl.lock.Unlock()
@@ -262,11 +299,25 @@ func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []typ
if _, ok := dl.ownHeaders[blocks[i].Hash()]; !ok {
return i, errors.New("unknown owner")
}
- if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok {
- return i, errors.New("unknown parent")
+ if _, ok := dl.ancientBlocks[blocks[i].ParentHash()]; !ok {
+ if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok {
+ return i, errors.New("unknown parent")
+ }
+ }
+ if blocks[i].NumberU64() <= ancientLimit {
+ dl.ancientBlocks[blocks[i].Hash()] = blocks[i]
+ dl.ancientReceipts[blocks[i].Hash()] = receipts[i]
+
+ // Migrate from active db to ancient db
+ dl.ancientHeaders[blocks[i].Hash()] = blocks[i].Header()
+ dl.ancientChainTd[blocks[i].Hash()] = new(big.Int).Add(dl.ancientChainTd[blocks[i].ParentHash()], blocks[i].Difficulty())
+
+ delete(dl.ownHeaders, blocks[i].Hash())
+ delete(dl.ownChainTd, blocks[i].Hash())
+ } else {
+ dl.ownBlocks[blocks[i].Hash()] = blocks[i]
+ dl.ownReceipts[blocks[i].Hash()] = receipts[i]
}
- dl.ownBlocks[blocks[i].Hash()] = blocks[i]
- dl.ownReceipts[blocks[i].Hash()] = receipts[i]
}
return len(blocks), nil
}
@@ -284,6 +335,11 @@ func (dl *downloadTester) Rollback(hashes []common.Hash) {
delete(dl.ownHeaders, hashes[i])
delete(dl.ownReceipts, hashes[i])
delete(dl.ownBlocks, hashes[i])
+
+ delete(dl.ancientChainTd, hashes[i])
+ delete(dl.ancientHeaders, hashes[i])
+ delete(dl.ancientReceipts, hashes[i])
+ delete(dl.ancientBlocks, hashes[i])
}
}
@@ -411,13 +467,13 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng
if tester.downloader.mode == LightSync {
blocks, receipts = 1, 1
}
- if hs := len(tester.ownHeaders); hs != headers {
+ if hs := len(tester.ownHeaders) + len(tester.ancientHeaders) - 1; hs != headers {
t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers)
}
- if bs := len(tester.ownBlocks); bs != blocks {
+ if bs := len(tester.ownBlocks) + len(tester.ancientBlocks) - 1; bs != blocks {
t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, blocks)
}
- if rs := len(tester.ownReceipts); rs != receipts {
+ if rs := len(tester.ownReceipts) + len(tester.ancientReceipts) - 1; rs != receipts {
t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts)
}
}
diff --git a/eth/downloader/testchain_test.go b/eth/downloader/testchain_test.go
index 4ae342dc6..f410152f5 100644
--- a/eth/downloader/testchain_test.go
+++ b/eth/downloader/testchain_test.go
@@ -45,7 +45,7 @@ var testChainBase = newTestChain(blockCacheItems+200, testGenesis)
var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain
func init() {
- var forkLen = int(MaxForkAncestry + 50)
+ var forkLen = int(maxForkAncestry + 50)
var wg sync.WaitGroup
wg.Add(3)
go func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1); wg.Done() }()