aboutsummaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
diff options
context:
space:
mode:
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go')
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go113
1 files changed, 80 insertions, 33 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
index e1cf30c53..5200be6fc 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
@@ -45,9 +45,9 @@ func (db *DB) jWriter() {
}
}
-func (db *DB) rotateMem(n int) (mem *memDB, err error) {
+func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
// Wait for pending memdb compaction.
- err = db.compSendIdle(db.mcompCmdC)
+ err = db.compTriggerWait(db.mcompCmdC)
if err != nil {
return
}
@@ -59,46 +59,50 @@ func (db *DB) rotateMem(n int) (mem *memDB, err error) {
}
// Schedule memdb compaction.
- db.compSendTrigger(db.mcompCmdC)
+ if wait {
+ err = db.compTriggerWait(db.mcompCmdC)
+ } else {
+ db.compTrigger(db.mcompCmdC)
+ }
return
}
-func (db *DB) flush(n int) (mem *memDB, nn int, err error) {
+func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
delayed := false
flush := func() (retry bool) {
v := db.s.version()
defer v.release()
- mem = db.getEffectiveMem()
+ mdb = db.getEffectiveMem()
defer func() {
if retry {
- mem.decref()
- mem = nil
+ mdb.decref()
+ mdb = nil
}
}()
- nn = mem.mdb.Free()
+ mdbFree = mdb.Free()
switch {
case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed:
delayed = true
time.Sleep(time.Millisecond)
- case nn >= n:
+ case mdbFree >= n:
return false
case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger():
delayed = true
- err = db.compSendIdle(db.tcompCmdC)
+ err = db.compTriggerWait(db.tcompCmdC)
if err != nil {
return false
}
default:
// Allow memdb to grow if it has no entry.
- if mem.mdb.Len() == 0 {
- nn = n
+ if mdb.Len() == 0 {
+ mdbFree = n
} else {
- mem.decref()
- mem, err = db.rotateMem(n)
+ mdb.decref()
+ mdb, err = db.rotateMem(n, false)
if err == nil {
- nn = mem.mdb.Free()
+ mdbFree = mdb.Free()
} else {
- nn = 0
+ mdbFree = 0
}
}
return false
@@ -129,7 +133,20 @@ func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
return
}
- b.init(wo.GetSync())
+ b.init(wo.GetSync() && !db.s.o.GetNoSync())
+
+ if b.size() > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
+ // Writes using transaction.
+ tr, err1 := db.OpenTransaction()
+ if err1 != nil {
+ return err1
+ }
+ if err1 := tr.Write(b, wo); err1 != nil {
+ tr.Discard()
+ return err1
+ }
+ return tr.Commit()
+ }
// The write happen synchronously.
select {
@@ -137,6 +154,8 @@ func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
if <-db.writeMergedC {
return <-db.writeAckC
}
+ // Continue, the write lock already acquired by previous writer
+ // and handed out to us.
case db.writeLockC <- struct{}{}:
case err = <-db.compPerErrC:
return
@@ -148,6 +167,7 @@ func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
danglingMerge := false
defer func() {
if danglingMerge {
+ // Only one dangling merge at most, so this is safe.
db.writeMergedC <- false
} else {
<-db.writeLockC
@@ -157,18 +177,18 @@ func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
}
}()
- mem, memFree, err := db.flush(b.size())
+ mdb, mdbFree, err := db.flush(b.size())
if err != nil {
return
}
- defer mem.decref()
+ defer mdb.decref()
// Calculate maximum size of the batch.
m := 1 << 20
if x := b.size(); x <= 128<<10 {
m = x + (128 << 10)
}
- m = minInt(m, memFree)
+ m = minInt(m, mdbFree)
// Merge with other batch.
drain:
@@ -197,7 +217,7 @@ drain:
select {
case db.journalC <- b:
// Write into memdb
- if berr := b.memReplay(mem.mdb); berr != nil {
+ if berr := b.memReplay(mdb.DB); berr != nil {
panic(berr)
}
case err = <-db.compPerErrC:
@@ -211,7 +231,7 @@ drain:
case err = <-db.journalAckC:
if err != nil {
// Revert memdb if error detected
- if berr := b.revertMemReplay(mem.mdb); berr != nil {
+ if berr := b.revertMemReplay(mdb.DB); berr != nil {
panic(berr)
}
return
@@ -225,7 +245,7 @@ drain:
if err != nil {
return
}
- if berr := b.memReplay(mem.mdb); berr != nil {
+ if berr := b.memReplay(mdb.DB); berr != nil {
panic(berr)
}
}
@@ -233,8 +253,8 @@ drain:
// Set last seq number.
db.addSeq(uint64(b.Len()))
- if b.size() >= memFree {
- db.rotateMem(0)
+ if b.size() >= mdbFree {
+ db.rotateMem(0, false)
}
return
}
@@ -249,8 +269,7 @@ func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
return db.Write(b, wo)
}
-// Delete deletes the value for the given key. It returns ErrNotFound if
-// the DB does not contain the key.
+// Delete deletes the value for the given key.
//
// It is safe to modify the contents of the arguments after Delete returns.
func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
@@ -290,16 +309,16 @@ func (db *DB) CompactRange(r util.Range) error {
}
// Check for overlaps in memdb.
- mem := db.getEffectiveMem()
- defer mem.decref()
- if isMemOverlaps(db.s.icmp, mem.mdb, r.Start, r.Limit) {
+ mdb := db.getEffectiveMem()
+ defer mdb.decref()
+ if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
// Memdb compaction.
- if _, err := db.rotateMem(0); err != nil {
+ if _, err := db.rotateMem(0, false); err != nil {
<-db.writeLockC
return err
}
<-db.writeLockC
- if err := db.compSendIdle(db.mcompCmdC); err != nil {
+ if err := db.compTriggerWait(db.mcompCmdC); err != nil {
return err
}
} else {
@@ -307,5 +326,33 @@ func (db *DB) CompactRange(r util.Range) error {
}
// Table compaction.
- return db.compSendRange(db.tcompCmdC, -1, r.Start, r.Limit)
+ return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit)
+}
+
+// SetReadOnly makes DB read-only. It will stay read-only until reopened.
+func (db *DB) SetReadOnly() error {
+ if err := db.ok(); err != nil {
+ return err
+ }
+
+ // Lock writer.
+ select {
+ case db.writeLockC <- struct{}{}:
+ db.compWriteLocking = true
+ case err := <-db.compPerErrC:
+ return err
+ case _, _ = <-db.closeC:
+ return ErrClosed
+ }
+
+ // Set compaction read-only.
+ select {
+ case db.compErrSetC <- ErrReadOnly:
+ case perr := <-db.compPerErrC:
+ return perr
+ case _, _ = <-db.closeC:
+ return ErrClosed
+ }
+
+ return nil
}