aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2019-05-13 20:28:01 +0800
committerGitHub <noreply@github.com>2019-05-13 20:28:01 +0800
commit9effd642901e13765dcc1396392ba55a18f66ccf (patch)
tree57355cff7ea6c5efbb5702c6e62ec7b698d4ff15 /eth
parent40cdcf8c47ff094775aca08fd5d94051f9cf1dbb (diff)
downloadgo-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar
go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar.gz
go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar.bz2
go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar.lz
go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar.xz
go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar.zst
go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.zip
core, eth, trie: bloom filter for trie node dedup during fast sync (#19489)
* core, eth, trie: bloom filter for trie node dedup during fast sync * eth/downloader, trie: address review comments * core, ethdb, trie: restart fast-sync bloom construction now and again * eth/downloader: initialize fast sync bloom on startup * eth: reenable eth/62 until we properly remove it
Diffstat (limited to 'eth')
-rw-r--r--eth/backend.go5
-rw-r--r--eth/downloader/downloader.go36
-rw-r--r--eth/downloader/downloader_test.go2
-rw-r--r--eth/downloader/statesync.go3
-rw-r--r--eth/handler.go25
-rw-r--r--eth/handler_test.go4
-rw-r--r--eth/helper_test.go3
7 files changed, 53 insertions, 25 deletions
diff --git a/eth/backend.go b/eth/backend.go
index 3363054aa..f69615776 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -190,10 +190,11 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
}
eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)
- if eth.protocolManager, err = NewProtocolManager(chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, config.Whitelist); err != nil {
+ // Permit the downloader to use the trie cache allowance during fast sync
+ cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit
+ if eth.protocolManager, err = NewProtocolManager(chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, cacheLimit, config.Whitelist); err != nil {
return nil, err
}
-
eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock)
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 95f2c2aee..0e19fe9e6 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
+ "github.com/ethereum/go-ethereum/trie"
)
var (
@@ -104,7 +105,9 @@ type Downloader struct {
genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT)
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
- stateDB ethdb.Database
+
+ stateDB ethdb.Database // Database to state sync into (and deduplicate via)
+ stateBloom *trie.SyncBloom // Bloom filter for fast trie node existence checks
rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
@@ -207,13 +210,13 @@ type BlockChain interface {
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
-func New(mode SyncMode, checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
+func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
if lightchain == nil {
lightchain = chain
}
dl := &Downloader{
- mode: mode,
stateDB: stateDb,
+ stateBloom: stateBloom,
mux: mux,
checkpoint: checkpoint,
queue: newQueue(),
@@ -255,13 +258,15 @@ func (d *Downloader) Progress() ethereum.SyncProgress {
defer d.syncStatsLock.RUnlock()
current := uint64(0)
- switch d.mode {
- case FullSync:
+ switch {
+ case d.blockchain != nil && d.mode == FullSync:
current = d.blockchain.CurrentBlock().NumberU64()
- case FastSync:
+ case d.blockchain != nil && d.mode == FastSync:
current = d.blockchain.CurrentFastBlock().NumberU64()
- case LightSync:
+ case d.lightchain != nil:
current = d.lightchain.CurrentHeader().Number.Uint64()
+ default:
+ log.Error("Unknown downloader chain/mode combo", "light", d.lightchain != nil, "full", d.blockchain != nil, "mode", d.mode)
}
return ethereum.SyncProgress{
StartingBlock: d.syncStatsChainOrigin,
@@ -363,6 +368,12 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
log.Info("Block synchronisation started")
}
+ // If we are already full syncing, but have a fast-sync bloom filter laying
+ // around, make sure it does't use memory any more. This is a special case
+ // when the user attempts to fast sync a new empty network.
+ if mode == FullSync && d.stateBloom != nil {
+ d.stateBloom.Close()
+ }
// Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset()
d.peers.Reset()
@@ -1662,6 +1673,8 @@ func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *state
func (d *Downloader) commitPivotBlock(result *fetchResult) error {
block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash())
+
+ // Commit the pivot block as the new head, will require full sync from here on
if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}); err != nil {
return err
}
@@ -1669,6 +1682,15 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
return err
}
atomic.StoreInt32(&d.committed, 1)
+
+ // If we had a bloom filter for the state sync, deallocate it now. Note, we only
+ // deallocate internally, but keep the empty wrapper. This ensures that if we do
+ // a rollback after committing the pivot and restarting fast sync, we don't end
+ // up using a nil bloom. Empty bloom is fine, it just returns that it does not
+ // have the info we need, so reach down to the database instead.
+ if d.stateBloom != nil {
+ d.stateBloom.Close()
+ }
return nil
}
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index ec45eb06d..ac7f48abd 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -75,7 +75,7 @@ func newTester() *downloadTester {
tester.stateDb = rawdb.NewMemoryDatabase()
tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00})
- tester.downloader = New(FullSync, 0, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer)
+ tester.downloader = New(0, tester.stateDb, trie.NewSyncBloom(1, tester.stateDb), new(event.TypeMux), tester, nil, tester.dropPeer)
return tester
}
diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go
index f294600b3..7e156f46e 100644
--- a/eth/downloader/statesync.go
+++ b/eth/downloader/statesync.go
@@ -59,6 +59,7 @@ type stateSyncStats struct {
// syncState starts downloading state with the given root hash.
func (d *Downloader) syncState(root common.Hash) *stateSync {
+ // Create the state sync
s := newStateSync(d, root)
select {
case d.stateSyncStart <- s:
@@ -239,7 +240,7 @@ type stateTask struct {
func newStateSync(d *Downloader, root common.Hash) *stateSync {
return &stateSync{
d: d,
- sched: state.NewStateSync(root, d.stateDB),
+ sched: state.NewStateSync(root, d.stateDB, d.stateBloom),
keccak: sha3.NewLegacyKeccak256(),
tasks: make(map[common.Hash]*stateTask),
deliver: make(chan *stateReq),
diff --git a/eth/handler.go b/eth/handler.go
index 7b753ebb4..58add2eaf 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -39,6 +39,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/trie"
)
const (
@@ -105,7 +106,7 @@ type ProtocolManager struct {
// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the Ethereum network.
-func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
+func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkID: networkID,
@@ -120,12 +121,8 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
- // Figure out whether to allow fast sync or not
- if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
- log.Warn("Blockchain not empty, fast sync disabled")
- mode = downloader.FullSync
- }
- if mode == downloader.FastSync {
+ // If fast sync was requested and our database is empty, grant it
+ if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() == 0 {
manager.fastSync = uint32(1)
}
// If we have trusted checkpoints, enforce them on the chain
@@ -137,7 +134,8 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
for i, version := range ProtocolVersions {
// Skip protocol version if incompatible with the mode of operation
- if mode == downloader.FastSync && version < eth63 {
+ // TODO(karalabe): hard-drop eth/62 from the code base
+ if atomic.LoadUint32(&manager.fastSync) == 1 && version < eth63 {
continue
}
// Compatible; initialise the sub-protocol
@@ -171,9 +169,16 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
if len(manager.SubProtocols) == 0 {
return nil, errIncompatibleConfig
}
- // Construct the different synchronisation mechanisms
- manager.downloader = downloader.New(mode, manager.checkpointNumber, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
+ // Construct the downloader (long sync) and its backing state bloom if fast
+ // sync is requested. The downloader is responsible for deallocating the state
+ // bloom when it's done.
+ var stateBloom *trie.SyncBloom
+ if atomic.LoadUint32(&manager.fastSync) == 1 {
+ stateBloom = trie.NewSyncBloom(uint64(cacheLimit), chaindb)
+ }
+ manager.downloader = downloader.New(manager.checkpointNumber, chaindb, stateBloom, manager.eventMux, blockchain, nil, manager.removePeer)
+ // Construct the fetcher (short sync)
validator := func(header *types.Header) error {
return engine.VerifyHeader(blockchain, header, true)
}
diff --git a/eth/handler_test.go b/eth/handler_test.go
index 72a0b78ab..04831e4dc 100644
--- a/eth/handler_test.go
+++ b/eth/handler_test.go
@@ -528,7 +528,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
if err != nil {
t.Fatalf("failed to create new blockchain: %v", err)
}
- pm, err := NewProtocolManager(config, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), ethash.NewFaker(), blockchain, db, nil)
+ pm, err := NewProtocolManager(config, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), ethash.NewFaker(), blockchain, db, 1, nil)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
@@ -615,7 +615,7 @@ func testBroadcastBlock(t *testing.T, totalPeers, broadcastExpected int) {
if err != nil {
t.Fatalf("failed to create new blockchain: %v", err)
}
- pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db, nil)
+ pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db, 1, nil)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
diff --git a/eth/helper_test.go b/eth/helper_test.go
index e91429b8c..27e7189ed 100644
--- a/eth/helper_test.go
+++ b/eth/helper_test.go
@@ -66,8 +66,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
if _, err := blockchain.InsertChain(chain); err != nil {
panic(err)
}
-
- pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, nil)
+ pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, 1, nil)
if err != nil {
return nil, nil, err
}