diff options
author | Péter Szilágyi <peterke@gmail.com> | 2019-05-13 20:28:01 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-05-13 20:28:01 +0800 |
commit | 9effd642901e13765dcc1396392ba55a18f66ccf (patch) | |
tree | 57355cff7ea6c5efbb5702c6e62ec7b698d4ff15 /trie | |
parent | 40cdcf8c47ff094775aca08fd5d94051f9cf1dbb (diff) | |
download | go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar.gz go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar.bz2 go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar.lz go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar.xz go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.tar.zst go-tangerine-9effd642901e13765dcc1396392ba55a18f66ccf.zip |
core, eth, trie: bloom filter for trie node dedup during fast sync (#19489)
* core, eth, trie: bloom filter for trie node dedup during fast sync
* eth/downloader, trie: address review comments
* core, ethdb, trie: restart fast-sync bloom construction now and again
* eth/downloader: initialize fast sync bloom on startup
* eth: reenable eth/62 until we properly remove it
Diffstat (limited to 'trie')
-rw-r--r-- | trie/sync.go | 37 | ||||
-rw-r--r-- | trie/sync_bloom.go | 207 | ||||
-rw-r--r-- | trie/sync_test.go | 14 |
3 files changed, 241 insertions, 17 deletions
diff --git a/trie/sync.go b/trie/sync.go index 85f1b0f85..d9564d783 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -76,15 +76,17 @@ type Sync struct { membatch *syncMemBatch // Memory buffer to avoid frequent database writes requests map[common.Hash]*request // Pending requests pertaining to a key hash queue *prque.Prque // Priority queue with the pending requests + bloom *SyncBloom // Bloom filter for fast node existence checks } // NewSync creates a new trie data download scheduler. -func NewSync(root common.Hash, database ethdb.Reader, callback LeafCallback) *Sync { +func NewSync(root common.Hash, database ethdb.Reader, callback LeafCallback, bloom *SyncBloom) *Sync { ts := &Sync{ database: database, membatch: newSyncMemBatch(), requests: make(map[common.Hash]*request), queue: prque.New(nil), + bloom: bloom, } ts.AddSubTrie(root, 0, common.Hash{}, callback) return ts @@ -99,10 +101,14 @@ func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callb if _, ok := s.membatch.batch[root]; ok { return } - key := root.Bytes() - blob, _ := s.database.Get(key) - if local, err := decodeNode(key, blob); local != nil && err == nil { - return + if s.bloom.Contains(root[:]) { + // Bloom filter says this might be a duplicate, double check + blob, _ := s.database.Get(root[:]) + if local, err := decodeNode(root[:], blob); local != nil && err == nil { + return + } + // False positive, bump fault meter + bloomFaultMeter.Mark(1) } // Assemble the new sub-trie sync request req := &request{ @@ -134,8 +140,13 @@ func (s *Sync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) { if _, ok := s.membatch.batch[hash]; ok { return } - if ok, _ := s.database.Has(hash.Bytes()); ok { - return + if s.bloom.Contains(hash[:]) { + // Bloom filter says this might be a duplicate, double check + if ok, _ := s.database.Has(hash[:]); ok { + return + } + // False positive, bump fault meter + bloomFaultMeter.Mark(1) } // Assemble the new sub-trie sync request req := &request{ @@ -219,8 +230,9 @@ func (s *Sync) Commit(dbw ethdb.Writer) (int, error) { if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil { return i, err } + s.bloom.Add(key[:]) } - written := len(s.membatch.order) + written := len(s.membatch.order) // TODO(karalabe): could an order change improve write performance? // Drop the membatch data and return s.membatch = newSyncMemBatch() @@ -292,8 +304,13 @@ func (s *Sync) children(req *request, object node) ([]*request, error) { if _, ok := s.membatch.batch[hash]; ok { continue } - if ok, _ := s.database.Has(node); ok { - continue + if s.bloom.Contains(node) { + // Bloom filter says this might be a duplicate, double check + if ok, _ := s.database.Has(node); ok { + continue + } + // False positive, bump fault meter + bloomFaultMeter.Mark(1) } // Locally unknown node, schedule for retrieval requests = append(requests, &request{ diff --git a/trie/sync_bloom.go b/trie/sync_bloom.go new file mode 100644 index 000000000..899a63add --- /dev/null +++ b/trie/sync_bloom.go @@ -0,0 +1,207 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package trie + +import ( + "encoding/binary" + "fmt" + "math" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/steakknife/bloomfilter" +) + +var ( + bloomAddMeter = metrics.NewRegisteredMeter("trie/bloom/add", nil) + bloomLoadMeter = metrics.NewRegisteredMeter("trie/bloom/load", nil) + bloomTestMeter = metrics.NewRegisteredMeter("trie/bloom/test", nil) + bloomMissMeter = metrics.NewRegisteredMeter("trie/bloom/miss", nil) + bloomFaultMeter = metrics.NewRegisteredMeter("trie/bloom/fault", nil) + bloomErrorGauge = metrics.NewRegisteredGauge("trie/bloom/error", nil) +) + +// syncBloomHasher is a wrapper around a byte blob to satisfy the interface API +// requirements of the bloom library used. It's used to convert a trie hash into +// a 64 bit mini hash. +type syncBloomHasher []byte + +func (f syncBloomHasher) Write(p []byte) (n int, err error) { panic("not implemented") } +func (f syncBloomHasher) Sum(b []byte) []byte { panic("not implemented") } +func (f syncBloomHasher) Reset() { panic("not implemented") } +func (f syncBloomHasher) BlockSize() int { panic("not implemented") } +func (f syncBloomHasher) Size() int { return 8 } +func (f syncBloomHasher) Sum64() uint64 { return binary.BigEndian.Uint64(f) } + +// SyncBloom is a bloom filter used during fast sync to quickly decide if a trie +// node already exists on disk or not. It self populates from the provided disk +// database on creation in a background thread and will only start returning live +// results once that's finished. +type SyncBloom struct { + bloom *bloomfilter.Filter + inited uint32 + closer sync.Once + closed uint32 + pend sync.WaitGroup +} + +// NewSyncBloom creates a new bloom filter of the given size (in megabytes) and +// initializes it from the database. The bloom is hard coded to use 3 filters. +func NewSyncBloom(memory uint64, database ethdb.Iteratee) *SyncBloom { + // Create the bloom filter to track known trie nodes + bloom, err := bloomfilter.New(memory*1024*1024*8, 3) + if err != nil { + panic(fmt.Sprintf("failed to create bloom: %v", err)) // Can't happen, here for sanity + } + log.Info("Allocated fast sync bloom", "size", common.StorageSize(memory*1024*1024)) + + // Assemble the fast sync bloom and init it from previous sessions + b := &SyncBloom{ + bloom: bloom, + } + b.pend.Add(2) + go func() { + defer b.pend.Done() + b.init(database) + }() + go func() { + defer b.pend.Done() + b.meter() + }() + return b +} + +// init iterates over the database, pushing every trie hash into the bloom filter. +func (b *SyncBloom) init(database ethdb.Iteratee) { + // Iterate over the database, but restart every now and again to avoid holding + // a persistent snapshot since fast sync can push a ton of data concurrently, + // bloating the disk. + // + // Note, this is fine, because everything inserted into leveldb by fast sync is + // also pushed into the bloom directly, so we're not missing anything when the + // iterator is swapped out for a new one. + it := database.NewIterator() + + var ( + start = time.Now() + swap = time.Now() + ) + for it.Next() && atomic.LoadUint32(&b.closed) == 0 { + // If the database entry is a trie node, add it to the bloom + if key := it.Key(); len(key) == common.HashLength { + b.bloom.Add(syncBloomHasher(key)) + bloomLoadMeter.Mark(1) + } + // If enough time elapsed since the last iterator swap, restart + if time.Since(swap) > 8*time.Second { + key := common.CopyBytes(it.Key()) + + it.Release() + it = database.NewIteratorWithStart(key) + + log.Info("Initializing fast sync bloom", "items", b.bloom.N(), "errorrate", b.errorRate(), "elapsed", time.Since(start)) + swap = time.Now() + } + } + it.Release() + + // Mark the bloom filter inited and return + log.Info("Initialized fast sync bloom", "items", b.bloom.N(), "errorrate", b.errorRate(), "elapsed", time.Since(start)) + atomic.StoreUint32(&b.inited, 1) +} + +// meter periodically recalculates the false positive error rate of the bloom +// filter and reports it in a metric. +func (b *SyncBloom) meter() { + for { + // Report the current error ration. No floats, lame, scale it up. + bloomErrorGauge.Update(int64(b.errorRate() * 100000)) + + // Wait one second, but check termination more frequently + for i := 0; i < 10; i++ { + if atomic.LoadUint32(&b.closed) == 1 { + return + } + time.Sleep(100 * time.Millisecond) + } + } +} + +// Close terminates any background initializer still running and releases all the +// memory allocated for the bloom. +func (b *SyncBloom) Close() error { + b.closer.Do(func() { + // Ensure the initializer is stopped + atomic.StoreUint32(&b.closed, 1) + b.pend.Wait() + + // Wipe the bloom, but mark it "uninited" just in case someone attempts an access + log.Info("Deallocated fast sync bloom", "items", b.bloom.N(), "errorrate", b.errorRate()) + + atomic.StoreUint32(&b.inited, 0) + b.bloom = nil + }) + return nil +} + +// Add inserts a new trie node hash into the bloom filter. +func (b *SyncBloom) Add(hash []byte) { + if atomic.LoadUint32(&b.closed) == 1 { + return + } + b.bloom.Add(syncBloomHasher(hash)) + bloomAddMeter.Mark(1) +} + +// Contains tests if the bloom filter contains the given hash: +// - false: the bloom definitely does not contain hash +// - true: the bloom maybe contains hash +// +// While the bloom is being initialized, any query will return true. +func (b *SyncBloom) Contains(hash []byte) bool { + bloomTestMeter.Mark(1) + if atomic.LoadUint32(&b.inited) == 0 { + // We didn't load all the trie nodes from the previous run of Geth yet. As + // such, we can't say for sure if a hash is not present for anything. Until + // the init is done, we're faking "possible presence" for everything. + return true + } + // Bloom initialized, check the real one and report any successful misses + maybe := b.bloom.Contains(syncBloomHasher(hash)) + if !maybe { + bloomMissMeter.Mark(1) + } + return maybe +} + +// errorRate calculates the probability of a random containment test returning a +// false positive. +// +// We're calculating it ourselves because the bloom library we used missed a +// parentheses in the formula and calculates it wrong. And it's discontinued... +func (b *SyncBloom) errorRate() float64 { + k := float64(b.bloom.K()) + n := float64(b.bloom.N()) + m := float64(b.bloom.M()) + + return math.Pow(1.0-math.Exp((-k)*(n+0.5)/(m-1)), k) +} diff --git a/trie/sync_test.go b/trie/sync_test.go index 0d8c29cfe..0621bb435 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -94,7 +94,7 @@ func TestEmptySync(t *testing.T) { emptyB, _ := New(emptyRoot, dbB) for i, trie := range []*Trie{emptyA, emptyB} { - if req := NewSync(trie.Hash(), memorydb.New(), nil).Missing(1); len(req) != 0 { + if req := NewSync(trie.Hash(), memorydb.New(), nil, NewSyncBloom(1, memorydb.New())).Missing(1); len(req) != 0 { t.Errorf("test %d: content requested for empty trie: %v", i, req) } } @@ -112,7 +112,7 @@ func testIterativeSync(t *testing.T, batch int) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil) + sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) queue := append([]common.Hash{}, sched.Missing(batch)...) for len(queue) > 0 { @@ -145,7 +145,7 @@ func TestIterativeDelayedSync(t *testing.T) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil) + sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) queue := append([]common.Hash{}, sched.Missing(10000)...) for len(queue) > 0 { @@ -183,7 +183,7 @@ func testIterativeRandomSync(t *testing.T, batch int) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil) + sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) queue := make(map[common.Hash]struct{}) for _, hash := range sched.Missing(batch) { @@ -224,7 +224,7 @@ func TestIterativeRandomDelayedSync(t *testing.T) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil) + sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) queue := make(map[common.Hash]struct{}) for _, hash := range sched.Missing(10000) { @@ -271,7 +271,7 @@ func TestDuplicateAvoidanceSync(t *testing.T) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil) + sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) queue := append([]common.Hash{}, sched.Missing(0)...) requested := make(map[common.Hash]struct{}) @@ -311,7 +311,7 @@ func TestIncompleteSync(t *testing.T) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil) + sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) var added []common.Hash queue := append([]common.Hash{}, sched.Missing(1)...) |