aboutsummaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go
diff options
context:
space:
mode:
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go')
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go207
1 files changed, 152 insertions, 55 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go
index a13706142..d4db9d6dd 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go
@@ -8,106 +8,203 @@ package leveldb
import (
"sync/atomic"
+ "time"
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/memdb"
)
+type memDB struct {
+ db *DB
+ mdb *memdb.DB
+ ref int32
+}
+
+func (m *memDB) incref() {
+ atomic.AddInt32(&m.ref, 1)
+}
+
+func (m *memDB) decref() {
+ if ref := atomic.AddInt32(&m.ref, -1); ref == 0 {
+ // Only put back memdb with std capacity.
+ if m.mdb.Capacity() == m.db.s.o.GetWriteBuffer() {
+ m.mdb.Reset()
+ m.db.mpoolPut(m.mdb)
+ }
+ m.db = nil
+ m.mdb = nil
+ } else if ref < 0 {
+ panic("negative memdb ref")
+ }
+}
+
// Get latest sequence number.
-func (d *DB) getSeq() uint64 {
- return atomic.LoadUint64(&d.seq)
+func (db *DB) getSeq() uint64 {
+ return atomic.LoadUint64(&db.seq)
}
// Atomically adds delta to seq.
-func (d *DB) addSeq(delta uint64) {
- atomic.AddUint64(&d.seq, delta)
+func (db *DB) addSeq(delta uint64) {
+ atomic.AddUint64(&db.seq, delta)
+}
+
+func (db *DB) sampleSeek(ikey iKey) {
+ v := db.s.version()
+ if v.sampleSeek(ikey) {
+ // Trigger table compaction.
+ db.compSendTrigger(db.tcompCmdC)
+ }
+ v.release()
+}
+
+func (db *DB) mpoolPut(mem *memdb.DB) {
+ defer func() {
+ recover()
+ }()
+ select {
+ case db.memPool <- mem:
+ default:
+ }
+}
+
+func (db *DB) mpoolGet() *memdb.DB {
+ select {
+ case mem := <-db.memPool:
+ return mem
+ default:
+ return nil
+ }
+}
+
+func (db *DB) mpoolDrain() {
+ ticker := time.NewTicker(30 * time.Second)
+ for {
+ select {
+ case <-ticker.C:
+ select {
+ case <-db.memPool:
+ default:
+ }
+ case _, _ = <-db.closeC:
+ close(db.memPool)
+ return
+ }
+ }
}
// Create new memdb and froze the old one; need external synchronization.
// newMem only called synchronously by the writer.
-func (d *DB) newMem(n int) (mem *memdb.DB, err error) {
- s := d.s
-
- num := s.allocFileNum()
- file := s.getJournalFile(num)
+func (db *DB) newMem(n int) (mem *memDB, err error) {
+ num := db.s.allocFileNum()
+ file := db.s.getJournalFile(num)
w, err := file.Create()
if err != nil {
- s.reuseFileNum(num)
+ db.s.reuseFileNum(num)
return
}
- d.memMu.Lock()
- if d.journal == nil {
- d.journal = journal.NewWriter(w)
+
+ db.memMu.Lock()
+ defer db.memMu.Unlock()
+
+ if db.frozenMem != nil {
+ panic("still has frozen mem")
+ }
+
+ if db.journal == nil {
+ db.journal = journal.NewWriter(w)
} else {
- d.journal.Reset(w)
- d.journalWriter.Close()
- d.frozenJournalFile = d.journalFile
- }
- d.journalWriter = w
- d.journalFile = file
- d.frozenMem = d.mem
- d.mem = memdb.New(s.icmp, maxInt(d.s.o.GetWriteBuffer(), n))
- mem = d.mem
- // The seq only incremented by the writer.
- d.frozenSeq = d.seq
- d.memMu.Unlock()
+ db.journal.Reset(w)
+ db.journalWriter.Close()
+ db.frozenJournalFile = db.journalFile
+ }
+ db.journalWriter = w
+ db.journalFile = file
+ db.frozenMem = db.mem
+ mdb := db.mpoolGet()
+ if mdb == nil || mdb.Capacity() < n {
+ mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n))
+ }
+ mem = &memDB{
+ db: db,
+ mdb: mdb,
+ ref: 2,
+ }
+ db.mem = mem
+ // The seq only incremented by the writer. And whoever called newMem
+ // should hold write lock, so no need additional synchronization here.
+ db.frozenSeq = db.seq
return
}
// Get all memdbs.
-func (d *DB) getMems() (e *memdb.DB, f *memdb.DB) {
- d.memMu.RLock()
- defer d.memMu.RUnlock()
- return d.mem, d.frozenMem
+func (db *DB) getMems() (e, f *memDB) {
+ db.memMu.RLock()
+ defer db.memMu.RUnlock()
+ if db.mem == nil {
+ panic("nil effective mem")
+ }
+ db.mem.incref()
+ if db.frozenMem != nil {
+ db.frozenMem.incref()
+ }
+ return db.mem, db.frozenMem
}
// Get frozen memdb.
-func (d *DB) getEffectiveMem() *memdb.DB {
- d.memMu.RLock()
- defer d.memMu.RUnlock()
- return d.mem
+func (db *DB) getEffectiveMem() *memDB {
+ db.memMu.RLock()
+ defer db.memMu.RUnlock()
+ if db.mem == nil {
+ panic("nil effective mem")
+ }
+ db.mem.incref()
+ return db.mem
}
// Check whether we has frozen memdb.
-func (d *DB) hasFrozenMem() bool {
- d.memMu.RLock()
- defer d.memMu.RUnlock()
- return d.frozenMem != nil
+func (db *DB) hasFrozenMem() bool {
+ db.memMu.RLock()
+ defer db.memMu.RUnlock()
+ return db.frozenMem != nil
}
// Get frozen memdb.
-func (d *DB) getFrozenMem() *memdb.DB {
- d.memMu.RLock()
- defer d.memMu.RUnlock()
- return d.frozenMem
+func (db *DB) getFrozenMem() *memDB {
+ db.memMu.RLock()
+ defer db.memMu.RUnlock()
+ if db.frozenMem != nil {
+ db.frozenMem.incref()
+ }
+ return db.frozenMem
}
// Drop frozen memdb; assume that frozen memdb isn't nil.
-func (d *DB) dropFrozenMem() {
- d.memMu.Lock()
- if err := d.frozenJournalFile.Remove(); err != nil {
- d.s.logf("journal@remove removing @%d %q", d.frozenJournalFile.Num(), err)
+func (db *DB) dropFrozenMem() {
+ db.memMu.Lock()
+ if err := db.frozenJournalFile.Remove(); err != nil {
+ db.logf("journal@remove removing @%d %q", db.frozenJournalFile.Num(), err)
} else {
- d.s.logf("journal@remove removed @%d", d.frozenJournalFile.Num())
+ db.logf("journal@remove removed @%d", db.frozenJournalFile.Num())
}
- d.frozenJournalFile = nil
- d.frozenMem = nil
- d.memMu.Unlock()
+ db.frozenJournalFile = nil
+ db.frozenMem.decref()
+ db.frozenMem = nil
+ db.memMu.Unlock()
}
// Set closed flag; return true if not already closed.
-func (d *DB) setClosed() bool {
- return atomic.CompareAndSwapUint32(&d.closed, 0, 1)
+func (db *DB) setClosed() bool {
+ return atomic.CompareAndSwapUint32(&db.closed, 0, 1)
}
// Check whether DB was closed.
-func (d *DB) isClosed() bool {
- return atomic.LoadUint32(&d.closed) != 0
+func (db *DB) isClosed() bool {
+ return atomic.LoadUint32(&db.closed) != 0
}
// Check read ok status.
-func (d *DB) ok() error {
- if d.isClosed() {
+func (db *DB) ok() error {
+ if db.isClosed() {
return ErrClosed
}
return nil