aboutsummaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
diff options
context:
space:
mode:
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go')
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go365
1 files changed, 225 insertions, 140 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
index 5576761fe..cc428b695 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
@@ -14,37 +14,23 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
)
-func (db *DB) writeJournal(b *Batch) error {
- w, err := db.journal.Next()
+func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
+ wr, err := db.journal.Next()
if err != nil {
return err
}
- if _, err := w.Write(b.encode()); err != nil {
+ if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
return err
}
if err := db.journal.Flush(); err != nil {
return err
}
- if b.sync {
+ if sync {
return db.journalWriter.Sync()
}
return nil
}
-func (db *DB) jWriter() {
- defer db.closeW.Done()
- for {
- select {
- case b := <-db.journalC:
- if b != nil {
- db.journalAckC <- db.writeJournal(b)
- }
- case _, _ = <-db.closeC:
- return
- }
- }
-}
-
func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
// Wait for pending memdb compaction.
err = db.compTriggerWait(db.mcompCmdC)
@@ -69,24 +55,29 @@ func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
delayed := false
+ slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
+ pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
flush := func() (retry bool) {
- v := db.s.version()
- defer v.release()
mdb = db.getEffectiveMem()
+ if mdb == nil {
+ err = ErrClosed
+ return false
+ }
defer func() {
if retry {
mdb.decref()
mdb = nil
}
}()
+ tLen := db.s.tLen(0)
mdbFree = mdb.Free()
switch {
- case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed:
+ case tLen >= slowdownTrigger && !delayed:
delayed = true
time.Sleep(time.Millisecond)
case mdbFree >= n:
return false
- case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger():
+ case tLen >= pauseTrigger:
delayed = true
err = db.compTriggerWait(db.tcompCmdC)
if err != nil {
@@ -123,159 +114,250 @@ func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
return
}
-// Write apply the given batch to the DB. The batch will be applied
-// sequentially.
-//
-// It is safe to modify the contents of the arguments after Write returns.
-func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
- err = db.ok()
- if err != nil || b == nil || b.Len() == 0 {
- return
+type writeMerge struct {
+ sync bool
+ batch *Batch
+ keyType keyType
+ key, value []byte
+}
+
+func (db *DB) unlockWrite(overflow bool, merged int, err error) {
+ for i := 0; i < merged; i++ {
+ db.writeAckC <- err
+ }
+ if overflow {
+ // Pass lock to the next write (that failed to merge).
+ db.writeMergedC <- false
+ } else {
+ // Release lock.
+ <-db.writeLockC
}
+}
- b.init(wo.GetSync() && !db.s.o.GetNoSync())
+// ourBatch if defined should equal with batch.
+func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
+ // Try to flush memdb. This method would also trying to throttle writes
+ // if it is too fast and compaction cannot catch-up.
+ mdb, mdbFree, err := db.flush(batch.internalLen)
+ if err != nil {
+ db.unlockWrite(false, 0, err)
+ return err
+ }
+ defer mdb.decref()
- if b.size() > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
- // Writes using transaction.
- tr, err1 := db.OpenTransaction()
- if err1 != nil {
- return err1
+ var (
+ overflow bool
+ merged int
+ batches = []*Batch{batch}
+ )
+
+ if merge {
+ // Merge limit.
+ var mergeLimit int
+ if batch.internalLen > 128<<10 {
+ mergeLimit = (1 << 20) - batch.internalLen
+ } else {
+ mergeLimit = 128 << 10
}
- if err1 := tr.Write(b, wo); err1 != nil {
- tr.Discard()
- return err1
+ mergeCap := mdbFree - batch.internalLen
+ if mergeLimit > mergeCap {
+ mergeLimit = mergeCap
}
- return tr.Commit()
- }
- // The write happen synchronously.
- select {
- case db.writeC <- b:
- if <-db.writeMergedC {
- return <-db.writeAckC
+ merge:
+ for mergeLimit > 0 {
+ select {
+ case incoming := <-db.writeMergeC:
+ if incoming.batch != nil {
+ // Merge batch.
+ if incoming.batch.internalLen > mergeLimit {
+ overflow = true
+ break merge
+ }
+ batches = append(batches, incoming.batch)
+ mergeLimit -= incoming.batch.internalLen
+ } else {
+ // Merge put.
+ internalLen := len(incoming.key) + len(incoming.value) + 8
+ if internalLen > mergeLimit {
+ overflow = true
+ break merge
+ }
+ if ourBatch == nil {
+ ourBatch = db.batchPool.Get().(*Batch)
+ ourBatch.Reset()
+ batches = append(batches, ourBatch)
+ }
+ // We can use same batch since concurrent write doesn't
+ // guarantee write order.
+ ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
+ mergeLimit -= internalLen
+ }
+ sync = sync || incoming.sync
+ merged++
+ db.writeMergedC <- true
+
+ default:
+ break merge
+ }
}
- // Continue, the write lock already acquired by previous writer
- // and handed out to us.
- case db.writeLockC <- struct{}{}:
- case err = <-db.compPerErrC:
- return
- case _, _ = <-db.closeC:
- return ErrClosed
}
- merged := 0
- danglingMerge := false
- defer func() {
- for i := 0; i < merged; i++ {
- db.writeAckC <- err
- }
- if danglingMerge {
- // Only one dangling merge at most, so this is safe.
- db.writeMergedC <- false
- } else {
- <-db.writeLockC
+ // Seq number.
+ seq := db.seq + 1
+
+ // Write journal.
+ if err := db.writeJournal(batches, seq, sync); err != nil {
+ db.unlockWrite(overflow, merged, err)
+ return err
+ }
+
+ // Put batches.
+ for _, batch := range batches {
+ if err := batch.putMem(seq, mdb.DB); err != nil {
+ panic(err)
}
- }()
+ seq += uint64(batch.Len())
+ }
- mdb, mdbFree, err := db.flush(b.size())
- if err != nil {
- return
+ // Incr seq number.
+ db.addSeq(uint64(batchesLen(batches)))
+
+ // Rotate memdb if it's reach the threshold.
+ if batch.internalLen >= mdbFree {
+ db.rotateMem(0, false)
}
- defer mdb.decref()
- // Calculate maximum size of the batch.
- m := 1 << 20
- if x := b.size(); x <= 128<<10 {
- m = x + (128 << 10)
+ db.unlockWrite(overflow, merged, nil)
+ return nil
+}
+
+// Write apply the given batch to the DB. The batch records will be applied
+// sequentially. Write might be used concurrently, when used concurrently and
+// batch is small enough, write will try to merge the batches. Set NoWriteMerge
+// option to true to disable write merge.
+//
+// It is safe to modify the contents of the arguments after Write returns but
+// not before. Write will not modify content of the batch.
+func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
+ if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 {
+ return err
}
- m = minInt(m, mdbFree)
- // Merge with other batch.
-drain:
- for b.size() < m && !b.sync {
- select {
- case nb := <-db.writeC:
- if b.size()+nb.size() <= m {
- b.append(nb)
- db.writeMergedC <- true
- merged++
- } else {
- danglingMerge = true
- break drain
- }
- default:
- break drain
+ // If the batch size is larger than write buffer, it may justified to write
+ // using transaction instead. Using transaction the batch will be written
+ // into tables directly, skipping the journaling.
+ if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
+ tr, err := db.OpenTransaction()
+ if err != nil {
+ return err
+ }
+ if err := tr.Write(batch, wo); err != nil {
+ tr.Discard()
+ return err
}
+ return tr.Commit()
}
- // Set batch first seq number relative from last seq.
- b.seq = db.seq + 1
+ merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
+ sync := wo.GetSync() && !db.s.o.GetNoSync()
- // Write journal concurrently if it is large enough.
- if b.size() >= (128 << 10) {
- // Push the write batch to the journal writer
+ // Acquire write lock.
+ if merge {
select {
- case db.journalC <- b:
- // Write into memdb
- if berr := b.memReplay(mdb.DB); berr != nil {
- panic(berr)
+ case db.writeMergeC <- writeMerge{sync: sync, batch: batch}:
+ if <-db.writeMergedC {
+ // Write is merged.
+ return <-db.writeAckC
}
- case err = <-db.compPerErrC:
- return
- case _, _ = <-db.closeC:
- err = ErrClosed
- return
+ // Write is not merged, the write lock is handed to us. Continue.
+ case db.writeLockC <- struct{}{}:
+ // Write lock acquired.
+ case err := <-db.compPerErrC:
+ // Compaction error.
+ return err
+ case <-db.closeC:
+ // Closed
+ return ErrClosed
}
- // Wait for journal writer
+ } else {
select {
- case err = <-db.journalAckC:
- if err != nil {
- // Revert memdb if error detected
- if berr := b.revertMemReplay(mdb.DB); berr != nil {
- panic(berr)
- }
- return
+ case db.writeLockC <- struct{}{}:
+ // Write lock acquired.
+ case err := <-db.compPerErrC:
+ // Compaction error.
+ return err
+ case <-db.closeC:
+ // Closed
+ return ErrClosed
+ }
+ }
+
+ return db.writeLocked(batch, nil, merge, sync)
+}
+
+func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
+ if err := db.ok(); err != nil {
+ return err
+ }
+
+ merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
+ sync := wo.GetSync() && !db.s.o.GetNoSync()
+
+ // Acquire write lock.
+ if merge {
+ select {
+ case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
+ if <-db.writeMergedC {
+ // Write is merged.
+ return <-db.writeAckC
}
- case _, _ = <-db.closeC:
- err = ErrClosed
- return
+ // Write is not merged, the write lock is handed to us. Continue.
+ case db.writeLockC <- struct{}{}:
+ // Write lock acquired.
+ case err := <-db.compPerErrC:
+ // Compaction error.
+ return err
+ case <-db.closeC:
+ // Closed
+ return ErrClosed
}
} else {
- err = db.writeJournal(b)
- if err != nil {
- return
- }
- if berr := b.memReplay(mdb.DB); berr != nil {
- panic(berr)
+ select {
+ case db.writeLockC <- struct{}{}:
+ // Write lock acquired.
+ case err := <-db.compPerErrC:
+ // Compaction error.
+ return err
+ case <-db.closeC:
+ // Closed
+ return ErrClosed
}
}
- // Set last seq number.
- db.addSeq(uint64(b.Len()))
-
- if b.size() >= mdbFree {
- db.rotateMem(0, false)
- }
- return
+ batch := db.batchPool.Get().(*Batch)
+ batch.Reset()
+ batch.appendRec(kt, key, value)
+ return db.writeLocked(batch, batch, merge, sync)
}
// Put sets the value for the given key. It overwrites any previous value
-// for that key; a DB is not a multi-map.
+// for that key; a DB is not a multi-map. Write merge also applies for Put, see
+// Write.
//
-// It is safe to modify the contents of the arguments after Put returns.
+// It is safe to modify the contents of the arguments after Put returns but not
+// before.
func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
- b := new(Batch)
- b.Put(key, value)
- return db.Write(b, wo)
+ return db.putRec(keyTypeVal, key, value, wo)
}
-// Delete deletes the value for the given key.
+// Delete deletes the value for the given key. Delete will not returns error if
+// key doesn't exist. Write merge also applies for Delete, see Write.
//
-// It is safe to modify the contents of the arguments after Delete returns.
+// It is safe to modify the contents of the arguments after Delete returns but
+// not before.
func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
- b := new(Batch)
- b.Delete(key)
- return db.Write(b, wo)
+ return db.putRec(keyTypeDel, key, nil, wo)
}
func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
@@ -304,12 +386,15 @@ func (db *DB) CompactRange(r util.Range) error {
case db.writeLockC <- struct{}{}:
case err := <-db.compPerErrC:
return err
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return ErrClosed
}
// Check for overlaps in memdb.
mdb := db.getEffectiveMem()
+ if mdb == nil {
+ return ErrClosed
+ }
defer mdb.decref()
if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
// Memdb compaction.
@@ -341,7 +426,7 @@ func (db *DB) SetReadOnly() error {
db.compWriteLocking = true
case err := <-db.compPerErrC:
return err
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return ErrClosed
}
@@ -350,7 +435,7 @@ func (db *DB) SetReadOnly() error {
case db.compErrSetC <- ErrReadOnly:
case perr := <-db.compPerErrC:
return perr
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return ErrClosed
}