diff options
author | Péter Szilágyi <peterke@gmail.com> | 2019-03-07 16:21:40 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-07 16:21:40 +0800 |
commit | 72b21db2d31d77d956c09353457a0c2db45249b0 (patch) | |
tree | c0c750bfbbc5b5d8e9622847b676c1f4d99f4020 /ethdb | |
parent | f2d63103541ee3746ff0834e7c69d188af3572d2 (diff) | |
parent | 054412e33528e53f6deae940c870217b614707b9 (diff) | |
download | go-tangerine-72b21db2d31d77d956c09353457a0c2db45249b0.tar go-tangerine-72b21db2d31d77d956c09353457a0c2db45249b0.tar.gz go-tangerine-72b21db2d31d77d956c09353457a0c2db45249b0.tar.bz2 go-tangerine-72b21db2d31d77d956c09353457a0c2db45249b0.tar.lz go-tangerine-72b21db2d31d77d956c09353457a0c2db45249b0.tar.xz go-tangerine-72b21db2d31d77d956c09353457a0c2db45249b0.tar.zst go-tangerine-72b21db2d31d77d956c09353457a0c2db45249b0.zip |
Merge pull request #19021 from karalabe/database-cleanup
all: clean up and properly abstract database accesses
Diffstat (limited to 'ethdb')
-rw-r--r-- | ethdb/.gitignore | 12 | ||||
-rw-r--r-- | ethdb/batch.go (renamed from ethdb/interface.go) | 44 | ||||
-rw-r--r-- | ethdb/database.go | 434 | ||||
-rw-r--r-- | ethdb/database_js.go | 68 | ||||
-rw-r--r-- | ethdb/database_js_test.go | 25 | ||||
-rw-r--r-- | ethdb/database_test.go | 214 | ||||
-rw-r--r-- | ethdb/iterator.go | 61 | ||||
-rw-r--r-- | ethdb/leveldb/leveldb.go | 418 | ||||
-rw-r--r-- | ethdb/memory_database.go | 143 | ||||
-rw-r--r-- | ethdb/memorydb/memorydb.go | 298 | ||||
-rw-r--r-- | ethdb/memorydb/memorydb_test.go | 100 | ||||
-rw-r--r-- | ethdb/table.go | 51 | ||||
-rw-r--r-- | ethdb/table_batch.go | 51 |
13 files changed, 962 insertions, 957 deletions
diff --git a/ethdb/.gitignore b/ethdb/.gitignore deleted file mode 100644 index f725d58d1..000000000 --- a/ethdb/.gitignore +++ /dev/null @@ -1,12 +0,0 @@ -# See http://help.github.com/ignore-files/ for more about ignoring files. -# -# If you find yourself ignoring temporary files generated by your text editor -# or operating system, you probably want to add a global ignore instead: -# git config --global core.excludesfile ~/.gitignore_global - -/tmp -*/**/*un~ -*un~ -.DS_Store -*/**/.DS_Store - diff --git a/ethdb/interface.go b/ethdb/batch.go index af1355779..a6f015821 100644 --- a/ethdb/interface.go +++ b/ethdb/batch.go @@ -1,4 +1,4 @@ -// Copyright 2014 The go-ethereum Authors +// Copyright 2018 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify @@ -16,37 +16,29 @@ package ethdb -// Code using batches should try to add this much data to the batch. -// The value was determined empirically. +// IdealBatchSize defines the size of the data batches should ideally add in one +// write. const IdealBatchSize = 100 * 1024 -// Putter wraps the database write operation supported by both batches and regular databases. -type Putter interface { - Put(key []byte, value []byte) error -} - -// Deleter wraps the database delete operation supported by both batches and regular databases. -type Deleter interface { - Delete(key []byte) error -} - -// Database wraps all database operations. All methods are safe for concurrent use. -type Database interface { - Putter - Deleter - Get(key []byte) ([]byte, error) - Has(key []byte) (bool, error) - Close() - NewBatch() Batch -} - // Batch is a write-only database that commits changes to its host database -// when Write is called. Batch cannot be used concurrently. +// when Write is called. A batch cannot be used concurrently. type Batch interface { - Putter + Writer Deleter - ValueSize() int // amount of data in the batch + + // ValueSize retrieves the amount of data queued up for writing. + ValueSize() int + + // Write flushes any accumulated data to disk. Write() error + // Reset resets the batch for reuse Reset() } + +// Batcher wraps the NewBatch method of a backing data store. +type Batcher interface { + // NewBatch creates a write-only database that buffers changes to its host db + // until a final write is called. + NewBatch() Batch +} diff --git a/ethdb/database.go b/ethdb/database.go index 17f1478e5..30208e146 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -1,4 +1,4 @@ -// Copyright 2014 The go-ethereum Authors +// Copyright 2018 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify @@ -14,372 +14,72 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. -// +build !js - +// Package database defines the interfaces for an Ethereum data store. package ethdb -import ( - "fmt" - "strconv" - "strings" - "sync" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/errors" - "github.com/syndtr/goleveldb/leveldb/filter" - "github.com/syndtr/goleveldb/leveldb/iterator" - "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/util" -) - -const ( - writePauseWarningThrottler = 1 * time.Minute -) - -var OpenFileLimit = 64 - -type LDBDatabase struct { - fn string // filename for reporting - db *leveldb.DB // LevelDB instance - - compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction - compReadMeter metrics.Meter // Meter for measuring the data read during compaction - compWriteMeter metrics.Meter // Meter for measuring the data written during compaction - writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction - writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction - diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read - diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written - - quitLock sync.Mutex // Mutex protecting the quit channel access - quitChan chan chan error // Quit channel to stop the metrics collection before closing the database - - log log.Logger // Contextual logger tracking the database path -} - -// NewLDBDatabase returns a LevelDB wrapped object. -func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) { - logger := log.New("database", file) - - // Ensure we have some minimal caching and file guarantees - if cache < 16 { - cache = 16 - } - if handles < 16 { - handles = 16 - } - logger.Info("Allocated cache and file handles", "cache", common.StorageSize(cache*1024*1024), "handles", handles) - - // Open the db and recover any potential corruptions - db, err := leveldb.OpenFile(file, &opt.Options{ - OpenFilesCacheCapacity: handles, - BlockCacheCapacity: cache / 2 * opt.MiB, - WriteBuffer: cache / 4 * opt.MiB, // Two of these are used internally - Filter: filter.NewBloomFilter(10), - }) - if _, corrupted := err.(*errors.ErrCorrupted); corrupted { - db, err = leveldb.RecoverFile(file, nil) - } - // (Re)check for errors and abort if opening of the db failed - if err != nil { - return nil, err - } - return &LDBDatabase{ - fn: file, - db: db, - log: logger, - }, nil -} - -// Path returns the path to the database directory. -func (db *LDBDatabase) Path() string { - return db.fn -} - -// Put puts the given key / value to the queue -func (db *LDBDatabase) Put(key []byte, value []byte) error { - return db.db.Put(key, value, nil) -} - -func (db *LDBDatabase) Has(key []byte) (bool, error) { - return db.db.Has(key, nil) -} - -// Get returns the given key if it's present. -func (db *LDBDatabase) Get(key []byte) ([]byte, error) { - dat, err := db.db.Get(key, nil) - if err != nil { - return nil, err - } - return dat, nil -} - -// Delete deletes the key from the queue and database -func (db *LDBDatabase) Delete(key []byte) error { - return db.db.Delete(key, nil) -} - -func (db *LDBDatabase) NewIterator() iterator.Iterator { - return db.db.NewIterator(nil, nil) -} - -// NewIteratorWithPrefix returns a iterator to iterate over subset of database content with a particular prefix. -func (db *LDBDatabase) NewIteratorWithPrefix(prefix []byte) iterator.Iterator { - return db.db.NewIterator(util.BytesPrefix(prefix), nil) -} - -func (db *LDBDatabase) Close() { - // Stop the metrics collection to avoid internal database races - db.quitLock.Lock() - defer db.quitLock.Unlock() - - if db.quitChan != nil { - errc := make(chan error) - db.quitChan <- errc - if err := <-errc; err != nil { - db.log.Error("Metrics collection failed", "err", err) - } - db.quitChan = nil - } - err := db.db.Close() - if err == nil { - db.log.Info("Database closed") - } else { - db.log.Error("Failed to close database", "err", err) - } -} - -func (db *LDBDatabase) LDB() *leveldb.DB { - return db.db -} - -// Meter configures the database metrics collectors and -func (db *LDBDatabase) Meter(prefix string) { - // Initialize all the metrics collector at the requested prefix - db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil) - db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil) - db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil) - db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil) - db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil) - db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil) - db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil) - - // Create a quit channel for the periodic collector and run it - db.quitLock.Lock() - db.quitChan = make(chan chan error) - db.quitLock.Unlock() - - go db.meter(3 * time.Second) -} - -// meter periodically retrieves internal leveldb counters and reports them to -// the metrics subsystem. -// -// This is how a stats table look like (currently): -// Compactions -// Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB) -// -------+------------+---------------+---------------+---------------+--------------- -// 0 | 0 | 0.00000 | 1.27969 | 0.00000 | 12.31098 -// 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294 -// 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884 -// 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000 -// -// This is how the write delay look like (currently): -// DelayN:5 Delay:406.604657ms Paused: false -// -// This is how the iostats look like (currently): -// Read(MB):3895.04860 Write(MB):3654.64712 -func (db *LDBDatabase) meter(refresh time.Duration) { - // Create the counters to store current and previous compaction values - compactions := make([][]float64, 2) - for i := 0; i < 2; i++ { - compactions[i] = make([]float64, 3) - } - // Create storage for iostats. - var iostats [2]float64 - - // Create storage and warning log tracer for write delay. - var ( - delaystats [2]int64 - lastWritePaused time.Time - ) - - var ( - errc chan error - merr error - ) - - // Iterate ad infinitum and collect the stats - for i := 1; errc == nil && merr == nil; i++ { - // Retrieve the database stats - stats, err := db.db.GetProperty("leveldb.stats") - if err != nil { - db.log.Error("Failed to read database stats", "err", err) - merr = err - continue - } - // Find the compaction table, skip the header - lines := strings.Split(stats, "\n") - for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" { - lines = lines[1:] - } - if len(lines) <= 3 { - db.log.Error("Compaction table not found") - merr = errors.New("compaction table not found") - continue - } - lines = lines[3:] - - // Iterate over all the table rows, and accumulate the entries - for j := 0; j < len(compactions[i%2]); j++ { - compactions[i%2][j] = 0 - } - for _, line := range lines { - parts := strings.Split(line, "|") - if len(parts) != 6 { - break - } - for idx, counter := range parts[3:] { - value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64) - if err != nil { - db.log.Error("Compaction entry parsing failed", "err", err) - merr = err - continue - } - compactions[i%2][idx] += value - } - } - // Update all the requested meters - if db.compTimeMeter != nil { - db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000)) - } - if db.compReadMeter != nil { - db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024)) - } - if db.compWriteMeter != nil { - db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024)) - } - - // Retrieve the write delay statistic - writedelay, err := db.db.GetProperty("leveldb.writedelay") - if err != nil { - db.log.Error("Failed to read database write delay statistic", "err", err) - merr = err - continue - } - var ( - delayN int64 - delayDuration string - duration time.Duration - paused bool - ) - if n, err := fmt.Sscanf(writedelay, "DelayN:%d Delay:%s Paused:%t", &delayN, &delayDuration, &paused); n != 3 || err != nil { - db.log.Error("Write delay statistic not found") - merr = err - continue - } - duration, err = time.ParseDuration(delayDuration) - if err != nil { - db.log.Error("Failed to parse delay duration", "err", err) - merr = err - continue - } - if db.writeDelayNMeter != nil { - db.writeDelayNMeter.Mark(delayN - delaystats[0]) - } - if db.writeDelayMeter != nil { - db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1]) - } - // If a warning that db is performing compaction has been displayed, any subsequent - // warnings will be withheld for one minute not to overwhelm the user. - if paused && delayN-delaystats[0] == 0 && duration.Nanoseconds()-delaystats[1] == 0 && - time.Now().After(lastWritePaused.Add(writePauseWarningThrottler)) { - db.log.Warn("Database compacting, degraded performance") - lastWritePaused = time.Now() - } - delaystats[0], delaystats[1] = delayN, duration.Nanoseconds() - - // Retrieve the database iostats. - ioStats, err := db.db.GetProperty("leveldb.iostats") - if err != nil { - db.log.Error("Failed to read database iostats", "err", err) - merr = err - continue - } - var nRead, nWrite float64 - parts := strings.Split(ioStats, " ") - if len(parts) < 2 { - db.log.Error("Bad syntax of ioStats", "ioStats", ioStats) - merr = fmt.Errorf("bad syntax of ioStats %s", ioStats) - continue - } - if n, err := fmt.Sscanf(parts[0], "Read(MB):%f", &nRead); n != 1 || err != nil { - db.log.Error("Bad syntax of read entry", "entry", parts[0]) - merr = err - continue - } - if n, err := fmt.Sscanf(parts[1], "Write(MB):%f", &nWrite); n != 1 || err != nil { - db.log.Error("Bad syntax of write entry", "entry", parts[1]) - merr = err - continue - } - if db.diskReadMeter != nil { - db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024)) - } - if db.diskWriteMeter != nil { - db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024)) - } - iostats[0], iostats[1] = nRead, nWrite - - // Sleep a bit, then repeat the stats collection - select { - case errc = <-db.quitChan: - // Quit requesting, stop hammering the database - case <-time.After(refresh): - // Timeout, gather a new set of stats - } - } - - if errc == nil { - errc = <-db.quitChan - } - errc <- merr -} - -func (db *LDBDatabase) NewBatch() Batch { - return &ldbBatch{db: db.db, b: new(leveldb.Batch)} -} - -type ldbBatch struct { - db *leveldb.DB - b *leveldb.Batch - size int -} - -func (b *ldbBatch) Put(key, value []byte) error { - b.b.Put(key, value) - b.size += len(value) - return nil -} - -func (b *ldbBatch) Delete(key []byte) error { - b.b.Delete(key) - b.size += 1 - return nil -} - -func (b *ldbBatch) Write() error { - return b.db.Write(b.b, nil) -} - -func (b *ldbBatch) ValueSize() int { - return b.size -} - -func (b *ldbBatch) Reset() { - b.b.Reset() - b.size = 0 +import "io" + +// Reader wraps the Has and Get method of a backing data store. +type Reader interface { + // Has retrieves if a key is present in the key-value data store. + Has(key []byte) (bool, error) + + // Get retrieves the given key if it's present in the key-value data store. + Get(key []byte) ([]byte, error) +} + +// Writer wraps the Put method of a backing data store. +type Writer interface { + // Put inserts the given value into the key-value data store. + Put(key []byte, value []byte) error +} + +// Deleter wraps the Delete method of a backing data store. +type Deleter interface { + // Delete removes the key from the key-value data store. + Delete(key []byte) error +} + +// Stater wraps the Stat method of a backing data store. +type Stater interface { + // Stat returns a particular internal stat of the database. + Stat(property string) (string, error) +} + +// Compacter wraps the Compact method of a backing data store. +type Compacter interface { + // Compact flattens the underlying data store for the given key range. In essence, + // deleted and overwritten versions are discarded, and the data is rearranged to + // reduce the cost of operations needed to access them. + // + // A nil start is treated as a key before all keys in the data store; a nil limit + // is treated as a key after all keys in the data store. If both is nil then it + // will compact entire data store. + Compact(start []byte, limit []byte) error +} + +// KeyValueStore contains all the methods required to allow handling different +// key-value data stores backing the high level database. +type KeyValueStore interface { + Reader + Writer + Deleter + Batcher + Iteratee + Stater + Compacter + io.Closer +} + +// Database contains all the methods required by the high level database to not +// only access the key-value data store but also the chain freezer. +type Database interface { + Reader + Writer + Deleter + Batcher + Iteratee + Stater + Compacter + io.Closer } diff --git a/ethdb/database_js.go b/ethdb/database_js.go deleted file mode 100644 index ba6eeb5a2..000000000 --- a/ethdb/database_js.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2014 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// +build js - -package ethdb - -import ( - "errors" -) - -var errNotSupported = errors.New("ethdb: not supported") - -type LDBDatabase struct { -} - -// NewLDBDatabase returns a LevelDB wrapped object. -func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) { - return nil, errNotSupported -} - -// Path returns the path to the database directory. -func (db *LDBDatabase) Path() string { - return "" -} - -// Put puts the given key / value to the queue -func (db *LDBDatabase) Put(key []byte, value []byte) error { - return errNotSupported -} - -func (db *LDBDatabase) Has(key []byte) (bool, error) { - return false, errNotSupported -} - -// Get returns the given key if it's present. -func (db *LDBDatabase) Get(key []byte) ([]byte, error) { - return nil, errNotSupported -} - -// Delete deletes the key from the queue and database -func (db *LDBDatabase) Delete(key []byte) error { - return errNotSupported -} - -func (db *LDBDatabase) Close() { -} - -// Meter configures the database metrics collectors and -func (db *LDBDatabase) Meter(prefix string) { -} - -func (db *LDBDatabase) NewBatch() Batch { - return nil -} diff --git a/ethdb/database_js_test.go b/ethdb/database_js_test.go deleted file mode 100644 index b4c12ae0b..000000000 --- a/ethdb/database_js_test.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2014 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// +build js - -package ethdb_test - -import ( - "github.com/ethereum/go-ethereum/ethdb" -) - -var _ ethdb.Database = ðdb.LDBDatabase{} diff --git a/ethdb/database_test.go b/ethdb/database_test.go deleted file mode 100644 index 382fedbf9..000000000 --- a/ethdb/database_test.go +++ /dev/null @@ -1,214 +0,0 @@ -// Copyright 2014 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// +build !js - -package ethdb_test - -import ( - "bytes" - "fmt" - "io/ioutil" - "os" - "strconv" - "sync" - "testing" - - "github.com/ethereum/go-ethereum/ethdb" -) - -func newTestLDB() (*ethdb.LDBDatabase, func()) { - dirname, err := ioutil.TempDir(os.TempDir(), "ethdb_test_") - if err != nil { - panic("failed to create test file: " + err.Error()) - } - db, err := ethdb.NewLDBDatabase(dirname, 0, 0) - if err != nil { - panic("failed to create test database: " + err.Error()) - } - - return db, func() { - db.Close() - os.RemoveAll(dirname) - } -} - -var test_values = []string{"", "a", "1251", "\x00123\x00"} - -func TestLDB_PutGet(t *testing.T) { - db, remove := newTestLDB() - defer remove() - testPutGet(db, t) -} - -func TestMemoryDB_PutGet(t *testing.T) { - testPutGet(ethdb.NewMemDatabase(), t) -} - -func testPutGet(db ethdb.Database, t *testing.T) { - t.Parallel() - - for _, k := range test_values { - err := db.Put([]byte(k), nil) - if err != nil { - t.Fatalf("put failed: %v", err) - } - } - - for _, k := range test_values { - data, err := db.Get([]byte(k)) - if err != nil { - t.Fatalf("get failed: %v", err) - } - if len(data) != 0 { - t.Fatalf("get returned wrong result, got %q expected nil", string(data)) - } - } - - _, err := db.Get([]byte("non-exist-key")) - if err == nil { - t.Fatalf("expect to return a not found error") - } - - for _, v := range test_values { - err := db.Put([]byte(v), []byte(v)) - if err != nil { - t.Fatalf("put failed: %v", err) - } - } - - for _, v := range test_values { - data, err := db.Get([]byte(v)) - if err != nil { - t.Fatalf("get failed: %v", err) - } - if !bytes.Equal(data, []byte(v)) { - t.Fatalf("get returned wrong result, got %q expected %q", string(data), v) - } - } - - for _, v := range test_values { - err := db.Put([]byte(v), []byte("?")) - if err != nil { - t.Fatalf("put override failed: %v", err) - } - } - - for _, v := range test_values { - data, err := db.Get([]byte(v)) - if err != nil { - t.Fatalf("get failed: %v", err) - } - if !bytes.Equal(data, []byte("?")) { - t.Fatalf("get returned wrong result, got %q expected ?", string(data)) - } - } - - for _, v := range test_values { - orig, err := db.Get([]byte(v)) - if err != nil { - t.Fatalf("get failed: %v", err) - } - orig[0] = byte(0xff) - data, err := db.Get([]byte(v)) - if err != nil { - t.Fatalf("get failed: %v", err) - } - if !bytes.Equal(data, []byte("?")) { - t.Fatalf("get returned wrong result, got %q expected ?", string(data)) - } - } - - for _, v := range test_values { - err := db.Delete([]byte(v)) - if err != nil { - t.Fatalf("delete %q failed: %v", v, err) - } - } - - for _, v := range test_values { - _, err := db.Get([]byte(v)) - if err == nil { - t.Fatalf("got deleted value %q", v) - } - } -} - -func TestLDB_ParallelPutGet(t *testing.T) { - db, remove := newTestLDB() - defer remove() - testParallelPutGet(db, t) -} - -func TestMemoryDB_ParallelPutGet(t *testing.T) { - testParallelPutGet(ethdb.NewMemDatabase(), t) -} - -func testParallelPutGet(db ethdb.Database, t *testing.T) { - const n = 8 - var pending sync.WaitGroup - - pending.Add(n) - for i := 0; i < n; i++ { - go func(key string) { - defer pending.Done() - err := db.Put([]byte(key), []byte("v"+key)) - if err != nil { - panic("put failed: " + err.Error()) - } - }(strconv.Itoa(i)) - } - pending.Wait() - - pending.Add(n) - for i := 0; i < n; i++ { - go func(key string) { - defer pending.Done() - data, err := db.Get([]byte(key)) - if err != nil { - panic("get failed: " + err.Error()) - } - if !bytes.Equal(data, []byte("v"+key)) { - panic(fmt.Sprintf("get failed, got %q expected %q", []byte(data), []byte("v"+key))) - } - }(strconv.Itoa(i)) - } - pending.Wait() - - pending.Add(n) - for i := 0; i < n; i++ { - go func(key string) { - defer pending.Done() - err := db.Delete([]byte(key)) - if err != nil { - panic("delete failed: " + err.Error()) - } - }(strconv.Itoa(i)) - } - pending.Wait() - - pending.Add(n) - for i := 0; i < n; i++ { - go func(key string) { - defer pending.Done() - _, err := db.Get([]byte(key)) - if err == nil { - panic("get succeeded") - } - }(strconv.Itoa(i)) - } - pending.Wait() -} diff --git a/ethdb/iterator.go b/ethdb/iterator.go new file mode 100644 index 000000000..f3cee7ec9 --- /dev/null +++ b/ethdb/iterator.go @@ -0,0 +1,61 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package ethdb + +// Iterator iterates over a database's key/value pairs in ascending key order. +// +// When it encounters an error any seek will return false and will yield no key/ +// value pairs. The error can be queried by calling the Error method. Calling +// Release is still necessary. +// +// An iterator must be released after use, but it is not necessary to read an +// iterator until exhaustion. An iterator is not safe for concurrent use, but it +// is safe to use multiple iterators concurrently. +type Iterator interface { + // Next moves the iterator to the next key/value pair. It returns whether the + // iterator is exhausted. + Next() bool + + // Error returns any accumulated error. Exhausting all the key/value pairs + // is not considered to be an error. + Error() error + + // Key returns the key of the current key/value pair, or nil if done. The caller + // should not modify the contents of the returned slice, and its contents may + // change on the next call to Next. + Key() []byte + + // Value returns the value of the current key/value pair, or nil if done. The + // caller should not modify the contents of the returned slice, and its contents + // may change on the next call to Next. + Value() []byte + + // Release releases associated resources. Release should always succeed and can + // be called multiple times without causing error. + Release() +} + +// Iteratee wraps the NewIterator methods of a backing data store. +type Iteratee interface { + // NewIterator creates a binary-alphabetical iterator over the entire keyspace + // contained within the key-value database. + NewIterator() Iterator + + // NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset + // of database content with a particular key prefix. + NewIteratorWithPrefix(prefix []byte) Iterator +} diff --git a/ethdb/leveldb/leveldb.go b/ethdb/leveldb/leveldb.go new file mode 100644 index 000000000..11bba6b08 --- /dev/null +++ b/ethdb/leveldb/leveldb.go @@ -0,0 +1,418 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// +build !js + +// Package leveldb implements the key-value database layer based on LevelDB. +package leveldb + +import ( + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/filter" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/util" +) + +const ( + // leveldbDegradationWarnInterval specifies how often warning should be printed + // if the leveldb database cannot keep up with requested writes. + leveldbDegradationWarnInterval = time.Minute + + // leveldbMinCache is the minimum amount of memory in megabytes to allocate to + // leveldb read and write caching, split half and half. + leveldbMinCache = 16 + + // leveldbMinHandles is the minimum number of files handles to allocate to the + // open database files. + leveldbMinHandles = 16 + + // metricsGatheringInterval specifies the interval to retrieve leveldb database + // compaction, io and pause stats to report to the user. + metricsGatheringInterval = 3 * time.Second +) + +// LevelDBDatabase is a persistent key-value store. Apart from basic data storage +// functionality it also supports batch writes and iterating over the keyspace in +// binary-alphabetical order. +type LevelDBDatabase struct { + fn string // filename for reporting + db *leveldb.DB // LevelDB instance + + compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction + compReadMeter metrics.Meter // Meter for measuring the data read during compaction + compWriteMeter metrics.Meter // Meter for measuring the data written during compaction + writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction + writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction + diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read + diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written + + quitLock sync.Mutex // Mutex protecting the quit channel access + quitChan chan chan error // Quit channel to stop the metrics collection before closing the database + + log log.Logger // Contextual logger tracking the database path +} + +// New returns a wrapped LevelDB object. The namespace is the prefix that the +// metrics reporting should use for surfacing internal stats. +func New(file string, cache int, handles int, namespace string) (*LevelDBDatabase, error) { + // Ensure we have some minimal caching and file guarantees + if cache < leveldbMinCache { + cache = leveldbMinCache + } + if handles < leveldbMinHandles { + handles = leveldbMinHandles + } + logger := log.New("database", file) + logger.Info("Allocated cache and file handles", "cache", common.StorageSize(cache*1024*1024), "handles", handles) + + // Open the db and recover any potential corruptions + db, err := leveldb.OpenFile(file, &opt.Options{ + OpenFilesCacheCapacity: handles, + BlockCacheCapacity: cache / 2 * opt.MiB, + WriteBuffer: cache / 4 * opt.MiB, // Two of these are used internally + Filter: filter.NewBloomFilter(10), + }) + if _, corrupted := err.(*errors.ErrCorrupted); corrupted { + db, err = leveldb.RecoverFile(file, nil) + } + if err != nil { + return nil, err + } + // Assemble the wrapper with all the registered metrics + ldb := &LevelDBDatabase{ + fn: file, + db: db, + log: logger, + quitChan: make(chan chan error), + } + ldb.compTimeMeter = metrics.NewRegisteredMeter(namespace+"compact/time", nil) + ldb.compReadMeter = metrics.NewRegisteredMeter(namespace+"compact/input", nil) + ldb.compWriteMeter = metrics.NewRegisteredMeter(namespace+"compact/output", nil) + ldb.diskReadMeter = metrics.NewRegisteredMeter(namespace+"disk/read", nil) + ldb.diskWriteMeter = metrics.NewRegisteredMeter(namespace+"disk/write", nil) + ldb.writeDelayMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/duration", nil) + ldb.writeDelayNMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/counter", nil) + + // Start up the metrics gathering and return + go ldb.meter(metricsGatheringInterval) + return ldb, nil +} + +// Close stops the metrics collection, flushes any pending data to disk and closes +// all io accesses to the underlying key-value store. +func (db *LevelDBDatabase) Close() error { + db.quitLock.Lock() + defer db.quitLock.Unlock() + + if db.quitChan != nil { + errc := make(chan error) + db.quitChan <- errc + if err := <-errc; err != nil { + db.log.Error("Metrics collection failed", "err", err) + } + db.quitChan = nil + } + return db.db.Close() +} + +// Has retrieves if a key is present in the key-value store. +func (db *LevelDBDatabase) Has(key []byte) (bool, error) { + return db.db.Has(key, nil) +} + +// Get retrieves the given key if it's present in the key-value store. +func (db *LevelDBDatabase) Get(key []byte) ([]byte, error) { + dat, err := db.db.Get(key, nil) + if err != nil { + return nil, err + } + return dat, nil +} + +// Put inserts the given value into the key-value store. +func (db *LevelDBDatabase) Put(key []byte, value []byte) error { + return db.db.Put(key, value, nil) +} + +// Delete removes the key from the key-value store. +func (db *LevelDBDatabase) Delete(key []byte) error { + return db.db.Delete(key, nil) +} + +// NewBatch creates a write-only key-value store that buffers changes to its host +// database until a final write is called. +func (db *LevelDBDatabase) NewBatch() ethdb.Batch { + return &levelDBBatch{ + db: db.db, + b: new(leveldb.Batch), + } +} + +// NewIterator creates a binary-alphabetical iterator over the entire keyspace +// contained within the leveldb database. +func (db *LevelDBDatabase) NewIterator() ethdb.Iterator { + return db.NewIteratorWithPrefix(nil) +} + +// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset +// of database content with a particular key prefix. +func (db *LevelDBDatabase) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator { + return db.db.NewIterator(util.BytesPrefix(prefix), nil) +} + +// Stat returns a particular internal stat of the database. +func (db *LevelDBDatabase) Stat(property string) (string, error) { + return db.db.GetProperty(property) +} + +// Compact flattens the underlying data store for the given key range. In essence, +// deleted and overwritten versions are discarded, and the data is rearranged to +// reduce the cost of operations needed to access them. +// +// A nil start is treated as a key before all keys in the data store; a nil limit +// is treated as a key after all keys in the data store. If both is nil then it +// will compact entire data store. +func (db *LevelDBDatabase) Compact(start []byte, limit []byte) error { + return db.db.CompactRange(util.Range{Start: start, Limit: limit}) +} + +// Path returns the path to the database directory. +func (db *LevelDBDatabase) Path() string { + return db.fn +} + +// meter periodically retrieves internal leveldb counters and reports them to +// the metrics subsystem. +// +// This is how a LevelDB stats table looks like (currently): +// Compactions +// Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB) +// -------+------------+---------------+---------------+---------------+--------------- +// 0 | 0 | 0.00000 | 1.27969 | 0.00000 | 12.31098 +// 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294 +// 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884 +// 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000 +// +// This is how the write delay look like (currently): +// DelayN:5 Delay:406.604657ms Paused: false +// +// This is how the iostats look like (currently): +// Read(MB):3895.04860 Write(MB):3654.64712 +func (db *LevelDBDatabase) meter(refresh time.Duration) { + // Create the counters to store current and previous compaction values + compactions := make([][]float64, 2) + for i := 0; i < 2; i++ { + compactions[i] = make([]float64, 3) + } + // Create storage for iostats. + var iostats [2]float64 + + // Create storage and warning log tracer for write delay. + var ( + delaystats [2]int64 + lastWritePaused time.Time + ) + + var ( + errc chan error + merr error + ) + + // Iterate ad infinitum and collect the stats + for i := 1; errc == nil && merr == nil; i++ { + // Retrieve the database stats + stats, err := db.db.GetProperty("leveldb.stats") + if err != nil { + db.log.Error("Failed to read database stats", "err", err) + merr = err + continue + } + // Find the compaction table, skip the header + lines := strings.Split(stats, "\n") + for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" { + lines = lines[1:] + } + if len(lines) <= 3 { + db.log.Error("Compaction leveldbTable not found") + merr = errors.New("compaction leveldbTable not found") + continue + } + lines = lines[3:] + + // Iterate over all the leveldbTable rows, and accumulate the entries + for j := 0; j < len(compactions[i%2]); j++ { + compactions[i%2][j] = 0 + } + for _, line := range lines { + parts := strings.Split(line, "|") + if len(parts) != 6 { + break + } + for idx, counter := range parts[3:] { + value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64) + if err != nil { + db.log.Error("Compaction entry parsing failed", "err", err) + merr = err + continue + } + compactions[i%2][idx] += value + } + } + // Update all the requested meters + if db.compTimeMeter != nil { + db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000)) + } + if db.compReadMeter != nil { + db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024)) + } + if db.compWriteMeter != nil { + db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024)) + } + + // Retrieve the write delay statistic + writedelay, err := db.db.GetProperty("leveldb.writedelay") + if err != nil { + db.log.Error("Failed to read database write delay statistic", "err", err) + merr = err + continue + } + var ( + delayN int64 + delayDuration string + duration time.Duration + paused bool + ) + if n, err := fmt.Sscanf(writedelay, "DelayN:%d Delay:%s Paused:%t", &delayN, &delayDuration, &paused); n != 3 || err != nil { + db.log.Error("Write delay statistic not found") + merr = err + continue + } + duration, err = time.ParseDuration(delayDuration) + if err != nil { + db.log.Error("Failed to parse delay duration", "err", err) + merr = err + continue + } + if db.writeDelayNMeter != nil { + db.writeDelayNMeter.Mark(delayN - delaystats[0]) + } + if db.writeDelayMeter != nil { + db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1]) + } + // If a warning that db is performing compaction has been displayed, any subsequent + // warnings will be withheld for one minute not to overwhelm the user. + if paused && delayN-delaystats[0] == 0 && duration.Nanoseconds()-delaystats[1] == 0 && + time.Now().After(lastWritePaused.Add(leveldbDegradationWarnInterval)) { + db.log.Warn("Database compacting, degraded performance") + lastWritePaused = time.Now() + } + delaystats[0], delaystats[1] = delayN, duration.Nanoseconds() + + // Retrieve the database iostats. + ioStats, err := db.db.GetProperty("leveldb.iostats") + if err != nil { + db.log.Error("Failed to read database iostats", "err", err) + merr = err + continue + } + var nRead, nWrite float64 + parts := strings.Split(ioStats, " ") + if len(parts) < 2 { + db.log.Error("Bad syntax of ioStats", "ioStats", ioStats) + merr = fmt.Errorf("bad syntax of ioStats %s", ioStats) + continue + } + if n, err := fmt.Sscanf(parts[0], "Read(MB):%f", &nRead); n != 1 || err != nil { + db.log.Error("Bad syntax of read entry", "entry", parts[0]) + merr = err + continue + } + if n, err := fmt.Sscanf(parts[1], "Write(MB):%f", &nWrite); n != 1 || err != nil { + db.log.Error("Bad syntax of write entry", "entry", parts[1]) + merr = err + continue + } + if db.diskReadMeter != nil { + db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024)) + } + if db.diskWriteMeter != nil { + db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024)) + } + iostats[0], iostats[1] = nRead, nWrite + + // Sleep a bit, then repeat the stats collection + select { + case errc = <-db.quitChan: + // Quit requesting, stop hammering the database + case <-time.After(refresh): + // Timeout, gather a new set of stats + } + } + + if errc == nil { + errc = <-db.quitChan + } + errc <- merr +} + +// levelDBBatch is a write-only leveldb batch that commits changes to its host +// database when Write is called. A batch cannot be used concurrently. +type levelDBBatch struct { + db *leveldb.DB + b *leveldb.Batch + size int +} + +// Put inserts the given value into the batch for later committing. +func (b *levelDBBatch) Put(key, value []byte) error { + b.b.Put(key, value) + b.size += len(value) + return nil +} + +// Delete inserts the a key removal into the batch for later committing. +func (b *levelDBBatch) Delete(key []byte) error { + b.b.Delete(key) + b.size += 1 + return nil +} + +// ValueSize retrieves the amount of data queued up for writing. +func (b *levelDBBatch) ValueSize() int { + return b.size +} + +// Write flushes any accumulated data to disk. +func (b *levelDBBatch) Write() error { + return b.db.Write(b.b, nil) +} + +// Reset resets the batch for reuse. +func (b *levelDBBatch) Reset() { + b.b.Reset() + b.size = 0 +} diff --git a/ethdb/memory_database.go b/ethdb/memory_database.go deleted file mode 100644 index 727f2f7ca..000000000 --- a/ethdb/memory_database.go +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright 2014 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package ethdb - -import ( - "errors" - "sync" - - "github.com/ethereum/go-ethereum/common" -) - -/* - * This is a test memory database. Do not use for any production it does not get persisted - */ -type MemDatabase struct { - db map[string][]byte - lock sync.RWMutex -} - -func NewMemDatabase() *MemDatabase { - return &MemDatabase{ - db: make(map[string][]byte), - } -} - -func NewMemDatabaseWithCap(size int) *MemDatabase { - return &MemDatabase{ - db: make(map[string][]byte, size), - } -} - -func (db *MemDatabase) Put(key []byte, value []byte) error { - db.lock.Lock() - defer db.lock.Unlock() - - db.db[string(key)] = common.CopyBytes(value) - return nil -} - -func (db *MemDatabase) Has(key []byte) (bool, error) { - db.lock.RLock() - defer db.lock.RUnlock() - - _, ok := db.db[string(key)] - return ok, nil -} - -func (db *MemDatabase) Get(key []byte) ([]byte, error) { - db.lock.RLock() - defer db.lock.RUnlock() - - if entry, ok := db.db[string(key)]; ok { - return common.CopyBytes(entry), nil - } - return nil, errors.New("not found") -} - -func (db *MemDatabase) Keys() [][]byte { - db.lock.RLock() - defer db.lock.RUnlock() - - keys := [][]byte{} - for key := range db.db { - keys = append(keys, []byte(key)) - } - return keys -} - -func (db *MemDatabase) Delete(key []byte) error { - db.lock.Lock() - defer db.lock.Unlock() - - delete(db.db, string(key)) - return nil -} - -func (db *MemDatabase) Close() {} - -func (db *MemDatabase) NewBatch() Batch { - return &memBatch{db: db} -} - -func (db *MemDatabase) Len() int { return len(db.db) } - -type kv struct { - k, v []byte - del bool -} - -type memBatch struct { - db *MemDatabase - writes []kv - size int -} - -func (b *memBatch) Put(key, value []byte) error { - b.writes = append(b.writes, kv{common.CopyBytes(key), common.CopyBytes(value), false}) - b.size += len(value) - return nil -} - -func (b *memBatch) Delete(key []byte) error { - b.writes = append(b.writes, kv{common.CopyBytes(key), nil, true}) - b.size += 1 - return nil -} - -func (b *memBatch) Write() error { - b.db.lock.Lock() - defer b.db.lock.Unlock() - - for _, kv := range b.writes { - if kv.del { - delete(b.db.db, string(kv.k)) - continue - } - b.db.db[string(kv.k)] = kv.v - } - return nil -} - -func (b *memBatch) ValueSize() int { - return b.size -} - -func (b *memBatch) Reset() { - b.writes = b.writes[:0] - b.size = 0 -} diff --git a/ethdb/memorydb/memorydb.go b/ethdb/memorydb/memorydb.go new file mode 100644 index 000000000..9c6bd48be --- /dev/null +++ b/ethdb/memorydb/memorydb.go @@ -0,0 +1,298 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package memorydb implements the key-value database layer based on memory maps. +package memorydb + +import ( + "errors" + "sort" + "strings" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" +) + +var ( + // errMemorydbClosed is returned if a memory database was already closed at the + // invocation of a data access operation. + errMemorydbClosed = errors.New("database closed") + + // errMemorydbNotFound is returned if a key is requested that is not found in + // the provided memory database. + errMemorydbNotFound = errors.New("not found") +) + +// MemoryDatabase is an ephemeral key-value store. Apart from basic data storage +// functionality it also supports batch writes and iterating over the keyspace in +// binary-alphabetical order. +type MemoryDatabase struct { + db map[string][]byte + lock sync.RWMutex +} + +// New returns a wrapped map with all the required database interface methods +// implemented. +func New() *MemoryDatabase { + return &MemoryDatabase{ + db: make(map[string][]byte), + } +} + +// NewWithCap returns a wrapped map pre-allocated to the provided capcity with +// all the required database interface methods implemented. +func NewWithCap(size int) *MemoryDatabase { + return &MemoryDatabase{ + db: make(map[string][]byte, size), + } +} + +// Close deallocates the internal map and ensures any consecutive data access op +// failes with an error. +func (db *MemoryDatabase) Close() error { + db.lock.Lock() + defer db.lock.Unlock() + + db.db = nil + return nil +} + +// Has retrieves if a key is present in the key-value store. +func (db *MemoryDatabase) Has(key []byte) (bool, error) { + db.lock.RLock() + defer db.lock.RUnlock() + + if db.db == nil { + return false, errMemorydbClosed + } + _, ok := db.db[string(key)] + return ok, nil +} + +// Get retrieves the given key if it's present in the key-value store. +func (db *MemoryDatabase) Get(key []byte) ([]byte, error) { + db.lock.RLock() + defer db.lock.RUnlock() + + if db.db == nil { + return nil, errMemorydbClosed + } + if entry, ok := db.db[string(key)]; ok { + return common.CopyBytes(entry), nil + } + return nil, errMemorydbNotFound +} + +// Put inserts the given value into the key-value store. +func (db *MemoryDatabase) Put(key []byte, value []byte) error { + db.lock.Lock() + defer db.lock.Unlock() + + if db.db == nil { + return errMemorydbClosed + } + db.db[string(key)] = common.CopyBytes(value) + return nil +} + +// Delete removes the key from the key-value store. +func (db *MemoryDatabase) Delete(key []byte) error { + db.lock.Lock() + defer db.lock.Unlock() + + if db.db == nil { + return errMemorydbClosed + } + delete(db.db, string(key)) + return nil +} + +// NewBatch creates a write-only key-value store that buffers changes to its host +// database until a final write is called. +func (db *MemoryDatabase) NewBatch() ethdb.Batch { + return &memoryBatch{ + db: db, + } +} + +// NewIterator creates a binary-alphabetical iterator over the entire keyspace +// contained within the memory database. +func (db *MemoryDatabase) NewIterator() ethdb.Iterator { + return db.NewIteratorWithPrefix(nil) +} + +// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset +// of database content with a particular key prefix. +func (db *MemoryDatabase) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator { + db.lock.RLock() + defer db.lock.RUnlock() + + var ( + pr = string(prefix) + keys = make([]string, 0, len(db.db)) + values = make([][]byte, 0, len(db.db)) + ) + // Collect the keys from the memory database corresponding to the given prefix + for key := range db.db { + if strings.HasPrefix(key, pr) { + keys = append(keys, key) + } + } + // Sort the items and retrieve the associated values + sort.Strings(keys) + for _, key := range keys { + values = append(values, db.db[key]) + } + return &memoryIterator{ + keys: keys, + values: values, + } +} + +// Stat returns a particular internal stat of the database. +func (db *MemoryDatabase) Stat(property string) (string, error) { + return "", errors.New("unknown property") +} + +// Compact is not supported on a memory database. +func (db *MemoryDatabase) Compact(start []byte, limit []byte) error { + return errors.New("unsupported operation") +} + +// Len returns the number of entries currently present in the memory database. +// +// Note, this method is only used for testing (i.e. not public in general) and +// does not have explicit checks for closed-ness to allow simpler testing code. +func (db *MemoryDatabase) Len() int { + db.lock.RLock() + defer db.lock.RUnlock() + + return len(db.db) +} + +// keyvalue is a key-value tuple tagged with a deletion field to allow creating +// memory-database write batches. +type keyvalue struct { + key []byte + value []byte + delete bool +} + +// memoryBatch is a write-only memory batch that commits changes to its host +// database when Write is called. A batch cannot be used concurrently. +type memoryBatch struct { + db *MemoryDatabase + writes []keyvalue + size int +} + +// Put inserts the given value into the batch for later committing. +func (b *memoryBatch) Put(key, value []byte) error { + b.writes = append(b.writes, keyvalue{common.CopyBytes(key), common.CopyBytes(value), false}) + b.size += len(value) + return nil +} + +// Delete inserts the a key removal into the batch for later committing. +func (b *memoryBatch) Delete(key []byte) error { + b.writes = append(b.writes, keyvalue{common.CopyBytes(key), nil, true}) + b.size += 1 + return nil +} + +// ValueSize retrieves the amount of data queued up for writing. +func (b *memoryBatch) ValueSize() int { + return b.size +} + +// Write flushes any accumulated data to the memory database. +func (b *memoryBatch) Write() error { + b.db.lock.Lock() + defer b.db.lock.Unlock() + + for _, keyvalue := range b.writes { + if keyvalue.delete { + delete(b.db.db, string(keyvalue.key)) + continue + } + b.db.db[string(keyvalue.key)] = keyvalue.value + } + return nil +} + +// Reset resets the batch for reuse. +func (b *memoryBatch) Reset() { + b.writes = b.writes[:0] + b.size = 0 +} + +// memoryIterator can walk over the (potentially partial) keyspace of a memory +// key value store. Internally it is a deep copy of the entire iterated state, +// sorted by keys. +type memoryIterator struct { + inited bool + keys []string + values [][]byte +} + +// Next moves the iterator to the next key/value pair. It returns whether the +// iterator is exhausted. +func (it *memoryIterator) Next() bool { + // If the iterator was not yet initialized, do it now + if !it.inited { + it.inited = true + return len(it.keys) > 0 + } + // Iterator already initialize, advance it + if len(it.keys) > 0 { + it.keys = it.keys[1:] + it.values = it.values[1:] + } + return len(it.keys) > 0 +} + +// Error returns any accumulated error. Exhausting all the key/value pairs +// is not considered to be an error. A memory iterator cannot encounter errors. +func (it *memoryIterator) Error() error { + return nil +} + +// Key returns the key of the current key/value pair, or nil if done. The caller +// should not modify the contents of the returned slice, and its contents may +// change on the next call to Next. +func (it *memoryIterator) Key() []byte { + if len(it.keys) > 0 { + return []byte(it.keys[0]) + } + return nil +} + +// Value returns the value of the current key/value pair, or nil if done. The +// caller should not modify the contents of the returned slice, and its contents +// may change on the next call to Next. +func (it *memoryIterator) Value() []byte { + if len(it.values) > 0 { + return it.values[0] + } + return nil +} + +// Release releases associated resources. Release should always succeed and can +// be called multiple times without causing error. +func (it *memoryIterator) Release() { + it.keys, it.values = nil, nil +} diff --git a/ethdb/memorydb/memorydb_test.go b/ethdb/memorydb/memorydb_test.go new file mode 100644 index 000000000..4210a0f7c --- /dev/null +++ b/ethdb/memorydb/memorydb_test.go @@ -0,0 +1,100 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package memorydb + +import ( + "bytes" + "testing" +) + +// Tests that key-value iteration on top of a memory database works. +func TestMemoryDBIterator(t *testing.T) { + tests := []struct { + content map[string]string + prefix string + order []string + }{ + // Empty databases should be iterable + {map[string]string{}, "", nil}, + {map[string]string{}, "non-existent-prefix", nil}, + + // Single-item databases should be iterable + {map[string]string{"key": "val"}, "", []string{"key"}}, + {map[string]string{"key": "val"}, "k", []string{"key"}}, + {map[string]string{"key": "val"}, "l", nil}, + + // Multi-item databases should be fully iterable + { + map[string]string{"k1": "v1", "k5": "v5", "k2": "v2", "k4": "v4", "k3": "v3"}, + "", + []string{"k1", "k2", "k3", "k4", "k5"}, + }, + { + map[string]string{"k1": "v1", "k5": "v5", "k2": "v2", "k4": "v4", "k3": "v3"}, + "k", + []string{"k1", "k2", "k3", "k4", "k5"}, + }, + { + map[string]string{"k1": "v1", "k5": "v5", "k2": "v2", "k4": "v4", "k3": "v3"}, + "l", + nil, + }, + // Multi-item databases should be prefix-iterable + { + map[string]string{ + "ka1": "va1", "ka5": "va5", "ka2": "va2", "ka4": "va4", "ka3": "va3", + "kb1": "vb1", "kb5": "vb5", "kb2": "vb2", "kb4": "vb4", "kb3": "vb3", + }, + "ka", + []string{"ka1", "ka2", "ka3", "ka4", "ka5"}, + }, + { + map[string]string{ + "ka1": "va1", "ka5": "va5", "ka2": "va2", "ka4": "va4", "ka3": "va3", + "kb1": "vb1", "kb5": "vb5", "kb2": "vb2", "kb4": "vb4", "kb3": "vb3", + }, + "kc", + nil, + }, + } + for i, tt := range tests { + // Create the key-value data store + db := New() + for key, val := range tt.content { + if err := db.Put([]byte(key), []byte(val)); err != nil { + t.Fatalf("test %d: failed to insert item %s:%s into database: %v", i, key, val, err) + } + } + // Iterate over the database with the given configs and verify the results + it, idx := db.NewIteratorWithPrefix([]byte(tt.prefix)), 0 + for it.Next() { + if !bytes.Equal(it.Key(), []byte(tt.order[idx])) { + t.Errorf("test %d: item %d: key mismatch: have %s, want %s", i, idx, string(it.Key()), tt.order[idx]) + } + if !bytes.Equal(it.Value(), []byte(tt.content[tt.order[idx]])) { + t.Errorf("test %d: item %d: value mismatch: have %s, want %s", i, idx, string(it.Value()), tt.content[tt.order[idx]]) + } + idx++ + } + if err := it.Error(); err != nil { + t.Errorf("test %d: iteration failed: %v", i, err) + } + if idx != len(tt.order) { + t.Errorf("test %d: iteration terminated prematurely: have %d, want %d", i, idx, len(tt.order)) + } + } +} diff --git a/ethdb/table.go b/ethdb/table.go deleted file mode 100644 index 28069c078..000000000 --- a/ethdb/table.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2014 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package ethdb - -type table struct { - db Database - prefix string -} - -// NewTable returns a Database object that prefixes all keys with a given -// string. -func NewTable(db Database, prefix string) Database { - return &table{ - db: db, - prefix: prefix, - } -} - -func (dt *table) Put(key []byte, value []byte) error { - return dt.db.Put(append([]byte(dt.prefix), key...), value) -} - -func (dt *table) Has(key []byte) (bool, error) { - return dt.db.Has(append([]byte(dt.prefix), key...)) -} - -func (dt *table) Get(key []byte) ([]byte, error) { - return dt.db.Get(append([]byte(dt.prefix), key...)) -} - -func (dt *table) Delete(key []byte) error { - return dt.db.Delete(append([]byte(dt.prefix), key...)) -} - -func (dt *table) Close() { - // Do nothing; don't close the underlying DB. -} diff --git a/ethdb/table_batch.go b/ethdb/table_batch.go deleted file mode 100644 index ae83e79ce..000000000 --- a/ethdb/table_batch.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2014 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package ethdb - -type tableBatch struct { - batch Batch - prefix string -} - -// NewTableBatch returns a Batch object which prefixes all keys with a given string. -func NewTableBatch(db Database, prefix string) Batch { - return &tableBatch{db.NewBatch(), prefix} -} - -func (dt *table) NewBatch() Batch { - return &tableBatch{dt.db.NewBatch(), dt.prefix} -} - -func (tb *tableBatch) Put(key, value []byte) error { - return tb.batch.Put(append([]byte(tb.prefix), key...), value) -} - -func (tb *tableBatch) Delete(key []byte) error { - return tb.batch.Delete(append([]byte(tb.prefix), key...)) -} - -func (tb *tableBatch) Write() error { - return tb.batch.Write() -} - -func (tb *tableBatch) ValueSize() int { - return tb.batch.ValueSize() -} - -func (tb *tableBatch) Reset() { - tb.batch.Reset() -} |