diff options
Diffstat (limited to 'ethdb/database.go')
-rw-r--r-- | ethdb/database.go | 111 |
1 files changed, 84 insertions, 27 deletions
diff --git a/ethdb/database.go b/ethdb/database.go index cc2df5fa0..eb562f852 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -1,17 +1,25 @@ package ethdb import ( - "fmt" + "sync" + "time" "github.com/ethereum/go-ethereum/compression/rle" - "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" ) type LDBDatabase struct { - db *leveldb.DB - comp bool + fn string + + mu sync.Mutex + db *leveldb.DB + + queue map[string][]byte + + quit chan struct{} } func NewLDBDatabase(file string) (*LDBDatabase, error) { @@ -20,35 +28,61 @@ func NewLDBDatabase(file string) (*LDBDatabase, error) { if err != nil { return nil, err } - database := &LDBDatabase{db: db, comp: true} + database := &LDBDatabase{ + fn: file, + db: db, + quit: make(chan struct{}), + } + database.makeQueue() + + go database.update() + return database, nil } +func (self *LDBDatabase) makeQueue() { + self.queue = make(map[string][]byte) +} + func (self *LDBDatabase) Put(key []byte, value []byte) { - if self.comp { + self.mu.Lock() + defer self.mu.Unlock() + + self.queue[string(key)] = value + /* value = rle.Compress(value) - } - err := self.db.Put(key, value, nil) - if err != nil { - fmt.Println("Error put", err) - } + err := self.db.Put(key, value, nil) + if err != nil { + fmt.Println("Error put", err) + } + */ } func (self *LDBDatabase) Get(key []byte) ([]byte, error) { + self.mu.Lock() + defer self.mu.Unlock() + + // Check queue first + if dat, ok := self.queue[string(key)]; ok { + return dat, nil + } + dat, err := self.db.Get(key, nil) if err != nil { return nil, err } - if self.comp { - return rle.Decompress(dat) - } - - return dat, nil + return rle.Decompress(dat) } func (self *LDBDatabase) Delete(key []byte) error { + self.mu.Lock() + defer self.mu.Unlock() + + // make sure it's not in the queue + delete(self.queue, string(key)) + return self.db.Delete(key, nil) } @@ -66,23 +100,46 @@ func (self *LDBDatabase) NewIterator() iterator.Iterator { return self.db.NewIterator(nil, nil) } -func (self *LDBDatabase) Write(batch *leveldb.Batch) error { +func (self *LDBDatabase) Flush() error { + self.mu.Lock() + defer self.mu.Unlock() + + batch := new(leveldb.Batch) + + for key, value := range self.queue { + batch.Put([]byte(key), rle.Compress(value)) + } + self.makeQueue() // reset the queue + return self.db.Write(batch, nil) } func (self *LDBDatabase) Close() { - // Close the leveldb database - self.db.Close() + self.quit <- struct{}{} + <-self.quit + glog.V(logger.Info).Infoln("flushed and closed db:", self.fn) } -func (self *LDBDatabase) Print() { - iter := self.db.NewIterator(nil, nil) - for iter.Next() { - key := iter.Key() - value := iter.Value() +func (self *LDBDatabase) update() { + ticker := time.NewTicker(1 * time.Minute) +done: + for { + select { + case <-ticker.C: + if err := self.Flush(); err != nil { + glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err) + } + case <-self.quit: + break done + } + } - fmt.Printf("%x(%d): ", key, len(key)) - node := common.NewValueFromBytes(value) - fmt.Printf("%v\n", node) + if err := self.Flush(); err != nil { + glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err) } + + // Close the leveldb database + self.db.Close() + + self.quit <- struct{}{} } |