aboutsummaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go
diff options
context:
space:
mode:
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go')
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go345
1 files changed, 216 insertions, 129 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go
index 501006717..225920002 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go
@@ -9,13 +9,15 @@ package leveldb
import (
"encoding/binary"
"fmt"
+ "io"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/memdb"
"github.com/syndtr/goleveldb/leveldb/storage"
)
-// ErrBatchCorrupted records reason of batch corruption.
+// ErrBatchCorrupted records reason of batch corruption. This error will be
+// wrapped with errors.ErrCorrupted.
type ErrBatchCorrupted struct {
Reason string
}
@@ -29,8 +31,9 @@ func newErrBatchCorrupted(reason string) error {
}
const (
- batchHdrLen = 8 + 4
- batchGrowRec = 3000
+ batchHeaderLen = 8 + 4
+ batchGrowRec = 3000
+ batchBufioSize = 16
)
// BatchReplay wraps basic batch operations.
@@ -39,34 +42,46 @@ type BatchReplay interface {
Delete(key []byte)
}
+type batchIndex struct {
+ keyType keyType
+ keyPos, keyLen int
+ valuePos, valueLen int
+}
+
+func (index batchIndex) k(data []byte) []byte {
+ return data[index.keyPos : index.keyPos+index.keyLen]
+}
+
+func (index batchIndex) v(data []byte) []byte {
+ if index.valueLen != 0 {
+ return data[index.valuePos : index.valuePos+index.valueLen]
+ }
+ return nil
+}
+
+func (index batchIndex) kv(data []byte) (key, value []byte) {
+ return index.k(data), index.v(data)
+}
+
// Batch is a write batch.
type Batch struct {
- data []byte
- rLen, bLen int
- seq uint64
- sync bool
+ data []byte
+ index []batchIndex
+
+ // internalLen is sums of key/value pair length plus 8-bytes internal key.
+ internalLen int
}
func (b *Batch) grow(n int) {
- off := len(b.data)
- if off == 0 {
- off = batchHdrLen
- if b.data != nil {
- b.data = b.data[:off]
- }
- }
- if cap(b.data)-off < n {
- if b.data == nil {
- b.data = make([]byte, off, off+n)
- } else {
- odata := b.data
- div := 1
- if b.rLen > batchGrowRec {
- div = b.rLen / batchGrowRec
- }
- b.data = make([]byte, off, off+n+(off-batchHdrLen)/div)
- copy(b.data, odata)
+ o := len(b.data)
+ if cap(b.data)-o < n {
+ div := 1
+ if len(b.index) > batchGrowRec {
+ div = len(b.index) / batchGrowRec
}
+ ndata := make([]byte, o, o+n+o/div)
+ copy(ndata, b.data)
+ b.data = ndata
}
}
@@ -76,32 +91,36 @@ func (b *Batch) appendRec(kt keyType, key, value []byte) {
n += binary.MaxVarintLen32 + len(value)
}
b.grow(n)
- off := len(b.data)
- data := b.data[:off+n]
- data[off] = byte(kt)
- off++
- off += binary.PutUvarint(data[off:], uint64(len(key)))
- copy(data[off:], key)
- off += len(key)
+ index := batchIndex{keyType: kt}
+ o := len(b.data)
+ data := b.data[:o+n]
+ data[o] = byte(kt)
+ o++
+ o += binary.PutUvarint(data[o:], uint64(len(key)))
+ index.keyPos = o
+ index.keyLen = len(key)
+ o += copy(data[o:], key)
if kt == keyTypeVal {
- off += binary.PutUvarint(data[off:], uint64(len(value)))
- copy(data[off:], value)
- off += len(value)
+ o += binary.PutUvarint(data[o:], uint64(len(value)))
+ index.valuePos = o
+ index.valueLen = len(value)
+ o += copy(data[o:], value)
}
- b.data = data[:off]
- b.rLen++
- // Include 8-byte ikey header
- b.bLen += len(key) + len(value) + 8
+ b.data = data[:o]
+ b.index = append(b.index, index)
+ b.internalLen += index.keyLen + index.valueLen + 8
}
// Put appends 'put operation' of the given key/value pair to the batch.
-// It is safe to modify the contents of the argument after Put returns.
+// It is safe to modify the contents of the argument after Put returns but not
+// before.
func (b *Batch) Put(key, value []byte) {
b.appendRec(keyTypeVal, key, value)
}
// Delete appends 'delete operation' of the given key to the batch.
-// It is safe to modify the contents of the argument after Delete returns.
+// It is safe to modify the contents of the argument after Delete returns but
+// not before.
func (b *Batch) Delete(key []byte) {
b.appendRec(keyTypeDel, key, nil)
}
@@ -111,7 +130,7 @@ func (b *Batch) Delete(key []byte) {
// The returned slice is not its own copy, so the contents should not be
// modified.
func (b *Batch) Dump() []byte {
- return b.encode()
+ return b.data
}
// Load loads given slice into the batch. Previous contents of the batch
@@ -119,144 +138,212 @@ func (b *Batch) Dump() []byte {
// The given slice will not be copied and will be used as batch buffer, so
// it is not safe to modify the contents of the slice.
func (b *Batch) Load(data []byte) error {
- return b.decode(0, data)
+ return b.decode(data, -1)
}
// Replay replays batch contents.
func (b *Batch) Replay(r BatchReplay) error {
- return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
- switch kt {
+ for _, index := range b.index {
+ switch index.keyType {
case keyTypeVal:
- r.Put(key, value)
+ r.Put(index.k(b.data), index.v(b.data))
case keyTypeDel:
- r.Delete(key)
+ r.Delete(index.k(b.data))
}
- return nil
- })
+ }
+ return nil
}
// Len returns number of records in the batch.
func (b *Batch) Len() int {
- return b.rLen
+ return len(b.index)
}
// Reset resets the batch.
func (b *Batch) Reset() {
b.data = b.data[:0]
- b.seq = 0
- b.rLen = 0
- b.bLen = 0
- b.sync = false
+ b.index = b.index[:0]
+ b.internalLen = 0
}
-func (b *Batch) init(sync bool) {
- b.sync = sync
+func (b *Batch) replayInternal(fn func(i int, kt keyType, k, v []byte) error) error {
+ for i, index := range b.index {
+ if err := fn(i, index.keyType, index.k(b.data), index.v(b.data)); err != nil {
+ return err
+ }
+ }
+ return nil
}
func (b *Batch) append(p *Batch) {
- if p.rLen > 0 {
- b.grow(len(p.data) - batchHdrLen)
- b.data = append(b.data, p.data[batchHdrLen:]...)
- b.rLen += p.rLen
- b.bLen += p.bLen
- }
- if p.sync {
- b.sync = true
+ ob := len(b.data)
+ oi := len(b.index)
+ b.data = append(b.data, p.data...)
+ b.index = append(b.index, p.index...)
+ b.internalLen += p.internalLen
+
+ // Updating index offset.
+ if ob != 0 {
+ for ; oi < len(b.index); oi++ {
+ index := &b.index[oi]
+ index.keyPos += ob
+ if index.valueLen != 0 {
+ index.valuePos += ob
+ }
+ }
}
}
-// size returns sums of key/value pair length plus 8-bytes ikey.
-func (b *Batch) size() int {
- return b.bLen
-}
-
-func (b *Batch) encode() []byte {
- b.grow(0)
- binary.LittleEndian.PutUint64(b.data, b.seq)
- binary.LittleEndian.PutUint32(b.data[8:], uint32(b.rLen))
-
- return b.data
+func (b *Batch) decode(data []byte, expectedLen int) error {
+ b.data = data
+ b.index = b.index[:0]
+ b.internalLen = 0
+ err := decodeBatch(data, func(i int, index batchIndex) error {
+ b.index = append(b.index, index)
+ b.internalLen += index.keyLen + index.valueLen + 8
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+ if expectedLen >= 0 && len(b.index) != expectedLen {
+ return newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", expectedLen, len(b.index)))
+ }
+ return nil
}
-func (b *Batch) decode(prevSeq uint64, data []byte) error {
- if len(data) < batchHdrLen {
- return newErrBatchCorrupted("too short")
+func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error {
+ var ik []byte
+ for i, index := range b.index {
+ ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
+ if err := mdb.Put(ik, index.v(b.data)); err != nil {
+ return err
+ }
}
+ return nil
+}
- b.seq = binary.LittleEndian.Uint64(data)
- if b.seq < prevSeq {
- return newErrBatchCorrupted("invalid sequence number")
- }
- b.rLen = int(binary.LittleEndian.Uint32(data[8:]))
- if b.rLen < 0 {
- return newErrBatchCorrupted("invalid records length")
+func (b *Batch) revertMem(seq uint64, mdb *memdb.DB) error {
+ var ik []byte
+ for i, index := range b.index {
+ ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
+ if err := mdb.Delete(ik); err != nil {
+ return err
+ }
}
- // No need to be precise at this point, it won't be used anyway
- b.bLen = len(data) - batchHdrLen
- b.data = data
-
return nil
}
-func (b *Batch) decodeRec(f func(i int, kt keyType, key, value []byte) error) error {
- off := batchHdrLen
- for i := 0; i < b.rLen; i++ {
- if off >= len(b.data) {
- return newErrBatchCorrupted("invalid records length")
- }
+func newBatch() interface{} {
+ return &Batch{}
+}
- kt := keyType(b.data[off])
- if kt > keyTypeVal {
- panic(kt)
- return newErrBatchCorrupted("bad record: invalid type")
+func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error {
+ var index batchIndex
+ for i, o := 0, 0; o < len(data); i++ {
+ // Key type.
+ index.keyType = keyType(data[o])
+ if index.keyType > keyTypeVal {
+ return newErrBatchCorrupted(fmt.Sprintf("bad record: invalid type %#x", uint(index.keyType)))
}
- off++
+ o++
- x, n := binary.Uvarint(b.data[off:])
- off += n
- if n <= 0 || off+int(x) > len(b.data) {
+ // Key.
+ x, n := binary.Uvarint(data[o:])
+ o += n
+ if n <= 0 || o+int(x) > len(data) {
return newErrBatchCorrupted("bad record: invalid key length")
}
- key := b.data[off : off+int(x)]
- off += int(x)
- var value []byte
- if kt == keyTypeVal {
- x, n := binary.Uvarint(b.data[off:])
- off += n
- if n <= 0 || off+int(x) > len(b.data) {
+ index.keyPos = o
+ index.keyLen = int(x)
+ o += index.keyLen
+
+ // Value.
+ if index.keyType == keyTypeVal {
+ x, n = binary.Uvarint(data[o:])
+ o += n
+ if n <= 0 || o+int(x) > len(data) {
return newErrBatchCorrupted("bad record: invalid value length")
}
- value = b.data[off : off+int(x)]
- off += int(x)
+ index.valuePos = o
+ index.valueLen = int(x)
+ o += index.valueLen
+ } else {
+ index.valuePos = 0
+ index.valueLen = 0
}
- if err := f(i, kt, key, value); err != nil {
+ if err := fn(i, index); err != nil {
return err
}
}
-
return nil
}
-func (b *Batch) memReplay(to *memdb.DB) error {
- var ikScratch []byte
- return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
- ikScratch = makeInternalKey(ikScratch, key, b.seq+uint64(i), kt)
- return to.Put(ikScratch, value)
+func decodeBatchToMem(data []byte, expectSeq uint64, mdb *memdb.DB) (seq uint64, batchLen int, err error) {
+ seq, batchLen, err = decodeBatchHeader(data)
+ if err != nil {
+ return 0, 0, err
+ }
+ if seq < expectSeq {
+ return 0, 0, newErrBatchCorrupted("invalid sequence number")
+ }
+ data = data[batchHeaderLen:]
+ var ik []byte
+ var decodedLen int
+ err = decodeBatch(data, func(i int, index batchIndex) error {
+ if i >= batchLen {
+ return newErrBatchCorrupted("invalid records length")
+ }
+ ik = makeInternalKey(ik, index.k(data), seq+uint64(i), index.keyType)
+ if err := mdb.Put(ik, index.v(data)); err != nil {
+ return err
+ }
+ decodedLen++
+ return nil
})
+ if err == nil && decodedLen != batchLen {
+ err = newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", batchLen, decodedLen))
+ }
+ return
}
-func (b *Batch) memDecodeAndReplay(prevSeq uint64, data []byte, to *memdb.DB) error {
- if err := b.decode(prevSeq, data); err != nil {
- return err
+func encodeBatchHeader(dst []byte, seq uint64, batchLen int) []byte {
+ dst = ensureBuffer(dst, batchHeaderLen)
+ binary.LittleEndian.PutUint64(dst, seq)
+ binary.LittleEndian.PutUint32(dst[8:], uint32(batchLen))
+ return dst
+}
+
+func decodeBatchHeader(data []byte) (seq uint64, batchLen int, err error) {
+ if len(data) < batchHeaderLen {
+ return 0, 0, newErrBatchCorrupted("too short")
+ }
+
+ seq = binary.LittleEndian.Uint64(data)
+ batchLen = int(binary.LittleEndian.Uint32(data[8:]))
+ if batchLen < 0 {
+ return 0, 0, newErrBatchCorrupted("invalid records length")
}
- return b.memReplay(to)
+ return
}
-func (b *Batch) revertMemReplay(to *memdb.DB) error {
- var ikScratch []byte
- return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
- ikScratch := makeInternalKey(ikScratch, key, b.seq+uint64(i), kt)
- return to.Delete(ikScratch)
- })
+func batchesLen(batches []*Batch) int {
+ batchLen := 0
+ for _, batch := range batches {
+ batchLen += batch.Len()
+ }
+ return batchLen
+}
+
+func writeBatchesWithHeader(wr io.Writer, batches []*Batch, seq uint64) error {
+ if _, err := wr.Write(encodeBatchHeader(nil, seq, batchesLen(batches))); err != nil {
+ return err
+ }
+ for _, batch := range batches {
+ if _, err := wr.Write(batch.data); err != nil {
+ return err
+ }
+ }
+ return nil
}