aboutsummaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go
diff options
context:
space:
mode:
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go')
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go228
1 files changed, 132 insertions, 96 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go
index 0d7911eca..ccf390c9c 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go
@@ -8,65 +8,84 @@ package leveldb
import (
"encoding/binary"
- "errors"
+ "fmt"
+ "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/memdb"
)
-var (
- errBatchTooShort = errors.New("leveldb: batch is too short")
- errBatchBadRecord = errors.New("leveldb: bad record in batch")
-)
+type ErrBatchCorrupted struct {
+ Reason string
+}
+
+func (e *ErrBatchCorrupted) Error() string {
+ return fmt.Sprintf("leveldb: batch corrupted: %s", e.Reason)
+}
+
+func newErrBatchCorrupted(reason string) error {
+ return errors.NewErrCorrupted(nil, &ErrBatchCorrupted{reason})
+}
-const kBatchHdrLen = 8 + 4
+const (
+ batchHdrLen = 8 + 4
+ batchGrowRec = 3000
+)
-type batchReplay interface {
- put(key, value []byte, seq uint64)
- delete(key []byte, seq uint64)
+type BatchReplay interface {
+ Put(key, value []byte)
+ Delete(key []byte)
}
// Batch is a write batch.
type Batch struct {
- buf []byte
+ data []byte
rLen, bLen int
seq uint64
sync bool
}
func (b *Batch) grow(n int) {
- off := len(b.buf)
+ off := len(b.data)
if off == 0 {
- // include headers
- off = kBatchHdrLen
- n += off
+ off = batchHdrLen
+ if b.data != nil {
+ b.data = b.data[:off]
+ }
}
- if cap(b.buf)-off >= n {
- return
+ if cap(b.data)-off < n {
+ if b.data == nil {
+ b.data = make([]byte, off, off+n)
+ } else {
+ odata := b.data
+ div := 1
+ if b.rLen > batchGrowRec {
+ div = b.rLen / batchGrowRec
+ }
+ b.data = make([]byte, off, off+n+(off-batchHdrLen)/div)
+ copy(b.data, odata)
+ }
}
- buf := make([]byte, 2*cap(b.buf)+n)
- copy(buf, b.buf)
- b.buf = buf[:off]
}
-func (b *Batch) appendRec(t vType, key, value []byte) {
+func (b *Batch) appendRec(kt kType, key, value []byte) {
n := 1 + binary.MaxVarintLen32 + len(key)
- if t == tVal {
+ if kt == ktVal {
n += binary.MaxVarintLen32 + len(value)
}
b.grow(n)
- off := len(b.buf)
- buf := b.buf[:off+n]
- buf[off] = byte(t)
+ off := len(b.data)
+ data := b.data[:off+n]
+ data[off] = byte(kt)
off += 1
- off += binary.PutUvarint(buf[off:], uint64(len(key)))
- copy(buf[off:], key)
+ off += binary.PutUvarint(data[off:], uint64(len(key)))
+ copy(data[off:], key)
off += len(key)
- if t == tVal {
- off += binary.PutUvarint(buf[off:], uint64(len(value)))
- copy(buf[off:], value)
+ if kt == ktVal {
+ off += binary.PutUvarint(data[off:], uint64(len(value)))
+ copy(data[off:], value)
off += len(value)
}
- b.buf = buf[:off]
+ b.data = data[:off]
b.rLen++
// Include 8-byte ikey header
b.bLen += len(key) + len(value) + 8
@@ -75,18 +94,51 @@ func (b *Batch) appendRec(t vType, key, value []byte) {
// Put appends 'put operation' of the given key/value pair to the batch.
// It is safe to modify the contents of the argument after Put returns.
func (b *Batch) Put(key, value []byte) {
- b.appendRec(tVal, key, value)
+ b.appendRec(ktVal, key, value)
}
// Delete appends 'delete operation' of the given key to the batch.
// It is safe to modify the contents of the argument after Delete returns.
func (b *Batch) Delete(key []byte) {
- b.appendRec(tDel, key, nil)
+ b.appendRec(ktDel, key, nil)
+}
+
+// Dump dumps batch contents. The returned slice can be loaded into the
+// batch using Load method.
+// The returned slice is not its own copy, so the contents should not be
+// modified.
+func (b *Batch) Dump() []byte {
+ return b.encode()
+}
+
+// Load loads given slice into the batch. Previous contents of the batch
+// will be discarded.
+// The given slice will not be copied and will be used as batch buffer, so
+// it is not safe to modify the contents of the slice.
+func (b *Batch) Load(data []byte) error {
+ return b.decode(0, data)
+}
+
+// Replay replays batch contents.
+func (b *Batch) Replay(r BatchReplay) error {
+ return b.decodeRec(func(i int, kt kType, key, value []byte) {
+ switch kt {
+ case ktVal:
+ r.Put(key, value)
+ case ktDel:
+ r.Delete(key)
+ }
+ })
+}
+
+// Len returns number of records in the batch.
+func (b *Batch) Len() int {
+ return b.rLen
}
// Reset resets the batch.
func (b *Batch) Reset() {
- b.buf = nil
+ b.data = b.data[:0]
b.seq = 0
b.rLen = 0
b.bLen = 0
@@ -97,24 +149,10 @@ func (b *Batch) init(sync bool) {
b.sync = sync
}
-func (b *Batch) put(key, value []byte, seq uint64) {
- if b.rLen == 0 {
- b.seq = seq
- }
- b.Put(key, value)
-}
-
-func (b *Batch) delete(key []byte, seq uint64) {
- if b.rLen == 0 {
- b.seq = seq
- }
- b.Delete(key)
-}
-
func (b *Batch) append(p *Batch) {
if p.rLen > 0 {
- b.grow(len(p.buf) - kBatchHdrLen)
- b.buf = append(b.buf, p.buf[kBatchHdrLen:]...)
+ b.grow(len(p.data) - batchHdrLen)
+ b.data = append(b.data, p.data[batchHdrLen:]...)
b.rLen += p.rLen
}
if p.sync {
@@ -122,95 +160,93 @@ func (b *Batch) append(p *Batch) {
}
}
-func (b *Batch) len() int {
- return b.rLen
-}
-
+// size returns sums of key/value pair length plus 8-bytes ikey.
func (b *Batch) size() int {
return b.bLen
}
func (b *Batch) encode() []byte {
b.grow(0)
- binary.LittleEndian.PutUint64(b.buf, b.seq)
- binary.LittleEndian.PutUint32(b.buf[8:], uint32(b.rLen))
+ binary.LittleEndian.PutUint64(b.data, b.seq)
+ binary.LittleEndian.PutUint32(b.data[8:], uint32(b.rLen))
- return b.buf
+ return b.data
}
-func (b *Batch) decode(buf []byte) error {
- if len(buf) < kBatchHdrLen {
- return errBatchTooShort
+func (b *Batch) decode(prevSeq uint64, data []byte) error {
+ if len(data) < batchHdrLen {
+ return newErrBatchCorrupted("too short")
}
- b.seq = binary.LittleEndian.Uint64(buf)
- b.rLen = int(binary.LittleEndian.Uint32(buf[8:]))
+ b.seq = binary.LittleEndian.Uint64(data)
+ if b.seq < prevSeq {
+ return newErrBatchCorrupted("invalid sequence number")
+ }
+ b.rLen = int(binary.LittleEndian.Uint32(data[8:]))
+ if b.rLen < 0 {
+ return newErrBatchCorrupted("invalid records length")
+ }
// No need to be precise at this point, it won't be used anyway
- b.bLen = len(buf) - kBatchHdrLen
- b.buf = buf
+ b.bLen = len(data) - batchHdrLen
+ b.data = data
return nil
}
-func (b *Batch) decodeRec(f func(i int, t vType, key, value []byte)) error {
- off := kBatchHdrLen
+func (b *Batch) decodeRec(f func(i int, kt kType, key, value []byte)) (err error) {
+ off := batchHdrLen
for i := 0; i < b.rLen; i++ {
- if off >= len(b.buf) {
- return errors.New("leveldb: invalid batch record length")
+ if off >= len(b.data) {
+ return newErrBatchCorrupted("invalid records length")
}
- t := vType(b.buf[off])
- if t > tVal {
- return errors.New("leveldb: invalid batch record type in batch")
+ kt := kType(b.data[off])
+ if kt > ktVal {
+ return newErrBatchCorrupted("bad record: invalid type")
}
off += 1
- x, n := binary.Uvarint(b.buf[off:])
+ x, n := binary.Uvarint(b.data[off:])
off += n
- if n <= 0 || off+int(x) > len(b.buf) {
- return errBatchBadRecord
+ if n <= 0 || off+int(x) > len(b.data) {
+ return newErrBatchCorrupted("bad record: invalid key length")
}
- key := b.buf[off : off+int(x)]
+ key := b.data[off : off+int(x)]
off += int(x)
-
var value []byte
- if t == tVal {
- x, n := binary.Uvarint(b.buf[off:])
+ if kt == ktVal {
+ x, n := binary.Uvarint(b.data[off:])
off += n
- if n <= 0 || off+int(x) > len(b.buf) {
- return errBatchBadRecord
+ if n <= 0 || off+int(x) > len(b.data) {
+ return newErrBatchCorrupted("bad record: invalid value length")
}
- value = b.buf[off : off+int(x)]
+ value = b.data[off : off+int(x)]
off += int(x)
}
- f(i, t, key, value)
+ f(i, kt, key, value)
}
return nil
}
-func (b *Batch) replay(to batchReplay) error {
- return b.decodeRec(func(i int, t vType, key, value []byte) {
- switch t {
- case tVal:
- to.put(key, value, b.seq+uint64(i))
- case tDel:
- to.delete(key, b.seq+uint64(i))
- }
- })
-}
-
func (b *Batch) memReplay(to *memdb.DB) error {
- return b.decodeRec(func(i int, t vType, key, value []byte) {
- ikey := newIKey(key, b.seq+uint64(i), t)
+ return b.decodeRec(func(i int, kt kType, key, value []byte) {
+ ikey := newIkey(key, b.seq+uint64(i), kt)
to.Put(ikey, value)
})
}
+func (b *Batch) memDecodeAndReplay(prevSeq uint64, data []byte, to *memdb.DB) error {
+ if err := b.decode(prevSeq, data); err != nil {
+ return err
+ }
+ return b.memReplay(to)
+}
+
func (b *Batch) revertMemReplay(to *memdb.DB) error {
- return b.decodeRec(func(i int, t vType, key, value []byte) {
- ikey := newIKey(key, b.seq+uint64(i), t)
+ return b.decodeRec(func(i int, kt kType, key, value []byte) {
+ ikey := newIkey(key, b.seq+uint64(i), kt)
to.Delete(ikey)
})
}