diff options
28 files changed, 823 insertions, 506 deletions
diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 42a207200..0fc91e877 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -1,6 +1,6 @@ { "ImportPath": "github.com/ethereum/go-ethereum", - "GoVersion": "go1.5.2", + "GoVersion": "go1.7", "GodepVersion": "v74", "Packages": [ "./..." @@ -111,6 +111,10 @@ "Rev": "51425a2415d21afadfd55cd93432c0bc69e9598d" }, { + "ImportPath": "github.com/rcrowley/go-metrics/exp", + "Rev": "51425a2415d21afadfd55cd93432c0bc69e9598d" + }, + { "ImportPath": "github.com/rjeczalik/notify", "Rev": "f627deca7a510d96f0ef9388f2d0e8b16d21f87f" }, @@ -152,51 +156,51 @@ }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb", - "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" + "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/cache", - "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" + "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/comparer", - "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" + "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/errors", - "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" + "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/filter", - "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" + "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/iterator", - "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" + "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/journal", - "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" + "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/memdb", - "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" + "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/opt", - "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" + "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/storage", - "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" + "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/table", - "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" + "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/util", - "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" + "Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727" }, { "ImportPath": "golang.org/x/crypto/pbkdf2", diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go index 501006717..225920002 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go @@ -9,13 +9,15 @@ package leveldb import ( "encoding/binary" "fmt" + "io" "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/memdb" "github.com/syndtr/goleveldb/leveldb/storage" ) -// ErrBatchCorrupted records reason of batch corruption. +// ErrBatchCorrupted records reason of batch corruption. This error will be +// wrapped with errors.ErrCorrupted. type ErrBatchCorrupted struct { Reason string } @@ -29,8 +31,9 @@ func newErrBatchCorrupted(reason string) error { } const ( - batchHdrLen = 8 + 4 - batchGrowRec = 3000 + batchHeaderLen = 8 + 4 + batchGrowRec = 3000 + batchBufioSize = 16 ) // BatchReplay wraps basic batch operations. @@ -39,34 +42,46 @@ type BatchReplay interface { Delete(key []byte) } +type batchIndex struct { + keyType keyType + keyPos, keyLen int + valuePos, valueLen int +} + +func (index batchIndex) k(data []byte) []byte { + return data[index.keyPos : index.keyPos+index.keyLen] +} + +func (index batchIndex) v(data []byte) []byte { + if index.valueLen != 0 { + return data[index.valuePos : index.valuePos+index.valueLen] + } + return nil +} + +func (index batchIndex) kv(data []byte) (key, value []byte) { + return index.k(data), index.v(data) +} + // Batch is a write batch. type Batch struct { - data []byte - rLen, bLen int - seq uint64 - sync bool + data []byte + index []batchIndex + + // internalLen is sums of key/value pair length plus 8-bytes internal key. + internalLen int } func (b *Batch) grow(n int) { - off := len(b.data) - if off == 0 { - off = batchHdrLen - if b.data != nil { - b.data = b.data[:off] - } - } - if cap(b.data)-off < n { - if b.data == nil { - b.data = make([]byte, off, off+n) - } else { - odata := b.data - div := 1 - if b.rLen > batchGrowRec { - div = b.rLen / batchGrowRec - } - b.data = make([]byte, off, off+n+(off-batchHdrLen)/div) - copy(b.data, odata) + o := len(b.data) + if cap(b.data)-o < n { + div := 1 + if len(b.index) > batchGrowRec { + div = len(b.index) / batchGrowRec } + ndata := make([]byte, o, o+n+o/div) + copy(ndata, b.data) + b.data = ndata } } @@ -76,32 +91,36 @@ func (b *Batch) appendRec(kt keyType, key, value []byte) { n += binary.MaxVarintLen32 + len(value) } b.grow(n) - off := len(b.data) - data := b.data[:off+n] - data[off] = byte(kt) - off++ - off += binary.PutUvarint(data[off:], uint64(len(key))) - copy(data[off:], key) - off += len(key) + index := batchIndex{keyType: kt} + o := len(b.data) + data := b.data[:o+n] + data[o] = byte(kt) + o++ + o += binary.PutUvarint(data[o:], uint64(len(key))) + index.keyPos = o + index.keyLen = len(key) + o += copy(data[o:], key) if kt == keyTypeVal { - off += binary.PutUvarint(data[off:], uint64(len(value))) - copy(data[off:], value) - off += len(value) + o += binary.PutUvarint(data[o:], uint64(len(value))) + index.valuePos = o + index.valueLen = len(value) + o += copy(data[o:], value) } - b.data = data[:off] - b.rLen++ - // Include 8-byte ikey header - b.bLen += len(key) + len(value) + 8 + b.data = data[:o] + b.index = append(b.index, index) + b.internalLen += index.keyLen + index.valueLen + 8 } // Put appends 'put operation' of the given key/value pair to the batch. -// It is safe to modify the contents of the argument after Put returns. +// It is safe to modify the contents of the argument after Put returns but not +// before. func (b *Batch) Put(key, value []byte) { b.appendRec(keyTypeVal, key, value) } // Delete appends 'delete operation' of the given key to the batch. -// It is safe to modify the contents of the argument after Delete returns. +// It is safe to modify the contents of the argument after Delete returns but +// not before. func (b *Batch) Delete(key []byte) { b.appendRec(keyTypeDel, key, nil) } @@ -111,7 +130,7 @@ func (b *Batch) Delete(key []byte) { // The returned slice is not its own copy, so the contents should not be // modified. func (b *Batch) Dump() []byte { - return b.encode() + return b.data } // Load loads given slice into the batch. Previous contents of the batch @@ -119,144 +138,212 @@ func (b *Batch) Dump() []byte { // The given slice will not be copied and will be used as batch buffer, so // it is not safe to modify the contents of the slice. func (b *Batch) Load(data []byte) error { - return b.decode(0, data) + return b.decode(data, -1) } // Replay replays batch contents. func (b *Batch) Replay(r BatchReplay) error { - return b.decodeRec(func(i int, kt keyType, key, value []byte) error { - switch kt { + for _, index := range b.index { + switch index.keyType { case keyTypeVal: - r.Put(key, value) + r.Put(index.k(b.data), index.v(b.data)) case keyTypeDel: - r.Delete(key) + r.Delete(index.k(b.data)) } - return nil - }) + } + return nil } // Len returns number of records in the batch. func (b *Batch) Len() int { - return b.rLen + return len(b.index) } // Reset resets the batch. func (b *Batch) Reset() { b.data = b.data[:0] - b.seq = 0 - b.rLen = 0 - b.bLen = 0 - b.sync = false + b.index = b.index[:0] + b.internalLen = 0 } -func (b *Batch) init(sync bool) { - b.sync = sync +func (b *Batch) replayInternal(fn func(i int, kt keyType, k, v []byte) error) error { + for i, index := range b.index { + if err := fn(i, index.keyType, index.k(b.data), index.v(b.data)); err != nil { + return err + } + } + return nil } func (b *Batch) append(p *Batch) { - if p.rLen > 0 { - b.grow(len(p.data) - batchHdrLen) - b.data = append(b.data, p.data[batchHdrLen:]...) - b.rLen += p.rLen - b.bLen += p.bLen - } - if p.sync { - b.sync = true + ob := len(b.data) + oi := len(b.index) + b.data = append(b.data, p.data...) + b.index = append(b.index, p.index...) + b.internalLen += p.internalLen + + // Updating index offset. + if ob != 0 { + for ; oi < len(b.index); oi++ { + index := &b.index[oi] + index.keyPos += ob + if index.valueLen != 0 { + index.valuePos += ob + } + } } } -// size returns sums of key/value pair length plus 8-bytes ikey. -func (b *Batch) size() int { - return b.bLen -} - -func (b *Batch) encode() []byte { - b.grow(0) - binary.LittleEndian.PutUint64(b.data, b.seq) - binary.LittleEndian.PutUint32(b.data[8:], uint32(b.rLen)) - - return b.data +func (b *Batch) decode(data []byte, expectedLen int) error { + b.data = data + b.index = b.index[:0] + b.internalLen = 0 + err := decodeBatch(data, func(i int, index batchIndex) error { + b.index = append(b.index, index) + b.internalLen += index.keyLen + index.valueLen + 8 + return nil + }) + if err != nil { + return err + } + if expectedLen >= 0 && len(b.index) != expectedLen { + return newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", expectedLen, len(b.index))) + } + return nil } -func (b *Batch) decode(prevSeq uint64, data []byte) error { - if len(data) < batchHdrLen { - return newErrBatchCorrupted("too short") +func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error { + var ik []byte + for i, index := range b.index { + ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType) + if err := mdb.Put(ik, index.v(b.data)); err != nil { + return err + } } + return nil +} - b.seq = binary.LittleEndian.Uint64(data) - if b.seq < prevSeq { - return newErrBatchCorrupted("invalid sequence number") - } - b.rLen = int(binary.LittleEndian.Uint32(data[8:])) - if b.rLen < 0 { - return newErrBatchCorrupted("invalid records length") +func (b *Batch) revertMem(seq uint64, mdb *memdb.DB) error { + var ik []byte + for i, index := range b.index { + ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType) + if err := mdb.Delete(ik); err != nil { + return err + } } - // No need to be precise at this point, it won't be used anyway - b.bLen = len(data) - batchHdrLen - b.data = data - return nil } -func (b *Batch) decodeRec(f func(i int, kt keyType, key, value []byte) error) error { - off := batchHdrLen - for i := 0; i < b.rLen; i++ { - if off >= len(b.data) { - return newErrBatchCorrupted("invalid records length") - } +func newBatch() interface{} { + return &Batch{} +} - kt := keyType(b.data[off]) - if kt > keyTypeVal { - panic(kt) - return newErrBatchCorrupted("bad record: invalid type") +func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error { + var index batchIndex + for i, o := 0, 0; o < len(data); i++ { + // Key type. + index.keyType = keyType(data[o]) + if index.keyType > keyTypeVal { + return newErrBatchCorrupted(fmt.Sprintf("bad record: invalid type %#x", uint(index.keyType))) } - off++ + o++ - x, n := binary.Uvarint(b.data[off:]) - off += n - if n <= 0 || off+int(x) > len(b.data) { + // Key. + x, n := binary.Uvarint(data[o:]) + o += n + if n <= 0 || o+int(x) > len(data) { return newErrBatchCorrupted("bad record: invalid key length") } - key := b.data[off : off+int(x)] - off += int(x) - var value []byte - if kt == keyTypeVal { - x, n := binary.Uvarint(b.data[off:]) - off += n - if n <= 0 || off+int(x) > len(b.data) { + index.keyPos = o + index.keyLen = int(x) + o += index.keyLen + + // Value. + if index.keyType == keyTypeVal { + x, n = binary.Uvarint(data[o:]) + o += n + if n <= 0 || o+int(x) > len(data) { return newErrBatchCorrupted("bad record: invalid value length") } - value = b.data[off : off+int(x)] - off += int(x) + index.valuePos = o + index.valueLen = int(x) + o += index.valueLen + } else { + index.valuePos = 0 + index.valueLen = 0 } - if err := f(i, kt, key, value); err != nil { + if err := fn(i, index); err != nil { return err } } - return nil } -func (b *Batch) memReplay(to *memdb.DB) error { - var ikScratch []byte - return b.decodeRec(func(i int, kt keyType, key, value []byte) error { - ikScratch = makeInternalKey(ikScratch, key, b.seq+uint64(i), kt) - return to.Put(ikScratch, value) +func decodeBatchToMem(data []byte, expectSeq uint64, mdb *memdb.DB) (seq uint64, batchLen int, err error) { + seq, batchLen, err = decodeBatchHeader(data) + if err != nil { + return 0, 0, err + } + if seq < expectSeq { + return 0, 0, newErrBatchCorrupted("invalid sequence number") + } + data = data[batchHeaderLen:] + var ik []byte + var decodedLen int + err = decodeBatch(data, func(i int, index batchIndex) error { + if i >= batchLen { + return newErrBatchCorrupted("invalid records length") + } + ik = makeInternalKey(ik, index.k(data), seq+uint64(i), index.keyType) + if err := mdb.Put(ik, index.v(data)); err != nil { + return err + } + decodedLen++ + return nil }) + if err == nil && decodedLen != batchLen { + err = newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", batchLen, decodedLen)) + } + return } -func (b *Batch) memDecodeAndReplay(prevSeq uint64, data []byte, to *memdb.DB) error { - if err := b.decode(prevSeq, data); err != nil { - return err +func encodeBatchHeader(dst []byte, seq uint64, batchLen int) []byte { + dst = ensureBuffer(dst, batchHeaderLen) + binary.LittleEndian.PutUint64(dst, seq) + binary.LittleEndian.PutUint32(dst[8:], uint32(batchLen)) + return dst +} + +func decodeBatchHeader(data []byte) (seq uint64, batchLen int, err error) { + if len(data) < batchHeaderLen { + return 0, 0, newErrBatchCorrupted("too short") + } + + seq = binary.LittleEndian.Uint64(data) + batchLen = int(binary.LittleEndian.Uint32(data[8:])) + if batchLen < 0 { + return 0, 0, newErrBatchCorrupted("invalid records length") } - return b.memReplay(to) + return } -func (b *Batch) revertMemReplay(to *memdb.DB) error { - var ikScratch []byte - return b.decodeRec(func(i int, kt keyType, key, value []byte) error { - ikScratch := makeInternalKey(ikScratch, key, b.seq+uint64(i), kt) - return to.Delete(ikScratch) - }) +func batchesLen(batches []*Batch) int { + batchLen := 0 + for _, batch := range batches { + batchLen += batch.Len() + } + return batchLen +} + +func writeBatchesWithHeader(wr io.Writer, batches []*Batch, seq uint64) error { + if _, err := wr.Write(encodeBatchHeader(nil, seq, batchesLen(batches))); err != nil { + return err + } + for _, batch := range batches { + if _, err := wr.Write(batch.data); err != nil { + return err + } + } + return nil } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go index a287d0e5e..c5940b232 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go @@ -16,7 +16,7 @@ import ( ) // Cacher provides interface to implements a caching functionality. -// An implementation must be goroutine-safe. +// An implementation must be safe for concurrent use. type Cacher interface { // Capacity returns cache capacity. Capacity() int @@ -511,18 +511,12 @@ func (r *Cache) EvictAll() { } } -// Close closes the 'cache map' and releases all 'cache node'. +// Close closes the 'cache map' and forcefully releases all 'cache node'. func (r *Cache) Close() error { r.mu.Lock() if !r.closed { r.closed = true - if r.cacher != nil { - if err := r.cacher.Close(); err != nil { - return err - } - } - h := (*mNode)(r.mHead) h.initBuckets() @@ -541,10 +535,37 @@ func (r *Cache) Close() error { for _, f := range n.onDel { f() } + n.onDel = nil } } } r.mu.Unlock() + + // Avoid deadlock. + if r.cacher != nil { + if err := r.cacher.Close(); err != nil { + return err + } + } + return nil +} + +// CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but +// unlike Close it doesn't forcefully releases 'cache node'. +func (r *Cache) CloseWeak() error { + r.mu.Lock() + if !r.closed { + r.closed = true + } + r.mu.Unlock() + + // Avoid deadlock. + if r.cacher != nil { + r.cacher.EvictAll() + if err := r.cacher.Close(); err != nil { + return err + } + } return nil } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/comparer.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/comparer.go index 248bf7c21..448402b82 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/comparer.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/comparer.go @@ -6,7 +6,9 @@ package leveldb -import "github.com/syndtr/goleveldb/leveldb/comparer" +import ( + "github.com/syndtr/goleveldb/leveldb/comparer" +) type iComparer struct { ucmp comparer.Comparer @@ -33,12 +35,12 @@ func (icmp *iComparer) Name() string { } func (icmp *iComparer) Compare(a, b []byte) int { - x := icmp.ucmp.Compare(internalKey(a).ukey(), internalKey(b).ukey()) + x := icmp.uCompare(internalKey(a).ukey(), internalKey(b).ukey()) if x == 0 { if m, n := internalKey(a).num(), internalKey(b).num(); m > n { - x = -1 + return -1 } else if m < n { - x = 1 + return 1 } } return x @@ -46,30 +48,20 @@ func (icmp *iComparer) Compare(a, b []byte) int { func (icmp *iComparer) Separator(dst, a, b []byte) []byte { ua, ub := internalKey(a).ukey(), internalKey(b).ukey() - dst = icmp.ucmp.Separator(dst, ua, ub) - if dst == nil { - return nil + dst = icmp.uSeparator(dst, ua, ub) + if dst != nil && len(dst) < len(ua) && icmp.uCompare(ua, dst) < 0 { + // Append earliest possible number. + return append(dst, keyMaxNumBytes...) } - if len(dst) < len(ua) && icmp.uCompare(ua, dst) < 0 { - dst = append(dst, keyMaxNumBytes...) - } else { - // Did not close possibilities that n maybe longer than len(ub). - dst = append(dst, a[len(a)-8:]...) - } - return dst + return nil } func (icmp *iComparer) Successor(dst, b []byte) []byte { ub := internalKey(b).ukey() - dst = icmp.ucmp.Successor(dst, ub) - if dst == nil { - return nil - } - if len(dst) < len(ub) && icmp.uCompare(ub, dst) < 0 { - dst = append(dst, keyMaxNumBytes...) - } else { - // Did not close possibilities that n maybe longer than len(ub). - dst = append(dst, b[len(b)-8:]...) + dst = icmp.uSuccessor(dst, ub) + if dst != nil && len(dst) < len(ub) && icmp.uCompare(ub, dst) < 0 { + // Append earliest possible number. + return append(dst, keyMaxNumBytes...) } - return dst + return nil } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go index eb6abd0fb..a02cb2c50 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go @@ -53,14 +53,13 @@ type DB struct { aliveSnaps, aliveIters int32 // Write. - writeC chan *Batch + batchPool sync.Pool + writeMergeC chan writeMerge writeMergedC chan bool writeLockC chan struct{} writeAckC chan error writeDelay time.Duration writeDelayN int - journalC chan *Batch - journalAckC chan error tr *Transaction // Compaction. @@ -94,12 +93,11 @@ func openDB(s *session) (*DB, error) { // Snapshot snapsList: list.New(), // Write - writeC: make(chan *Batch), + batchPool: sync.Pool{New: newBatch}, + writeMergeC: make(chan writeMerge), writeMergedC: make(chan bool), writeLockC: make(chan struct{}, 1), writeAckC: make(chan error), - journalC: make(chan *Batch), - journalAckC: make(chan error), // Compaction tcompCmdC: make(chan cCmd), tcompPauseC: make(chan chan<- struct{}), @@ -144,10 +142,10 @@ func openDB(s *session) (*DB, error) { if readOnly { db.SetReadOnly() } else { - db.closeW.Add(3) + db.closeW.Add(2) go db.tCompaction() go db.mCompaction() - go db.jWriter() + // go db.jWriter() } s.logf("db@open done T·%v", time.Since(start)) @@ -162,10 +160,10 @@ func openDB(s *session) (*DB, error) { // os.ErrExist error. // // Open will return an error with type of ErrCorrupted if corruption -// detected in the DB. Corrupted DB can be recovered with Recover -// function. +// detected in the DB. Use errors.IsCorrupted to test whether an error is +// due to corruption. Corrupted DB can be recovered with Recover function. // -// The returned DB instance is goroutine-safe. +// The returned DB instance is safe for concurrent use. // The DB must be closed after use, by calling Close method. func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) { s, err := newSession(stor, o) @@ -202,13 +200,13 @@ func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) { // os.ErrExist error. // // OpenFile uses standard file-system backed storage implementation as -// desribed in the leveldb/storage package. +// described in the leveldb/storage package. // // OpenFile will return an error with type of ErrCorrupted if corruption -// detected in the DB. Corrupted DB can be recovered with Recover -// function. +// detected in the DB. Use errors.IsCorrupted to test whether an error is +// due to corruption. Corrupted DB can be recovered with Recover function. // -// The returned DB instance is goroutine-safe. +// The returned DB instance is safe for concurrent use. // The DB must be closed after use, by calling Close method. func OpenFile(path string, o *opt.Options) (db *DB, err error) { stor, err := storage.OpenFile(path, o.GetReadOnly()) @@ -229,7 +227,7 @@ func OpenFile(path string, o *opt.Options) (db *DB, err error) { // The DB must already exist or it will returns an error. // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. // -// The returned DB instance is goroutine-safe. +// The returned DB instance is safe for concurrent use. // The DB must be closed after use, by calling Close method. func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) { s, err := newSession(stor, o) @@ -255,10 +253,10 @@ func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) { // The DB must already exist or it will returns an error. // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. // -// RecoverFile uses standard file-system backed storage implementation as desribed +// RecoverFile uses standard file-system backed storage implementation as described // in the leveldb/storage package. // -// The returned DB instance is goroutine-safe. +// The returned DB instance is safe for concurrent use. // The DB must be closed after use, by calling Close method. func RecoverFile(path string, o *opt.Options) (db *DB, err error) { stor, err := storage.OpenFile(path, false) @@ -504,10 +502,11 @@ func (db *DB) recoverJournal() error { checksum = db.s.o.GetStrict(opt.StrictJournalChecksum) writeBuffer = db.s.o.GetWriteBuffer() - jr *journal.Reader - mdb = memdb.New(db.s.icmp, writeBuffer) - buf = &util.Buffer{} - batch = &Batch{} + jr *journal.Reader + mdb = memdb.New(db.s.icmp, writeBuffer) + buf = &util.Buffer{} + batchSeq uint64 + batchLen int ) for _, fd := range fds { @@ -526,7 +525,7 @@ func (db *DB) recoverJournal() error { } // Flush memdb and remove obsolete journal file. - if !ofd.Nil() { + if !ofd.Zero() { if mdb.Len() > 0 { if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { fr.Close() @@ -569,7 +568,8 @@ func (db *DB) recoverJournal() error { fr.Close() return errors.SetFd(err, fd) } - if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil { + batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb) + if err != nil { if !strict && errors.IsCorrupted(err) { db.s.logf("journal error: %v (skipped)", err) // We won't apply sequence number as it might be corrupted. @@ -581,7 +581,7 @@ func (db *DB) recoverJournal() error { } // Save sequence number. - db.seq = batch.seq + uint64(batch.Len()) + db.seq = batchSeq + uint64(batchLen) // Flush it if large enough. if mdb.Size() >= writeBuffer { @@ -624,7 +624,7 @@ func (db *DB) recoverJournal() error { } // Remove the last obsolete journal file. - if !ofd.Nil() { + if !ofd.Zero() { db.s.stor.Remove(ofd) } @@ -661,9 +661,10 @@ func (db *DB) recoverJournalRO() error { db.logf("journal@recovery RO·Mode F·%d", len(fds)) var ( - jr *journal.Reader - buf = &util.Buffer{} - batch = &Batch{} + jr *journal.Reader + buf = &util.Buffer{} + batchSeq uint64 + batchLen int ) for _, fd := range fds { @@ -703,7 +704,8 @@ func (db *DB) recoverJournalRO() error { fr.Close() return errors.SetFd(err, fd) } - if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil { + batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb) + if err != nil { if !strict && errors.IsCorrupted(err) { db.s.logf("journal error: %v (skipped)", err) // We won't apply sequence number as it might be corrupted. @@ -715,7 +717,7 @@ func (db *DB) recoverJournalRO() error { } // Save sequence number. - db.seq = batch.seq + uint64(batch.Len()) + db.seq = batchSeq + uint64(batchLen) } fr.Close() @@ -856,7 +858,7 @@ func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) { // NewIterator returns an iterator for the latest snapshot of the // underlying DB. -// The returned iterator is not goroutine-safe, but it is safe to use +// The returned iterator is not safe for concurrent use, but it is safe to use // multiple iterators concurrently, with each in a dedicated goroutine. // It is also safe to use an iterator concurrently with modifying its // underlying DB. The resultant key/value pairs are guaranteed to be @@ -1062,6 +1064,8 @@ func (db *DB) Close() error { if db.journal != nil { db.journal.Close() db.journalWriter.Close() + db.journal = nil + db.journalWriter = nil } if db.writeDelayN > 0 { @@ -1077,15 +1081,11 @@ func (db *DB) Close() error { if err1 := db.closer.Close(); err == nil { err = err1 } + db.closer = nil } - // NIL'ing pointers. - db.s = nil - db.mem = nil - db.frozenMem = nil - db.journal = nil - db.journalWriter = nil - db.closer = nil + // Clear memdbs. + db.clearMems() return err } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go index 659f00dc6..2d0ad0753 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go @@ -96,7 +96,7 @@ noerr: default: goto haserr } - case _, _ = <-db.closeC: + case <-db.closeC: return } } @@ -113,7 +113,7 @@ haserr: goto hasperr default: } - case _, _ = <-db.closeC: + case <-db.closeC: return } } @@ -126,7 +126,7 @@ hasperr: case db.writeLockC <- struct{}{}: // Hold write lock, so that write won't pass-through. db.compWriteLocking = true - case _, _ = <-db.closeC: + case <-db.closeC: if db.compWriteLocking { // We should release the lock or Close will hang. <-db.writeLockC @@ -195,7 +195,7 @@ func (db *DB) compactionTransact(name string, t compactionTransactInterface) { db.logf("%s exiting (persistent error %q)", name, perr) db.compactionExitTransact() } - case _, _ = <-db.closeC: + case <-db.closeC: db.logf("%s exiting", name) db.compactionExitTransact() } @@ -224,7 +224,7 @@ func (db *DB) compactionTransact(name string, t compactionTransactInterface) { } select { case <-backoffT.C: - case _, _ = <-db.closeC: + case <-db.closeC: db.logf("%s exiting", name) db.compactionExitTransact() } @@ -288,7 +288,7 @@ func (db *DB) memCompaction() { case <-db.compPerErrC: close(resumeC) resumeC = nil - case _, _ = <-db.closeC: + case <-db.closeC: return } @@ -337,7 +337,7 @@ func (db *DB) memCompaction() { select { case <-resumeC: close(resumeC) - case _, _ = <-db.closeC: + case <-db.closeC: return } } @@ -378,7 +378,7 @@ func (b *tableCompactionBuilder) appendKV(key, value []byte) error { select { case ch := <-b.db.tcompPauseC: b.db.pauseCompaction(ch) - case _, _ = <-b.db.closeC: + case <-b.db.closeC: b.db.compactionExitTransact() default: } @@ -643,7 +643,7 @@ func (db *DB) tableNeedCompaction() bool { func (db *DB) pauseCompaction(ch chan<- struct{}) { select { case ch <- struct{}{}: - case _, _ = <-db.closeC: + case <-db.closeC: db.compactionExitTransact() } } @@ -697,14 +697,14 @@ func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) { case compC <- cAuto{ch}: case err = <-db.compErrC: return - case _, _ = <-db.closeC: + case <-db.closeC: return ErrClosed } // Wait cmd. select { case err = <-ch: case err = <-db.compErrC: - case _, _ = <-db.closeC: + case <-db.closeC: return ErrClosed } return err @@ -719,14 +719,14 @@ func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (e case compC <- cRange{level, min, max, ch}: case err := <-db.compErrC: return err - case _, _ = <-db.closeC: + case <-db.closeC: return ErrClosed } // Wait cmd. select { case err = <-ch: case err = <-db.compErrC: - case _, _ = <-db.closeC: + case <-db.closeC: return ErrClosed } return err @@ -758,7 +758,7 @@ func (db *DB) mCompaction() { default: panic("leveldb: unknown command") } - case _, _ = <-db.closeC: + case <-db.closeC: return } } @@ -791,7 +791,7 @@ func (db *DB) tCompaction() { case ch := <-db.tcompPauseC: db.pauseCompaction(ch) continue - case _, _ = <-db.closeC: + case <-db.closeC: return default: } @@ -806,7 +806,7 @@ func (db *DB) tCompaction() { case ch := <-db.tcompPauseC: db.pauseCompaction(ch) continue - case _, _ = <-db.closeC: + case <-db.closeC: return } } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go index 977f65ba5..2c69d2e53 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go @@ -59,7 +59,7 @@ func (db *DB) releaseSnapshot(se *snapshotElement) { } } -// Gets minimum sequence that not being snapshoted. +// Gets minimum sequence that not being snapshotted. func (db *DB) minSeq() uint64 { db.snapsMu.Lock() defer db.snapsMu.Unlock() @@ -131,7 +131,7 @@ func (snap *Snapshot) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) } // NewIterator returns an iterator for the snapshot of the underlying DB. -// The returned iterator is not goroutine-safe, but it is safe to use +// The returned iterator is not safe for concurrent use, but it is safe to use // multiple iterators concurrently, with each in a dedicated goroutine. // It is also safe to use an iterator concurrently with modifying its // underlying DB. The resultant key/value pairs are guaranteed to be diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go index 40f454da1..85b02d24b 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go @@ -67,12 +67,11 @@ func (db *DB) sampleSeek(ikey internalKey) { } func (db *DB) mpoolPut(mem *memdb.DB) { - defer func() { - recover() - }() - select { - case db.memPool <- mem: - default: + if !db.isClosed() { + select { + case db.memPool <- mem: + default: + } } } @@ -100,7 +99,13 @@ func (db *DB) mpoolDrain() { case <-db.memPool: default: } - case _, _ = <-db.closeC: + case <-db.closeC: + ticker.Stop() + // Make sure the pool is drained. + select { + case <-db.memPool: + case <-time.After(time.Second): + } close(db.memPool) return } @@ -148,24 +153,26 @@ func (db *DB) newMem(n int) (mem *memDB, err error) { func (db *DB) getMems() (e, f *memDB) { db.memMu.RLock() defer db.memMu.RUnlock() - if db.mem == nil { + if db.mem != nil { + db.mem.incref() + } else if !db.isClosed() { panic("nil effective mem") } - db.mem.incref() if db.frozenMem != nil { db.frozenMem.incref() } return db.mem, db.frozenMem } -// Get frozen memdb. +// Get effective memdb. func (db *DB) getEffectiveMem() *memDB { db.memMu.RLock() defer db.memMu.RUnlock() - if db.mem == nil { + if db.mem != nil { + db.mem.incref() + } else if !db.isClosed() { panic("nil effective mem") } - db.mem.incref() return db.mem } @@ -200,6 +207,14 @@ func (db *DB) dropFrozenMem() { db.memMu.Unlock() } +// Clear mems ptr; used by DB.Close(). +func (db *DB) clearMems() { + db.memMu.Lock() + db.mem = nil + db.frozenMem = nil + db.memMu.Unlock() +} + // Set closed flag; return true if not already closed. func (db *DB) setClosed() bool { return atomic.CompareAndSwapUint32(&db.closed, 0, 1) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_transaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_transaction.go index fca88037b..b8f7e7d21 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_transaction.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_transaction.go @@ -59,8 +59,8 @@ func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) { } // NewIterator returns an iterator for the latest snapshot of the transaction. -// The returned iterator is not goroutine-safe, but it is safe to use multiple -// iterators concurrently, with each in a dedicated goroutine. +// The returned iterator is not safe for concurrent use, but it is safe to use +// multiple iterators concurrently, with each in a dedicated goroutine. // It is also safe to use an iterator concurrently while writes to the // transaction. The resultant key/value pairs are guaranteed to be consistent. // @@ -167,8 +167,8 @@ func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error { if tr.closed { return errTransactionDone } - return b.decodeRec(func(i int, kt keyType, key, value []byte) error { - return tr.put(kt, key, value) + return b.replayInternal(func(i int, kt keyType, k, v []byte) error { + return tr.put(kt, k, v) }) } @@ -179,7 +179,8 @@ func (tr *Transaction) setDone() { <-tr.db.writeLockC } -// Commit commits the transaction. +// Commit commits the transaction. If error is not nil, then the transaction is +// not committed, it can then either be retried or discarded. // // Other methods should not be called after transaction has been committed. func (tr *Transaction) Commit() error { @@ -192,24 +193,27 @@ func (tr *Transaction) Commit() error { if tr.closed { return errTransactionDone } - defer tr.setDone() if err := tr.flush(); err != nil { - tr.discard() + // Return error, lets user decide either to retry or discard + // transaction. return err } if len(tr.tables) != 0 { // Committing transaction. tr.rec.setSeqNum(tr.seq) tr.db.compCommitLk.Lock() - defer tr.db.compCommitLk.Unlock() + tr.stats.startTimer() + var cerr error for retry := 0; retry < 3; retry++ { - if err := tr.db.s.commit(&tr.rec); err != nil { - tr.db.logf("transaction@commit error R·%d %q", retry, err) + cerr = tr.db.s.commit(&tr.rec) + if cerr != nil { + tr.db.logf("transaction@commit error R·%d %q", retry, cerr) select { case <-time.After(time.Second): - case _, _ = <-tr.db.closeC: + case <-tr.db.closeC: tr.db.logf("transaction@commit exiting") - return err + tr.db.compCommitLk.Unlock() + return cerr } } else { // Success. Set db.seq. @@ -217,9 +221,26 @@ func (tr *Transaction) Commit() error { break } } + tr.stats.stopTimer() + if cerr != nil { + // Return error, lets user decide either to retry or discard + // transaction. + return cerr + } + + // Update compaction stats. This is safe as long as we hold compCommitLk. + tr.db.compStats.addStat(0, &tr.stats) + // Trigger table auto-compaction. tr.db.compTrigger(tr.db.tcompCmdC) + tr.db.compCommitLk.Unlock() + + // Additionally, wait compaction when certain threshold reached. + // Ignore error, returns error only if transaction can't be committed. + tr.db.waitCompaction() } + // Only mark as done if transaction committed successfully. + tr.setDone() return nil } @@ -245,10 +266,20 @@ func (tr *Transaction) Discard() { tr.lk.Unlock() } +func (db *DB) waitCompaction() error { + if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() { + return db.compTriggerWait(db.tcompCmdC) + } + return nil +} + // OpenTransaction opens an atomic DB transaction. Only one transaction can be -// opened at a time. Write will be blocked until the transaction is committed or -// discarded. -// The returned transaction handle is goroutine-safe. +// opened at a time. Subsequent call to Write and OpenTransaction will be blocked +// until in-flight transaction is committed or discarded. +// The returned transaction handle is safe for concurrent use. +// +// Transaction is expensive and can overwhelm compaction, especially if +// transaction size is small. Use with caution. // // The transaction must be closed once done, either by committing or discarding // the transaction. @@ -263,7 +294,7 @@ func (db *DB) OpenTransaction() (*Transaction, error) { case db.writeLockC <- struct{}{}: case err := <-db.compPerErrC: return nil, err - case _, _ = <-db.closeC: + case <-db.closeC: return nil, ErrClosed } @@ -278,6 +309,11 @@ func (db *DB) OpenTransaction() (*Transaction, error) { } } + // Wait compaction when certain threshold reached. + if err := db.waitCompaction(); err != nil { + return nil, err + } + tr := &Transaction{ db: db, seq: db.seq, diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_util.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_util.go index 7fd386ca4..7ecd960d2 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_util.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_util.go @@ -62,7 +62,7 @@ func (db *DB) checkAndCleanFiles() error { case storage.TypeManifest: keep = fd.Num >= db.s.manifestFd.Num case storage.TypeJournal: - if !db.frozenJournalFd.Nil() { + if !db.frozenJournalFd.Zero() { keep = fd.Num >= db.frozenJournalFd.Num } else { keep = fd.Num >= db.journalFd.Num diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go index 5576761fe..cc428b695 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go @@ -14,37 +14,23 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -func (db *DB) writeJournal(b *Batch) error { - w, err := db.journal.Next() +func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error { + wr, err := db.journal.Next() if err != nil { return err } - if _, err := w.Write(b.encode()); err != nil { + if err := writeBatchesWithHeader(wr, batches, seq); err != nil { return err } if err := db.journal.Flush(); err != nil { return err } - if b.sync { + if sync { return db.journalWriter.Sync() } return nil } -func (db *DB) jWriter() { - defer db.closeW.Done() - for { - select { - case b := <-db.journalC: - if b != nil { - db.journalAckC <- db.writeJournal(b) - } - case _, _ = <-db.closeC: - return - } - } -} - func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) { // Wait for pending memdb compaction. err = db.compTriggerWait(db.mcompCmdC) @@ -69,24 +55,29 @@ func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) { func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { delayed := false + slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger() + pauseTrigger := db.s.o.GetWriteL0PauseTrigger() flush := func() (retry bool) { - v := db.s.version() - defer v.release() mdb = db.getEffectiveMem() + if mdb == nil { + err = ErrClosed + return false + } defer func() { if retry { mdb.decref() mdb = nil } }() + tLen := db.s.tLen(0) mdbFree = mdb.Free() switch { - case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed: + case tLen >= slowdownTrigger && !delayed: delayed = true time.Sleep(time.Millisecond) case mdbFree >= n: return false - case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger(): + case tLen >= pauseTrigger: delayed = true err = db.compTriggerWait(db.tcompCmdC) if err != nil { @@ -123,159 +114,250 @@ func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { return } -// Write apply the given batch to the DB. The batch will be applied -// sequentially. -// -// It is safe to modify the contents of the arguments after Write returns. -func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { - err = db.ok() - if err != nil || b == nil || b.Len() == 0 { - return +type writeMerge struct { + sync bool + batch *Batch + keyType keyType + key, value []byte +} + +func (db *DB) unlockWrite(overflow bool, merged int, err error) { + for i := 0; i < merged; i++ { + db.writeAckC <- err + } + if overflow { + // Pass lock to the next write (that failed to merge). + db.writeMergedC <- false + } else { + // Release lock. + <-db.writeLockC } +} - b.init(wo.GetSync() && !db.s.o.GetNoSync()) +// ourBatch if defined should equal with batch. +func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error { + // Try to flush memdb. This method would also trying to throttle writes + // if it is too fast and compaction cannot catch-up. + mdb, mdbFree, err := db.flush(batch.internalLen) + if err != nil { + db.unlockWrite(false, 0, err) + return err + } + defer mdb.decref() - if b.size() > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() { - // Writes using transaction. - tr, err1 := db.OpenTransaction() - if err1 != nil { - return err1 + var ( + overflow bool + merged int + batches = []*Batch{batch} + ) + + if merge { + // Merge limit. + var mergeLimit int + if batch.internalLen > 128<<10 { + mergeLimit = (1 << 20) - batch.internalLen + } else { + mergeLimit = 128 << 10 } - if err1 := tr.Write(b, wo); err1 != nil { - tr.Discard() - return err1 + mergeCap := mdbFree - batch.internalLen + if mergeLimit > mergeCap { + mergeLimit = mergeCap } - return tr.Commit() - } - // The write happen synchronously. - select { - case db.writeC <- b: - if <-db.writeMergedC { - return <-db.writeAckC + merge: + for mergeLimit > 0 { + select { + case incoming := <-db.writeMergeC: + if incoming.batch != nil { + // Merge batch. + if incoming.batch.internalLen > mergeLimit { + overflow = true + break merge + } + batches = append(batches, incoming.batch) + mergeLimit -= incoming.batch.internalLen + } else { + // Merge put. + internalLen := len(incoming.key) + len(incoming.value) + 8 + if internalLen > mergeLimit { + overflow = true + break merge + } + if ourBatch == nil { + ourBatch = db.batchPool.Get().(*Batch) + ourBatch.Reset() + batches = append(batches, ourBatch) + } + // We can use same batch since concurrent write doesn't + // guarantee write order. + ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value) + mergeLimit -= internalLen + } + sync = sync || incoming.sync + merged++ + db.writeMergedC <- true + + default: + break merge + } } - // Continue, the write lock already acquired by previous writer - // and handed out to us. - case db.writeLockC <- struct{}{}: - case err = <-db.compPerErrC: - return - case _, _ = <-db.closeC: - return ErrClosed } - merged := 0 - danglingMerge := false - defer func() { - for i := 0; i < merged; i++ { - db.writeAckC <- err - } - if danglingMerge { - // Only one dangling merge at most, so this is safe. - db.writeMergedC <- false - } else { - <-db.writeLockC + // Seq number. + seq := db.seq + 1 + + // Write journal. + if err := db.writeJournal(batches, seq, sync); err != nil { + db.unlockWrite(overflow, merged, err) + return err + } + + // Put batches. + for _, batch := range batches { + if err := batch.putMem(seq, mdb.DB); err != nil { + panic(err) } - }() + seq += uint64(batch.Len()) + } - mdb, mdbFree, err := db.flush(b.size()) - if err != nil { - return + // Incr seq number. + db.addSeq(uint64(batchesLen(batches))) + + // Rotate memdb if it's reach the threshold. + if batch.internalLen >= mdbFree { + db.rotateMem(0, false) } - defer mdb.decref() - // Calculate maximum size of the batch. - m := 1 << 20 - if x := b.size(); x <= 128<<10 { - m = x + (128 << 10) + db.unlockWrite(overflow, merged, nil) + return nil +} + +// Write apply the given batch to the DB. The batch records will be applied +// sequentially. Write might be used concurrently, when used concurrently and +// batch is small enough, write will try to merge the batches. Set NoWriteMerge +// option to true to disable write merge. +// +// It is safe to modify the contents of the arguments after Write returns but +// not before. Write will not modify content of the batch. +func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error { + if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 { + return err } - m = minInt(m, mdbFree) - // Merge with other batch. -drain: - for b.size() < m && !b.sync { - select { - case nb := <-db.writeC: - if b.size()+nb.size() <= m { - b.append(nb) - db.writeMergedC <- true - merged++ - } else { - danglingMerge = true - break drain - } - default: - break drain + // If the batch size is larger than write buffer, it may justified to write + // using transaction instead. Using transaction the batch will be written + // into tables directly, skipping the journaling. + if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() { + tr, err := db.OpenTransaction() + if err != nil { + return err + } + if err := tr.Write(batch, wo); err != nil { + tr.Discard() + return err } + return tr.Commit() } - // Set batch first seq number relative from last seq. - b.seq = db.seq + 1 + merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge() + sync := wo.GetSync() && !db.s.o.GetNoSync() - // Write journal concurrently if it is large enough. - if b.size() >= (128 << 10) { - // Push the write batch to the journal writer + // Acquire write lock. + if merge { select { - case db.journalC <- b: - // Write into memdb - if berr := b.memReplay(mdb.DB); berr != nil { - panic(berr) + case db.writeMergeC <- writeMerge{sync: sync, batch: batch}: + if <-db.writeMergedC { + // Write is merged. + return <-db.writeAckC } - case err = <-db.compPerErrC: - return - case _, _ = <-db.closeC: - err = ErrClosed - return + // Write is not merged, the write lock is handed to us. Continue. + case db.writeLockC <- struct{}{}: + // Write lock acquired. + case err := <-db.compPerErrC: + // Compaction error. + return err + case <-db.closeC: + // Closed + return ErrClosed } - // Wait for journal writer + } else { select { - case err = <-db.journalAckC: - if err != nil { - // Revert memdb if error detected - if berr := b.revertMemReplay(mdb.DB); berr != nil { - panic(berr) - } - return + case db.writeLockC <- struct{}{}: + // Write lock acquired. + case err := <-db.compPerErrC: + // Compaction error. + return err + case <-db.closeC: + // Closed + return ErrClosed + } + } + + return db.writeLocked(batch, nil, merge, sync) +} + +func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error { + if err := db.ok(); err != nil { + return err + } + + merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge() + sync := wo.GetSync() && !db.s.o.GetNoSync() + + // Acquire write lock. + if merge { + select { + case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}: + if <-db.writeMergedC { + // Write is merged. + return <-db.writeAckC } - case _, _ = <-db.closeC: - err = ErrClosed - return + // Write is not merged, the write lock is handed to us. Continue. + case db.writeLockC <- struct{}{}: + // Write lock acquired. + case err := <-db.compPerErrC: + // Compaction error. + return err + case <-db.closeC: + // Closed + return ErrClosed } } else { - err = db.writeJournal(b) - if err != nil { - return - } - if berr := b.memReplay(mdb.DB); berr != nil { - panic(berr) + select { + case db.writeLockC <- struct{}{}: + // Write lock acquired. + case err := <-db.compPerErrC: + // Compaction error. + return err + case <-db.closeC: + // Closed + return ErrClosed } } - // Set last seq number. - db.addSeq(uint64(b.Len())) - - if b.size() >= mdbFree { - db.rotateMem(0, false) - } - return + batch := db.batchPool.Get().(*Batch) + batch.Reset() + batch.appendRec(kt, key, value) + return db.writeLocked(batch, batch, merge, sync) } // Put sets the value for the given key. It overwrites any previous value -// for that key; a DB is not a multi-map. +// for that key; a DB is not a multi-map. Write merge also applies for Put, see +// Write. // -// It is safe to modify the contents of the arguments after Put returns. +// It is safe to modify the contents of the arguments after Put returns but not +// before. func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error { - b := new(Batch) - b.Put(key, value) - return db.Write(b, wo) + return db.putRec(keyTypeVal, key, value, wo) } -// Delete deletes the value for the given key. +// Delete deletes the value for the given key. Delete will not returns error if +// key doesn't exist. Write merge also applies for Delete, see Write. // -// It is safe to modify the contents of the arguments after Delete returns. +// It is safe to modify the contents of the arguments after Delete returns but +// not before. func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error { - b := new(Batch) - b.Delete(key) - return db.Write(b, wo) + return db.putRec(keyTypeDel, key, nil, wo) } func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool { @@ -304,12 +386,15 @@ func (db *DB) CompactRange(r util.Range) error { case db.writeLockC <- struct{}{}: case err := <-db.compPerErrC: return err - case _, _ = <-db.closeC: + case <-db.closeC: return ErrClosed } // Check for overlaps in memdb. mdb := db.getEffectiveMem() + if mdb == nil { + return ErrClosed + } defer mdb.decref() if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) { // Memdb compaction. @@ -341,7 +426,7 @@ func (db *DB) SetReadOnly() error { db.compWriteLocking = true case err := <-db.compPerErrC: return err - case _, _ = <-db.closeC: + case <-db.closeC: return ErrClosed } @@ -350,7 +435,7 @@ func (db *DB) SetReadOnly() error { case db.compErrSetC <- ErrReadOnly: case perr := <-db.compPerErrC: return perr - case _, _ = <-db.closeC: + case <-db.closeC: return ErrClosed } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors.go index c8bd66a5a..de2649812 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors.go @@ -10,6 +10,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/errors" ) +// Common errors. var ( ErrNotFound = errors.ErrNotFound ErrReadOnly = errors.New("leveldb: read-only mode") diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors/errors.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors/errors.go index 9a0f6e2c1..8d6146b6f 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors/errors.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors/errors.go @@ -15,6 +15,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) +// Common errors. var ( ErrNotFound = New("leveldb: not found") ErrReleased = util.ErrReleased @@ -34,11 +35,10 @@ type ErrCorrupted struct { } func (e *ErrCorrupted) Error() string { - if !e.Fd.Nil() { + if !e.Fd.Zero() { return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd) - } else { - return e.Err.Error() } + return e.Err.Error() } // NewErrCorrupted creates new ErrCorrupted error. diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/iter.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/iter.go index c2522860b..3b5553274 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/iter.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/iter.go @@ -21,13 +21,13 @@ var ( // IteratorSeeker is the interface that wraps the 'seeks method'. type IteratorSeeker interface { // First moves the iterator to the first key/value pair. If the iterator - // only contains one key/value pair then First and Last whould moves + // only contains one key/value pair then First and Last would moves // to the same key/value pair. // It returns whether such pair exist. First() bool // Last moves the iterator to the last key/value pair. If the iterator - // only contains one key/value pair then First and Last whould moves + // only contains one key/value pair then First and Last would moves // to the same key/value pair. // It returns whether such pair exist. Last() bool @@ -48,7 +48,7 @@ type IteratorSeeker interface { Prev() bool } -// CommonIterator is the interface that wraps common interator methods. +// CommonIterator is the interface that wraps common iterator methods. type CommonIterator interface { IteratorSeeker @@ -71,14 +71,15 @@ type CommonIterator interface { // Iterator iterates over a DB's key/value pairs in key order. // -// When encouter an error any 'seeks method' will return false and will +// When encounter an error any 'seeks method' will return false and will // yield no key/value pairs. The error can be queried by calling the Error // method. Calling Release is still necessary. // // An iterator must be released after use, but it is not necessary to read // an iterator until exhaustion. -// Also, an iterator is not necessarily goroutine-safe, but it is safe to use -// multiple iterators concurrently, with each in a dedicated goroutine. +// Also, an iterator is not necessarily safe for concurrent use, but it is +// safe to use multiple iterators concurrently, with each in a dedicated +// goroutine. type Iterator interface { CommonIterator @@ -98,7 +99,7 @@ type Iterator interface { // // ErrorCallbackSetter implemented by indexed and merged iterator. type ErrorCallbackSetter interface { - // SetErrorCallback allows set an error callback of the coresponding + // SetErrorCallback allows set an error callback of the corresponding // iterator. Use nil to clear the callback. SetErrorCallback(f func(err error)) } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go index 891098bb7..d094c3d0f 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go @@ -180,34 +180,37 @@ func (r *Reader) nextChunk(first bool) error { checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4]) length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6]) chunkType := r.buf[r.j+6] - + unprocBlock := r.n - r.j if checksum == 0 && length == 0 && chunkType == 0 { // Drop entire block. - m := r.n - r.j r.i = r.n r.j = r.n - return r.corrupt(m, "zero header", false) - } else { - m := r.n - r.j - r.i = r.j + headerSize - r.j = r.j + headerSize + int(length) - if r.j > r.n { - // Drop entire block. - r.i = r.n - r.j = r.n - return r.corrupt(m, "chunk length overflows block", false) - } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() { - // Drop entire block. - r.i = r.n - r.j = r.n - return r.corrupt(m, "checksum mismatch", false) - } + return r.corrupt(unprocBlock, "zero header", false) + } + if chunkType < fullChunkType || chunkType > lastChunkType { + // Drop entire block. + r.i = r.n + r.j = r.n + return r.corrupt(unprocBlock, fmt.Sprintf("invalid chunk type %#x", chunkType), false) + } + r.i = r.j + headerSize + r.j = r.j + headerSize + int(length) + if r.j > r.n { + // Drop entire block. + r.i = r.n + r.j = r.n + return r.corrupt(unprocBlock, "chunk length overflows block", false) + } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() { + // Drop entire block. + r.i = r.n + r.j = r.n + return r.corrupt(unprocBlock, "checksum mismatch", false) } if first && chunkType != fullChunkType && chunkType != firstChunkType { - m := r.j - r.i + chunkLength := (r.j - r.i) + headerSize r.i = r.j // Report the error, but skip it. - return r.corrupt(m+headerSize, "orphan chunk", true) + return r.corrupt(chunkLength, "orphan chunk", true) } r.last = chunkType == fullChunkType || chunkType == lastChunkType return nil diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/key.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/key.go index d0b80aaf9..ad8f51ec8 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/key.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/key.go @@ -37,14 +37,14 @@ func (kt keyType) String() string { case keyTypeVal: return "v" } - return "x" + return fmt.Sprintf("<invalid:%#x>", uint(kt)) } // Value types encoded as the last component of internal keys. // Don't modify; this value are saved to disk. const ( - keyTypeDel keyType = iota - keyTypeVal + keyTypeDel = keyType(0) + keyTypeVal = keyType(1) ) // keyTypeSeek defines the keyType that should be passed when constructing an @@ -79,11 +79,7 @@ func makeInternalKey(dst, ukey []byte, seq uint64, kt keyType) internalKey { panic("leveldb: invalid type") } - if n := len(ukey) + 8; cap(dst) < n { - dst = make([]byte, n) - } else { - dst = dst[:n] - } + dst = ensureBuffer(dst, len(ukey)+8) copy(dst, ukey) binary.LittleEndian.PutUint64(dst[len(ukey):], (seq<<8)|uint64(kt)) return internalKey(dst) @@ -143,5 +139,5 @@ func (ik internalKey) String() string { if ukey, seq, kt, err := parseInternalKey(ik); err == nil { return fmt.Sprintf("%s,%s%d", shorten(string(ukey)), kt, seq) } - return "<invalid>" + return fmt.Sprintf("<invalid:%#x>", []byte(ik)) } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go index 1395bd928..18a19ed42 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go @@ -17,6 +17,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) +// Common errors. var ( ErrNotFound = errors.ErrNotFound ErrIterReleased = errors.New("leveldb/memdb: iterator released") @@ -385,7 +386,7 @@ func (p *DB) Find(key []byte) (rkey, value []byte, err error) { } // NewIterator returns an iterator of the DB. -// The returned iterator is not goroutine-safe, but it is safe to use +// The returned iterator is not safe for concurrent use, but it is safe to use // multiple iterators concurrently, with each in a dedicated goroutine. // It is also safe to use an iterator concurrently with modifying its // underlying DB. However, the resultant key/value pairs are not guaranteed @@ -411,7 +412,7 @@ func (p *DB) Capacity() int { } // Size returns sum of keys and values length. Note that deleted -// key/value will not be accouted for, but it will still consume +// key/value will not be accounted for, but it will still consume // the buffer, since the buffer is append only. func (p *DB) Size() int { p.mu.RLock() @@ -453,11 +454,14 @@ func (p *DB) Reset() { p.mu.Unlock() } -// New creates a new initalized in-memory key/value DB. The capacity +// New creates a new initialized in-memory key/value DB. The capacity // is the initial key/value buffer capacity. The capacity is advisory, // not enforced. // -// The returned DB instance is goroutine-safe. +// This DB is append-only, deleting an entry would remove entry node but not +// reclaim KV buffer. +// +// The returned DB instance is safe for concurrent use. func New(cmp comparer.BasicComparer, capacity int) *DB { p := &DB{ cmp: cmp, diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go index 3d2bf1c02..44e7d9adc 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go @@ -312,6 +312,11 @@ type Options struct { // The default is false. NoSync bool + // NoWriteMerge allows disabling write merge. + // + // The default is false. + NoWriteMerge bool + // OpenFilesCacher provides cache algorithm for open files caching. // Specify NoCacher to disable caching algorithm. // @@ -543,6 +548,13 @@ func (o *Options) GetNoSync() bool { return o.NoSync } +func (o *Options) GetNoWriteMerge() bool { + if o == nil { + return false + } + return o.NoWriteMerge +} + func (o *Options) GetOpenFilesCacher() Cacher { if o == nil || o.OpenFilesCacher == nil { return DefaultOpenFilesCacher @@ -629,6 +641,11 @@ func (ro *ReadOptions) GetStrict(strict Strict) bool { // WriteOptions holds the optional parameters for 'write operation'. The // 'write operation' includes Write, Put and Delete. type WriteOptions struct { + // NoWriteMerge allows disabling write merge. + // + // The default is false. + NoWriteMerge bool + // Sync is whether to sync underlying writes from the OS buffer cache // through to actual disk, if applicable. Setting Sync can result in // slower writes. @@ -644,6 +661,13 @@ type WriteOptions struct { Sync bool } +func (wo *WriteOptions) GetNoWriteMerge() bool { + if wo == nil { + return false + } + return wo.NoWriteMerge +} + func (wo *WriteOptions) GetSync() bool { if wo == nil { return false diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go index b0d3fef1d..f3e747701 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go @@ -18,7 +18,8 @@ import ( "github.com/syndtr/goleveldb/leveldb/storage" ) -// ErrManifestCorrupted records manifest corruption. +// ErrManifestCorrupted records manifest corruption. This error will be +// wrapped with errors.ErrCorrupted. type ErrManifestCorrupted struct { Field string Reason string @@ -42,7 +43,7 @@ type session struct { stSeqNum uint64 // last mem compacted seq; need external synchronization stor storage.Storage - storLock storage.Lock + storLock storage.Locker o *cachedOptions icmp *iComparer tops *tOps @@ -87,12 +88,12 @@ func (s *session) close() { } s.manifest = nil s.manifestWriter = nil - s.stVersion = nil + s.setVersion(&version{s: s, closing: true}) } // Release session lock. func (s *session) release() { - s.storLock.Release() + s.storLock.Unlock() } // Create a new database session; need external synchronization. diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go index 674182fb2..34ad61798 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go @@ -50,6 +50,12 @@ func (s *session) version() *version { return s.stVersion } +func (s *session) tLen(level int) int { + s.vmu.Lock() + defer s.vmu.Unlock() + return s.stVersion.tLen(level) +} + // Set current version to v. func (s *session) setVersion(v *version) { s.vmu.Lock() @@ -197,7 +203,7 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { if s.manifestWriter != nil { s.manifestWriter.Close() } - if !s.manifestFd.Nil() { + if !s.manifestFd.Zero() { s.stor.Remove(s.manifestFd) } s.manifestFd = fd diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go index cbe1dc103..e53434cab 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go @@ -32,7 +32,7 @@ type fileStorageLock struct { fs *fileStorage } -func (lock *fileStorageLock) Release() { +func (lock *fileStorageLock) Unlock() { if lock.fs != nil { lock.fs.mu.Lock() defer lock.fs.mu.Unlock() @@ -116,7 +116,7 @@ func OpenFile(path string, readOnly bool) (Storage, error) { return fs, nil } -func (fs *fileStorage) Lock() (Lock, error) { +func (fs *fileStorage) Lock() (Locker, error) { fs.mu.Lock() defer fs.mu.Unlock() if fs.open < 0 { @@ -323,7 +323,7 @@ func (fs *fileStorage) GetMeta() (fd FileDesc, err error) { } } // Don't remove any files if there is no valid CURRENT file. - if fd.Nil() { + if fd.Zero() { if cerr != nil { err = cerr } else { diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go index 9b70e1513..9b0421f03 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go @@ -18,7 +18,7 @@ type memStorageLock struct { ms *memStorage } -func (lock *memStorageLock) Release() { +func (lock *memStorageLock) Unlock() { ms := lock.ms ms.mu.Lock() defer ms.mu.Unlock() @@ -43,7 +43,7 @@ func NewMemStorage() Storage { } } -func (ms *memStorage) Lock() (Lock, error) { +func (ms *memStorage) Lock() (Locker, error) { ms.mu.Lock() defer ms.mu.Unlock() if ms.slock != nil { @@ -69,7 +69,7 @@ func (ms *memStorage) SetMeta(fd FileDesc) error { func (ms *memStorage) GetMeta() (FileDesc, error) { ms.mu.Lock() defer ms.mu.Unlock() - if ms.meta.Nil() { + if ms.meta.Zero() { return FileDesc{}, os.ErrNotExist } return ms.meta, nil @@ -78,7 +78,7 @@ func (ms *memStorage) GetMeta() (FileDesc, error) { func (ms *memStorage) List(ft FileType) ([]FileDesc, error) { ms.mu.Lock() var fds []FileDesc - for x, _ := range ms.files { + for x := range ms.files { fd := unpackFile(x) if fd.Type&ft != 0 { fds = append(fds, fd) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/storage.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/storage.go index 9b30b6727..c16bce6b6 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/storage.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/storage.go @@ -11,12 +11,12 @@ import ( "errors" "fmt" "io" - - "github.com/syndtr/goleveldb/leveldb/util" ) +// FileType represent a file type. type FileType int +// File types. const ( TypeManifest FileType = 1 << iota TypeJournal @@ -40,6 +40,7 @@ func (t FileType) String() string { return fmt.Sprintf("<unknown:%d>", t) } +// Common error. var ( ErrInvalidFile = errors.New("leveldb/storage: invalid file for argument") ErrLocked = errors.New("leveldb/storage: already locked") @@ -55,11 +56,10 @@ type ErrCorrupted struct { } func (e *ErrCorrupted) Error() string { - if !e.Fd.Nil() { + if !e.Fd.Zero() { return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd) - } else { - return e.Err.Error() } + return e.Err.Error() } // Syncer is the interface that wraps basic Sync method. @@ -83,11 +83,12 @@ type Writer interface { Syncer } -type Lock interface { - util.Releaser +// Locker is the interface that wraps Unlock method. +type Locker interface { + Unlock() } -// FileDesc is a file descriptor. +// FileDesc is a 'file descriptor'. type FileDesc struct { Type FileType Num int64 @@ -108,12 +109,12 @@ func (fd FileDesc) String() string { } } -// Nil returns true if fd == (FileDesc{}). -func (fd FileDesc) Nil() bool { +// Zero returns true if fd == (FileDesc{}). +func (fd FileDesc) Zero() bool { return fd == (FileDesc{}) } -// FileDescOk returns true if fd is a valid file descriptor. +// FileDescOk returns true if fd is a valid 'file descriptor'. func FileDescOk(fd FileDesc) bool { switch fd.Type { case TypeManifest: @@ -126,43 +127,44 @@ func FileDescOk(fd FileDesc) bool { return fd.Num >= 0 } -// Storage is the storage. A storage instance must be goroutine-safe. +// Storage is the storage. A storage instance must be safe for concurrent use. type Storage interface { // Lock locks the storage. Any subsequent attempt to call Lock will fail // until the last lock released. - // After use the caller should call the Release method. - Lock() (Lock, error) + // Caller should call Unlock method after use. + Lock() (Locker, error) // Log logs a string. This is used for logging. // An implementation may write to a file, stdout or simply do nothing. Log(str string) - // SetMeta sets to point to the given fd, which then can be acquired using - // GetMeta method. - // SetMeta should be implemented in such way that changes should happened + // SetMeta store 'file descriptor' that can later be acquired using GetMeta + // method. The 'file descriptor' should point to a valid file. + // SetMeta should be implemented in such way that changes should happen // atomically. SetMeta(fd FileDesc) error - // GetManifest returns a manifest file. - // Returns os.ErrNotExist if meta doesn't point to any fd, or point to fd - // that doesn't exist. + // GetMeta returns 'file descriptor' stored in meta. The 'file descriptor' + // can be updated using SetMeta method. + // Returns os.ErrNotExist if meta doesn't store any 'file descriptor', or + // 'file descriptor' point to nonexistent file. GetMeta() (FileDesc, error) - // List returns fds that match the given file types. + // List returns file descriptors that match the given file types. // The file types may be OR'ed together. List(ft FileType) ([]FileDesc, error) - // Open opens file with the given fd read-only. + // Open opens file with the given 'file descriptor' read-only. // Returns os.ErrNotExist error if the file does not exist. // Returns ErrClosed if the underlying storage is closed. Open(fd FileDesc) (Reader, error) - // Create creates file with the given fd, truncate if already exist and - // opens write-only. + // Create creates file with the given 'file descriptor', truncate if already + // exist and opens write-only. // Returns ErrClosed if the underlying storage is closed. Create(fd FileDesc) (Writer, error) - // Remove removes file with the given fd. + // Remove removes file with the given 'file descriptor'. // Returns ErrClosed if the underlying storage is closed. Remove(fd FileDesc) error diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go index 310ba6c22..81d18a531 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go @@ -434,7 +434,7 @@ func (t *tOps) close() { t.bpool.Close() t.cache.Close() if t.bcache != nil { - t.bcache.Close() + t.bcache.CloseWeak() } } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go index ae61bece9..c5be420b3 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go @@ -26,12 +26,15 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) +// Reader errors. var ( ErrNotFound = errors.ErrNotFound ErrReaderReleased = errors.New("leveldb/table: reader released") ErrIterReleased = errors.New("leveldb/table: iterator released") ) +// ErrCorrupted describes error due to corruption. This error will be wrapped +// with errors.ErrCorrupted. type ErrCorrupted struct { Pos int64 Size int64 @@ -61,7 +64,7 @@ type block struct { func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) { index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool { offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) - offset += 1 // shared always zero, since this is a restart point + offset++ // shared always zero, since this is a restart point v1, n1 := binary.Uvarint(b.data[offset:]) // key length _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length m := offset + n1 + n2 @@ -356,7 +359,7 @@ func (i *blockIter) Prev() bool { i.value = nil offset := i.block.restartOffset(ri) if offset == i.offset { - ri -= 1 + ri-- if ri < 0 { i.dir = dirSOI return false @@ -783,8 +786,8 @@ func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChe // table. And a nil Range.Limit is treated as a key after all keys in // the table. // -// The returned iterator is not goroutine-safe and should be released -// when not used. +// The returned iterator is not safe for concurrent use and should be released +// after use. // // Also read Iterator documentation of the leveldb/iterator package. func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { @@ -826,18 +829,21 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo index := r.newBlockIter(indexBlock, nil, nil, true) defer index.Release() + if !index.Seek(key) { - err = index.Error() - if err == nil { + if err = index.Error(); err == nil { err = ErrNotFound } return } + dataBH, n := decodeBlockHandle(index.Value()) if n == 0 { r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle") - return + return nil, nil, r.err } + + // The filter should only used for exact match. if filtered && r.filter != nil { filterBlock, frel, ferr := r.getFilterBlock(true) if ferr == nil { @@ -847,30 +853,53 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo } frel.Release() } else if !errors.IsCorrupted(ferr) { - err = ferr - return + return nil, nil, ferr } } + data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache()) - defer data.Release() if !data.Seek(key) { - err = data.Error() - if err == nil { - err = ErrNotFound + data.Release() + if err = data.Error(); err != nil { + return + } + + // The nearest greater-than key is the first key of the next block. + if !index.Next() { + if err = index.Error(); err == nil { + err = ErrNotFound + } + return + } + + dataBH, n = decodeBlockHandle(index.Value()) + if n == 0 { + r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle") + return nil, nil, r.err + } + + data = r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache()) + if !data.Next() { + data.Release() + if err = data.Error(); err == nil { + err = ErrNotFound + } + return } - return } - // Don't use block buffer, no need to copy the buffer. + + // Key doesn't use block buffer, no need to copy the buffer. rkey = data.Key() if !noValue { if r.bpool == nil { value = data.Value() } else { - // Use block buffer, and since the buffer will be recycled, the buffer - // need to be copied. + // Value does use block buffer, and since the buffer will be + // recycled, it need to be copied. value = append([]byte{}, data.Value()...) } } + data.Release() return } @@ -888,7 +917,7 @@ func (r *Reader) Find(key []byte, filtered bool, ro *opt.ReadOptions) (rkey, val return r.find(key, filtered, ro, false) } -// Find finds key that is greater than or equal to the given key. +// FindKey finds key that is greater than or equal to the given key. // It returns ErrNotFound if the table doesn't contain such key. // If filtered is true then the nearest 'block' will be checked against // 'filter data' (if present) and will immediately return ErrNotFound if @@ -987,7 +1016,7 @@ func (r *Reader) Release() { // NewReader creates a new initialized table reader for the file. // The fi, cache and bpool is optional and can be nil. // -// The returned table reader instance is goroutine-safe. +// The returned table reader instance is safe for concurrent use. func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.NamespaceGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) { if f == nil { return nil, errors.New("leveldb/table: nil file") @@ -1039,9 +1068,8 @@ func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.Name if errors.IsCorrupted(err) { r.err = err return r, nil - } else { - return nil, err } + return nil, err } // Set data end. @@ -1086,9 +1114,8 @@ func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.Name if errors.IsCorrupted(err) { r.err = err return r, nil - } else { - return nil, err } + return nil, err } if r.filter != nil { r.filterBlock, err = r.readFilterBlock(r.filterBH) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/writer.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/writer.go index 274dee6da..b96b271d8 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/writer.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/writer.go @@ -349,7 +349,7 @@ func (w *Writer) Close() error { // NewWriter creates a new initialized table writer for the file. // -// Table writer is not goroutine-safe. +// Table writer is not safe for concurrent use. func NewWriter(f io.Writer, o *opt.Options) *Writer { w := &Writer{ writer: f, diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util.go index 3b663d1cc..e572a329e 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util.go @@ -89,3 +89,10 @@ func (p fdSorter) Swap(i, j int) { func sortFds(fds []storage.FileDesc) { sort.Sort(fdSorter(fds)) } + +func ensureBuffer(b []byte, n int) []byte { + if cap(b) < n { + return make([]byte, n) + } + return b[:n] +} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go index d274eeff2..c60f12c20 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go @@ -34,7 +34,8 @@ type version struct { cSeek unsafe.Pointer - ref int + closing bool + ref int // Succeeding version. next *version } @@ -131,6 +132,10 @@ func (v *version) walkOverlapping(aux tFiles, ikey internalKey, f func(level int } func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue bool) (value []byte, tcomp bool, err error) { + if v.closing { + return nil, false, ErrClosed + } + ukey := ikey.ukey() var ( |