diff options
author | obscuren <geffobscura@gmail.com> | 2015-04-08 04:19:01 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-04-08 04:19:01 +0800 |
commit | 7f32a08b6095bb5f1ff58168be70326ee0c29266 (patch) | |
tree | 8fec3ed0cb6ee6a99942f8ad699c85f8d204339b | |
parent | 758205b187e079080193c2fed2a21caff2377329 (diff) | |
download | go-tangerine-7f32a08b6095bb5f1ff58168be70326ee0c29266.tar go-tangerine-7f32a08b6095bb5f1ff58168be70326ee0c29266.tar.gz go-tangerine-7f32a08b6095bb5f1ff58168be70326ee0c29266.tar.bz2 go-tangerine-7f32a08b6095bb5f1ff58168be70326ee0c29266.tar.lz go-tangerine-7f32a08b6095bb5f1ff58168be70326ee0c29266.tar.xz go-tangerine-7f32a08b6095bb5f1ff58168be70326ee0c29266.tar.zst go-tangerine-7f32a08b6095bb5f1ff58168be70326ee0c29266.zip |
Queued level db writes and batch writes. Closes #647
-rw-r--r-- | common/db.go | 2 | ||||
-rw-r--r-- | ethdb/database.go | 111 | ||||
-rw-r--r-- | ethdb/database_test.go | 27 |
3 files changed, 94 insertions, 46 deletions
diff --git a/common/db.go b/common/db.go index 6505e61c6..408b1e755 100644 --- a/common/db.go +++ b/common/db.go @@ -4,9 +4,7 @@ package common type Database interface { Put(key []byte, value []byte) Get(key []byte) ([]byte, error) - //GetKeys() []*Key Delete(key []byte) error LastKnownTD() []byte Close() - Print() } diff --git a/ethdb/database.go b/ethdb/database.go index cc2df5fa0..eb562f852 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -1,17 +1,25 @@ package ethdb import ( - "fmt" + "sync" + "time" "github.com/ethereum/go-ethereum/compression/rle" - "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" ) type LDBDatabase struct { - db *leveldb.DB - comp bool + fn string + + mu sync.Mutex + db *leveldb.DB + + queue map[string][]byte + + quit chan struct{} } func NewLDBDatabase(file string) (*LDBDatabase, error) { @@ -20,35 +28,61 @@ func NewLDBDatabase(file string) (*LDBDatabase, error) { if err != nil { return nil, err } - database := &LDBDatabase{db: db, comp: true} + database := &LDBDatabase{ + fn: file, + db: db, + quit: make(chan struct{}), + } + database.makeQueue() + + go database.update() + return database, nil } +func (self *LDBDatabase) makeQueue() { + self.queue = make(map[string][]byte) +} + func (self *LDBDatabase) Put(key []byte, value []byte) { - if self.comp { + self.mu.Lock() + defer self.mu.Unlock() + + self.queue[string(key)] = value + /* value = rle.Compress(value) - } - err := self.db.Put(key, value, nil) - if err != nil { - fmt.Println("Error put", err) - } + err := self.db.Put(key, value, nil) + if err != nil { + fmt.Println("Error put", err) + } + */ } func (self *LDBDatabase) Get(key []byte) ([]byte, error) { + self.mu.Lock() + defer self.mu.Unlock() + + // Check queue first + if dat, ok := self.queue[string(key)]; ok { + return dat, nil + } + dat, err := self.db.Get(key, nil) if err != nil { return nil, err } - if self.comp { - return rle.Decompress(dat) - } - - return dat, nil + return rle.Decompress(dat) } func (self *LDBDatabase) Delete(key []byte) error { + self.mu.Lock() + defer self.mu.Unlock() + + // make sure it's not in the queue + delete(self.queue, string(key)) + return self.db.Delete(key, nil) } @@ -66,23 +100,46 @@ func (self *LDBDatabase) NewIterator() iterator.Iterator { return self.db.NewIterator(nil, nil) } -func (self *LDBDatabase) Write(batch *leveldb.Batch) error { +func (self *LDBDatabase) Flush() error { + self.mu.Lock() + defer self.mu.Unlock() + + batch := new(leveldb.Batch) + + for key, value := range self.queue { + batch.Put([]byte(key), rle.Compress(value)) + } + self.makeQueue() // reset the queue + return self.db.Write(batch, nil) } func (self *LDBDatabase) Close() { - // Close the leveldb database - self.db.Close() + self.quit <- struct{}{} + <-self.quit + glog.V(logger.Info).Infoln("flushed and closed db:", self.fn) } -func (self *LDBDatabase) Print() { - iter := self.db.NewIterator(nil, nil) - for iter.Next() { - key := iter.Key() - value := iter.Value() +func (self *LDBDatabase) update() { + ticker := time.NewTicker(1 * time.Minute) +done: + for { + select { + case <-ticker.C: + if err := self.Flush(); err != nil { + glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err) + } + case <-self.quit: + break done + } + } - fmt.Printf("%x(%d): ", key, len(key)) - node := common.NewValueFromBytes(value) - fmt.Printf("%v\n", node) + if err := self.Flush(); err != nil { + glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err) } + + // Close the leveldb database + self.db.Close() + + self.quit <- struct{}{} } diff --git a/ethdb/database_test.go b/ethdb/database_test.go index 7de30fd81..3cead8bcd 100644 --- a/ethdb/database_test.go +++ b/ethdb/database_test.go @@ -1,26 +1,19 @@ package ethdb -/* import ( - "bytes" - "testing" + "os" + "path" + + "github.com/ethereum/go-ethereum/common" ) -func TestCompression(t *testing.T) { - db, err := NewLDBDatabase("testdb") - if err != nil { - t.Fatal(err) +func newDb() *LDBDatabase { + file := path.Join("/", "tmp", "ldbtesttmpfile") + if common.FileExist(file) { + os.RemoveAll(file) } - in := make([]byte, 10) - db.Put([]byte("test1"), in) - out, err := db.Get([]byte("test1")) - if err != nil { - t.Fatal(err) - } + db, _ := NewLDBDatabase(file) - if bytes.Compare(out, in) != 0 { - t.Error("put get", in, out) - } + return db } -*/ |