aboutsummaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go
diff options
context:
space:
mode:
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go')
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go108
1 files changed, 74 insertions, 34 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go
index 9973a8fef..011a94a35 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go
@@ -8,7 +8,10 @@ package leveldb
import (
"errors"
+ "math/rand"
"runtime"
+ "sync"
+ "sync/atomic"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
@@ -19,50 +22,69 @@ var (
errInvalidIkey = errors.New("leveldb: Iterator: invalid internal key")
)
-func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
- s := db.s
+type memdbReleaser struct {
+ once sync.Once
+ m *memDB
+}
+
+func (mr *memdbReleaser) Release() {
+ mr.once.Do(func() {
+ mr.m.decref()
+ })
+}
+func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
em, fm := db.getMems()
- v := s.version()
+ v := db.s.version()
ti := v.getIterators(slice, ro)
n := len(ti) + 2
i := make([]iterator.Iterator, 0, n)
- i = append(i, em.NewIterator(slice))
+ emi := em.mdb.NewIterator(slice)
+ emi.SetReleaser(&memdbReleaser{m: em})
+ i = append(i, emi)
if fm != nil {
- i = append(i, fm.NewIterator(slice))
+ fmi := fm.mdb.NewIterator(slice)
+ fmi.SetReleaser(&memdbReleaser{m: fm})
+ i = append(i, fmi)
}
i = append(i, ti...)
- strict := s.o.GetStrict(opt.StrictIterator) || ro.GetStrict(opt.StrictIterator)
- mi := iterator.NewMergedIterator(i, s.icmp, strict)
+ strict := opt.GetStrict(db.s.o.Options, ro, opt.StrictReader)
+ mi := iterator.NewMergedIterator(i, db.s.icmp, strict)
mi.SetReleaser(&versionReleaser{v: v})
return mi
}
func (db *DB) newIterator(seq uint64, slice *util.Range, ro *opt.ReadOptions) *dbIter {
- var slice_ *util.Range
+ var islice *util.Range
if slice != nil {
- slice_ = &util.Range{}
+ islice = &util.Range{}
if slice.Start != nil {
- slice_.Start = newIKey(slice.Start, kMaxSeq, tSeek)
+ islice.Start = newIkey(slice.Start, kMaxSeq, ktSeek)
}
if slice.Limit != nil {
- slice_.Limit = newIKey(slice.Limit, kMaxSeq, tSeek)
+ islice.Limit = newIkey(slice.Limit, kMaxSeq, ktSeek)
}
}
- rawIter := db.newRawIterator(slice_, ro)
+ rawIter := db.newRawIterator(islice, ro)
iter := &dbIter{
+ db: db,
icmp: db.s.icmp,
iter: rawIter,
seq: seq,
- strict: db.s.o.GetStrict(opt.StrictIterator) || ro.GetStrict(opt.StrictIterator),
+ strict: opt.GetStrict(db.s.o.Options, ro, opt.StrictReader),
key: make([]byte, 0),
value: make([]byte, 0),
}
+ atomic.AddInt32(&db.aliveIters, 1)
runtime.SetFinalizer(iter, (*dbIter).Release)
return iter
}
+func (db *DB) iterSamplingRate() int {
+ return rand.Intn(2 * db.s.o.GetIteratorSamplingRate())
+}
+
type dir int
const (
@@ -75,16 +97,27 @@ const (
// dbIter represent an interator states over a database session.
type dbIter struct {
+ db *DB
icmp *iComparer
iter iterator.Iterator
seq uint64
strict bool
- dir dir
- key []byte
- value []byte
- err error
- releaser util.Releaser
+ smaplingGap int
+ dir dir
+ key []byte
+ value []byte
+ err error
+ releaser util.Releaser
+}
+
+func (i *dbIter) sampleSeek() {
+ ikey := i.iter.Key()
+ i.smaplingGap -= len(ikey) + len(i.iter.Value())
+ for i.smaplingGap < 0 {
+ i.smaplingGap += i.db.iterSamplingRate()
+ i.db.sampleSeek(ikey)
+ }
}
func (i *dbIter) setErr(err error) {
@@ -144,7 +177,7 @@ func (i *dbIter) Seek(key []byte) bool {
return false
}
- ikey := newIKey(key, i.seq, tSeek)
+ ikey := newIkey(key, i.seq, ktSeek)
if i.iter.Seek(ikey) {
i.dir = dirSOI
return i.next()
@@ -156,15 +189,15 @@ func (i *dbIter) Seek(key []byte) bool {
func (i *dbIter) next() bool {
for {
- ukey, seq, t, ok := parseIkey(i.iter.Key())
- if ok {
+ if ukey, seq, kt, kerr := parseIkey(i.iter.Key()); kerr == nil {
+ i.sampleSeek()
if seq <= i.seq {
- switch t {
- case tDel:
+ switch kt {
+ case ktDel:
// Skip deleted key.
i.key = append(i.key[:0], ukey...)
i.dir = dirForward
- case tVal:
+ case ktVal:
if i.dir == dirSOI || i.icmp.uCompare(ukey, i.key) > 0 {
i.key = append(i.key[:0], ukey...)
i.value = append(i.value[:0], i.iter.Value()...)
@@ -174,7 +207,7 @@ func (i *dbIter) next() bool {
}
}
} else if i.strict {
- i.setErr(errInvalidIkey)
+ i.setErr(kerr)
break
}
if !i.iter.Next() {
@@ -207,20 +240,20 @@ func (i *dbIter) prev() bool {
del := true
if i.iter.Valid() {
for {
- ukey, seq, t, ok := parseIkey(i.iter.Key())
- if ok {
+ if ukey, seq, kt, kerr := parseIkey(i.iter.Key()); kerr == nil {
+ i.sampleSeek()
if seq <= i.seq {
if !del && i.icmp.uCompare(ukey, i.key) < 0 {
return true
}
- del = (t == tDel)
+ del = (kt == ktDel)
if !del {
i.key = append(i.key[:0], ukey...)
i.value = append(i.value[:0], i.iter.Value()...)
}
}
} else if i.strict {
- i.setErr(errInvalidIkey)
+ i.setErr(kerr)
return false
}
if !i.iter.Prev() {
@@ -249,13 +282,13 @@ func (i *dbIter) Prev() bool {
return i.Last()
case dirForward:
for i.iter.Prev() {
- ukey, _, _, ok := parseIkey(i.iter.Key())
- if ok {
+ if ukey, _, _, kerr := parseIkey(i.iter.Key()); kerr == nil {
+ i.sampleSeek()
if i.icmp.uCompare(ukey, i.key) < 0 {
goto cont
}
} else if i.strict {
- i.setErr(errInvalidIkey)
+ i.setErr(kerr)
return false
}
}
@@ -289,6 +322,7 @@ func (i *dbIter) Release() {
if i.releaser != nil {
i.releaser.Release()
+ i.releaser = nil
}
i.dir = dirReleased
@@ -296,13 +330,19 @@ func (i *dbIter) Release() {
i.value = nil
i.iter.Release()
i.iter = nil
+ atomic.AddInt32(&i.db.aliveIters, -1)
+ i.db = nil
}
}
func (i *dbIter) SetReleaser(releaser util.Releaser) {
- if i.dir != dirReleased {
- i.releaser = releaser
+ if i.dir == dirReleased {
+ panic(util.ErrReleased)
+ }
+ if i.releaser != nil && releaser != nil {
+ panic(util.ErrHasReleaser)
}
+ i.releaser = releaser
}
func (i *dbIter) Error() error {