aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Godeps/Godeps.json30
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go345
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go37
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/comparer.go40
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go78
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go32
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go4
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go39
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_transaction.go68
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_util.go2
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go365
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors.go1
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors/errors.go6
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/iter.go15
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go43
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/key.go14
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go12
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go24
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go9
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go8
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go6
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go8
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/storage.go52
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go2
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go73
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/writer.go2
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util.go7
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go7
28 files changed, 823 insertions, 506 deletions
diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json
index 42a207200..0fc91e877 100644
--- a/Godeps/Godeps.json
+++ b/Godeps/Godeps.json
@@ -1,6 +1,6 @@
{
"ImportPath": "github.com/ethereum/go-ethereum",
- "GoVersion": "go1.5.2",
+ "GoVersion": "go1.7",
"GodepVersion": "v74",
"Packages": [
"./..."
@@ -111,6 +111,10 @@
"Rev": "51425a2415d21afadfd55cd93432c0bc69e9598d"
},
{
+ "ImportPath": "github.com/rcrowley/go-metrics/exp",
+ "Rev": "51425a2415d21afadfd55cd93432c0bc69e9598d"
+ },
+ {
"ImportPath": "github.com/rjeczalik/notify",
"Rev": "f627deca7a510d96f0ef9388f2d0e8b16d21f87f"
},
@@ -152,51 +156,51 @@
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb",
- "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
+ "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/cache",
- "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
+ "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/comparer",
- "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
+ "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/errors",
- "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
+ "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/filter",
- "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
+ "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/iterator",
- "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
+ "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/journal",
- "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
+ "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/memdb",
- "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
+ "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/opt",
- "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
+ "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/storage",
- "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
+ "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/table",
- "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
+ "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/util",
- "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
+ "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "golang.org/x/crypto/pbkdf2",
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go
index 501006717..225920002 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go
@@ -9,13 +9,15 @@ package leveldb
import (
"encoding/binary"
"fmt"
+ "io"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/memdb"
"github.com/syndtr/goleveldb/leveldb/storage"
)
-// ErrBatchCorrupted records reason of batch corruption.
+// ErrBatchCorrupted records reason of batch corruption. This error will be
+// wrapped with errors.ErrCorrupted.
type ErrBatchCorrupted struct {
Reason string
}
@@ -29,8 +31,9 @@ func newErrBatchCorrupted(reason string) error {
}
const (
- batchHdrLen = 8 + 4
- batchGrowRec = 3000
+ batchHeaderLen = 8 + 4
+ batchGrowRec = 3000
+ batchBufioSize = 16
)
// BatchReplay wraps basic batch operations.
@@ -39,34 +42,46 @@ type BatchReplay interface {
Delete(key []byte)
}
+type batchIndex struct {
+ keyType keyType
+ keyPos, keyLen int
+ valuePos, valueLen int
+}
+
+func (index batchIndex) k(data []byte) []byte {
+ return data[index.keyPos : index.keyPos+index.keyLen]
+}
+
+func (index batchIndex) v(data []byte) []byte {
+ if index.valueLen != 0 {
+ return data[index.valuePos : index.valuePos+index.valueLen]
+ }
+ return nil
+}
+
+func (index batchIndex) kv(data []byte) (key, value []byte) {
+ return index.k(data), index.v(data)
+}
+
// Batch is a write batch.
type Batch struct {
- data []byte
- rLen, bLen int
- seq uint64
- sync bool
+ data []byte
+ index []batchIndex
+
+ // internalLen is sums of key/value pair length plus 8-bytes internal key.
+ internalLen int
}
func (b *Batch) grow(n int) {
- off := len(b.data)
- if off == 0 {
- off = batchHdrLen
- if b.data != nil {
- b.data = b.data[:off]
- }
- }
- if cap(b.data)-off < n {
- if b.data == nil {
- b.data = make([]byte, off, off+n)
- } else {
- odata := b.data
- div := 1
- if b.rLen > batchGrowRec {
- div = b.rLen / batchGrowRec
- }
- b.data = make([]byte, off, off+n+(off-batchHdrLen)/div)
- copy(b.data, odata)
+ o := len(b.data)
+ if cap(b.data)-o < n {
+ div := 1
+ if len(b.index) > batchGrowRec {
+ div = len(b.index) / batchGrowRec
}
+ ndata := make([]byte, o, o+n+o/div)
+ copy(ndata, b.data)
+ b.data = ndata
}
}
@@ -76,32 +91,36 @@ func (b *Batch) appendRec(kt keyType, key, value []byte) {
n += binary.MaxVarintLen32 + len(value)
}
b.grow(n)
- off := len(b.data)
- data := b.data[:off+n]
- data[off] = byte(kt)
- off++
- off += binary.PutUvarint(data[off:], uint64(len(key)))
- copy(data[off:], key)
- off += len(key)
+ index := batchIndex{keyType: kt}
+ o := len(b.data)
+ data := b.data[:o+n]
+ data[o] = byte(kt)
+ o++
+ o += binary.PutUvarint(data[o:], uint64(len(key)))
+ index.keyPos = o
+ index.keyLen = len(key)
+ o += copy(data[o:], key)
if kt == keyTypeVal {
- off += binary.PutUvarint(data[off:], uint64(len(value)))
- copy(data[off:], value)
- off += len(value)
+ o += binary.PutUvarint(data[o:], uint64(len(value)))
+ index.valuePos = o
+ index.valueLen = len(value)
+ o += copy(data[o:], value)
}
- b.data = data[:off]
- b.rLen++
- // Include 8-byte ikey header
- b.bLen += len(key) + len(value) + 8
+ b.data = data[:o]
+ b.index = append(b.index, index)
+ b.internalLen += index.keyLen + index.valueLen + 8
}
// Put appends 'put operation' of the given key/value pair to the batch.
-// It is safe to modify the contents of the argument after Put returns.
+// It is safe to modify the contents of the argument after Put returns but not
+// before.
func (b *Batch) Put(key, value []byte) {
b.appendRec(keyTypeVal, key, value)
}
// Delete appends 'delete operation' of the given key to the batch.
-// It is safe to modify the contents of the argument after Delete returns.
+// It is safe to modify the contents of the argument after Delete returns but
+// not before.
func (b *Batch) Delete(key []byte) {
b.appendRec(keyTypeDel, key, nil)
}
@@ -111,7 +130,7 @@ func (b *Batch) Delete(key []byte) {
// The returned slice is not its own copy, so the contents should not be
// modified.
func (b *Batch) Dump() []byte {
- return b.encode()
+ return b.data
}
// Load loads given slice into the batch. Previous contents of the batch
@@ -119,144 +138,212 @@ func (b *Batch) Dump() []byte {
// The given slice will not be copied and will be used as batch buffer, so
// it is not safe to modify the contents of the slice.
func (b *Batch) Load(data []byte) error {
- return b.decode(0, data)
+ return b.decode(data, -1)
}
// Replay replays batch contents.
func (b *Batch) Replay(r BatchReplay) error {
- return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
- switch kt {
+ for _, index := range b.index {
+ switch index.keyType {
case keyTypeVal:
- r.Put(key, value)
+ r.Put(index.k(b.data), index.v(b.data))
case keyTypeDel:
- r.Delete(key)
+ r.Delete(index.k(b.data))
}
- return nil
- })
+ }
+ return nil
}
// Len returns number of records in the batch.
func (b *Batch) Len() int {
- return b.rLen
+ return len(b.index)
}
// Reset resets the batch.
func (b *Batch) Reset() {
b.data = b.data[:0]
- b.seq = 0
- b.rLen = 0
- b.bLen = 0
- b.sync = false
+ b.index = b.index[:0]
+ b.internalLen = 0
}
-func (b *Batch) init(sync bool) {
- b.sync = sync
+func (b *Batch) replayInternal(fn func(i int, kt keyType, k, v []byte) error) error {
+ for i, index := range b.index {
+ if err := fn(i, index.keyType, index.k(b.data), index.v(b.data)); err != nil {
+ return err
+ }
+ }
+ return nil
}
func (b *Batch) append(p *Batch) {
- if p.rLen > 0 {
- b.grow(len(p.data) - batchHdrLen)
- b.data = append(b.data, p.data[batchHdrLen:]...)
- b.rLen += p.rLen
- b.bLen += p.bLen
- }
- if p.sync {
- b.sync = true
+ ob := len(b.data)
+ oi := len(b.index)
+ b.data = append(b.data, p.data...)
+ b.index = append(b.index, p.index...)
+ b.internalLen += p.internalLen
+
+ // Updating index offset.
+ if ob != 0 {
+ for ; oi < len(b.index); oi++ {
+ index := &b.index[oi]
+ index.keyPos += ob
+ if index.valueLen != 0 {
+ index.valuePos += ob
+ }
+ }
}
}
-// size returns sums of key/value pair length plus 8-bytes ikey.
-func (b *Batch) size() int {
- return b.bLen
-}
-
-func (b *Batch) encode() []byte {
- b.grow(0)
- binary.LittleEndian.PutUint64(b.data, b.seq)
- binary.LittleEndian.PutUint32(b.data[8:], uint32(b.rLen))
-
- return b.data
+func (b *Batch) decode(data []byte, expectedLen int) error {
+ b.data = data
+ b.index = b.index[:0]
+ b.internalLen = 0
+ err := decodeBatch(data, func(i int, index batchIndex) error {
+ b.index = append(b.index, index)
+ b.internalLen += index.keyLen + index.valueLen + 8
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+ if expectedLen >= 0 && len(b.index) != expectedLen {
+ return newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", expectedLen, len(b.index)))
+ }
+ return nil
}
-func (b *Batch) decode(prevSeq uint64, data []byte) error {
- if len(data) < batchHdrLen {
- return newErrBatchCorrupted("too short")
+func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error {
+ var ik []byte
+ for i, index := range b.index {
+ ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
+ if err := mdb.Put(ik, index.v(b.data)); err != nil {
+ return err
+ }
}
+ return nil
+}
- b.seq = binary.LittleEndian.Uint64(data)
- if b.seq < prevSeq {
- return newErrBatchCorrupted("invalid sequence number")
- }
- b.rLen = int(binary.LittleEndian.Uint32(data[8:]))
- if b.rLen < 0 {
- return newErrBatchCorrupted("invalid records length")
+func (b *Batch) revertMem(seq uint64, mdb *memdb.DB) error {
+ var ik []byte
+ for i, index := range b.index {
+ ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
+ if err := mdb.Delete(ik); err != nil {
+ return err
+ }
}
- // No need to be precise at this point, it won't be used anyway
- b.bLen = len(data) - batchHdrLen
- b.data = data
-
return nil
}
-func (b *Batch) decodeRec(f func(i int, kt keyType, key, value []byte) error) error {
- off := batchHdrLen
- for i := 0; i < b.rLen; i++ {
- if off >= len(b.data) {
- return newErrBatchCorrupted("invalid records length")
- }
+func newBatch() interface{} {
+ return &Batch{}
+}
- kt := keyType(b.data[off])
- if kt > keyTypeVal {
- panic(kt)
- return newErrBatchCorrupted("bad record: invalid type")
+func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error {
+ var index batchIndex
+ for i, o := 0, 0; o < len(data); i++ {
+ // Key type.
+ index.keyType = keyType(data[o])
+ if index.keyType > keyTypeVal {
+ return newErrBatchCorrupted(fmt.Sprintf("bad record: invalid type %#x", uint(index.keyType)))
}
- off++
+ o++
- x, n := binary.Uvarint(b.data[off:])
- off += n
- if n <= 0 || off+int(x) > len(b.data) {
+ // Key.
+ x, n := binary.Uvarint(data[o:])
+ o += n
+ if n <= 0 || o+int(x) > len(data) {
return newErrBatchCorrupted("bad record: invalid key length")
}
- key := b.data[off : off+int(x)]
- off += int(x)
- var value []byte
- if kt == keyTypeVal {
- x, n := binary.Uvarint(b.data[off:])
- off += n
- if n <= 0 || off+int(x) > len(b.data) {
+ index.keyPos = o
+ index.keyLen = int(x)
+ o += index.keyLen
+
+ // Value.
+ if index.keyType == keyTypeVal {
+ x, n = binary.Uvarint(data[o:])
+ o += n
+ if n <= 0 || o+int(x) > len(data) {
return newErrBatchCorrupted("bad record: invalid value length")
}
- value = b.data[off : off+int(x)]
- off += int(x)
+ index.valuePos = o
+ index.valueLen = int(x)
+ o += index.valueLen
+ } else {
+ index.valuePos = 0
+ index.valueLen = 0
}
- if err := f(i, kt, key, value); err != nil {
+ if err := fn(i, index); err != nil {
return err
}
}
-
return nil
}
-func (b *Batch) memReplay(to *memdb.DB) error {
- var ikScratch []byte
- return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
- ikScratch = makeInternalKey(ikScratch, key, b.seq+uint64(i), kt)
- return to.Put(ikScratch, value)
+func decodeBatchToMem(data []byte, expectSeq uint64, mdb *memdb.DB) (seq uint64, batchLen int, err error) {
+ seq, batchLen, err = decodeBatchHeader(data)
+ if err != nil {
+ return 0, 0, err
+ }
+ if seq < expectSeq {
+ return 0, 0, newErrBatchCorrupted("invalid sequence number")
+ }
+ data = data[batchHeaderLen:]
+ var ik []byte
+ var decodedLen int
+ err = decodeBatch(data, func(i int, index batchIndex) error {
+ if i >= batchLen {
+ return newErrBatchCorrupted("invalid records length")
+ }
+ ik = makeInternalKey(ik, index.k(data), seq+uint64(i), index.keyType)
+ if err := mdb.Put(ik, index.v(data)); err != nil {
+ return err
+ }
+ decodedLen++
+ return nil
})
+ if err == nil && decodedLen != batchLen {
+ err = newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", batchLen, decodedLen))
+ }
+ return
}
-func (b *Batch) memDecodeAndReplay(prevSeq uint64, data []byte, to *memdb.DB) error {
- if err := b.decode(prevSeq, data); err != nil {
- return err
+func encodeBatchHeader(dst []byte, seq uint64, batchLen int) []byte {
+ dst = ensureBuffer(dst, batchHeaderLen)
+ binary.LittleEndian.PutUint64(dst, seq)
+ binary.LittleEndian.PutUint32(dst[8:], uint32(batchLen))
+ return dst
+}
+
+func decodeBatchHeader(data []byte) (seq uint64, batchLen int, err error) {
+ if len(data) < batchHeaderLen {
+ return 0, 0, newErrBatchCorrupted("too short")
+ }
+
+ seq = binary.LittleEndian.Uint64(data)
+ batchLen = int(binary.LittleEndian.Uint32(data[8:]))
+ if batchLen < 0 {
+ return 0, 0, newErrBatchCorrupted("invalid records length")
}
- return b.memReplay(to)
+ return
}
-func (b *Batch) revertMemReplay(to *memdb.DB) error {
- var ikScratch []byte
- return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
- ikScratch := makeInternalKey(ikScratch, key, b.seq+uint64(i), kt)
- return to.Delete(ikScratch)
- })
+func batchesLen(batches []*Batch) int {
+ batchLen := 0
+ for _, batch := range batches {
+ batchLen += batch.Len()
+ }
+ return batchLen
+}
+
+func writeBatchesWithHeader(wr io.Writer, batches []*Batch, seq uint64) error {
+ if _, err := wr.Write(encodeBatchHeader(nil, seq, batchesLen(batches))); err != nil {
+ return err
+ }
+ for _, batch := range batches {
+ if _, err := wr.Write(batch.data); err != nil {
+ return err
+ }
+ }
+ return nil
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go
index a287d0e5e..c5940b232 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go
@@ -16,7 +16,7 @@ import (
)
// Cacher provides interface to implements a caching functionality.
-// An implementation must be goroutine-safe.
+// An implementation must be safe for concurrent use.
type Cacher interface {
// Capacity returns cache capacity.
Capacity() int
@@ -511,18 +511,12 @@ func (r *Cache) EvictAll() {
}
}
-// Close closes the 'cache map' and releases all 'cache node'.
+// Close closes the 'cache map' and forcefully releases all 'cache node'.
func (r *Cache) Close() error {
r.mu.Lock()
if !r.closed {
r.closed = true
- if r.cacher != nil {
- if err := r.cacher.Close(); err != nil {
- return err
- }
- }
-
h := (*mNode)(r.mHead)
h.initBuckets()
@@ -541,10 +535,37 @@ func (r *Cache) Close() error {
for _, f := range n.onDel {
f()
}
+ n.onDel = nil
}
}
}
r.mu.Unlock()
+
+ // Avoid deadlock.
+ if r.cacher != nil {
+ if err := r.cacher.Close(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but
+// unlike Close it doesn't forcefully releases 'cache node'.
+func (r *Cache) CloseWeak() error {
+ r.mu.Lock()
+ if !r.closed {
+ r.closed = true
+ }
+ r.mu.Unlock()
+
+ // Avoid deadlock.
+ if r.cacher != nil {
+ r.cacher.EvictAll()
+ if err := r.cacher.Close(); err != nil {
+ return err
+ }
+ }
return nil
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/comparer.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/comparer.go
index 248bf7c21..448402b82 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/comparer.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/comparer.go
@@ -6,7 +6,9 @@
package leveldb
-import "github.com/syndtr/goleveldb/leveldb/comparer"
+import (
+ "github.com/syndtr/goleveldb/leveldb/comparer"
+)
type iComparer struct {
ucmp comparer.Comparer
@@ -33,12 +35,12 @@ func (icmp *iComparer) Name() string {
}
func (icmp *iComparer) Compare(a, b []byte) int {
- x := icmp.ucmp.Compare(internalKey(a).ukey(), internalKey(b).ukey())
+ x := icmp.uCompare(internalKey(a).ukey(), internalKey(b).ukey())
if x == 0 {
if m, n := internalKey(a).num(), internalKey(b).num(); m > n {
- x = -1
+ return -1
} else if m < n {
- x = 1
+ return 1
}
}
return x
@@ -46,30 +48,20 @@ func (icmp *iComparer) Compare(a, b []byte) int {
func (icmp *iComparer) Separator(dst, a, b []byte) []byte {
ua, ub := internalKey(a).ukey(), internalKey(b).ukey()
- dst = icmp.ucmp.Separator(dst, ua, ub)
- if dst == nil {
- return nil
+ dst = icmp.uSeparator(dst, ua, ub)
+ if dst != nil && len(dst) < len(ua) && icmp.uCompare(ua, dst) < 0 {
+ // Append earliest possible number.
+ return append(dst, keyMaxNumBytes...)
}
- if len(dst) < len(ua) && icmp.uCompare(ua, dst) < 0 {
- dst = append(dst, keyMaxNumBytes...)
- } else {
- // Did not close possibilities that n maybe longer than len(ub).
- dst = append(dst, a[len(a)-8:]...)
- }
- return dst
+ return nil
}
func (icmp *iComparer) Successor(dst, b []byte) []byte {
ub := internalKey(b).ukey()
- dst = icmp.ucmp.Successor(dst, ub)
- if dst == nil {
- return nil
- }
- if len(dst) < len(ub) && icmp.uCompare(ub, dst) < 0 {
- dst = append(dst, keyMaxNumBytes...)
- } else {
- // Did not close possibilities that n maybe longer than len(ub).
- dst = append(dst, b[len(b)-8:]...)
+ dst = icmp.uSuccessor(dst, ub)
+ if dst != nil && len(dst) < len(ub) && icmp.uCompare(ub, dst) < 0 {
+ // Append earliest possible number.
+ return append(dst, keyMaxNumBytes...)
}
- return dst
+ return nil
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go
index eb6abd0fb..a02cb2c50 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go
@@ -53,14 +53,13 @@ type DB struct {
aliveSnaps, aliveIters int32
// Write.
- writeC chan *Batch
+ batchPool sync.Pool
+ writeMergeC chan writeMerge
writeMergedC chan bool
writeLockC chan struct{}
writeAckC chan error
writeDelay time.Duration
writeDelayN int
- journalC chan *Batch
- journalAckC chan error
tr *Transaction
// Compaction.
@@ -94,12 +93,11 @@ func openDB(s *session) (*DB, error) {
// Snapshot
snapsList: list.New(),
// Write
- writeC: make(chan *Batch),
+ batchPool: sync.Pool{New: newBatch},
+ writeMergeC: make(chan writeMerge),
writeMergedC: make(chan bool),
writeLockC: make(chan struct{}, 1),
writeAckC: make(chan error),
- journalC: make(chan *Batch),
- journalAckC: make(chan error),
// Compaction
tcompCmdC: make(chan cCmd),
tcompPauseC: make(chan chan<- struct{}),
@@ -144,10 +142,10 @@ func openDB(s *session) (*DB, error) {
if readOnly {
db.SetReadOnly()
} else {
- db.closeW.Add(3)
+ db.closeW.Add(2)
go db.tCompaction()
go db.mCompaction()
- go db.jWriter()
+ // go db.jWriter()
}
s.logf("db@open done T·%v", time.Since(start))
@@ -162,10 +160,10 @@ func openDB(s *session) (*DB, error) {
// os.ErrExist error.
//
// Open will return an error with type of ErrCorrupted if corruption
-// detected in the DB. Corrupted DB can be recovered with Recover
-// function.
+// detected in the DB. Use errors.IsCorrupted to test whether an error is
+// due to corruption. Corrupted DB can be recovered with Recover function.
//
-// The returned DB instance is goroutine-safe.
+// The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method.
func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
s, err := newSession(stor, o)
@@ -202,13 +200,13 @@ func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
// os.ErrExist error.
//
// OpenFile uses standard file-system backed storage implementation as
-// desribed in the leveldb/storage package.
+// described in the leveldb/storage package.
//
// OpenFile will return an error with type of ErrCorrupted if corruption
-// detected in the DB. Corrupted DB can be recovered with Recover
-// function.
+// detected in the DB. Use errors.IsCorrupted to test whether an error is
+// due to corruption. Corrupted DB can be recovered with Recover function.
//
-// The returned DB instance is goroutine-safe.
+// The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method.
func OpenFile(path string, o *opt.Options) (db *DB, err error) {
stor, err := storage.OpenFile(path, o.GetReadOnly())
@@ -229,7 +227,7 @@ func OpenFile(path string, o *opt.Options) (db *DB, err error) {
// The DB must already exist or it will returns an error.
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
//
-// The returned DB instance is goroutine-safe.
+// The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method.
func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
s, err := newSession(stor, o)
@@ -255,10 +253,10 @@ func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
// The DB must already exist or it will returns an error.
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
//
-// RecoverFile uses standard file-system backed storage implementation as desribed
+// RecoverFile uses standard file-system backed storage implementation as described
// in the leveldb/storage package.
//
-// The returned DB instance is goroutine-safe.
+// The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method.
func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
stor, err := storage.OpenFile(path, false)
@@ -504,10 +502,11 @@ func (db *DB) recoverJournal() error {
checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
writeBuffer = db.s.o.GetWriteBuffer()
- jr *journal.Reader
- mdb = memdb.New(db.s.icmp, writeBuffer)
- buf = &util.Buffer{}
- batch = &Batch{}
+ jr *journal.Reader
+ mdb = memdb.New(db.s.icmp, writeBuffer)
+ buf = &util.Buffer{}
+ batchSeq uint64
+ batchLen int
)
for _, fd := range fds {
@@ -526,7 +525,7 @@ func (db *DB) recoverJournal() error {
}
// Flush memdb and remove obsolete journal file.
- if !ofd.Nil() {
+ if !ofd.Zero() {
if mdb.Len() > 0 {
if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
fr.Close()
@@ -569,7 +568,8 @@ func (db *DB) recoverJournal() error {
fr.Close()
return errors.SetFd(err, fd)
}
- if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
+ batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
+ if err != nil {
if !strict && errors.IsCorrupted(err) {
db.s.logf("journal error: %v (skipped)", err)
// We won't apply sequence number as it might be corrupted.
@@ -581,7 +581,7 @@ func (db *DB) recoverJournal() error {
}
// Save sequence number.
- db.seq = batch.seq + uint64(batch.Len())
+ db.seq = batchSeq + uint64(batchLen)
// Flush it if large enough.
if mdb.Size() >= writeBuffer {
@@ -624,7 +624,7 @@ func (db *DB) recoverJournal() error {
}
// Remove the last obsolete journal file.
- if !ofd.Nil() {
+ if !ofd.Zero() {
db.s.stor.Remove(ofd)
}
@@ -661,9 +661,10 @@ func (db *DB) recoverJournalRO() error {
db.logf("journal@recovery RO·Mode F·%d", len(fds))
var (
- jr *journal.Reader
- buf = &util.Buffer{}
- batch = &Batch{}
+ jr *journal.Reader
+ buf = &util.Buffer{}
+ batchSeq uint64
+ batchLen int
)
for _, fd := range fds {
@@ -703,7 +704,8 @@ func (db *DB) recoverJournalRO() error {
fr.Close()
return errors.SetFd(err, fd)
}
- if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
+ batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
+ if err != nil {
if !strict && errors.IsCorrupted(err) {
db.s.logf("journal error: %v (skipped)", err)
// We won't apply sequence number as it might be corrupted.
@@ -715,7 +717,7 @@ func (db *DB) recoverJournalRO() error {
}
// Save sequence number.
- db.seq = batch.seq + uint64(batch.Len())
+ db.seq = batchSeq + uint64(batchLen)
}
fr.Close()
@@ -856,7 +858,7 @@ func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
// NewIterator returns an iterator for the latest snapshot of the
// underlying DB.
-// The returned iterator is not goroutine-safe, but it is safe to use
+// The returned iterator is not safe for concurrent use, but it is safe to use
// multiple iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently with modifying its
// underlying DB. The resultant key/value pairs are guaranteed to be
@@ -1062,6 +1064,8 @@ func (db *DB) Close() error {
if db.journal != nil {
db.journal.Close()
db.journalWriter.Close()
+ db.journal = nil
+ db.journalWriter = nil
}
if db.writeDelayN > 0 {
@@ -1077,15 +1081,11 @@ func (db *DB) Close() error {
if err1 := db.closer.Close(); err == nil {
err = err1
}
+ db.closer = nil
}
- // NIL'ing pointers.
- db.s = nil
- db.mem = nil
- db.frozenMem = nil
- db.journal = nil
- db.journalWriter = nil
- db.closer = nil
+ // Clear memdbs.
+ db.clearMems()
return err
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
index 659f00dc6..2d0ad0753 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
@@ -96,7 +96,7 @@ noerr:
default:
goto haserr
}
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return
}
}
@@ -113,7 +113,7 @@ haserr:
goto hasperr
default:
}
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return
}
}
@@ -126,7 +126,7 @@ hasperr:
case db.writeLockC <- struct{}{}:
// Hold write lock, so that write won't pass-through.
db.compWriteLocking = true
- case _, _ = <-db.closeC:
+ case <-db.closeC:
if db.compWriteLocking {
// We should release the lock or Close will hang.
<-db.writeLockC
@@ -195,7 +195,7 @@ func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
db.logf("%s exiting (persistent error %q)", name, perr)
db.compactionExitTransact()
}
- case _, _ = <-db.closeC:
+ case <-db.closeC:
db.logf("%s exiting", name)
db.compactionExitTransact()
}
@@ -224,7 +224,7 @@ func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
}
select {
case <-backoffT.C:
- case _, _ = <-db.closeC:
+ case <-db.closeC:
db.logf("%s exiting", name)
db.compactionExitTransact()
}
@@ -288,7 +288,7 @@ func (db *DB) memCompaction() {
case <-db.compPerErrC:
close(resumeC)
resumeC = nil
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return
}
@@ -337,7 +337,7 @@ func (db *DB) memCompaction() {
select {
case <-resumeC:
close(resumeC)
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return
}
}
@@ -378,7 +378,7 @@ func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
select {
case ch := <-b.db.tcompPauseC:
b.db.pauseCompaction(ch)
- case _, _ = <-b.db.closeC:
+ case <-b.db.closeC:
b.db.compactionExitTransact()
default:
}
@@ -643,7 +643,7 @@ func (db *DB) tableNeedCompaction() bool {
func (db *DB) pauseCompaction(ch chan<- struct{}) {
select {
case ch <- struct{}{}:
- case _, _ = <-db.closeC:
+ case <-db.closeC:
db.compactionExitTransact()
}
}
@@ -697,14 +697,14 @@ func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
case compC <- cAuto{ch}:
case err = <-db.compErrC:
return
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return ErrClosed
}
// Wait cmd.
select {
case err = <-ch:
case err = <-db.compErrC:
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return ErrClosed
}
return err
@@ -719,14 +719,14 @@ func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (e
case compC <- cRange{level, min, max, ch}:
case err := <-db.compErrC:
return err
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return ErrClosed
}
// Wait cmd.
select {
case err = <-ch:
case err = <-db.compErrC:
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return ErrClosed
}
return err
@@ -758,7 +758,7 @@ func (db *DB) mCompaction() {
default:
panic("leveldb: unknown command")
}
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return
}
}
@@ -791,7 +791,7 @@ func (db *DB) tCompaction() {
case ch := <-db.tcompPauseC:
db.pauseCompaction(ch)
continue
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return
default:
}
@@ -806,7 +806,7 @@ func (db *DB) tCompaction() {
case ch := <-db.tcompPauseC:
db.pauseCompaction(ch)
continue
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return
}
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go
index 977f65ba5..2c69d2e53 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go
@@ -59,7 +59,7 @@ func (db *DB) releaseSnapshot(se *snapshotElement) {
}
}
-// Gets minimum sequence that not being snapshoted.
+// Gets minimum sequence that not being snapshotted.
func (db *DB) minSeq() uint64 {
db.snapsMu.Lock()
defer db.snapsMu.Unlock()
@@ -131,7 +131,7 @@ func (snap *Snapshot) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error)
}
// NewIterator returns an iterator for the snapshot of the underlying DB.
-// The returned iterator is not goroutine-safe, but it is safe to use
+// The returned iterator is not safe for concurrent use, but it is safe to use
// multiple iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently with modifying its
// underlying DB. The resultant key/value pairs are guaranteed to be
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go
index 40f454da1..85b02d24b 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go
@@ -67,12 +67,11 @@ func (db *DB) sampleSeek(ikey internalKey) {
}
func (db *DB) mpoolPut(mem *memdb.DB) {
- defer func() {
- recover()
- }()
- select {
- case db.memPool <- mem:
- default:
+ if !db.isClosed() {
+ select {
+ case db.memPool <- mem:
+ default:
+ }
}
}
@@ -100,7 +99,13 @@ func (db *DB) mpoolDrain() {
case <-db.memPool:
default:
}
- case _, _ = <-db.closeC:
+ case <-db.closeC:
+ ticker.Stop()
+ // Make sure the pool is drained.
+ select {
+ case <-db.memPool:
+ case <-time.After(time.Second):
+ }
close(db.memPool)
return
}
@@ -148,24 +153,26 @@ func (db *DB) newMem(n int) (mem *memDB, err error) {
func (db *DB) getMems() (e, f *memDB) {
db.memMu.RLock()
defer db.memMu.RUnlock()
- if db.mem == nil {
+ if db.mem != nil {
+ db.mem.incref()
+ } else if !db.isClosed() {
panic("nil effective mem")
}
- db.mem.incref()
if db.frozenMem != nil {
db.frozenMem.incref()
}
return db.mem, db.frozenMem
}
-// Get frozen memdb.
+// Get effective memdb.
func (db *DB) getEffectiveMem() *memDB {
db.memMu.RLock()
defer db.memMu.RUnlock()
- if db.mem == nil {
+ if db.mem != nil {
+ db.mem.incref()
+ } else if !db.isClosed() {
panic("nil effective mem")
}
- db.mem.incref()
return db.mem
}
@@ -200,6 +207,14 @@ func (db *DB) dropFrozenMem() {
db.memMu.Unlock()
}
+// Clear mems ptr; used by DB.Close().
+func (db *DB) clearMems() {
+ db.memMu.Lock()
+ db.mem = nil
+ db.frozenMem = nil
+ db.memMu.Unlock()
+}
+
// Set closed flag; return true if not already closed.
func (db *DB) setClosed() bool {
return atomic.CompareAndSwapUint32(&db.closed, 0, 1)
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_transaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_transaction.go
index fca88037b..b8f7e7d21 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_transaction.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_transaction.go
@@ -59,8 +59,8 @@ func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
}
// NewIterator returns an iterator for the latest snapshot of the transaction.
-// The returned iterator is not goroutine-safe, but it is safe to use multiple
-// iterators concurrently, with each in a dedicated goroutine.
+// The returned iterator is not safe for concurrent use, but it is safe to use
+// multiple iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently while writes to the
// transaction. The resultant key/value pairs are guaranteed to be consistent.
//
@@ -167,8 +167,8 @@ func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error {
if tr.closed {
return errTransactionDone
}
- return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
- return tr.put(kt, key, value)
+ return b.replayInternal(func(i int, kt keyType, k, v []byte) error {
+ return tr.put(kt, k, v)
})
}
@@ -179,7 +179,8 @@ func (tr *Transaction) setDone() {
<-tr.db.writeLockC
}
-// Commit commits the transaction.
+// Commit commits the transaction. If error is not nil, then the transaction is
+// not committed, it can then either be retried or discarded.
//
// Other methods should not be called after transaction has been committed.
func (tr *Transaction) Commit() error {
@@ -192,24 +193,27 @@ func (tr *Transaction) Commit() error {
if tr.closed {
return errTransactionDone
}
- defer tr.setDone()
if err := tr.flush(); err != nil {
- tr.discard()
+ // Return error, lets user decide either to retry or discard
+ // transaction.
return err
}
if len(tr.tables) != 0 {
// Committing transaction.
tr.rec.setSeqNum(tr.seq)
tr.db.compCommitLk.Lock()
- defer tr.db.compCommitLk.Unlock()
+ tr.stats.startTimer()
+ var cerr error
for retry := 0; retry < 3; retry++ {
- if err := tr.db.s.commit(&tr.rec); err != nil {
- tr.db.logf("transaction@commit error R·%d %q", retry, err)
+ cerr = tr.db.s.commit(&tr.rec)
+ if cerr != nil {
+ tr.db.logf("transaction@commit error R·%d %q", retry, cerr)
select {
case <-time.After(time.Second):
- case _, _ = <-tr.db.closeC:
+ case <-tr.db.closeC:
tr.db.logf("transaction@commit exiting")
- return err
+ tr.db.compCommitLk.Unlock()
+ return cerr
}
} else {
// Success. Set db.seq.
@@ -217,9 +221,26 @@ func (tr *Transaction) Commit() error {
break
}
}
+ tr.stats.stopTimer()
+ if cerr != nil {
+ // Return error, lets user decide either to retry or discard
+ // transaction.
+ return cerr
+ }
+
+ // Update compaction stats. This is safe as long as we hold compCommitLk.
+ tr.db.compStats.addStat(0, &tr.stats)
+
// Trigger table auto-compaction.
tr.db.compTrigger(tr.db.tcompCmdC)
+ tr.db.compCommitLk.Unlock()
+
+ // Additionally, wait compaction when certain threshold reached.
+ // Ignore error, returns error only if transaction can't be committed.
+ tr.db.waitCompaction()
}
+ // Only mark as done if transaction committed successfully.
+ tr.setDone()
return nil
}
@@ -245,10 +266,20 @@ func (tr *Transaction) Discard() {
tr.lk.Unlock()
}
+func (db *DB) waitCompaction() error {
+ if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() {
+ return db.compTriggerWait(db.tcompCmdC)
+ }
+ return nil
+}
+
// OpenTransaction opens an atomic DB transaction. Only one transaction can be
-// opened at a time. Write will be blocked until the transaction is committed or
-// discarded.
-// The returned transaction handle is goroutine-safe.
+// opened at a time. Subsequent call to Write and OpenTransaction will be blocked
+// until in-flight transaction is committed or discarded.
+// The returned transaction handle is safe for concurrent use.
+//
+// Transaction is expensive and can overwhelm compaction, especially if
+// transaction size is small. Use with caution.
//
// The transaction must be closed once done, either by committing or discarding
// the transaction.
@@ -263,7 +294,7 @@ func (db *DB) OpenTransaction() (*Transaction, error) {
case db.writeLockC <- struct{}{}:
case err := <-db.compPerErrC:
return nil, err
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return nil, ErrClosed
}
@@ -278,6 +309,11 @@ func (db *DB) OpenTransaction() (*Transaction, error) {
}
}
+ // Wait compaction when certain threshold reached.
+ if err := db.waitCompaction(); err != nil {
+ return nil, err
+ }
+
tr := &Transaction{
db: db,
seq: db.seq,
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_util.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_util.go
index 7fd386ca4..7ecd960d2 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_util.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_util.go
@@ -62,7 +62,7 @@ func (db *DB) checkAndCleanFiles() error {
case storage.TypeManifest:
keep = fd.Num >= db.s.manifestFd.Num
case storage.TypeJournal:
- if !db.frozenJournalFd.Nil() {
+ if !db.frozenJournalFd.Zero() {
keep = fd.Num >= db.frozenJournalFd.Num
} else {
keep = fd.Num >= db.journalFd.Num
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
index 5576761fe..cc428b695 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
@@ -14,37 +14,23 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
)
-func (db *DB) writeJournal(b *Batch) error {
- w, err := db.journal.Next()
+func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
+ wr, err := db.journal.Next()
if err != nil {
return err
}
- if _, err := w.Write(b.encode()); err != nil {
+ if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
return err
}
if err := db.journal.Flush(); err != nil {
return err
}
- if b.sync {
+ if sync {
return db.journalWriter.Sync()
}
return nil
}
-func (db *DB) jWriter() {
- defer db.closeW.Done()
- for {
- select {
- case b := <-db.journalC:
- if b != nil {
- db.journalAckC <- db.writeJournal(b)
- }
- case _, _ = <-db.closeC:
- return
- }
- }
-}
-
func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
// Wait for pending memdb compaction.
err = db.compTriggerWait(db.mcompCmdC)
@@ -69,24 +55,29 @@ func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
delayed := false
+ slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
+ pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
flush := func() (retry bool) {
- v := db.s.version()
- defer v.release()
mdb = db.getEffectiveMem()
+ if mdb == nil {
+ err = ErrClosed
+ return false
+ }
defer func() {
if retry {
mdb.decref()
mdb = nil
}
}()
+ tLen := db.s.tLen(0)
mdbFree = mdb.Free()
switch {
- case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed:
+ case tLen >= slowdownTrigger && !delayed:
delayed = true
time.Sleep(time.Millisecond)
case mdbFree >= n:
return false
- case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger():
+ case tLen >= pauseTrigger:
delayed = true
err = db.compTriggerWait(db.tcompCmdC)
if err != nil {
@@ -123,159 +114,250 @@ func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
return
}
-// Write apply the given batch to the DB. The batch will be applied
-// sequentially.
-//
-// It is safe to modify the contents of the arguments after Write returns.
-func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
- err = db.ok()
- if err != nil || b == nil || b.Len() == 0 {
- return
+type writeMerge struct {
+ sync bool
+ batch *Batch
+ keyType keyType
+ key, value []byte
+}
+
+func (db *DB) unlockWrite(overflow bool, merged int, err error) {
+ for i := 0; i < merged; i++ {
+ db.writeAckC <- err
+ }
+ if overflow {
+ // Pass lock to the next write (that failed to merge).
+ db.writeMergedC <- false
+ } else {
+ // Release lock.
+ <-db.writeLockC
}
+}
- b.init(wo.GetSync() && !db.s.o.GetNoSync())
+// ourBatch if defined should equal with batch.
+func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
+ // Try to flush memdb. This method would also trying to throttle writes
+ // if it is too fast and compaction cannot catch-up.
+ mdb, mdbFree, err := db.flush(batch.internalLen)
+ if err != nil {
+ db.unlockWrite(false, 0, err)
+ return err
+ }
+ defer mdb.decref()
- if b.size() > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
- // Writes using transaction.
- tr, err1 := db.OpenTransaction()
- if err1 != nil {
- return err1
+ var (
+ overflow bool
+ merged int
+ batches = []*Batch{batch}
+ )
+
+ if merge {
+ // Merge limit.
+ var mergeLimit int
+ if batch.internalLen > 128<<10 {
+ mergeLimit = (1 << 20) - batch.internalLen
+ } else {
+ mergeLimit = 128 << 10
}
- if err1 := tr.Write(b, wo); err1 != nil {
- tr.Discard()
- return err1
+ mergeCap := mdbFree - batch.internalLen
+ if mergeLimit > mergeCap {
+ mergeLimit = mergeCap
}
- return tr.Commit()
- }
- // The write happen synchronously.
- select {
- case db.writeC <- b:
- if <-db.writeMergedC {
- return <-db.writeAckC
+ merge:
+ for mergeLimit > 0 {
+ select {
+ case incoming := <-db.writeMergeC:
+ if incoming.batch != nil {
+ // Merge batch.
+ if incoming.batch.internalLen > mergeLimit {
+ overflow = true
+ break merge
+ }
+ batches = append(batches, incoming.batch)
+ mergeLimit -= incoming.batch.internalLen
+ } else {
+ // Merge put.
+ internalLen := len(incoming.key) + len(incoming.value) + 8
+ if internalLen > mergeLimit {
+ overflow = true
+ break merge
+ }
+ if ourBatch == nil {
+ ourBatch = db.batchPool.Get().(*Batch)
+ ourBatch.Reset()
+ batches = append(batches, ourBatch)
+ }
+ // We can use same batch since concurrent write doesn't
+ // guarantee write order.
+ ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
+ mergeLimit -= internalLen
+ }
+ sync = sync || incoming.sync
+ merged++
+ db.writeMergedC <- true
+
+ default:
+ break merge
+ }
}
- // Continue, the write lock already acquired by previous writer
- // and handed out to us.
- case db.writeLockC <- struct{}{}:
- case err = <-db.compPerErrC:
- return
- case _, _ = <-db.closeC:
- return ErrClosed
}
- merged := 0
- danglingMerge := false
- defer func() {
- for i := 0; i < merged; i++ {
- db.writeAckC <- err
- }
- if danglingMerge {
- // Only one dangling merge at most, so this is safe.
- db.writeMergedC <- false
- } else {
- <-db.writeLockC
+ // Seq number.
+ seq := db.seq + 1
+
+ // Write journal.
+ if err := db.writeJournal(batches, seq, sync); err != nil {
+ db.unlockWrite(overflow, merged, err)
+ return err
+ }
+
+ // Put batches.
+ for _, batch := range batches {
+ if err := batch.putMem(seq, mdb.DB); err != nil {
+ panic(err)
}
- }()
+ seq += uint64(batch.Len())
+ }
- mdb, mdbFree, err := db.flush(b.size())
- if err != nil {
- return
+ // Incr seq number.
+ db.addSeq(uint64(batchesLen(batches)))
+
+ // Rotate memdb if it's reach the threshold.
+ if batch.internalLen >= mdbFree {
+ db.rotateMem(0, false)
}
- defer mdb.decref()
- // Calculate maximum size of the batch.
- m := 1 << 20
- if x := b.size(); x <= 128<<10 {
- m = x + (128 << 10)
+ db.unlockWrite(overflow, merged, nil)
+ return nil
+}
+
+// Write apply the given batch to the DB. The batch records will be applied
+// sequentially. Write might be used concurrently, when used concurrently and
+// batch is small enough, write will try to merge the batches. Set NoWriteMerge
+// option to true to disable write merge.
+//
+// It is safe to modify the contents of the arguments after Write returns but
+// not before. Write will not modify content of the batch.
+func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
+ if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 {
+ return err
}
- m = minInt(m, mdbFree)
- // Merge with other batch.
-drain:
- for b.size() < m && !b.sync {
- select {
- case nb := <-db.writeC:
- if b.size()+nb.size() <= m {
- b.append(nb)
- db.writeMergedC <- true
- merged++
- } else {
- danglingMerge = true
- break drain
- }
- default:
- break drain
+ // If the batch size is larger than write buffer, it may justified to write
+ // using transaction instead. Using transaction the batch will be written
+ // into tables directly, skipping the journaling.
+ if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
+ tr, err := db.OpenTransaction()
+ if err != nil {
+ return err
+ }
+ if err := tr.Write(batch, wo); err != nil {
+ tr.Discard()
+ return err
}
+ return tr.Commit()
}
- // Set batch first seq number relative from last seq.
- b.seq = db.seq + 1
+ merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
+ sync := wo.GetSync() && !db.s.o.GetNoSync()
- // Write journal concurrently if it is large enough.
- if b.size() >= (128 << 10) {
- // Push the write batch to the journal writer
+ // Acquire write lock.
+ if merge {
select {
- case db.journalC <- b:
- // Write into memdb
- if berr := b.memReplay(mdb.DB); berr != nil {
- panic(berr)
+ case db.writeMergeC <- writeMerge{sync: sync, batch: batch}:
+ if <-db.writeMergedC {
+ // Write is merged.
+ return <-db.writeAckC
}
- case err = <-db.compPerErrC:
- return
- case _, _ = <-db.closeC:
- err = ErrClosed
- return
+ // Write is not merged, the write lock is handed to us. Continue.
+ case db.writeLockC <- struct{}{}:
+ // Write lock acquired.
+ case err := <-db.compPerErrC:
+ // Compaction error.
+ return err
+ case <-db.closeC:
+ // Closed
+ return ErrClosed
}
- // Wait for journal writer
+ } else {
select {
- case err = <-db.journalAckC:
- if err != nil {
- // Revert memdb if error detected
- if berr := b.revertMemReplay(mdb.DB); berr != nil {
- panic(berr)
- }
- return
+ case db.writeLockC <- struct{}{}:
+ // Write lock acquired.
+ case err := <-db.compPerErrC:
+ // Compaction error.
+ return err
+ case <-db.closeC:
+ // Closed
+ return ErrClosed
+ }
+ }
+
+ return db.writeLocked(batch, nil, merge, sync)
+}
+
+func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
+ if err := db.ok(); err != nil {
+ return err
+ }
+
+ merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
+ sync := wo.GetSync() && !db.s.o.GetNoSync()
+
+ // Acquire write lock.
+ if merge {
+ select {
+ case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
+ if <-db.writeMergedC {
+ // Write is merged.
+ return <-db.writeAckC
}
- case _, _ = <-db.closeC:
- err = ErrClosed
- return
+ // Write is not merged, the write lock is handed to us. Continue.
+ case db.writeLockC <- struct{}{}:
+ // Write lock acquired.
+ case err := <-db.compPerErrC:
+ // Compaction error.
+ return err
+ case <-db.closeC:
+ // Closed
+ return ErrClosed
}
} else {
- err = db.writeJournal(b)
- if err != nil {
- return
- }
- if berr := b.memReplay(mdb.DB); berr != nil {
- panic(berr)
+ select {
+ case db.writeLockC <- struct{}{}:
+ // Write lock acquired.
+ case err := <-db.compPerErrC:
+ // Compaction error.
+ return err
+ case <-db.closeC:
+ // Closed
+ return ErrClosed
}
}
- // Set last seq number.
- db.addSeq(uint64(b.Len()))
-
- if b.size() >= mdbFree {
- db.rotateMem(0, false)
- }
- return
+ batch := db.batchPool.Get().(*Batch)
+ batch.Reset()
+ batch.appendRec(kt, key, value)
+ return db.writeLocked(batch, batch, merge, sync)
}
// Put sets the value for the given key. It overwrites any previous value
-// for that key; a DB is not a multi-map.
+// for that key; a DB is not a multi-map. Write merge also applies for Put, see
+// Write.
//
-// It is safe to modify the contents of the arguments after Put returns.
+// It is safe to modify the contents of the arguments after Put returns but not
+// before.
func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
- b := new(Batch)
- b.Put(key, value)
- return db.Write(b, wo)
+ return db.putRec(keyTypeVal, key, value, wo)
}
-// Delete deletes the value for the given key.
+// Delete deletes the value for the given key. Delete will not returns error if
+// key doesn't exist. Write merge also applies for Delete, see Write.
//
-// It is safe to modify the contents of the arguments after Delete returns.
+// It is safe to modify the contents of the arguments after Delete returns but
+// not before.
func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
- b := new(Batch)
- b.Delete(key)
- return db.Write(b, wo)
+ return db.putRec(keyTypeDel, key, nil, wo)
}
func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
@@ -304,12 +386,15 @@ func (db *DB) CompactRange(r util.Range) error {
case db.writeLockC <- struct{}{}:
case err := <-db.compPerErrC:
return err
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return ErrClosed
}
// Check for overlaps in memdb.
mdb := db.getEffectiveMem()
+ if mdb == nil {
+ return ErrClosed
+ }
defer mdb.decref()
if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
// Memdb compaction.
@@ -341,7 +426,7 @@ func (db *DB) SetReadOnly() error {
db.compWriteLocking = true
case err := <-db.compPerErrC:
return err
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return ErrClosed
}
@@ -350,7 +435,7 @@ func (db *DB) SetReadOnly() error {
case db.compErrSetC <- ErrReadOnly:
case perr := <-db.compPerErrC:
return perr
- case _, _ = <-db.closeC:
+ case <-db.closeC:
return ErrClosed
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors.go
index c8bd66a5a..de2649812 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors.go
@@ -10,6 +10,7 @@ import (
"github.com/syndtr/goleveldb/leveldb/errors"
)
+// Common errors.
var (
ErrNotFound = errors.ErrNotFound
ErrReadOnly = errors.New("leveldb: read-only mode")
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors/errors.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors/errors.go
index 9a0f6e2c1..8d6146b6f 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors/errors.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors/errors.go
@@ -15,6 +15,7 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
)
+// Common errors.
var (
ErrNotFound = New("leveldb: not found")
ErrReleased = util.ErrReleased
@@ -34,11 +35,10 @@ type ErrCorrupted struct {
}
func (e *ErrCorrupted) Error() string {
- if !e.Fd.Nil() {
+ if !e.Fd.Zero() {
return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd)
- } else {
- return e.Err.Error()
}
+ return e.Err.Error()
}
// NewErrCorrupted creates new ErrCorrupted error.
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/iter.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/iter.go
index c2522860b..3b5553274 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/iter.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/iter.go
@@ -21,13 +21,13 @@ var (
// IteratorSeeker is the interface that wraps the 'seeks method'.
type IteratorSeeker interface {
// First moves the iterator to the first key/value pair. If the iterator
- // only contains one key/value pair then First and Last whould moves
+ // only contains one key/value pair then First and Last would moves
// to the same key/value pair.
// It returns whether such pair exist.
First() bool
// Last moves the iterator to the last key/value pair. If the iterator
- // only contains one key/value pair then First and Last whould moves
+ // only contains one key/value pair then First and Last would moves
// to the same key/value pair.
// It returns whether such pair exist.
Last() bool
@@ -48,7 +48,7 @@ type IteratorSeeker interface {
Prev() bool
}
-// CommonIterator is the interface that wraps common interator methods.
+// CommonIterator is the interface that wraps common iterator methods.
type CommonIterator interface {
IteratorSeeker
@@ -71,14 +71,15 @@ type CommonIterator interface {
// Iterator iterates over a DB's key/value pairs in key order.
//
-// When encouter an error any 'seeks method' will return false and will
+// When encounter an error any 'seeks method' will return false and will
// yield no key/value pairs. The error can be queried by calling the Error
// method. Calling Release is still necessary.
//
// An iterator must be released after use, but it is not necessary to read
// an iterator until exhaustion.
-// Also, an iterator is not necessarily goroutine-safe, but it is safe to use
-// multiple iterators concurrently, with each in a dedicated goroutine.
+// Also, an iterator is not necessarily safe for concurrent use, but it is
+// safe to use multiple iterators concurrently, with each in a dedicated
+// goroutine.
type Iterator interface {
CommonIterator
@@ -98,7 +99,7 @@ type Iterator interface {
//
// ErrorCallbackSetter implemented by indexed and merged iterator.
type ErrorCallbackSetter interface {
- // SetErrorCallback allows set an error callback of the coresponding
+ // SetErrorCallback allows set an error callback of the corresponding
// iterator. Use nil to clear the callback.
SetErrorCallback(f func(err error))
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go
index 891098bb7..d094c3d0f 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go
@@ -180,34 +180,37 @@ func (r *Reader) nextChunk(first bool) error {
checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
chunkType := r.buf[r.j+6]
-
+ unprocBlock := r.n - r.j
if checksum == 0 && length == 0 && chunkType == 0 {
// Drop entire block.
- m := r.n - r.j
r.i = r.n
r.j = r.n
- return r.corrupt(m, "zero header", false)
- } else {
- m := r.n - r.j
- r.i = r.j + headerSize
- r.j = r.j + headerSize + int(length)
- if r.j > r.n {
- // Drop entire block.
- r.i = r.n
- r.j = r.n
- return r.corrupt(m, "chunk length overflows block", false)
- } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
- // Drop entire block.
- r.i = r.n
- r.j = r.n
- return r.corrupt(m, "checksum mismatch", false)
- }
+ return r.corrupt(unprocBlock, "zero header", false)
+ }
+ if chunkType < fullChunkType || chunkType > lastChunkType {
+ // Drop entire block.
+ r.i = r.n
+ r.j = r.n
+ return r.corrupt(unprocBlock, fmt.Sprintf("invalid chunk type %#x", chunkType), false)
+ }
+ r.i = r.j + headerSize
+ r.j = r.j + headerSize + int(length)
+ if r.j > r.n {
+ // Drop entire block.
+ r.i = r.n
+ r.j = r.n
+ return r.corrupt(unprocBlock, "chunk length overflows block", false)
+ } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
+ // Drop entire block.
+ r.i = r.n
+ r.j = r.n
+ return r.corrupt(unprocBlock, "checksum mismatch", false)
}
if first && chunkType != fullChunkType && chunkType != firstChunkType {
- m := r.j - r.i
+ chunkLength := (r.j - r.i) + headerSize
r.i = r.j
// Report the error, but skip it.
- return r.corrupt(m+headerSize, "orphan chunk", true)
+ return r.corrupt(chunkLength, "orphan chunk", true)
}
r.last = chunkType == fullChunkType || chunkType == lastChunkType
return nil
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/key.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/key.go
index d0b80aaf9..ad8f51ec8 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/key.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/key.go
@@ -37,14 +37,14 @@ func (kt keyType) String() string {
case keyTypeVal:
return "v"
}
- return "x"
+ return fmt.Sprintf("<invalid:%#x>", uint(kt))
}
// Value types encoded as the last component of internal keys.
// Don't modify; this value are saved to disk.
const (
- keyTypeDel keyType = iota
- keyTypeVal
+ keyTypeDel = keyType(0)
+ keyTypeVal = keyType(1)
)
// keyTypeSeek defines the keyType that should be passed when constructing an
@@ -79,11 +79,7 @@ func makeInternalKey(dst, ukey []byte, seq uint64, kt keyType) internalKey {
panic("leveldb: invalid type")
}
- if n := len(ukey) + 8; cap(dst) < n {
- dst = make([]byte, n)
- } else {
- dst = dst[:n]
- }
+ dst = ensureBuffer(dst, len(ukey)+8)
copy(dst, ukey)
binary.LittleEndian.PutUint64(dst[len(ukey):], (seq<<8)|uint64(kt))
return internalKey(dst)
@@ -143,5 +139,5 @@ func (ik internalKey) String() string {
if ukey, seq, kt, err := parseInternalKey(ik); err == nil {
return fmt.Sprintf("%s,%s%d", shorten(string(ukey)), kt, seq)
}
- return "<invalid>"
+ return fmt.Sprintf("<invalid:%#x>", []byte(ik))
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go
index 1395bd928..18a19ed42 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go
@@ -17,6 +17,7 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
)
+// Common errors.
var (
ErrNotFound = errors.ErrNotFound
ErrIterReleased = errors.New("leveldb/memdb: iterator released")
@@ -385,7 +386,7 @@ func (p *DB) Find(key []byte) (rkey, value []byte, err error) {
}
// NewIterator returns an iterator of the DB.
-// The returned iterator is not goroutine-safe, but it is safe to use
+// The returned iterator is not safe for concurrent use, but it is safe to use
// multiple iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently with modifying its
// underlying DB. However, the resultant key/value pairs are not guaranteed
@@ -411,7 +412,7 @@ func (p *DB) Capacity() int {
}
// Size returns sum of keys and values length. Note that deleted
-// key/value will not be accouted for, but it will still consume
+// key/value will not be accounted for, but it will still consume
// the buffer, since the buffer is append only.
func (p *DB) Size() int {
p.mu.RLock()
@@ -453,11 +454,14 @@ func (p *DB) Reset() {
p.mu.Unlock()
}
-// New creates a new initalized in-memory key/value DB. The capacity
+// New creates a new initialized in-memory key/value DB. The capacity
// is the initial key/value buffer capacity. The capacity is advisory,
// not enforced.
//
-// The returned DB instance is goroutine-safe.
+// This DB is append-only, deleting an entry would remove entry node but not
+// reclaim KV buffer.
+//
+// The returned DB instance is safe for concurrent use.
func New(cmp comparer.BasicComparer, capacity int) *DB {
p := &DB{
cmp: cmp,
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go
index 3d2bf1c02..44e7d9adc 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go
@@ -312,6 +312,11 @@ type Options struct {
// The default is false.
NoSync bool
+ // NoWriteMerge allows disabling write merge.
+ //
+ // The default is false.
+ NoWriteMerge bool
+
// OpenFilesCacher provides cache algorithm for open files caching.
// Specify NoCacher to disable caching algorithm.
//
@@ -543,6 +548,13 @@ func (o *Options) GetNoSync() bool {
return o.NoSync
}
+func (o *Options) GetNoWriteMerge() bool {
+ if o == nil {
+ return false
+ }
+ return o.NoWriteMerge
+}
+
func (o *Options) GetOpenFilesCacher() Cacher {
if o == nil || o.OpenFilesCacher == nil {
return DefaultOpenFilesCacher
@@ -629,6 +641,11 @@ func (ro *ReadOptions) GetStrict(strict Strict) bool {
// WriteOptions holds the optional parameters for 'write operation'. The
// 'write operation' includes Write, Put and Delete.
type WriteOptions struct {
+ // NoWriteMerge allows disabling write merge.
+ //
+ // The default is false.
+ NoWriteMerge bool
+
// Sync is whether to sync underlying writes from the OS buffer cache
// through to actual disk, if applicable. Setting Sync can result in
// slower writes.
@@ -644,6 +661,13 @@ type WriteOptions struct {
Sync bool
}
+func (wo *WriteOptions) GetNoWriteMerge() bool {
+ if wo == nil {
+ return false
+ }
+ return wo.NoWriteMerge
+}
+
func (wo *WriteOptions) GetSync() bool {
if wo == nil {
return false
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go
index b0d3fef1d..f3e747701 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go
@@ -18,7 +18,8 @@ import (
"github.com/syndtr/goleveldb/leveldb/storage"
)
-// ErrManifestCorrupted records manifest corruption.
+// ErrManifestCorrupted records manifest corruption. This error will be
+// wrapped with errors.ErrCorrupted.
type ErrManifestCorrupted struct {
Field string
Reason string
@@ -42,7 +43,7 @@ type session struct {
stSeqNum uint64 // last mem compacted seq; need external synchronization
stor storage.Storage
- storLock storage.Lock
+ storLock storage.Locker
o *cachedOptions
icmp *iComparer
tops *tOps
@@ -87,12 +88,12 @@ func (s *session) close() {
}
s.manifest = nil
s.manifestWriter = nil
- s.stVersion = nil
+ s.setVersion(&version{s: s, closing: true})
}
// Release session lock.
func (s *session) release() {
- s.storLock.Release()
+ s.storLock.Unlock()
}
// Create a new database session; need external synchronization.
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go
index 674182fb2..34ad61798 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go
@@ -50,6 +50,12 @@ func (s *session) version() *version {
return s.stVersion
}
+func (s *session) tLen(level int) int {
+ s.vmu.Lock()
+ defer s.vmu.Unlock()
+ return s.stVersion.tLen(level)
+}
+
// Set current version to v.
func (s *session) setVersion(v *version) {
s.vmu.Lock()
@@ -197,7 +203,7 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
if s.manifestWriter != nil {
s.manifestWriter.Close()
}
- if !s.manifestFd.Nil() {
+ if !s.manifestFd.Zero() {
s.stor.Remove(s.manifestFd)
}
s.manifestFd = fd
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go
index cbe1dc103..e53434cab 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go
@@ -32,7 +32,7 @@ type fileStorageLock struct {
fs *fileStorage
}
-func (lock *fileStorageLock) Release() {
+func (lock *fileStorageLock) Unlock() {
if lock.fs != nil {
lock.fs.mu.Lock()
defer lock.fs.mu.Unlock()
@@ -116,7 +116,7 @@ func OpenFile(path string, readOnly bool) (Storage, error) {
return fs, nil
}
-func (fs *fileStorage) Lock() (Lock, error) {
+func (fs *fileStorage) Lock() (Locker, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.open < 0 {
@@ -323,7 +323,7 @@ func (fs *fileStorage) GetMeta() (fd FileDesc, err error) {
}
}
// Don't remove any files if there is no valid CURRENT file.
- if fd.Nil() {
+ if fd.Zero() {
if cerr != nil {
err = cerr
} else {
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go
index 9b70e1513..9b0421f03 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go
@@ -18,7 +18,7 @@ type memStorageLock struct {
ms *memStorage
}
-func (lock *memStorageLock) Release() {
+func (lock *memStorageLock) Unlock() {
ms := lock.ms
ms.mu.Lock()
defer ms.mu.Unlock()
@@ -43,7 +43,7 @@ func NewMemStorage() Storage {
}
}
-func (ms *memStorage) Lock() (Lock, error) {
+func (ms *memStorage) Lock() (Locker, error) {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.slock != nil {
@@ -69,7 +69,7 @@ func (ms *memStorage) SetMeta(fd FileDesc) error {
func (ms *memStorage) GetMeta() (FileDesc, error) {
ms.mu.Lock()
defer ms.mu.Unlock()
- if ms.meta.Nil() {
+ if ms.meta.Zero() {
return FileDesc{}, os.ErrNotExist
}
return ms.meta, nil
@@ -78,7 +78,7 @@ func (ms *memStorage) GetMeta() (FileDesc, error) {
func (ms *memStorage) List(ft FileType) ([]FileDesc, error) {
ms.mu.Lock()
var fds []FileDesc
- for x, _ := range ms.files {
+ for x := range ms.files {
fd := unpackFile(x)
if fd.Type&ft != 0 {
fds = append(fds, fd)
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/storage.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/storage.go
index 9b30b6727..c16bce6b6 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/storage.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/storage.go
@@ -11,12 +11,12 @@ import (
"errors"
"fmt"
"io"
-
- "github.com/syndtr/goleveldb/leveldb/util"
)
+// FileType represent a file type.
type FileType int
+// File types.
const (
TypeManifest FileType = 1 << iota
TypeJournal
@@ -40,6 +40,7 @@ func (t FileType) String() string {
return fmt.Sprintf("<unknown:%d>", t)
}
+// Common error.
var (
ErrInvalidFile = errors.New("leveldb/storage: invalid file for argument")
ErrLocked = errors.New("leveldb/storage: already locked")
@@ -55,11 +56,10 @@ type ErrCorrupted struct {
}
func (e *ErrCorrupted) Error() string {
- if !e.Fd.Nil() {
+ if !e.Fd.Zero() {
return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd)
- } else {
- return e.Err.Error()
}
+ return e.Err.Error()
}
// Syncer is the interface that wraps basic Sync method.
@@ -83,11 +83,12 @@ type Writer interface {
Syncer
}
-type Lock interface {
- util.Releaser
+// Locker is the interface that wraps Unlock method.
+type Locker interface {
+ Unlock()
}
-// FileDesc is a file descriptor.
+// FileDesc is a 'file descriptor'.
type FileDesc struct {
Type FileType
Num int64
@@ -108,12 +109,12 @@ func (fd FileDesc) String() string {
}
}
-// Nil returns true if fd == (FileDesc{}).
-func (fd FileDesc) Nil() bool {
+// Zero returns true if fd == (FileDesc{}).
+func (fd FileDesc) Zero() bool {
return fd == (FileDesc{})
}
-// FileDescOk returns true if fd is a valid file descriptor.
+// FileDescOk returns true if fd is a valid 'file descriptor'.
func FileDescOk(fd FileDesc) bool {
switch fd.Type {
case TypeManifest:
@@ -126,43 +127,44 @@ func FileDescOk(fd FileDesc) bool {
return fd.Num >= 0
}
-// Storage is the storage. A storage instance must be goroutine-safe.
+// Storage is the storage. A storage instance must be safe for concurrent use.
type Storage interface {
// Lock locks the storage. Any subsequent attempt to call Lock will fail
// until the last lock released.
- // After use the caller should call the Release method.
- Lock() (Lock, error)
+ // Caller should call Unlock method after use.
+ Lock() (Locker, error)
// Log logs a string. This is used for logging.
// An implementation may write to a file, stdout or simply do nothing.
Log(str string)
- // SetMeta sets to point to the given fd, which then can be acquired using
- // GetMeta method.
- // SetMeta should be implemented in such way that changes should happened
+ // SetMeta store 'file descriptor' that can later be acquired using GetMeta
+ // method. The 'file descriptor' should point to a valid file.
+ // SetMeta should be implemented in such way that changes should happen
// atomically.
SetMeta(fd FileDesc) error
- // GetManifest returns a manifest file.
- // Returns os.ErrNotExist if meta doesn't point to any fd, or point to fd
- // that doesn't exist.
+ // GetMeta returns 'file descriptor' stored in meta. The 'file descriptor'
+ // can be updated using SetMeta method.
+ // Returns os.ErrNotExist if meta doesn't store any 'file descriptor', or
+ // 'file descriptor' point to nonexistent file.
GetMeta() (FileDesc, error)
- // List returns fds that match the given file types.
+ // List returns file descriptors that match the given file types.
// The file types may be OR'ed together.
List(ft FileType) ([]FileDesc, error)
- // Open opens file with the given fd read-only.
+ // Open opens file with the given 'file descriptor' read-only.
// Returns os.ErrNotExist error if the file does not exist.
// Returns ErrClosed if the underlying storage is closed.
Open(fd FileDesc) (Reader, error)
- // Create creates file with the given fd, truncate if already exist and
- // opens write-only.
+ // Create creates file with the given 'file descriptor', truncate if already
+ // exist and opens write-only.
// Returns ErrClosed if the underlying storage is closed.
Create(fd FileDesc) (Writer, error)
- // Remove removes file with the given fd.
+ // Remove removes file with the given 'file descriptor'.
// Returns ErrClosed if the underlying storage is closed.
Remove(fd FileDesc) error
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go
index 310ba6c22..81d18a531 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go
@@ -434,7 +434,7 @@ func (t *tOps) close() {
t.bpool.Close()
t.cache.Close()
if t.bcache != nil {
- t.bcache.Close()
+ t.bcache.CloseWeak()
}
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go
index ae61bece9..c5be420b3 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go
@@ -26,12 +26,15 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
)
+// Reader errors.
var (
ErrNotFound = errors.ErrNotFound
ErrReaderReleased = errors.New("leveldb/table: reader released")
ErrIterReleased = errors.New("leveldb/table: iterator released")
)
+// ErrCorrupted describes error due to corruption. This error will be wrapped
+// with errors.ErrCorrupted.
type ErrCorrupted struct {
Pos int64
Size int64
@@ -61,7 +64,7 @@ type block struct {
func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) {
index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):]))
- offset += 1 // shared always zero, since this is a restart point
+ offset++ // shared always zero, since this is a restart point
v1, n1 := binary.Uvarint(b.data[offset:]) // key length
_, n2 := binary.Uvarint(b.data[offset+n1:]) // value length
m := offset + n1 + n2
@@ -356,7 +359,7 @@ func (i *blockIter) Prev() bool {
i.value = nil
offset := i.block.restartOffset(ri)
if offset == i.offset {
- ri -= 1
+ ri--
if ri < 0 {
i.dir = dirSOI
return false
@@ -783,8 +786,8 @@ func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChe
// table. And a nil Range.Limit is treated as a key after all keys in
// the table.
//
-// The returned iterator is not goroutine-safe and should be released
-// when not used.
+// The returned iterator is not safe for concurrent use and should be released
+// after use.
//
// Also read Iterator documentation of the leveldb/iterator package.
func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
@@ -826,18 +829,21 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo
index := r.newBlockIter(indexBlock, nil, nil, true)
defer index.Release()
+
if !index.Seek(key) {
- err = index.Error()
- if err == nil {
+ if err = index.Error(); err == nil {
err = ErrNotFound
}
return
}
+
dataBH, n := decodeBlockHandle(index.Value())
if n == 0 {
r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
- return
+ return nil, nil, r.err
}
+
+ // The filter should only used for exact match.
if filtered && r.filter != nil {
filterBlock, frel, ferr := r.getFilterBlock(true)
if ferr == nil {
@@ -847,30 +853,53 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo
}
frel.Release()
} else if !errors.IsCorrupted(ferr) {
- err = ferr
- return
+ return nil, nil, ferr
}
}
+
data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
- defer data.Release()
if !data.Seek(key) {
- err = data.Error()
- if err == nil {
- err = ErrNotFound
+ data.Release()
+ if err = data.Error(); err != nil {
+ return
+ }
+
+ // The nearest greater-than key is the first key of the next block.
+ if !index.Next() {
+ if err = index.Error(); err == nil {
+ err = ErrNotFound
+ }
+ return
+ }
+
+ dataBH, n = decodeBlockHandle(index.Value())
+ if n == 0 {
+ r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
+ return nil, nil, r.err
+ }
+
+ data = r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
+ if !data.Next() {
+ data.Release()
+ if err = data.Error(); err == nil {
+ err = ErrNotFound
+ }
+ return
}
- return
}
- // Don't use block buffer, no need to copy the buffer.
+
+ // Key doesn't use block buffer, no need to copy the buffer.
rkey = data.Key()
if !noValue {
if r.bpool == nil {
value = data.Value()
} else {
- // Use block buffer, and since the buffer will be recycled, the buffer
- // need to be copied.
+ // Value does use block buffer, and since the buffer will be
+ // recycled, it need to be copied.
value = append([]byte{}, data.Value()...)
}
}
+ data.Release()
return
}
@@ -888,7 +917,7 @@ func (r *Reader) Find(key []byte, filtered bool, ro *opt.ReadOptions) (rkey, val
return r.find(key, filtered, ro, false)
}
-// Find finds key that is greater than or equal to the given key.
+// FindKey finds key that is greater than or equal to the given key.
// It returns ErrNotFound if the table doesn't contain such key.
// If filtered is true then the nearest 'block' will be checked against
// 'filter data' (if present) and will immediately return ErrNotFound if
@@ -987,7 +1016,7 @@ func (r *Reader) Release() {
// NewReader creates a new initialized table reader for the file.
// The fi, cache and bpool is optional and can be nil.
//
-// The returned table reader instance is goroutine-safe.
+// The returned table reader instance is safe for concurrent use.
func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.NamespaceGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) {
if f == nil {
return nil, errors.New("leveldb/table: nil file")
@@ -1039,9 +1068,8 @@ func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.Name
if errors.IsCorrupted(err) {
r.err = err
return r, nil
- } else {
- return nil, err
}
+ return nil, err
}
// Set data end.
@@ -1086,9 +1114,8 @@ func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.Name
if errors.IsCorrupted(err) {
r.err = err
return r, nil
- } else {
- return nil, err
}
+ return nil, err
}
if r.filter != nil {
r.filterBlock, err = r.readFilterBlock(r.filterBH)
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/writer.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/writer.go
index 274dee6da..b96b271d8 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/writer.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/writer.go
@@ -349,7 +349,7 @@ func (w *Writer) Close() error {
// NewWriter creates a new initialized table writer for the file.
//
-// Table writer is not goroutine-safe.
+// Table writer is not safe for concurrent use.
func NewWriter(f io.Writer, o *opt.Options) *Writer {
w := &Writer{
writer: f,
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util.go
index 3b663d1cc..e572a329e 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util.go
@@ -89,3 +89,10 @@ func (p fdSorter) Swap(i, j int) {
func sortFds(fds []storage.FileDesc) {
sort.Sort(fdSorter(fds))
}
+
+func ensureBuffer(b []byte, n int) []byte {
+ if cap(b) < n {
+ return make([]byte, n)
+ }
+ return b[:n]
+}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go
index d274eeff2..c60f12c20 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go
@@ -34,7 +34,8 @@ type version struct {
cSeek unsafe.Pointer
- ref int
+ closing bool
+ ref int
// Succeeding version.
next *version
}
@@ -131,6 +132,10 @@ func (v *version) walkOverlapping(aux tFiles, ikey internalKey, f func(level int
}
func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue bool) (value []byte, tcomp bool, err error) {
+ if v.closing {
+ return nil, false, ErrClosed
+ }
+
ukey := ikey.ukey()
var (