diff options
author | Péter Szilágyi <peterke@gmail.com> | 2018-11-13 00:47:34 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2018-11-15 18:22:13 +0800 |
commit | 434dd5bc0067cdf604d84426df9086015721dd36 (patch) | |
tree | 279d85e32a36b8804d60c5a4b83b444514850782 /trie/database.go | |
parent | 9a000601c6c4e4f8134caedba1957ffe28d2b659 (diff) | |
download | dexon-434dd5bc0067cdf604d84426df9086015721dd36.tar dexon-434dd5bc0067cdf604d84426df9086015721dd36.tar.gz dexon-434dd5bc0067cdf604d84426df9086015721dd36.tar.bz2 dexon-434dd5bc0067cdf604d84426df9086015721dd36.tar.lz dexon-434dd5bc0067cdf604d84426df9086015721dd36.tar.xz dexon-434dd5bc0067cdf604d84426df9086015721dd36.tar.zst dexon-434dd5bc0067cdf604d84426df9086015721dd36.zip |
cmd, core, eth, light, trie: add trie read caching layer
Diffstat (limited to 'trie/database.go')
-rw-r--r-- | trie/database.go | 205 |
1 files changed, 130 insertions, 75 deletions
diff --git a/trie/database.go b/trie/database.go index d0691b637..71190b3f3 100644 --- a/trie/database.go +++ b/trie/database.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/allegro/bigcache" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" @@ -30,6 +31,11 @@ import ( ) var ( + memcacheCleanHitMeter = metrics.NewRegisteredMeter("trie/memcache/clean/hit", nil) + memcacheCleanMissMeter = metrics.NewRegisteredMeter("trie/memcache/clean/miss", nil) + memcacheCleanReadMeter = metrics.NewRegisteredMeter("trie/memcache/clean/read", nil) + memcacheCleanWriteMeter = metrics.NewRegisteredMeter("trie/memcache/clean/write", nil) + memcacheFlushTimeTimer = metrics.NewRegisteredResettingTimer("trie/memcache/flush/time", nil) memcacheFlushNodesMeter = metrics.NewRegisteredMeter("trie/memcache/flush/nodes", nil) memcacheFlushSizeMeter = metrics.NewRegisteredMeter("trie/memcache/flush/size", nil) @@ -64,9 +70,10 @@ type DatabaseReader interface { type Database struct { diskdb ethdb.Database // Persistent storage for matured trie nodes - nodes map[common.Hash]*cachedNode // Data and references relationships of a node - oldest common.Hash // Oldest tracked node, flush-list head - newest common.Hash // Newest tracked node, flush-list tail + cleans *bigcache.BigCache // GC friendly memory cache of clean node RLPs + dirties map[common.Hash]*cachedNode // Data and references relationships of dirty nodes + oldest common.Hash // Oldest tracked node, flush-list head + newest common.Hash // Newest tracked node, flush-list tail preimages map[common.Hash][]byte // Preimages of nodes from the secure trie seckeybuf [secureKeyLength]byte // Ephemeral buffer for calculating preimage keys @@ -79,7 +86,7 @@ type Database struct { flushnodes uint64 // Nodes flushed since last commit flushsize common.StorageSize // Data storage flushed since last commit - nodesSize common.StorageSize // Storage size of the nodes cache (exc. flushlist) + dirtiesSize common.StorageSize // Storage size of the dirty node cache (exc. flushlist) preimagesSize common.StorageSize // Storage size of the preimages cache lock sync.RWMutex @@ -262,11 +269,30 @@ func expandNode(hash hashNode, n node, cachegen uint16) node { } // NewDatabase creates a new trie database to store ephemeral trie content before -// its written out to disk or garbage collected. +// its written out to disk or garbage collected. No read cache is created, so all +// data retrievals will hit the underlying disk database. func NewDatabase(diskdb ethdb.Database) *Database { + return NewDatabaseWithCache(diskdb, 0) +} + +// NewDatabaseWithCache creates a new trie database to store ephemeral trie content +// before its written out to disk or garbage collected. It also acts as a read cache +// for nodes loaded from disk. +func NewDatabaseWithCache(diskdb ethdb.Database, cache int) *Database { + var cleans *bigcache.BigCache + if cache > 0 { + cleans, _ = bigcache.NewBigCache(bigcache.Config{ + Shards: 1024, + LifeWindow: time.Hour, + MaxEntriesInWindow: cache * 1024, + MaxEntrySize: 512, + HardMaxCacheSize: cache, + }) + } return &Database{ diskdb: diskdb, - nodes: map[common.Hash]*cachedNode{{}: {}}, + cleans: cleans, + dirties: map[common.Hash]*cachedNode{{}: {}}, preimages: make(map[common.Hash][]byte), } } @@ -293,7 +319,7 @@ func (db *Database) InsertBlob(hash common.Hash, blob []byte) { // size tracking. func (db *Database) insert(hash common.Hash, blob []byte, node node) { // If the node's already cached, skip - if _, ok := db.nodes[hash]; ok { + if _, ok := db.dirties[hash]; ok { return } // Create the cached entry for this node @@ -303,19 +329,19 @@ func (db *Database) insert(hash common.Hash, blob []byte, node node) { flushPrev: db.newest, } for _, child := range entry.childs() { - if c := db.nodes[child]; c != nil { + if c := db.dirties[child]; c != nil { c.parents++ } } - db.nodes[hash] = entry + db.dirties[hash] = entry // Update the flush-list endpoints if db.oldest == (common.Hash{}) { db.oldest, db.newest = hash, hash } else { - db.nodes[db.newest].flushNext, db.newest = hash, hash + db.dirties[db.newest].flushNext, db.newest = hash, hash } - db.nodesSize += common.StorageSize(common.HashLength + entry.size) + db.dirtiesSize += common.StorageSize(common.HashLength + entry.size) } // insertPreimage writes a new trie node pre-image to the memory database if it's @@ -333,35 +359,64 @@ func (db *Database) insertPreimage(hash common.Hash, preimage []byte) { // node retrieves a cached trie node from memory, or returns nil if none can be // found in the memory cache. func (db *Database) node(hash common.Hash, cachegen uint16) node { - // Retrieve the node from cache if available + // Retrieve the node from the clean cache if available + if db.cleans != nil { + if enc, err := db.cleans.Get(string(hash[:])); err == nil && enc != nil { + memcacheCleanHitMeter.Mark(1) + memcacheCleanReadMeter.Mark(int64(len(enc))) + return mustDecodeNode(hash[:], enc, cachegen) + } + } + // Retrieve the node from the dirty cache if available db.lock.RLock() - node := db.nodes[hash] + dirty := db.dirties[hash] db.lock.RUnlock() - if node != nil { - return node.obj(hash, cachegen) + if dirty != nil { + return dirty.obj(hash, cachegen) } // Content unavailable in memory, attempt to retrieve from disk enc, err := db.diskdb.Get(hash[:]) if err != nil || enc == nil { return nil } + if db.cleans != nil { + db.cleans.Set(string(hash[:]), enc) + memcacheCleanMissMeter.Mark(1) + memcacheCleanWriteMeter.Mark(int64(len(enc))) + } return mustDecodeNode(hash[:], enc, cachegen) } // Node retrieves an encoded cached trie node from memory. If it cannot be found // cached, the method queries the persistent database for the content. func (db *Database) Node(hash common.Hash) ([]byte, error) { - // Retrieve the node from cache if available + // Retrieve the node from the clean cache if available + if db.cleans != nil { + if enc, err := db.cleans.Get(string(hash[:])); err == nil && enc != nil { + memcacheCleanHitMeter.Mark(1) + memcacheCleanReadMeter.Mark(int64(len(enc))) + return enc, nil + } + } + // Retrieve the node from the dirty cache if available db.lock.RLock() - node := db.nodes[hash] + dirty := db.dirties[hash] db.lock.RUnlock() - if node != nil { - return node.rlp(), nil + if dirty != nil { + return dirty.rlp(), nil } // Content unavailable in memory, attempt to retrieve from disk - return db.diskdb.Get(hash[:]) + enc, err := db.diskdb.Get(hash[:]) + if err == nil && enc != nil { + if db.cleans != nil { + db.cleans.Set(string(hash[:]), enc) + memcacheCleanMissMeter.Mark(1) + memcacheCleanWriteMeter.Mark(int64(len(enc))) + } + } + return enc, err } // preimage retrieves a cached trie node pre-image from memory. If it cannot be @@ -395,8 +450,8 @@ func (db *Database) Nodes() []common.Hash { db.lock.RLock() defer db.lock.RUnlock() - var hashes = make([]common.Hash, 0, len(db.nodes)) - for hash := range db.nodes { + var hashes = make([]common.Hash, 0, len(db.dirties)) + for hash := range db.dirties { if hash != (common.Hash{}) { // Special case for "root" references/nodes hashes = append(hashes, hash) } @@ -415,18 +470,18 @@ func (db *Database) Reference(child common.Hash, parent common.Hash) { // reference is the private locked version of Reference. func (db *Database) reference(child common.Hash, parent common.Hash) { // If the node does not exist, it's a node pulled from disk, skip - node, ok := db.nodes[child] + node, ok := db.dirties[child] if !ok { return } // If the reference already exists, only duplicate for roots - if db.nodes[parent].children == nil { - db.nodes[parent].children = make(map[common.Hash]uint16) - } else if _, ok = db.nodes[parent].children[child]; ok && parent != (common.Hash{}) { + if db.dirties[parent].children == nil { + db.dirties[parent].children = make(map[common.Hash]uint16) + } else if _, ok = db.dirties[parent].children[child]; ok && parent != (common.Hash{}) { return } node.parents++ - db.nodes[parent].children[child]++ + db.dirties[parent].children[child]++ } // Dereference removes an existing reference from a root node. @@ -439,25 +494,25 @@ func (db *Database) Dereference(root common.Hash) { db.lock.Lock() defer db.lock.Unlock() - nodes, storage, start := len(db.nodes), db.nodesSize, time.Now() + nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now() db.dereference(root, common.Hash{}) - db.gcnodes += uint64(nodes - len(db.nodes)) - db.gcsize += storage - db.nodesSize + db.gcnodes += uint64(nodes - len(db.dirties)) + db.gcsize += storage - db.dirtiesSize db.gctime += time.Since(start) memcacheGCTimeTimer.Update(time.Since(start)) - memcacheGCSizeMeter.Mark(int64(storage - db.nodesSize)) - memcacheGCNodesMeter.Mark(int64(nodes - len(db.nodes))) + memcacheGCSizeMeter.Mark(int64(storage - db.dirtiesSize)) + memcacheGCNodesMeter.Mark(int64(nodes - len(db.dirties))) - log.Debug("Dereferenced trie from memory database", "nodes", nodes-len(db.nodes), "size", storage-db.nodesSize, "time", time.Since(start), - "gcnodes", db.gcnodes, "gcsize", db.gcsize, "gctime", db.gctime, "livenodes", len(db.nodes), "livesize", db.nodesSize) + log.Debug("Dereferenced trie from memory database", "nodes", nodes-len(db.dirties), "size", storage-db.dirtiesSize, "time", time.Since(start), + "gcnodes", db.gcnodes, "gcsize", db.gcsize, "gctime", db.gctime, "livenodes", len(db.dirties), "livesize", db.dirtiesSize) } // dereference is the private locked version of Dereference. func (db *Database) dereference(child common.Hash, parent common.Hash) { // Dereference the parent-child - node := db.nodes[parent] + node := db.dirties[parent] if node.children != nil && node.children[child] > 0 { node.children[child]-- @@ -466,7 +521,7 @@ func (db *Database) dereference(child common.Hash, parent common.Hash) { } } // If the child does not exist, it's a previously committed node. - node, ok := db.nodes[child] + node, ok := db.dirties[child] if !ok { return } @@ -483,20 +538,20 @@ func (db *Database) dereference(child common.Hash, parent common.Hash) { switch child { case db.oldest: db.oldest = node.flushNext - db.nodes[node.flushNext].flushPrev = common.Hash{} + db.dirties[node.flushNext].flushPrev = common.Hash{} case db.newest: db.newest = node.flushPrev - db.nodes[node.flushPrev].flushNext = common.Hash{} + db.dirties[node.flushPrev].flushNext = common.Hash{} default: - db.nodes[node.flushPrev].flushNext = node.flushNext - db.nodes[node.flushNext].flushPrev = node.flushPrev + db.dirties[node.flushPrev].flushNext = node.flushNext + db.dirties[node.flushNext].flushPrev = node.flushPrev } // Dereference all children and delete the node for _, hash := range node.childs() { db.dereference(hash, child) } - delete(db.nodes, child) - db.nodesSize -= common.StorageSize(common.HashLength + int(node.size)) + delete(db.dirties, child) + db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size)) } } @@ -509,13 +564,13 @@ func (db *Database) Cap(limit common.StorageSize) error { // by only uncaching existing data when the database write finalizes. db.lock.RLock() - nodes, storage, start := len(db.nodes), db.nodesSize, time.Now() + nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now() batch := db.diskdb.NewBatch() - // db.nodesSize only contains the useful data in the cache, but when reporting + // db.dirtiesSize only contains the useful data in the cache, but when reporting // the total memory consumption, the maintenance metadata is also needed to be // counted. For every useful node, we track 2 extra hashes as the flushlist. - size := db.nodesSize + common.StorageSize((len(db.nodes)-1)*2*common.HashLength) + size := db.dirtiesSize + common.StorageSize((len(db.dirties)-1)*2*common.HashLength) // If the preimage cache got large enough, push to disk. If it's still small // leave for later to deduplicate writes. @@ -540,7 +595,7 @@ func (db *Database) Cap(limit common.StorageSize) error { oldest := db.oldest for size > limit && oldest != (common.Hash{}) { // Fetch the oldest referenced node and push into the batch - node := db.nodes[oldest] + node := db.dirties[oldest] if err := batch.Put(oldest[:], node.rlp()); err != nil { db.lock.RUnlock() return err @@ -578,25 +633,25 @@ func (db *Database) Cap(limit common.StorageSize) error { db.preimagesSize = 0 } for db.oldest != oldest { - node := db.nodes[db.oldest] - delete(db.nodes, db.oldest) + node := db.dirties[db.oldest] + delete(db.dirties, db.oldest) db.oldest = node.flushNext - db.nodesSize -= common.StorageSize(common.HashLength + int(node.size)) + db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size)) } if db.oldest != (common.Hash{}) { - db.nodes[db.oldest].flushPrev = common.Hash{} + db.dirties[db.oldest].flushPrev = common.Hash{} } - db.flushnodes += uint64(nodes - len(db.nodes)) - db.flushsize += storage - db.nodesSize + db.flushnodes += uint64(nodes - len(db.dirties)) + db.flushsize += storage - db.dirtiesSize db.flushtime += time.Since(start) memcacheFlushTimeTimer.Update(time.Since(start)) - memcacheFlushSizeMeter.Mark(int64(storage - db.nodesSize)) - memcacheFlushNodesMeter.Mark(int64(nodes - len(db.nodes))) + memcacheFlushSizeMeter.Mark(int64(storage - db.dirtiesSize)) + memcacheFlushNodesMeter.Mark(int64(nodes - len(db.dirties))) - log.Debug("Persisted nodes from memory database", "nodes", nodes-len(db.nodes), "size", storage-db.nodesSize, "time", time.Since(start), - "flushnodes", db.flushnodes, "flushsize", db.flushsize, "flushtime", db.flushtime, "livenodes", len(db.nodes), "livesize", db.nodesSize) + log.Debug("Persisted nodes from memory database", "nodes", nodes-len(db.dirties), "size", storage-db.dirtiesSize, "time", time.Since(start), + "flushnodes", db.flushnodes, "flushsize", db.flushsize, "flushtime", db.flushtime, "livenodes", len(db.dirties), "livesize", db.dirtiesSize) return nil } @@ -630,7 +685,7 @@ func (db *Database) Commit(node common.Hash, report bool) error { } } // Move the trie itself into the batch, flushing if enough data is accumulated - nodes, storage := len(db.nodes), db.nodesSize + nodes, storage := len(db.dirties), db.dirtiesSize if err := db.commit(node, batch); err != nil { log.Error("Failed to commit trie from trie database", "err", err) db.lock.RUnlock() @@ -654,15 +709,15 @@ func (db *Database) Commit(node common.Hash, report bool) error { db.uncache(node) memcacheCommitTimeTimer.Update(time.Since(start)) - memcacheCommitSizeMeter.Mark(int64(storage - db.nodesSize)) - memcacheCommitNodesMeter.Mark(int64(nodes - len(db.nodes))) + memcacheCommitSizeMeter.Mark(int64(storage - db.dirtiesSize)) + memcacheCommitNodesMeter.Mark(int64(nodes - len(db.dirties))) logger := log.Info if !report { logger = log.Debug } - logger("Persisted trie from memory database", "nodes", nodes-len(db.nodes)+int(db.flushnodes), "size", storage-db.nodesSize+db.flushsize, "time", time.Since(start)+db.flushtime, - "gcnodes", db.gcnodes, "gcsize", db.gcsize, "gctime", db.gctime, "livenodes", len(db.nodes), "livesize", db.nodesSize) + logger("Persisted trie from memory database", "nodes", nodes-len(db.dirties)+int(db.flushnodes), "size", storage-db.dirtiesSize+db.flushsize, "time", time.Since(start)+db.flushtime, + "gcnodes", db.gcnodes, "gcsize", db.gcsize, "gctime", db.gctime, "livenodes", len(db.dirties), "livesize", db.dirtiesSize) // Reset the garbage collection statistics db.gcnodes, db.gcsize, db.gctime = 0, 0, 0 @@ -674,7 +729,7 @@ func (db *Database) Commit(node common.Hash, report bool) error { // commit is the private locked version of Commit. func (db *Database) commit(hash common.Hash, batch ethdb.Batch) error { // If the node does not exist, it's a previously committed node - node, ok := db.nodes[hash] + node, ok := db.dirties[hash] if !ok { return nil } @@ -702,7 +757,7 @@ func (db *Database) commit(hash common.Hash, batch ethdb.Batch) error { // to disk. func (db *Database) uncache(hash common.Hash) { // If the node does not exist, we're done on this path - node, ok := db.nodes[hash] + node, ok := db.dirties[hash] if !ok { return } @@ -710,20 +765,20 @@ func (db *Database) uncache(hash common.Hash) { switch hash { case db.oldest: db.oldest = node.flushNext - db.nodes[node.flushNext].flushPrev = common.Hash{} + db.dirties[node.flushNext].flushPrev = common.Hash{} case db.newest: db.newest = node.flushPrev - db.nodes[node.flushPrev].flushNext = common.Hash{} + db.dirties[node.flushPrev].flushNext = common.Hash{} default: - db.nodes[node.flushPrev].flushNext = node.flushNext - db.nodes[node.flushNext].flushPrev = node.flushPrev + db.dirties[node.flushPrev].flushNext = node.flushNext + db.dirties[node.flushNext].flushPrev = node.flushPrev } // Uncache the node's subtries and remove the node itself too for _, child := range node.childs() { db.uncache(child) } - delete(db.nodes, hash) - db.nodesSize -= common.StorageSize(common.HashLength + int(node.size)) + delete(db.dirties, hash) + db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size)) } // Size returns the current storage size of the memory cache in front of the @@ -732,11 +787,11 @@ func (db *Database) Size() (common.StorageSize, common.StorageSize) { db.lock.RLock() defer db.lock.RUnlock() - // db.nodesSize only contains the useful data in the cache, but when reporting + // db.dirtiesSize only contains the useful data in the cache, but when reporting // the total memory consumption, the maintenance metadata is also needed to be // counted. For every useful node, we track 2 extra hashes as the flushlist. - var flushlistSize = common.StorageSize((len(db.nodes) - 1) * 2 * common.HashLength) - return db.nodesSize + flushlistSize, db.preimagesSize + var flushlistSize = common.StorageSize((len(db.dirties) - 1) * 2 * common.HashLength) + return db.dirtiesSize + flushlistSize, db.preimagesSize } // verifyIntegrity is a debug method to iterate over the entire trie stored in @@ -749,12 +804,12 @@ func (db *Database) verifyIntegrity() { // Iterate over all the cached nodes and accumulate them into a set reachable := map[common.Hash]struct{}{{}: {}} - for child := range db.nodes[common.Hash{}].children { + for child := range db.dirties[common.Hash{}].children { db.accumulate(child, reachable) } // Find any unreachable but cached nodes unreachable := []string{} - for hash, node := range db.nodes { + for hash, node := range db.dirties { if _, ok := reachable[hash]; !ok { unreachable = append(unreachable, fmt.Sprintf("%x: {Node: %v, Parents: %d, Prev: %x, Next: %x}", hash, node.node, node.parents, node.flushPrev, node.flushNext)) @@ -769,7 +824,7 @@ func (db *Database) verifyIntegrity() { // cached children found in memory. func (db *Database) accumulate(hash common.Hash, reachable map[common.Hash]struct{}) { // Mark the node reachable if present in the memory cache - node, ok := db.nodes[hash] + node, ok := db.dirties[hash] if !ok { return } |