aboutsummaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
diff options
context:
space:
mode:
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go')
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go180
1 files changed, 106 insertions, 74 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
index 4660e840c..e1cf30c53 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
@@ -14,84 +14,93 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
)
-func (d *DB) writeJournal(b *Batch) error {
- w, err := d.journal.Next()
+func (db *DB) writeJournal(b *Batch) error {
+ w, err := db.journal.Next()
if err != nil {
return err
}
if _, err := w.Write(b.encode()); err != nil {
return err
}
- if err := d.journal.Flush(); err != nil {
+ if err := db.journal.Flush(); err != nil {
return err
}
if b.sync {
- return d.journalWriter.Sync()
+ return db.journalWriter.Sync()
}
return nil
}
-func (d *DB) jWriter() {
- defer d.closeW.Done()
+func (db *DB) jWriter() {
+ defer db.closeW.Done()
for {
select {
- case b := <-d.journalC:
+ case b := <-db.journalC:
if b != nil {
- d.journalAckC <- d.writeJournal(b)
+ db.journalAckC <- db.writeJournal(b)
}
- case _, _ = <-d.closeC:
+ case _, _ = <-db.closeC:
return
}
}
}
-func (d *DB) rotateMem(n int) (mem *memdb.DB, err error) {
+func (db *DB) rotateMem(n int) (mem *memDB, err error) {
// Wait for pending memdb compaction.
- err = d.compSendIdle(d.mcompCmdC)
+ err = db.compSendIdle(db.mcompCmdC)
if err != nil {
return
}
// Create new memdb and journal.
- mem, err = d.newMem(n)
+ mem, err = db.newMem(n)
if err != nil {
return
}
// Schedule memdb compaction.
- d.compTrigger(d.mcompTriggerC)
+ db.compSendTrigger(db.mcompCmdC)
return
}
-func (d *DB) flush(n int) (mem *memdb.DB, nn int, err error) {
- s := d.s
-
+func (db *DB) flush(n int) (mem *memDB, nn int, err error) {
delayed := false
- flush := func() bool {
- v := s.version()
+ flush := func() (retry bool) {
+ v := db.s.version()
defer v.release()
- mem = d.getEffectiveMem()
- nn = mem.Free()
+ mem = db.getEffectiveMem()
+ defer func() {
+ if retry {
+ mem.decref()
+ mem = nil
+ }
+ }()
+ nn = mem.mdb.Free()
switch {
- case v.tLen(0) >= kL0_SlowdownWritesTrigger && !delayed:
+ case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed:
delayed = true
time.Sleep(time.Millisecond)
case nn >= n:
return false
- case v.tLen(0) >= kL0_StopWritesTrigger:
+ case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger():
delayed = true
- err = d.compSendIdle(d.tcompCmdC)
+ err = db.compSendIdle(db.tcompCmdC)
if err != nil {
return false
}
default:
// Allow memdb to grow if it has no entry.
- if mem.Len() == 0 {
+ if mem.mdb.Len() == 0 {
nn = n
- return false
+ } else {
+ mem.decref()
+ mem, err = db.rotateMem(n)
+ if err == nil {
+ nn = mem.mdb.Free()
+ } else {
+ nn = 0
+ }
}
- mem, err = d.rotateMem(n)
- nn = mem.Free()
return false
}
return true
@@ -100,7 +109,12 @@ func (d *DB) flush(n int) (mem *memdb.DB, nn int, err error) {
for flush() {
}
if delayed {
- s.logf("db@write delayed T·%v", time.Since(start))
+ db.writeDelay += time.Since(start)
+ db.writeDelayN++
+ } else if db.writeDelayN > 0 {
+ db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
+ db.writeDelay = 0
+ db.writeDelayN = 0
}
return
}
@@ -109,39 +123,45 @@ func (d *DB) flush(n int) (mem *memdb.DB, nn int, err error) {
// sequentially.
//
// It is safe to modify the contents of the arguments after Write returns.
-func (d *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
- err = d.ok()
- if err != nil || b == nil || b.len() == 0 {
+func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
+ err = db.ok()
+ if err != nil || b == nil || b.Len() == 0 {
return
}
b.init(wo.GetSync())
// The write happen synchronously.
-retry:
select {
- case d.writeC <- b:
- if <-d.writeMergedC {
- return <-d.writeAckC
+ case db.writeC <- b:
+ if <-db.writeMergedC {
+ return <-db.writeAckC
}
- goto retry
- case d.writeLockC <- struct{}{}:
- case _, _ = <-d.closeC:
+ case db.writeLockC <- struct{}{}:
+ case err = <-db.compPerErrC:
+ return
+ case _, _ = <-db.closeC:
return ErrClosed
}
merged := 0
+ danglingMerge := false
defer func() {
- <-d.writeLockC
+ if danglingMerge {
+ db.writeMergedC <- false
+ } else {
+ <-db.writeLockC
+ }
for i := 0; i < merged; i++ {
- d.writeAckC <- err
+ db.writeAckC <- err
}
}()
- mem, memFree, err := d.flush(b.size())
+ mem, memFree, err := db.flush(b.size())
if err != nil {
return
}
+ defer mem.decref()
// Calculate maximum size of the batch.
m := 1 << 20
@@ -154,13 +174,13 @@ retry:
drain:
for b.size() < m && !b.sync {
select {
- case nb := <-d.writeC:
+ case nb := <-db.writeC:
if b.size()+nb.size() <= m {
b.append(nb)
- d.writeMergedC <- true
+ db.writeMergedC <- true
merged++
} else {
- d.writeMergedC <- false
+ danglingMerge = true
break drain
}
default:
@@ -169,44 +189,52 @@ drain:
}
// Set batch first seq number relative from last seq.
- b.seq = d.seq + 1
+ b.seq = db.seq + 1
// Write journal concurrently if it is large enough.
if b.size() >= (128 << 10) {
// Push the write batch to the journal writer
select {
- case _, _ = <-d.closeC:
+ case db.journalC <- b:
+ // Write into memdb
+ if berr := b.memReplay(mem.mdb); berr != nil {
+ panic(berr)
+ }
+ case err = <-db.compPerErrC:
+ return
+ case _, _ = <-db.closeC:
err = ErrClosed
return
- case d.journalC <- b:
- // Write into memdb
- b.memReplay(mem)
}
// Wait for journal writer
select {
- case _, _ = <-d.closeC:
- err = ErrClosed
- return
- case err = <-d.journalAckC:
+ case err = <-db.journalAckC:
if err != nil {
// Revert memdb if error detected
- b.revertMemReplay(mem)
+ if berr := b.revertMemReplay(mem.mdb); berr != nil {
+ panic(berr)
+ }
return
}
+ case _, _ = <-db.closeC:
+ err = ErrClosed
+ return
}
} else {
- err = d.writeJournal(b)
+ err = db.writeJournal(b)
if err != nil {
return
}
- b.memReplay(mem)
+ if berr := b.memReplay(mem.mdb); berr != nil {
+ panic(berr)
+ }
}
// Set last seq number.
- d.addSeq(uint64(b.len()))
+ db.addSeq(uint64(b.Len()))
if b.size() >= memFree {
- d.rotateMem(0)
+ db.rotateMem(0)
}
return
}
@@ -215,20 +243,20 @@ drain:
// for that key; a DB is not a multi-map.
//
// It is safe to modify the contents of the arguments after Put returns.
-func (d *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
+func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
b := new(Batch)
b.Put(key, value)
- return d.Write(b, wo)
+ return db.Write(b, wo)
}
// Delete deletes the value for the given key. It returns ErrNotFound if
// the DB does not contain the key.
//
// It is safe to modify the contents of the arguments after Delete returns.
-func (d *DB) Delete(key []byte, wo *opt.WriteOptions) error {
+func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
b := new(Batch)
b.Delete(key)
- return d.Write(b, wo)
+ return db.Write(b, wo)
}
func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
@@ -247,33 +275,37 @@ func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
// A nil Range.Start is treated as a key before all keys in the DB.
// And a nil Range.Limit is treated as a key after all keys in the DB.
// Therefore if both is nil then it will compact entire DB.
-func (d *DB) CompactRange(r util.Range) error {
- if err := d.ok(); err != nil {
+func (db *DB) CompactRange(r util.Range) error {
+ if err := db.ok(); err != nil {
return err
}
+ // Lock writer.
select {
- case d.writeLockC <- struct{}{}:
- case _, _ = <-d.closeC:
+ case db.writeLockC <- struct{}{}:
+ case err := <-db.compPerErrC:
+ return err
+ case _, _ = <-db.closeC:
return ErrClosed
}
// Check for overlaps in memdb.
- mem := d.getEffectiveMem()
- if isMemOverlaps(d.s.icmp, mem, r.Start, r.Limit) {
+ mem := db.getEffectiveMem()
+ defer mem.decref()
+ if isMemOverlaps(db.s.icmp, mem.mdb, r.Start, r.Limit) {
// Memdb compaction.
- if _, err := d.rotateMem(0); err != nil {
- <-d.writeLockC
+ if _, err := db.rotateMem(0); err != nil {
+ <-db.writeLockC
return err
}
- <-d.writeLockC
- if err := d.compSendIdle(d.mcompCmdC); err != nil {
+ <-db.writeLockC
+ if err := db.compSendIdle(db.mcompCmdC); err != nil {
return err
}
} else {
- <-d.writeLockC
+ <-db.writeLockC
}
// Table compaction.
- return d.compSendRange(d.tcompCmdC, -1, r.Start, r.Limit)
+ return db.compSendRange(db.tcompCmdC, -1, r.Start, r.Limit)
}