diff options
author | Péter Szilágyi <peterke@gmail.com> | 2019-05-13 20:28:01 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-05-13 20:28:01 +0800 |
commit | 9effd642901e13765dcc1396392ba55a18f66ccf (patch) | |
tree | 57355cff7ea6c5efbb5702c6e62ec7b698d4ff15 /eth | |
parent | 40cdcf8c47ff094775aca08fd5d94051f9cf1dbb (diff) | |
download | go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar.gz go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar.bz2 go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar.lz go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar.xz go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar.zst go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.zip |
core, eth, trie: bloom filter for trie node dedup during fast sync (#19489)
* core, eth, trie: bloom filter for trie node dedup during fast sync
* eth/downloader, trie: address review comments
* core, ethdb, trie: restart fast-sync bloom construction now and again
* eth/downloader: initialize fast sync bloom on startup
* eth: reenable eth/62 until we properly remove it
Diffstat (limited to 'eth')
-rw-r--r-- | eth/backend.go | 5 | ||||
-rw-r--r-- | eth/downloader/downloader.go | 36 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 2 | ||||
-rw-r--r-- | eth/downloader/statesync.go | 3 | ||||
-rw-r--r-- | eth/handler.go | 25 | ||||
-rw-r--r-- | eth/handler_test.go | 4 | ||||
-rw-r--r-- | eth/helper_test.go | 3 |
7 files changed, 53 insertions, 25 deletions
diff --git a/eth/backend.go b/eth/backend.go index 3363054aa..f69615776 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -190,10 +190,11 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { } eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain) - if eth.protocolManager, err = NewProtocolManager(chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, config.Whitelist); err != nil { + // Permit the downloader to use the trie cache allowance during fast sync + cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + if eth.protocolManager, err = NewProtocolManager(chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, cacheLimit, config.Whitelist); err != nil { return nil, err } - eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 95f2c2aee..0e19fe9e6 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -34,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/trie" ) var ( @@ -104,7 +105,9 @@ type Downloader struct { genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT) queue *queue // Scheduler for selecting the hashes to download peers *peerSet // Set of active peers from which download can proceed - stateDB ethdb.Database + + stateDB ethdb.Database // Database to state sync into (and deduplicate via) + stateBloom *trie.SyncBloom // Bloom filter for fast trie node existence checks rttEstimate uint64 // Round trip time to target for download requests rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) @@ -207,13 +210,13 @@ type BlockChain interface { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mode SyncMode, checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { +func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { if lightchain == nil { lightchain = chain } dl := &Downloader{ - mode: mode, stateDB: stateDb, + stateBloom: stateBloom, mux: mux, checkpoint: checkpoint, queue: newQueue(), @@ -255,13 +258,15 @@ func (d *Downloader) Progress() ethereum.SyncProgress { defer d.syncStatsLock.RUnlock() current := uint64(0) - switch d.mode { - case FullSync: + switch { + case d.blockchain != nil && d.mode == FullSync: current = d.blockchain.CurrentBlock().NumberU64() - case FastSync: + case d.blockchain != nil && d.mode == FastSync: current = d.blockchain.CurrentFastBlock().NumberU64() - case LightSync: + case d.lightchain != nil: current = d.lightchain.CurrentHeader().Number.Uint64() + default: + log.Error("Unknown downloader chain/mode combo", "light", d.lightchain != nil, "full", d.blockchain != nil, "mode", d.mode) } return ethereum.SyncProgress{ StartingBlock: d.syncStatsChainOrigin, @@ -363,6 +368,12 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { log.Info("Block synchronisation started") } + // If we are already full syncing, but have a fast-sync bloom filter laying + // around, make sure it does't use memory any more. This is a special case + // when the user attempts to fast sync a new empty network. + if mode == FullSync && d.stateBloom != nil { + d.stateBloom.Close() + } // Reset the queue, peer set and wake channels to clean any internal leftover state d.queue.Reset() d.peers.Reset() @@ -1662,6 +1673,8 @@ func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *state func (d *Downloader) commitPivotBlock(result *fetchResult) error { block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash()) + + // Commit the pivot block as the new head, will require full sync from here on if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}); err != nil { return err } @@ -1669,6 +1682,15 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error { return err } atomic.StoreInt32(&d.committed, 1) + + // If we had a bloom filter for the state sync, deallocate it now. Note, we only + // deallocate internally, but keep the empty wrapper. This ensures that if we do + // a rollback after committing the pivot and restarting fast sync, we don't end + // up using a nil bloom. Empty bloom is fine, it just returns that it does not + // have the info we need, so reach down to the database instead. + if d.stateBloom != nil { + d.stateBloom.Close() + } return nil } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index ec45eb06d..ac7f48abd 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -75,7 +75,7 @@ func newTester() *downloadTester { tester.stateDb = rawdb.NewMemoryDatabase() tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00}) - tester.downloader = New(FullSync, 0, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer) + tester.downloader = New(0, tester.stateDb, trie.NewSyncBloom(1, tester.stateDb), new(event.TypeMux), tester, nil, tester.dropPeer) return tester } diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index f294600b3..7e156f46e 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -59,6 +59,7 @@ type stateSyncStats struct { // syncState starts downloading state with the given root hash. func (d *Downloader) syncState(root common.Hash) *stateSync { + // Create the state sync s := newStateSync(d, root) select { case d.stateSyncStart <- s: @@ -239,7 +240,7 @@ type stateTask struct { func newStateSync(d *Downloader, root common.Hash) *stateSync { return &stateSync{ d: d, - sched: state.NewStateSync(root, d.stateDB), + sched: state.NewStateSync(root, d.stateDB, d.stateBloom), keccak: sha3.NewLegacyKeccak256(), tasks: make(map[common.Hash]*stateTask), deliver: make(chan *stateReq), diff --git a/eth/handler.go b/eth/handler.go index 7b753ebb4..58add2eaf 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -39,6 +39,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" ) const ( @@ -105,7 +106,7 @@ type ProtocolManager struct { // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the Ethereum network. -func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, whitelist map[uint64]common.Hash) (*ProtocolManager, error) { +func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ networkID: networkID, @@ -120,12 +121,8 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } - // Figure out whether to allow fast sync or not - if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 { - log.Warn("Blockchain not empty, fast sync disabled") - mode = downloader.FullSync - } - if mode == downloader.FastSync { + // If fast sync was requested and our database is empty, grant it + if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() == 0 { manager.fastSync = uint32(1) } // If we have trusted checkpoints, enforce them on the chain @@ -137,7 +134,8 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) for i, version := range ProtocolVersions { // Skip protocol version if incompatible with the mode of operation - if mode == downloader.FastSync && version < eth63 { + // TODO(karalabe): hard-drop eth/62 from the code base + if atomic.LoadUint32(&manager.fastSync) == 1 && version < eth63 { continue } // Compatible; initialise the sub-protocol @@ -171,9 +169,16 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne if len(manager.SubProtocols) == 0 { return nil, errIncompatibleConfig } - // Construct the different synchronisation mechanisms - manager.downloader = downloader.New(mode, manager.checkpointNumber, chaindb, manager.eventMux, blockchain, nil, manager.removePeer) + // Construct the downloader (long sync) and its backing state bloom if fast + // sync is requested. The downloader is responsible for deallocating the state + // bloom when it's done. + var stateBloom *trie.SyncBloom + if atomic.LoadUint32(&manager.fastSync) == 1 { + stateBloom = trie.NewSyncBloom(uint64(cacheLimit), chaindb) + } + manager.downloader = downloader.New(manager.checkpointNumber, chaindb, stateBloom, manager.eventMux, blockchain, nil, manager.removePeer) + // Construct the fetcher (short sync) validator := func(header *types.Header) error { return engine.VerifyHeader(blockchain, header, true) } diff --git a/eth/handler_test.go b/eth/handler_test.go index 72a0b78ab..04831e4dc 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -528,7 +528,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo if err != nil { t.Fatalf("failed to create new blockchain: %v", err) } - pm, err := NewProtocolManager(config, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), ethash.NewFaker(), blockchain, db, nil) + pm, err := NewProtocolManager(config, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), ethash.NewFaker(), blockchain, db, 1, nil) if err != nil { t.Fatalf("failed to start test protocol manager: %v", err) } @@ -615,7 +615,7 @@ func testBroadcastBlock(t *testing.T, totalPeers, broadcastExpected int) { if err != nil { t.Fatalf("failed to create new blockchain: %v", err) } - pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db, nil) + pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db, 1, nil) if err != nil { t.Fatalf("failed to start test protocol manager: %v", err) } diff --git a/eth/helper_test.go b/eth/helper_test.go index e91429b8c..27e7189ed 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -66,8 +66,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func if _, err := blockchain.InsertChain(chain); err != nil { panic(err) } - - pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, nil) + pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, 1, nil) if err != nil { return nil, nil, err } |