aboutsummaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_transaction.go
diff options
context:
space:
mode:
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_transaction.go')
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_transaction.go68
1 files changed, 52 insertions, 16 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_transaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_transaction.go
index fca88037b..b8f7e7d21 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_transaction.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_transaction.go
@@ -59,8 +59,8 @@ func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
}
// NewIterator returns an iterator for the latest snapshot of the transaction.
-// The returned iterator is not goroutine-safe, but it is safe to use multiple
-// iterators concurrently, with each in a dedicated goroutine.
+// The returned iterator is not safe for concurrent use, but it is safe to use
+// multiple iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently while writes to the
// transaction. The resultant key/value pairs are guaranteed to be consistent.
//
@@ -167,8 +167,8 @@ func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error {
if tr.closed {
return errTransactionDone
}
- return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
- return tr.put(kt, key, value)
+ return b.replayInternal(func(i int, kt keyType, k, v []byte) error {
+ return tr.put(kt, k, v)
})
}
@@ -179,7 +179,8 @@ func (tr *Transaction) setDone() {
<-tr.db.writeLockC
}
-// Commit commits the transaction.
+// Commit commits the transaction. If error is not nil, then the transaction is
+// not committed, it can then either be retried or discarded.
//
// Other methods should not be called after transaction has been committed.
func (tr *Transaction) Commit() error {
@@ -192,24 +193,27 @@ func (tr *Transaction) Commit() error {
if tr.closed {
return errTransactionDone
}
- defer tr.setDone()
if err := tr.flush(); err != nil {
- tr.discard()
+ // Return error, lets user decide either to retry or discard
+ // transaction.
return err
}
if len(tr.tables) != 0 {
// Committing transaction.
tr.rec.setSeqNum(tr.seq)
tr.db.compCommitLk.Lock()
- defer tr.db.compCommitLk.Unlock()
+ tr.stats.startTimer()
+ var cerr error
for retry := 0; retry < 3; retry++ {
- if err := tr.db.s.commit(&tr.rec); err != nil {
- tr.db.logf("transaction@commit error R·%d %q", retry, err)
+ cerr = tr.db.s.commit(&tr.rec)
+ if cerr != nil {
+ tr.db.logf("transaction@commit error R·%d %q", retry, cerr)
select {
case <-time.After(time.Second):
- case _, _ = <-tr.db.closeC:
+ case <-tr.db.closeC:
tr.db.logf("transaction@commit exiting")
- return err
+ tr.db.compCommitLk.Unlock()
+ return cerr
}
} else {
// Success. Set db.seq.
@@ -217,9 +221,26 @@ func (tr *Transaction) Commit() error {
break
}
}
+ tr.stats.stopTimer()
+ if cerr != nil {
+ // Return error, lets user decide either to retry or discard
+ // transaction.
+ return cerr
+ }
+
+ // Update compaction stats. This is safe as long as we hold compCommitLk.
+ tr.db.compStats.addStat(0, &tr.stats)
+
// Trigger table auto-compaction.
tr.db.compTrigger(tr.db.tcompCmdC)
+ tr.db.compCommitLk.Unlock()
+
+ // Additionally, wait compaction when certain threshold reached.
+ // Ignore error, returns error only if transaction can't be committed.
+ tr.db.waitCompaction()
}
+ // Only mark as done if transaction committed successfully.
+ tr.setDone()
return nil
}
@@ -245,10 +266,20 @@ func (tr *Transaction) Discard() {
tr.lk.Unlock()
}
+func (db *DB) waitCompaction() error {
+ if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() {
+ return db.compTriggerWait(db.tcompCmdC)
+ }
+ return nil
+}
+
// OpenTransaction opens an atomic DB transaction. Only one transaction can be
-// opened at a time. Write will be blocked until the transaction is committed or
-// discarded.
-// The returned transaction handle is goroutine-safe.
+// opened at a time. Subsequent call to Write and OpenTransaction will be blocked
+// until in-flight transaction is committed or discarded.
+// The returned transaction handle is safe for concurrent use.
+//
+// Transaction is expensive and can overwhelm compaction, especially if
+// transaction size is small. Use with caution.
//
// The transaction must be closed once done, either by committing or discarding
// the transaction.
@@ -263,7 +294,7 @@ func (db *DB) OpenTransaction() (*Transaction, error) {
case db.writeLockC <- struct{}{}:
case err := <-db.compPerErrC:
return nil, err
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return nil, ErrClosed
}
@@ -278,6 +309,11 @@ func (db *DB) OpenTransaction() (*Transaction, error) {
}
}
+ // Wait compaction when certain threshold reached.
+ if err := db.waitCompaction(); err != nil {
+ return nil, err
+ }
+
tr := &Transaction{
db: db,
seq: db.seq,