diff options
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go')
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go | 180 |
1 files changed, 106 insertions, 74 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go index 4660e840c..e1cf30c53 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go @@ -14,84 +14,93 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -func (d *DB) writeJournal(b *Batch) error { - w, err := d.journal.Next() +func (db *DB) writeJournal(b *Batch) error { + w, err := db.journal.Next() if err != nil { return err } if _, err := w.Write(b.encode()); err != nil { return err } - if err := d.journal.Flush(); err != nil { + if err := db.journal.Flush(); err != nil { return err } if b.sync { - return d.journalWriter.Sync() + return db.journalWriter.Sync() } return nil } -func (d *DB) jWriter() { - defer d.closeW.Done() +func (db *DB) jWriter() { + defer db.closeW.Done() for { select { - case b := <-d.journalC: + case b := <-db.journalC: if b != nil { - d.journalAckC <- d.writeJournal(b) + db.journalAckC <- db.writeJournal(b) } - case _, _ = <-d.closeC: + case _, _ = <-db.closeC: return } } } -func (d *DB) rotateMem(n int) (mem *memdb.DB, err error) { +func (db *DB) rotateMem(n int) (mem *memDB, err error) { // Wait for pending memdb compaction. - err = d.compSendIdle(d.mcompCmdC) + err = db.compSendIdle(db.mcompCmdC) if err != nil { return } // Create new memdb and journal. - mem, err = d.newMem(n) + mem, err = db.newMem(n) if err != nil { return } // Schedule memdb compaction. - d.compTrigger(d.mcompTriggerC) + db.compSendTrigger(db.mcompCmdC) return } -func (d *DB) flush(n int) (mem *memdb.DB, nn int, err error) { - s := d.s - +func (db *DB) flush(n int) (mem *memDB, nn int, err error) { delayed := false - flush := func() bool { - v := s.version() + flush := func() (retry bool) { + v := db.s.version() defer v.release() - mem = d.getEffectiveMem() - nn = mem.Free() + mem = db.getEffectiveMem() + defer func() { + if retry { + mem.decref() + mem = nil + } + }() + nn = mem.mdb.Free() switch { - case v.tLen(0) >= kL0_SlowdownWritesTrigger && !delayed: + case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed: delayed = true time.Sleep(time.Millisecond) case nn >= n: return false - case v.tLen(0) >= kL0_StopWritesTrigger: + case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger(): delayed = true - err = d.compSendIdle(d.tcompCmdC) + err = db.compSendIdle(db.tcompCmdC) if err != nil { return false } default: // Allow memdb to grow if it has no entry. - if mem.Len() == 0 { + if mem.mdb.Len() == 0 { nn = n - return false + } else { + mem.decref() + mem, err = db.rotateMem(n) + if err == nil { + nn = mem.mdb.Free() + } else { + nn = 0 + } } - mem, err = d.rotateMem(n) - nn = mem.Free() return false } return true @@ -100,7 +109,12 @@ func (d *DB) flush(n int) (mem *memdb.DB, nn int, err error) { for flush() { } if delayed { - s.logf("db@write delayed T·%v", time.Since(start)) + db.writeDelay += time.Since(start) + db.writeDelayN++ + } else if db.writeDelayN > 0 { + db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay) + db.writeDelay = 0 + db.writeDelayN = 0 } return } @@ -109,39 +123,45 @@ func (d *DB) flush(n int) (mem *memdb.DB, nn int, err error) { // sequentially. // // It is safe to modify the contents of the arguments after Write returns. -func (d *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { - err = d.ok() - if err != nil || b == nil || b.len() == 0 { +func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { + err = db.ok() + if err != nil || b == nil || b.Len() == 0 { return } b.init(wo.GetSync()) // The write happen synchronously. -retry: select { - case d.writeC <- b: - if <-d.writeMergedC { - return <-d.writeAckC + case db.writeC <- b: + if <-db.writeMergedC { + return <-db.writeAckC } - goto retry - case d.writeLockC <- struct{}{}: - case _, _ = <-d.closeC: + case db.writeLockC <- struct{}{}: + case err = <-db.compPerErrC: + return + case _, _ = <-db.closeC: return ErrClosed } merged := 0 + danglingMerge := false defer func() { - <-d.writeLockC + if danglingMerge { + db.writeMergedC <- false + } else { + <-db.writeLockC + } for i := 0; i < merged; i++ { - d.writeAckC <- err + db.writeAckC <- err } }() - mem, memFree, err := d.flush(b.size()) + mem, memFree, err := db.flush(b.size()) if err != nil { return } + defer mem.decref() // Calculate maximum size of the batch. m := 1 << 20 @@ -154,13 +174,13 @@ retry: drain: for b.size() < m && !b.sync { select { - case nb := <-d.writeC: + case nb := <-db.writeC: if b.size()+nb.size() <= m { b.append(nb) - d.writeMergedC <- true + db.writeMergedC <- true merged++ } else { - d.writeMergedC <- false + danglingMerge = true break drain } default: @@ -169,44 +189,52 @@ drain: } // Set batch first seq number relative from last seq. - b.seq = d.seq + 1 + b.seq = db.seq + 1 // Write journal concurrently if it is large enough. if b.size() >= (128 << 10) { // Push the write batch to the journal writer select { - case _, _ = <-d.closeC: + case db.journalC <- b: + // Write into memdb + if berr := b.memReplay(mem.mdb); berr != nil { + panic(berr) + } + case err = <-db.compPerErrC: + return + case _, _ = <-db.closeC: err = ErrClosed return - case d.journalC <- b: - // Write into memdb - b.memReplay(mem) } // Wait for journal writer select { - case _, _ = <-d.closeC: - err = ErrClosed - return - case err = <-d.journalAckC: + case err = <-db.journalAckC: if err != nil { // Revert memdb if error detected - b.revertMemReplay(mem) + if berr := b.revertMemReplay(mem.mdb); berr != nil { + panic(berr) + } return } + case _, _ = <-db.closeC: + err = ErrClosed + return } } else { - err = d.writeJournal(b) + err = db.writeJournal(b) if err != nil { return } - b.memReplay(mem) + if berr := b.memReplay(mem.mdb); berr != nil { + panic(berr) + } } // Set last seq number. - d.addSeq(uint64(b.len())) + db.addSeq(uint64(b.Len())) if b.size() >= memFree { - d.rotateMem(0) + db.rotateMem(0) } return } @@ -215,20 +243,20 @@ drain: // for that key; a DB is not a multi-map. // // It is safe to modify the contents of the arguments after Put returns. -func (d *DB) Put(key, value []byte, wo *opt.WriteOptions) error { +func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error { b := new(Batch) b.Put(key, value) - return d.Write(b, wo) + return db.Write(b, wo) } // Delete deletes the value for the given key. It returns ErrNotFound if // the DB does not contain the key. // // It is safe to modify the contents of the arguments after Delete returns. -func (d *DB) Delete(key []byte, wo *opt.WriteOptions) error { +func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error { b := new(Batch) b.Delete(key) - return d.Write(b, wo) + return db.Write(b, wo) } func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool { @@ -247,33 +275,37 @@ func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool { // A nil Range.Start is treated as a key before all keys in the DB. // And a nil Range.Limit is treated as a key after all keys in the DB. // Therefore if both is nil then it will compact entire DB. -func (d *DB) CompactRange(r util.Range) error { - if err := d.ok(); err != nil { +func (db *DB) CompactRange(r util.Range) error { + if err := db.ok(); err != nil { return err } + // Lock writer. select { - case d.writeLockC <- struct{}{}: - case _, _ = <-d.closeC: + case db.writeLockC <- struct{}{}: + case err := <-db.compPerErrC: + return err + case _, _ = <-db.closeC: return ErrClosed } // Check for overlaps in memdb. - mem := d.getEffectiveMem() - if isMemOverlaps(d.s.icmp, mem, r.Start, r.Limit) { + mem := db.getEffectiveMem() + defer mem.decref() + if isMemOverlaps(db.s.icmp, mem.mdb, r.Start, r.Limit) { // Memdb compaction. - if _, err := d.rotateMem(0); err != nil { - <-d.writeLockC + if _, err := db.rotateMem(0); err != nil { + <-db.writeLockC return err } - <-d.writeLockC - if err := d.compSendIdle(d.mcompCmdC); err != nil { + <-db.writeLockC + if err := db.compSendIdle(db.mcompCmdC); err != nil { return err } } else { - <-d.writeLockC + <-db.writeLockC } // Table compaction. - return d.compSendRange(d.tcompCmdC, -1, r.Start, r.Limit) + return db.compSendRange(db.tcompCmdC, -1, r.Start, r.Limit) } |