aboutsummaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go
diff options
context:
space:
mode:
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go')
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go701
1 files changed, 480 insertions, 221 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go
index 8acb9f720..6f38e84b3 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go
@@ -8,27 +8,41 @@ package table
import (
"encoding/binary"
- "errors"
"fmt"
"io"
"sort"
"strings"
+ "sync"
- "code.google.com/p/snappy-go/snappy"
+ "github.com/syndtr/gosnappy/snappy"
"github.com/syndtr/goleveldb/leveldb/cache"
"github.com/syndtr/goleveldb/leveldb/comparer"
+ "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
+ "github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
)
var (
- ErrNotFound = util.ErrNotFound
- ErrIterReleased = errors.New("leveldb/table: iterator released")
+ ErrNotFound = errors.ErrNotFound
+ ErrReaderReleased = errors.New("leveldb/table: reader released")
+ ErrIterReleased = errors.New("leveldb/table: iterator released")
)
+type ErrCorrupted struct {
+ Pos int64
+ Size int64
+ Kind string
+ Reason string
+}
+
+func (e *ErrCorrupted) Error() string {
+ return fmt.Sprintf("leveldb/table: corruption on %s (pos=%d): %s", e.Kind, e.Pos, e.Reason)
+}
+
func max(x, y int) int {
if x > y {
return x
@@ -37,40 +51,33 @@ func max(x, y int) int {
}
type block struct {
- cmp comparer.BasicComparer
+ bpool *util.BufferPool
+ bh blockHandle
data []byte
restartsLen int
restartsOffset int
- // Whether checksum is verified and valid.
- checksum bool
}
-func (b *block) seek(rstart, rlimit int, key []byte) (index, offset int, err error) {
- n := b.restartsOffset
- data := b.data
- cmp := b.cmp
-
+func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) {
index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
- offset := int(binary.LittleEndian.Uint32(data[n+4*(rstart+i):]))
- offset += 1 // shared always zero, since this is a restart point
- v1, n1 := binary.Uvarint(data[offset:]) // key length
- _, n2 := binary.Uvarint(data[offset+n1:]) // value length
+ offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):]))
+ offset += 1 // shared always zero, since this is a restart point
+ v1, n1 := binary.Uvarint(b.data[offset:]) // key length
+ _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length
m := offset + n1 + n2
- return cmp.Compare(data[m:m+int(v1)], key) > 0
+ return cmp.Compare(b.data[m:m+int(v1)], key) > 0
}) + rstart - 1
if index < rstart {
// The smallest key is greater-than key sought.
index = rstart
}
- offset = int(binary.LittleEndian.Uint32(data[n+4*index:]))
+ offset = int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
return
}
func (b *block) restartIndex(rstart, rlimit, offset int) int {
- n := b.restartsOffset
- data := b.data
return sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
- return int(binary.LittleEndian.Uint32(data[n+4*(rstart+i):])) > offset
+ return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) > offset
}) + rstart - 1
}
@@ -81,7 +88,7 @@ func (b *block) restartOffset(index int) int {
func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error) {
if offset >= b.restartsOffset {
if offset != b.restartsOffset {
- err = errors.New("leveldb/table: Reader: BlockEntry: invalid block (block entries offset not aligned)")
+ err = &ErrCorrupted{Reason: "entries offset not aligned"}
}
return
}
@@ -91,7 +98,7 @@ func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error)
m := n0 + n1 + n2
n = m + int(v1) + int(v2)
if n0 <= 0 || n1 <= 0 || n2 <= 0 || offset+n > b.restartsOffset {
- err = errors.New("leveldb/table: Reader: invalid block (block entries corrupted)")
+ err = &ErrCorrupted{Reason: "entries corrupted"}
return
}
key = b.data[offset+m : offset+m+int(v1)]
@@ -100,43 +107,10 @@ func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error)
return
}
-func (b *block) newIterator(slice *util.Range, inclLimit bool, cache util.Releaser) *blockIter {
- bi := &blockIter{
- block: b,
- cache: cache,
- // Valid key should never be nil.
- key: make([]byte, 0),
- dir: dirSOI,
- riStart: 0,
- riLimit: b.restartsLen,
- offsetStart: 0,
- offsetRealStart: 0,
- offsetLimit: b.restartsOffset,
- }
- if slice != nil {
- if slice.Start != nil {
- if bi.Seek(slice.Start) {
- bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset)
- bi.offsetStart = b.restartOffset(bi.riStart)
- bi.offsetRealStart = bi.prevOffset
- } else {
- bi.riStart = b.restartsLen
- bi.offsetStart = b.restartsOffset
- bi.offsetRealStart = b.restartsOffset
- }
- }
- if slice.Limit != nil {
- if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) {
- bi.offsetLimit = bi.prevOffset
- bi.riLimit = bi.restartIndex + 1
- }
- }
- bi.reset()
- if bi.offsetStart > bi.offsetLimit {
- bi.sErr(errors.New("leveldb/table: Reader: invalid slice range"))
- }
- }
- return bi
+func (b *block) Release() {
+ b.bpool.Put(b.data)
+ b.bpool = nil
+ b.data = nil
}
type dir int
@@ -150,10 +124,12 @@ const (
)
type blockIter struct {
- block *block
- cache, releaser util.Releaser
- key, value []byte
- offset int
+ tr *Reader
+ block *block
+ blockReleaser util.Releaser
+ releaser util.Releaser
+ key, value []byte
+ offset int
// Previous offset, only filled by Next.
prevOffset int
prevNode []int
@@ -250,7 +226,7 @@ func (i *blockIter) Seek(key []byte) bool {
return false
}
- ri, offset, err := i.block.seek(i.riStart, i.riLimit, key)
+ ri, offset, err := i.block.seek(i.tr.cmp, i.riStart, i.riLimit, key)
if err != nil {
i.sErr(err)
return false
@@ -261,7 +237,7 @@ func (i *blockIter) Seek(key []byte) bool {
i.dir = dirForward
}
for i.Next() {
- if i.block.cmp.Compare(i.key, key) >= 0 {
+ if i.tr.cmp.Compare(i.key, key) >= 0 {
return true
}
}
@@ -286,7 +262,7 @@ func (i *blockIter) Next() bool {
for i.offset < i.offsetRealStart {
key, value, nShared, n, err := i.block.entry(i.offset)
if err != nil {
- i.sErr(err)
+ i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
return false
}
if n == 0 {
@@ -300,13 +276,13 @@ func (i *blockIter) Next() bool {
if i.offset >= i.offsetLimit {
i.dir = dirEOI
if i.offset != i.offsetLimit {
- i.sErr(errors.New("leveldb/table: Reader: Next: invalid block (block entries offset not aligned)"))
+ i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned"))
}
return false
}
key, value, nShared, n, err := i.block.entry(i.offset)
if err != nil {
- i.sErr(err)
+ i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
return false
}
if n == 0 {
@@ -391,7 +367,7 @@ func (i *blockIter) Prev() bool {
for {
key, value, nShared, n, err := i.block.entry(offset)
if err != nil {
- i.sErr(err)
+ i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
return false
}
if offset >= i.offsetRealStart {
@@ -410,7 +386,7 @@ func (i *blockIter) Prev() bool {
// Stop if target offset reached.
if offset >= i.offset {
if offset != i.offset {
- i.sErr(errors.New("leveldb/table: Reader: Prev: invalid block (block entries offset not aligned)"))
+ i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned"))
return false
}
@@ -437,25 +413,33 @@ func (i *blockIter) Value() []byte {
}
func (i *blockIter) Release() {
- i.prevNode = nil
- i.prevKeys = nil
- i.key = nil
- i.value = nil
- i.dir = dirReleased
- if i.cache != nil {
- i.cache.Release()
- i.cache = nil
- }
- if i.releaser != nil {
- i.releaser.Release()
- i.releaser = nil
+ if i.dir != dirReleased {
+ i.tr = nil
+ i.block = nil
+ i.prevNode = nil
+ i.prevKeys = nil
+ i.key = nil
+ i.value = nil
+ i.dir = dirReleased
+ if i.blockReleaser != nil {
+ i.blockReleaser.Release()
+ i.blockReleaser = nil
+ }
+ if i.releaser != nil {
+ i.releaser.Release()
+ i.releaser = nil
+ }
}
}
func (i *blockIter) SetReleaser(releaser util.Releaser) {
- if i.dir > dirReleased {
- i.releaser = releaser
+ if i.dir == dirReleased {
+ panic(util.ErrReleased)
}
+ if i.releaser != nil && releaser != nil {
+ panic(util.ErrHasReleaser)
+ }
+ i.releaser = releaser
}
func (i *blockIter) Valid() bool {
@@ -467,21 +451,21 @@ func (i *blockIter) Error() error {
}
type filterBlock struct {
- filter filter.Filter
+ bpool *util.BufferPool
data []byte
oOffset int
baseLg uint
filtersNum int
}
-func (b *filterBlock) contains(offset uint64, key []byte) bool {
+func (b *filterBlock) contains(filter filter.Filter, offset uint64, key []byte) bool {
i := int(offset >> b.baseLg)
if i < b.filtersNum {
o := b.data[b.oOffset+i*4:]
n := int(binary.LittleEndian.Uint32(o))
m := int(binary.LittleEndian.Uint32(o[4:]))
if n < m && m <= b.oOffset {
- return b.filter.Contains(b.data[n:m], key)
+ return filter.Contains(b.data[n:m], key)
} else if n == m {
return false
}
@@ -489,12 +473,17 @@ func (b *filterBlock) contains(offset uint64, key []byte) bool {
return true
}
+func (b *filterBlock) Release() {
+ b.bpool.Put(b.data)
+ b.bpool = nil
+ b.data = nil
+}
+
type indexIter struct {
- blockIter
- tableReader *Reader
- slice *util.Range
+ *blockIter
+ tr *Reader
+ slice *util.Range
// Options
- checksum bool
fillCache bool
}
@@ -505,95 +494,173 @@ func (i *indexIter) Get() iterator.Iterator {
}
dataBH, n := decodeBlockHandle(value)
if n == 0 {
- return iterator.NewEmptyIterator(errors.New("leveldb/table: Reader: invalid table (bad data block handle)"))
+ return iterator.NewEmptyIterator(i.tr.newErrCorruptedBH(i.tr.indexBH, "bad data block handle"))
}
+
var slice *util.Range
if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) {
slice = i.slice
}
- return i.tableReader.getDataIter(dataBH, slice, i.checksum, i.fillCache)
+ return i.tr.getDataIterErr(dataBH, slice, i.tr.verifyChecksum, i.fillCache)
}
// Reader is a table reader.
type Reader struct {
+ mu sync.RWMutex
+ fi *storage.FileInfo
reader io.ReaderAt
- cache cache.Namespace
+ cache *cache.CacheGetter
err error
+ bpool *util.BufferPool
// Options
- cmp comparer.Comparer
- filter filter.Filter
- checksum bool
- strictIter bool
+ o *opt.Options
+ cmp comparer.Comparer
+ filter filter.Filter
+ verifyChecksum bool
- dataEnd int64
- indexBlock *block
- filterBlock *filterBlock
+ dataEnd int64
+ metaBH, indexBH, filterBH blockHandle
+ indexBlock *block
+ filterBlock *filterBlock
}
-func verifyChecksum(data []byte) bool {
- n := len(data) - 4
- checksum0 := binary.LittleEndian.Uint32(data[n:])
- checksum1 := util.NewCRC(data[:n]).Value()
- return checksum0 == checksum1
+func (r *Reader) blockKind(bh blockHandle) string {
+ switch bh.offset {
+ case r.metaBH.offset:
+ return "meta-block"
+ case r.indexBH.offset:
+ return "index-block"
+ case r.filterBH.offset:
+ if r.filterBH.length > 0 {
+ return "filter-block"
+ }
+ }
+ return "data-block"
}
-func (r *Reader) readRawBlock(bh blockHandle, checksum bool) ([]byte, error) {
- data := make([]byte, bh.length+blockTrailerLen)
+func (r *Reader) newErrCorrupted(pos, size int64, kind, reason string) error {
+ return &errors.ErrCorrupted{File: r.fi, Err: &ErrCorrupted{Pos: pos, Size: size, Kind: kind, Reason: reason}}
+}
+
+func (r *Reader) newErrCorruptedBH(bh blockHandle, reason string) error {
+ return r.newErrCorrupted(int64(bh.offset), int64(bh.length), r.blockKind(bh), reason)
+}
+
+func (r *Reader) fixErrCorruptedBH(bh blockHandle, err error) error {
+ if cerr, ok := err.(*ErrCorrupted); ok {
+ cerr.Pos = int64(bh.offset)
+ cerr.Size = int64(bh.length)
+ cerr.Kind = r.blockKind(bh)
+ return &errors.ErrCorrupted{File: r.fi, Err: cerr}
+ }
+ return err
+}
+
+func (r *Reader) readRawBlock(bh blockHandle, verifyChecksum bool) ([]byte, error) {
+ data := r.bpool.Get(int(bh.length + blockTrailerLen))
if _, err := r.reader.ReadAt(data, int64(bh.offset)); err != nil && err != io.EOF {
return nil, err
}
- if checksum || r.checksum {
- if !verifyChecksum(data) {
- return nil, errors.New("leveldb/table: Reader: invalid block (checksum mismatch)")
+
+ if verifyChecksum {
+ n := bh.length + 1
+ checksum0 := binary.LittleEndian.Uint32(data[n:])
+ checksum1 := util.NewCRC(data[:n]).Value()
+ if checksum0 != checksum1 {
+ r.bpool.Put(data)
+ return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("checksum mismatch, want=%#x got=%#x", checksum0, checksum1))
}
}
+
switch data[bh.length] {
case blockTypeNoCompression:
data = data[:bh.length]
case blockTypeSnappyCompression:
- var err error
- data, err = snappy.Decode(nil, data[:bh.length])
+ decLen, err := snappy.DecodedLen(data[:bh.length])
if err != nil {
- return nil, err
+ return nil, r.newErrCorruptedBH(bh, err.Error())
+ }
+ decData := r.bpool.Get(decLen)
+ decData, err = snappy.Decode(decData, data[:bh.length])
+ r.bpool.Put(data)
+ if err != nil {
+ r.bpool.Put(decData)
+ return nil, r.newErrCorruptedBH(bh, err.Error())
}
+ data = decData
default:
- return nil, fmt.Errorf("leveldb/table: Reader: unknown block compression type: %d", data[bh.length])
+ r.bpool.Put(data)
+ return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("unknown compression type %#x", data[bh.length]))
}
return data, nil
}
-func (r *Reader) readBlock(bh blockHandle, checksum bool) (*block, error) {
- data, err := r.readRawBlock(bh, checksum)
+func (r *Reader) readBlock(bh blockHandle, verifyChecksum bool) (*block, error) {
+ data, err := r.readRawBlock(bh, verifyChecksum)
if err != nil {
return nil, err
}
restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:]))
b := &block{
- cmp: r.cmp,
+ bpool: r.bpool,
+ bh: bh,
data: data,
restartsLen: restartsLen,
restartsOffset: len(data) - (restartsLen+1)*4,
- checksum: checksum || r.checksum,
}
return b, nil
}
-func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterBlock, error) {
+func (r *Reader) readBlockCached(bh blockHandle, verifyChecksum, fillCache bool) (*block, util.Releaser, error) {
+ if r.cache != nil {
+ var (
+ err error
+ ch *cache.Handle
+ )
+ if fillCache {
+ ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
+ var b *block
+ b, err = r.readBlock(bh, verifyChecksum)
+ if err != nil {
+ return 0, nil
+ }
+ return cap(b.data), b
+ })
+ } else {
+ ch = r.cache.Get(bh.offset, nil)
+ }
+ if ch != nil {
+ b, ok := ch.Value().(*block)
+ if !ok {
+ ch.Release()
+ return nil, nil, errors.New("leveldb/table: inconsistent block type")
+ }
+ return b, ch, err
+ } else if err != nil {
+ return nil, nil, err
+ }
+ }
+
+ b, err := r.readBlock(bh, verifyChecksum)
+ return b, b, err
+}
+
+func (r *Reader) readFilterBlock(bh blockHandle) (*filterBlock, error) {
data, err := r.readRawBlock(bh, true)
if err != nil {
return nil, err
}
n := len(data)
if n < 5 {
- return nil, errors.New("leveldb/table: Reader: invalid filter block (too short)")
+ return nil, r.newErrCorruptedBH(bh, "too short")
}
m := n - 5
oOffset := int(binary.LittleEndian.Uint32(data[m:]))
if oOffset > m {
- return nil, errors.New("leveldb/table: Reader: invalid filter block (invalid offset)")
+ return nil, r.newErrCorruptedBH(bh, "invalid data-offsets offset")
}
b := &filterBlock{
- filter: filter,
+ bpool: r.bpool,
data: data,
oOffset: oOffset,
baseLg: uint(data[n-1]),
@@ -602,44 +669,111 @@ func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterB
return b, nil
}
-func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator {
+func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterBlock, util.Releaser, error) {
if r.cache != nil {
- // Get/set block cache.
- var err error
- cache, ok := r.cache.Get(dataBH.offset, func() (ok bool, value interface{}, charge int, fin cache.SetFin) {
- if !fillCache {
- return
+ var (
+ err error
+ ch *cache.Handle
+ )
+ if fillCache {
+ ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
+ var b *filterBlock
+ b, err = r.readFilterBlock(bh)
+ if err != nil {
+ return 0, nil
+ }
+ return cap(b.data), b
+ })
+ } else {
+ ch = r.cache.Get(bh.offset, nil)
+ }
+ if ch != nil {
+ b, ok := ch.Value().(*filterBlock)
+ if !ok {
+ ch.Release()
+ return nil, nil, errors.New("leveldb/table: inconsistent block type")
}
- var dataBlock *block
- dataBlock, err = r.readBlock(dataBH, checksum)
- if err == nil {
- ok = true
- value = dataBlock
- charge = int(dataBH.length)
+ return b, ch, err
+ } else if err != nil {
+ return nil, nil, err
+ }
+ }
+
+ b, err := r.readFilterBlock(bh)
+ return b, b, err
+}
+
+func (r *Reader) getIndexBlock(fillCache bool) (b *block, rel util.Releaser, err error) {
+ if r.indexBlock == nil {
+ return r.readBlockCached(r.indexBH, true, fillCache)
+ }
+ return r.indexBlock, util.NoopReleaser{}, nil
+}
+
+func (r *Reader) getFilterBlock(fillCache bool) (*filterBlock, util.Releaser, error) {
+ if r.filterBlock == nil {
+ return r.readFilterBlockCached(r.filterBH, fillCache)
+ }
+ return r.filterBlock, util.NoopReleaser{}, nil
+}
+
+func (r *Reader) newBlockIter(b *block, bReleaser util.Releaser, slice *util.Range, inclLimit bool) *blockIter {
+ bi := &blockIter{
+ tr: r,
+ block: b,
+ blockReleaser: bReleaser,
+ // Valid key should never be nil.
+ key: make([]byte, 0),
+ dir: dirSOI,
+ riStart: 0,
+ riLimit: b.restartsLen,
+ offsetStart: 0,
+ offsetRealStart: 0,
+ offsetLimit: b.restartsOffset,
+ }
+ if slice != nil {
+ if slice.Start != nil {
+ if bi.Seek(slice.Start) {
+ bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset)
+ bi.offsetStart = b.restartOffset(bi.riStart)
+ bi.offsetRealStart = bi.prevOffset
+ } else {
+ bi.riStart = b.restartsLen
+ bi.offsetStart = b.restartsOffset
+ bi.offsetRealStart = b.restartsOffset
}
- return
- })
- if err != nil {
- return iterator.NewEmptyIterator(err)
}
- if ok {
- dataBlock := cache.Value().(*block)
- if !dataBlock.checksum && (r.checksum || checksum) {
- if !verifyChecksum(dataBlock.data) {
- return iterator.NewEmptyIterator(errors.New("leveldb/table: Reader: invalid block (checksum mismatch)"))
- }
- dataBlock.checksum = true
+ if slice.Limit != nil {
+ if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) {
+ bi.offsetLimit = bi.prevOffset
+ bi.riLimit = bi.restartIndex + 1
}
- iter := dataBlock.newIterator(slice, false, cache)
- return iter
+ }
+ bi.reset()
+ if bi.offsetStart > bi.offsetLimit {
+ bi.sErr(errors.New("leveldb/table: invalid slice range"))
}
}
- dataBlock, err := r.readBlock(dataBH, checksum)
+ return bi
+}
+
+func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator {
+ b, rel, err := r.readBlockCached(dataBH, verifyChecksum, fillCache)
if err != nil {
return iterator.NewEmptyIterator(err)
}
- iter := dataBlock.newIterator(slice, false, nil)
- return iter
+ return r.newBlockIter(b, rel, slice, false)
+}
+
+func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+
+ if r.err != nil {
+ return iterator.NewEmptyIterator(r.err)
+ }
+
+ return r.getDataIter(dataBH, slice, verifyChecksum, fillCache)
}
// NewIterator creates an iterator from the table.
@@ -653,35 +787,44 @@ func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fi
// when not used.
//
// Also read Iterator documentation of the leveldb/iterator package.
-
func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+
if r.err != nil {
return iterator.NewEmptyIterator(r.err)
}
+ fillCache := !ro.GetDontFillCache()
+ indexBlock, rel, err := r.getIndexBlock(fillCache)
+ if err != nil {
+ return iterator.NewEmptyIterator(err)
+ }
index := &indexIter{
- blockIter: *r.indexBlock.newIterator(slice, true, nil),
- tableReader: r,
- slice: slice,
- checksum: ro.GetStrict(opt.StrictBlockChecksum),
- fillCache: !ro.GetDontFillCache(),
+ blockIter: r.newBlockIter(indexBlock, rel, slice, true),
+ tr: r,
+ slice: slice,
+ fillCache: !ro.GetDontFillCache(),
}
- return iterator.NewIndexedIterator(index, r.strictIter || ro.GetStrict(opt.StrictIterator), false)
+ return iterator.NewIndexedIterator(index, opt.GetStrict(r.o, ro, opt.StrictReader))
}
-// Find finds key/value pair whose key is greater than or equal to the
-// given key. It returns ErrNotFound if the table doesn't contain
-// such pair.
-//
-// The caller should not modify the contents of the returned slice, but
-// it is safe to modify the contents of the argument after Find returns.
-func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err error) {
+func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bool) (rkey, value []byte, err error) {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+
if r.err != nil {
err = r.err
return
}
- index := r.indexBlock.newIterator(nil, true, nil)
+ indexBlock, rel, err := r.getIndexBlock(true)
+ if err != nil {
+ return
+ }
+ defer rel.Release()
+
+ index := r.newBlockIter(indexBlock, nil, nil, true)
defer index.Release()
if !index.Seek(key) {
err = index.Error()
@@ -692,14 +835,23 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err
}
dataBH, n := decodeBlockHandle(index.Value())
if n == 0 {
- err = errors.New("leveldb/table: Reader: invalid table (bad data block handle)")
+ r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
return
}
- if r.filterBlock != nil && !r.filterBlock.contains(dataBH.offset, key) {
- err = ErrNotFound
- return
+ if filtered && r.filter != nil {
+ filterBlock, frel, ferr := r.getFilterBlock(true)
+ if ferr == nil {
+ if !filterBlock.contains(r.filter, dataBH.offset, key) {
+ frel.Release()
+ return nil, nil, ErrNotFound
+ }
+ frel.Release()
+ } else if !errors.IsCorrupted(ferr) {
+ err = ferr
+ return
+ }
}
- data := r.getDataIter(dataBH, nil, ro.GetStrict(opt.StrictBlockChecksum), !ro.GetDontFillCache())
+ data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
defer data.Release()
if !data.Seek(key) {
err = data.Error()
@@ -708,23 +860,64 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err
}
return
}
+ // Don't use block buffer, no need to copy the buffer.
rkey = data.Key()
- value = data.Value()
+ if !noValue {
+ if r.bpool == nil {
+ value = data.Value()
+ } else {
+ // Use block buffer, and since the buffer will be recycled, the buffer
+ // need to be copied.
+ value = append([]byte{}, data.Value()...)
+ }
+ }
+ return
+}
+
+// Find finds key/value pair whose key is greater than or equal to the
+// given key. It returns ErrNotFound if the table doesn't contain
+// such pair.
+// If filtered is true then the nearest 'block' will be checked against
+// 'filter data' (if present) and will immediately return ErrNotFound if
+// 'filter data' indicates that such pair doesn't exist.
+//
+// The caller may modify the contents of the returned slice as it is its
+// own copy.
+// It is safe to modify the contents of the argument after Find returns.
+func (r *Reader) Find(key []byte, filtered bool, ro *opt.ReadOptions) (rkey, value []byte, err error) {
+ return r.find(key, filtered, ro, false)
+}
+
+// Find finds key that is greater than or equal to the given key.
+// It returns ErrNotFound if the table doesn't contain such key.
+// If filtered is true then the nearest 'block' will be checked against
+// 'filter data' (if present) and will immediately return ErrNotFound if
+// 'filter data' indicates that such key doesn't exist.
+//
+// The caller may modify the contents of the returned slice as it is its
+// own copy.
+// It is safe to modify the contents of the argument after Find returns.
+func (r *Reader) FindKey(key []byte, filtered bool, ro *opt.ReadOptions) (rkey []byte, err error) {
+ rkey, _, err = r.find(key, filtered, ro, true)
return
}
// Get gets the value for the given key. It returns errors.ErrNotFound
// if the table does not contain the key.
//
-// The caller should not modify the contents of the returned slice, but
-// it is safe to modify the contents of the argument after Get returns.
+// The caller may modify the contents of the returned slice as it is its
+// own copy.
+// It is safe to modify the contents of the argument after Find returns.
func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+
if r.err != nil {
err = r.err
return
}
- rkey, value, err := r.Find(key, ro)
+ rkey, value, err := r.find(key, false, ro, false)
if err == nil && r.cmp.Compare(rkey, key) != 0 {
value = nil
err = ErrNotFound
@@ -736,17 +929,26 @@ func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error)
//
// It is safe to modify the contents of the argument after Get returns.
func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+
if r.err != nil {
err = r.err
return
}
- index := r.indexBlock.newIterator(nil, true, nil)
+ indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true)
+ if err != nil {
+ return
+ }
+ defer rel.Release()
+
+ index := r.newBlockIter(indexBlock, nil, nil, true)
defer index.Release()
if index.Seek(key) {
dataBH, n := decodeBlockHandle(index.Value())
if n == 0 {
- err = errors.New("leveldb/table: Reader: invalid table (bad data block handle)")
+ r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
return
}
offset = int64(dataBH.offset)
@@ -759,90 +961,147 @@ func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
return
}
-// NewReader creates a new initialized table reader for the file.
-// The cache is optional and can be nil.
-func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, o *opt.Options) *Reader {
- r := &Reader{
- reader: f,
- cache: cache,
- cmp: o.GetComparer(),
- checksum: o.GetStrict(opt.StrictBlockChecksum),
- strictIter: o.GetStrict(opt.StrictIterator),
+// Release implements util.Releaser.
+// It also close the file if it is an io.Closer.
+func (r *Reader) Release() {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ if closer, ok := r.reader.(io.Closer); ok {
+ closer.Close()
+ }
+ if r.indexBlock != nil {
+ r.indexBlock.Release()
+ r.indexBlock = nil
}
+ if r.filterBlock != nil {
+ r.filterBlock.Release()
+ r.filterBlock = nil
+ }
+ r.reader = nil
+ r.cache = nil
+ r.bpool = nil
+ r.err = ErrReaderReleased
+}
+
+// NewReader creates a new initialized table reader for the file.
+// The fi, cache and bpool is optional and can be nil.
+//
+// The returned table reader instance is goroutine-safe.
+func NewReader(f io.ReaderAt, size int64, fi *storage.FileInfo, cache *cache.CacheGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) {
if f == nil {
- r.err = errors.New("leveldb/table: Reader: nil file")
- return r
+ return nil, errors.New("leveldb/table: nil file")
}
+
+ r := &Reader{
+ fi: fi,
+ reader: f,
+ cache: cache,
+ bpool: bpool,
+ o: o,
+ cmp: o.GetComparer(),
+ verifyChecksum: o.GetStrict(opt.StrictBlockChecksum),
+ }
+
if size < footerLen {
- r.err = errors.New("leveldb/table: Reader: invalid table (file size is too small)")
- return r
+ r.err = r.newErrCorrupted(0, size, "table", "too small")
+ return r, nil
}
+
+ footerPos := size - footerLen
var footer [footerLen]byte
- if _, err := r.reader.ReadAt(footer[:], size-footerLen); err != nil && err != io.EOF {
- r.err = fmt.Errorf("leveldb/table: Reader: invalid table (could not read footer): %v", err)
+ if _, err := r.reader.ReadAt(footer[:], footerPos); err != nil && err != io.EOF {
+ return nil, err
}
if string(footer[footerLen-len(magic):footerLen]) != magic {
- r.err = errors.New("leveldb/table: Reader: invalid table (bad magic number)")
- return r
+ r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad magic number")
+ return r, nil
}
+
+ var n int
// Decode the metaindex block handle.
- metaBH, n := decodeBlockHandle(footer[:])
+ r.metaBH, n = decodeBlockHandle(footer[:])
if n == 0 {
- r.err = errors.New("leveldb/table: Reader: invalid table (bad metaindex block handle)")
- return r
+ r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad metaindex block handle")
+ return r, nil
}
+
// Decode the index block handle.
- indexBH, n := decodeBlockHandle(footer[n:])
+ r.indexBH, n = decodeBlockHandle(footer[n:])
if n == 0 {
- r.err = errors.New("leveldb/table: Reader: invalid table (bad index block handle)")
- return r
- }
- // Read index block.
- r.indexBlock, r.err = r.readBlock(indexBH, true)
- if r.err != nil {
- return r
+ r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad index block handle")
+ return r, nil
}
+
// Read metaindex block.
- metaBlock, err := r.readBlock(metaBH, true)
+ metaBlock, err := r.readBlock(r.metaBH, true)
if err != nil {
- r.err = err
- return r
+ if errors.IsCorrupted(err) {
+ r.err = err
+ return r, nil
+ } else {
+ return nil, err
+ }
}
+
// Set data end.
- r.dataEnd = int64(metaBH.offset)
- metaIter := metaBlock.newIterator(nil, false, nil)
+ r.dataEnd = int64(r.metaBH.offset)
+
+ // Read metaindex.
+ metaIter := r.newBlockIter(metaBlock, nil, nil, true)
for metaIter.Next() {
key := string(metaIter.Key())
if !strings.HasPrefix(key, "filter.") {
continue
}
fn := key[7:]
- var filter filter.Filter
if f0 := o.GetFilter(); f0 != nil && f0.Name() == fn {
- filter = f0
+ r.filter = f0
} else {
for _, f0 := range o.GetAltFilters() {
if f0.Name() == fn {
- filter = f0
+ r.filter = f0
break
}
}
}
- if filter != nil {
+ if r.filter != nil {
filterBH, n := decodeBlockHandle(metaIter.Value())
if n == 0 {
continue
}
+ r.filterBH = filterBH
// Update data end.
r.dataEnd = int64(filterBH.offset)
- filterBlock, err := r.readFilterBlock(filterBH, filter)
- if err != nil {
- continue
- }
- r.filterBlock = filterBlock
break
}
}
metaIter.Release()
- return r
+ metaBlock.Release()
+
+ // Cache index and filter block locally, since we don't have global cache.
+ if cache == nil {
+ r.indexBlock, err = r.readBlock(r.indexBH, true)
+ if err != nil {
+ if errors.IsCorrupted(err) {
+ r.err = err
+ return r, nil
+ } else {
+ return nil, err
+ }
+ }
+ if r.filter != nil {
+ r.filterBlock, err = r.readFilterBlock(r.filterBH)
+ if err != nil {
+ if !errors.IsCorrupted(err) {
+ return nil, err
+ }
+
+ // Don't use filter then.
+ r.filter = nil
+ }
+ }
+ }
+
+ return r, nil
}