diff options
author | Martin Holst Swende <martin@swende.se> | 2019-03-26 22:48:31 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2019-03-26 22:48:31 +0800 |
commit | 59e195324643e8f3a18396b529e3350e550fdecc (patch) | |
tree | 80850abf25e40b1e3a5bd359dacc654b651d8c73 /trie | |
parent | df717abc999add34c5725ab86dce1fcee968ca10 (diff) | |
download | go-tangerine-59e195324643e8f3a18396b529e3350e550fdecc.tar go-tangerine-59e195324643e8f3a18396b529e3350e550fdecc.tar.gz go-tangerine-59e195324643e8f3a18396b529e3350e550fdecc.tar.bz2 go-tangerine-59e195324643e8f3a18396b529e3350e550fdecc.tar.lz go-tangerine-59e195324643e8f3a18396b529e3350e550fdecc.tar.xz go-tangerine-59e195324643e8f3a18396b529e3350e550fdecc.tar.zst go-tangerine-59e195324643e8f3a18396b529e3350e550fdecc.zip |
core, ethdb, trie: mode dirty data to clean cache on flush (#19307)
This PR is a more advanced form of the dirty-to-clean cacher (#18995),
where we reuse previous database write batches as datasets to uncache,
saving a dirty-trie-iteration and a dirty-trie-rlp-reencoding per block.
Diffstat (limited to 'trie')
-rw-r--r-- | trie/database.go | 120 |
1 files changed, 72 insertions, 48 deletions
diff --git a/trie/database.go b/trie/database.go index 6df1a7f79..3bbcb6ade 100644 --- a/trie/database.go +++ b/trie/database.go @@ -59,6 +59,11 @@ const secureKeyLength = 11 + 32 // Database is an intermediate write layer between the trie data structures and // the disk database. The aim is to accumulate trie writes in-memory and only // periodically flush a couple tries to disk, garbage collecting the remainder. +// +// Note, the trie Database is **not** thread safe in its mutations, but it **is** +// thread safe in providing individual, independent node access. The rationale +// behind this split design is to provide read access to RPC handlers and sync +// servers even while the trie is executing expensive garbage collection. type Database struct { diskdb ethdb.KeyValueStore // Persistent storage for matured trie nodes @@ -465,8 +470,8 @@ func (db *Database) Nodes() []common.Hash { // Reference adds a new reference from a parent node to a child node. func (db *Database) Reference(child common.Hash, parent common.Hash) { - db.lock.RLock() - defer db.lock.RUnlock() + db.lock.Lock() + defer db.lock.Unlock() db.reference(child, parent) } @@ -561,13 +566,14 @@ func (db *Database) dereference(child common.Hash, parent common.Hash) { // Cap iteratively flushes old but still referenced trie nodes until the total // memory usage goes below the given threshold. +// +// Note, this method is a non-synchronized mutator. It is unsafe to call this +// concurrently with other mutators. func (db *Database) Cap(limit common.StorageSize) error { // Create a database batch to flush persistent data out. It is important that // outside code doesn't see an inconsistent state (referenced data removed from // memory cache during commit but not yet in persistent storage). This is ensured // by only uncaching existing data when the database write finalizes. - db.lock.RLock() - nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now() batch := db.diskdb.NewBatch() @@ -583,12 +589,10 @@ func (db *Database) Cap(limit common.StorageSize) error { for hash, preimage := range db.preimages { if err := batch.Put(db.secureKey(hash[:]), preimage); err != nil { log.Error("Failed to commit preimage from trie database", "err", err) - db.lock.RUnlock() return err } if batch.ValueSize() > ethdb.IdealBatchSize { if err := batch.Write(); err != nil { - db.lock.RUnlock() return err } batch.Reset() @@ -601,14 +605,12 @@ func (db *Database) Cap(limit common.StorageSize) error { // Fetch the oldest referenced node and push into the batch node := db.dirties[oldest] if err := batch.Put(oldest[:], node.rlp()); err != nil { - db.lock.RUnlock() return err } // If we exceeded the ideal batch size, commit and reset if batch.ValueSize() >= ethdb.IdealBatchSize { if err := batch.Write(); err != nil { log.Error("Failed to write flush list to disk", "err", err) - db.lock.RUnlock() return err } batch.Reset() @@ -623,11 +625,8 @@ func (db *Database) Cap(limit common.StorageSize) error { // Flush out any remainder data from the last batch if err := batch.Write(); err != nil { log.Error("Failed to write flush list to disk", "err", err) - db.lock.RUnlock() return err } - db.lock.RUnlock() - // Write successful, clear out the flushed data db.lock.Lock() defer db.lock.Unlock() @@ -661,16 +660,16 @@ func (db *Database) Cap(limit common.StorageSize) error { } // Commit iterates over all the children of a particular node, writes them out -// to disk, forcefully tearing down all references in both directions. +// to disk, forcefully tearing down all references in both directions. As a side +// effect, all pre-images accumulated up to this point are also written. // -// As a side effect, all pre-images accumulated up to this point are also written. +// Note, this method is a non-synchronized mutator. It is unsafe to call this +// concurrently with other mutators. func (db *Database) Commit(node common.Hash, report bool) error { // Create a database batch to flush persistent data out. It is important that // outside code doesn't see an inconsistent state (referenced data removed from // memory cache during commit but not yet in persistent storage). This is ensured // by only uncaching existing data when the database write finalizes. - db.lock.RLock() - start := time.Now() batch := db.diskdb.NewBatch() @@ -678,41 +677,47 @@ func (db *Database) Commit(node common.Hash, report bool) error { for hash, preimage := range db.preimages { if err := batch.Put(db.secureKey(hash[:]), preimage); err != nil { log.Error("Failed to commit preimage from trie database", "err", err) - db.lock.RUnlock() return err } + // If the batch is too large, flush to disk if batch.ValueSize() > ethdb.IdealBatchSize { if err := batch.Write(); err != nil { - db.lock.RUnlock() return err } batch.Reset() } } + // Since we're going to replay trie node writes into the clean cache, flush out + // any batched pre-images before continuing. + if err := batch.Write(); err != nil { + return err + } + batch.Reset() + // Move the trie itself into the batch, flushing if enough data is accumulated nodes, storage := len(db.dirties), db.dirtiesSize - if err := db.commit(node, batch); err != nil { + + uncacher := &cleaner{db} + if err := db.commit(node, batch, uncacher); err != nil { log.Error("Failed to commit trie from trie database", "err", err) - db.lock.RUnlock() return err } - // Write batch ready, unlock for readers during persistence + // Trie mostly committed to disk, flush any batch leftovers if err := batch.Write(); err != nil { log.Error("Failed to write trie to disk", "err", err) - db.lock.RUnlock() return err } - db.lock.RUnlock() - - // Write successful, clear out the flushed data + // Uncache any leftovers in the last batch db.lock.Lock() defer db.lock.Unlock() + batch.Replay(uncacher) + batch.Reset() + + // Reset the storage counters and bumpd metrics db.preimages = make(map[common.Hash][]byte) db.preimagesSize = 0 - db.uncache(node) - memcacheCommitTimeTimer.Update(time.Since(start)) memcacheCommitSizeMeter.Mark(int64(storage - db.dirtiesSize)) memcacheCommitNodesMeter.Mark(int64(nodes - len(db.dirties))) @@ -732,14 +737,14 @@ func (db *Database) Commit(node common.Hash, report bool) error { } // commit is the private locked version of Commit. -func (db *Database) commit(hash common.Hash, batch ethdb.Batch) error { +func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleaner) error { // If the node does not exist, it's a previously committed node node, ok := db.dirties[hash] if !ok { return nil } for _, child := range node.childs() { - if err := db.commit(child, batch); err != nil { + if err := db.commit(child, batch, uncacher); err != nil { return err } } @@ -751,39 +756,58 @@ func (db *Database) commit(hash common.Hash, batch ethdb.Batch) error { if err := batch.Write(); err != nil { return err } + db.lock.Lock() + batch.Replay(uncacher) batch.Reset() + db.lock.Unlock() } return nil } -// uncache is the post-processing step of a commit operation where the already -// persisted trie is removed from the cache. The reason behind the two-phase -// commit is to ensure consistent data availability while moving from memory -// to disk. -func (db *Database) uncache(hash common.Hash) { +// cleaner is a database batch replayer that takes a batch of write operations +// and cleans up the trie database from anything written to disk. +type cleaner struct { + db *Database +} + +// Put reacts to database writes and implements dirty data uncaching. This is the +// post-processing step of a commit operation where the already persisted trie is +// removed from the dirty cache and moved into the clean cache. The reason behind +// the two-phase commit is to ensure ensure data availability while moving from +// memory to disk. +func (c *cleaner) Put(key []byte, rlp []byte) error { + hash := common.BytesToHash(key) + // If the node does not exist, we're done on this path - node, ok := db.dirties[hash] + node, ok := c.db.dirties[hash] if !ok { - return + return nil } // Node still exists, remove it from the flush-list switch hash { - case db.oldest: - db.oldest = node.flushNext - db.dirties[node.flushNext].flushPrev = common.Hash{} - case db.newest: - db.newest = node.flushPrev - db.dirties[node.flushPrev].flushNext = common.Hash{} + case c.db.oldest: + c.db.oldest = node.flushNext + c.db.dirties[node.flushNext].flushPrev = common.Hash{} + case c.db.newest: + c.db.newest = node.flushPrev + c.db.dirties[node.flushPrev].flushNext = common.Hash{} default: - db.dirties[node.flushPrev].flushNext = node.flushNext - db.dirties[node.flushNext].flushPrev = node.flushPrev + c.db.dirties[node.flushPrev].flushNext = node.flushNext + c.db.dirties[node.flushNext].flushPrev = node.flushPrev } - // Uncache the node's subtries and remove the node itself too - for _, child := range node.childs() { - db.uncache(child) + // Remove the node from the dirty cache + delete(c.db.dirties, hash) + c.db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size)) + + // Move the flushed node into the clean cache to prevent insta-reloads + if c.db.cleans != nil { + c.db.cleans.Set(string(hash[:]), rlp) } - delete(db.dirties, hash) - db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size)) + return nil +} + +func (c *cleaner) Delete(key []byte) error { + panic("Not implemented") } // Size returns the current storage size of the memory cache in front of the |