diff options
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go')
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go | 228 |
1 files changed, 132 insertions, 96 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go index 0d7911eca..ccf390c9c 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go @@ -8,65 +8,84 @@ package leveldb import ( "encoding/binary" - "errors" + "fmt" + "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/memdb" ) -var ( - errBatchTooShort = errors.New("leveldb: batch is too short") - errBatchBadRecord = errors.New("leveldb: bad record in batch") -) +type ErrBatchCorrupted struct { + Reason string +} + +func (e *ErrBatchCorrupted) Error() string { + return fmt.Sprintf("leveldb: batch corrupted: %s", e.Reason) +} + +func newErrBatchCorrupted(reason string) error { + return errors.NewErrCorrupted(nil, &ErrBatchCorrupted{reason}) +} -const kBatchHdrLen = 8 + 4 +const ( + batchHdrLen = 8 + 4 + batchGrowRec = 3000 +) -type batchReplay interface { - put(key, value []byte, seq uint64) - delete(key []byte, seq uint64) +type BatchReplay interface { + Put(key, value []byte) + Delete(key []byte) } // Batch is a write batch. type Batch struct { - buf []byte + data []byte rLen, bLen int seq uint64 sync bool } func (b *Batch) grow(n int) { - off := len(b.buf) + off := len(b.data) if off == 0 { - // include headers - off = kBatchHdrLen - n += off + off = batchHdrLen + if b.data != nil { + b.data = b.data[:off] + } } - if cap(b.buf)-off >= n { - return + if cap(b.data)-off < n { + if b.data == nil { + b.data = make([]byte, off, off+n) + } else { + odata := b.data + div := 1 + if b.rLen > batchGrowRec { + div = b.rLen / batchGrowRec + } + b.data = make([]byte, off, off+n+(off-batchHdrLen)/div) + copy(b.data, odata) + } } - buf := make([]byte, 2*cap(b.buf)+n) - copy(buf, b.buf) - b.buf = buf[:off] } -func (b *Batch) appendRec(t vType, key, value []byte) { +func (b *Batch) appendRec(kt kType, key, value []byte) { n := 1 + binary.MaxVarintLen32 + len(key) - if t == tVal { + if kt == ktVal { n += binary.MaxVarintLen32 + len(value) } b.grow(n) - off := len(b.buf) - buf := b.buf[:off+n] - buf[off] = byte(t) + off := len(b.data) + data := b.data[:off+n] + data[off] = byte(kt) off += 1 - off += binary.PutUvarint(buf[off:], uint64(len(key))) - copy(buf[off:], key) + off += binary.PutUvarint(data[off:], uint64(len(key))) + copy(data[off:], key) off += len(key) - if t == tVal { - off += binary.PutUvarint(buf[off:], uint64(len(value))) - copy(buf[off:], value) + if kt == ktVal { + off += binary.PutUvarint(data[off:], uint64(len(value))) + copy(data[off:], value) off += len(value) } - b.buf = buf[:off] + b.data = data[:off] b.rLen++ // Include 8-byte ikey header b.bLen += len(key) + len(value) + 8 @@ -75,18 +94,51 @@ func (b *Batch) appendRec(t vType, key, value []byte) { // Put appends 'put operation' of the given key/value pair to the batch. // It is safe to modify the contents of the argument after Put returns. func (b *Batch) Put(key, value []byte) { - b.appendRec(tVal, key, value) + b.appendRec(ktVal, key, value) } // Delete appends 'delete operation' of the given key to the batch. // It is safe to modify the contents of the argument after Delete returns. func (b *Batch) Delete(key []byte) { - b.appendRec(tDel, key, nil) + b.appendRec(ktDel, key, nil) +} + +// Dump dumps batch contents. The returned slice can be loaded into the +// batch using Load method. +// The returned slice is not its own copy, so the contents should not be +// modified. +func (b *Batch) Dump() []byte { + return b.encode() +} + +// Load loads given slice into the batch. Previous contents of the batch +// will be discarded. +// The given slice will not be copied and will be used as batch buffer, so +// it is not safe to modify the contents of the slice. +func (b *Batch) Load(data []byte) error { + return b.decode(0, data) +} + +// Replay replays batch contents. +func (b *Batch) Replay(r BatchReplay) error { + return b.decodeRec(func(i int, kt kType, key, value []byte) { + switch kt { + case ktVal: + r.Put(key, value) + case ktDel: + r.Delete(key) + } + }) +} + +// Len returns number of records in the batch. +func (b *Batch) Len() int { + return b.rLen } // Reset resets the batch. func (b *Batch) Reset() { - b.buf = nil + b.data = b.data[:0] b.seq = 0 b.rLen = 0 b.bLen = 0 @@ -97,24 +149,10 @@ func (b *Batch) init(sync bool) { b.sync = sync } -func (b *Batch) put(key, value []byte, seq uint64) { - if b.rLen == 0 { - b.seq = seq - } - b.Put(key, value) -} - -func (b *Batch) delete(key []byte, seq uint64) { - if b.rLen == 0 { - b.seq = seq - } - b.Delete(key) -} - func (b *Batch) append(p *Batch) { if p.rLen > 0 { - b.grow(len(p.buf) - kBatchHdrLen) - b.buf = append(b.buf, p.buf[kBatchHdrLen:]...) + b.grow(len(p.data) - batchHdrLen) + b.data = append(b.data, p.data[batchHdrLen:]...) b.rLen += p.rLen } if p.sync { @@ -122,95 +160,93 @@ func (b *Batch) append(p *Batch) { } } -func (b *Batch) len() int { - return b.rLen -} - +// size returns sums of key/value pair length plus 8-bytes ikey. func (b *Batch) size() int { return b.bLen } func (b *Batch) encode() []byte { b.grow(0) - binary.LittleEndian.PutUint64(b.buf, b.seq) - binary.LittleEndian.PutUint32(b.buf[8:], uint32(b.rLen)) + binary.LittleEndian.PutUint64(b.data, b.seq) + binary.LittleEndian.PutUint32(b.data[8:], uint32(b.rLen)) - return b.buf + return b.data } -func (b *Batch) decode(buf []byte) error { - if len(buf) < kBatchHdrLen { - return errBatchTooShort +func (b *Batch) decode(prevSeq uint64, data []byte) error { + if len(data) < batchHdrLen { + return newErrBatchCorrupted("too short") } - b.seq = binary.LittleEndian.Uint64(buf) - b.rLen = int(binary.LittleEndian.Uint32(buf[8:])) + b.seq = binary.LittleEndian.Uint64(data) + if b.seq < prevSeq { + return newErrBatchCorrupted("invalid sequence number") + } + b.rLen = int(binary.LittleEndian.Uint32(data[8:])) + if b.rLen < 0 { + return newErrBatchCorrupted("invalid records length") + } // No need to be precise at this point, it won't be used anyway - b.bLen = len(buf) - kBatchHdrLen - b.buf = buf + b.bLen = len(data) - batchHdrLen + b.data = data return nil } -func (b *Batch) decodeRec(f func(i int, t vType, key, value []byte)) error { - off := kBatchHdrLen +func (b *Batch) decodeRec(f func(i int, kt kType, key, value []byte)) (err error) { + off := batchHdrLen for i := 0; i < b.rLen; i++ { - if off >= len(b.buf) { - return errors.New("leveldb: invalid batch record length") + if off >= len(b.data) { + return newErrBatchCorrupted("invalid records length") } - t := vType(b.buf[off]) - if t > tVal { - return errors.New("leveldb: invalid batch record type in batch") + kt := kType(b.data[off]) + if kt > ktVal { + return newErrBatchCorrupted("bad record: invalid type") } off += 1 - x, n := binary.Uvarint(b.buf[off:]) + x, n := binary.Uvarint(b.data[off:]) off += n - if n <= 0 || off+int(x) > len(b.buf) { - return errBatchBadRecord + if n <= 0 || off+int(x) > len(b.data) { + return newErrBatchCorrupted("bad record: invalid key length") } - key := b.buf[off : off+int(x)] + key := b.data[off : off+int(x)] off += int(x) - var value []byte - if t == tVal { - x, n := binary.Uvarint(b.buf[off:]) + if kt == ktVal { + x, n := binary.Uvarint(b.data[off:]) off += n - if n <= 0 || off+int(x) > len(b.buf) { - return errBatchBadRecord + if n <= 0 || off+int(x) > len(b.data) { + return newErrBatchCorrupted("bad record: invalid value length") } - value = b.buf[off : off+int(x)] + value = b.data[off : off+int(x)] off += int(x) } - f(i, t, key, value) + f(i, kt, key, value) } return nil } -func (b *Batch) replay(to batchReplay) error { - return b.decodeRec(func(i int, t vType, key, value []byte) { - switch t { - case tVal: - to.put(key, value, b.seq+uint64(i)) - case tDel: - to.delete(key, b.seq+uint64(i)) - } - }) -} - func (b *Batch) memReplay(to *memdb.DB) error { - return b.decodeRec(func(i int, t vType, key, value []byte) { - ikey := newIKey(key, b.seq+uint64(i), t) + return b.decodeRec(func(i int, kt kType, key, value []byte) { + ikey := newIkey(key, b.seq+uint64(i), kt) to.Put(ikey, value) }) } +func (b *Batch) memDecodeAndReplay(prevSeq uint64, data []byte, to *memdb.DB) error { + if err := b.decode(prevSeq, data); err != nil { + return err + } + return b.memReplay(to) +} + func (b *Batch) revertMemReplay(to *memdb.DB) error { - return b.decodeRec(func(i int, t vType, key, value []byte) { - ikey := newIKey(key, b.seq+uint64(i), t) + return b.decodeRec(func(i int, kt kType, key, value []byte) { + ikey := newIkey(key, b.seq+uint64(i), kt) to.Delete(ikey) }) } |