aboutsummaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-04-28 17:18:01 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-04-28 17:18:01 +0800
commit7e3b080f8517731db774d5d2587b9ded4f9716e0 (patch)
treec27488e8e84dacaece8b07458e187906b7940384 /Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go
parent182d484aa70bcd5b22117f02333b1fd3b1535dcb (diff)
downloadgo-tangerine-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar
go-tangerine-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar.gz
go-tangerine-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar.bz2
go-tangerine-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar.lz
go-tangerine-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar.xz
go-tangerine-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar.zst
go-tangerine-7e3b080f8517731db774d5d2587b9ded4f9716e0.zip
godeps: update leveldb and snappy, dump serpent-go
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go')
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go306
1 files changed, 179 insertions, 127 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go
index 6b2a61683..b3906f7fc 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go
@@ -7,12 +7,13 @@
package leveldb
import (
- "errors"
+ "fmt"
"io"
"os"
"sync"
"sync/atomic"
+ "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/opt"
@@ -20,18 +21,31 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
)
+type ErrManifestCorrupted struct {
+ Field string
+ Reason string
+}
+
+func (e *ErrManifestCorrupted) Error() string {
+ return fmt.Sprintf("leveldb: manifest corrupted (field '%s'): %s", e.Field, e.Reason)
+}
+
+func newErrManifestCorrupted(f storage.File, field, reason string) error {
+ return errors.NewErrCorrupted(f, &ErrManifestCorrupted{field, reason})
+}
+
// session represent a persistent database session.
type session struct {
// Need 64-bit alignment.
- stFileNum uint64 // current unused file number
+ stNextFileNum uint64 // current unused file number
stJournalNum uint64 // current journal file number; need external synchronization
stPrevJournalNum uint64 // prev journal file number; no longer used; for compatibility with older version of leveldb
- stSeq uint64 // last mem compacted seq; need external synchronization
+ stSeqNum uint64 // last mem compacted seq; need external synchronization
stTempFileNum uint64
stor storage.Storage
storLock util.Releaser
- o *opt.Options
+ o *cachedOptions
icmp *iComparer
tops *tOps
@@ -39,11 +53,12 @@ type session struct {
manifestWriter storage.Writer
manifestFile storage.File
- stCPtrs [kNumLevels]iKey // compact pointers; need external synchronization
- stVersion *version // current version
- vmu sync.Mutex
+ stCompPtrs []iKey // compaction pointers; need external synchronization
+ stVersion *version // current version
+ vmu sync.Mutex
}
+// Creates new initialized session instance.
func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
if stor == nil {
return nil, os.ErrInvalid
@@ -53,22 +68,20 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
return
}
s = &session{
- stor: stor,
- storLock: storLock,
+ stor: stor,
+ storLock: storLock,
+ stCompPtrs: make([]iKey, o.GetNumLevel()),
}
s.setOptions(o)
- s.tops = newTableOps(s, s.o.GetMaxOpenFiles())
- s.setVersion(&version{s: s})
- s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock D·DeletedEntry L·Level Q·SeqNum T·TimeElapsed")
+ s.tops = newTableOps(s)
+ s.setVersion(newVersion(s))
+ s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed")
return
}
// Close session.
func (s *session) close() {
s.tops.close()
- if bc := s.o.GetBlockCache(); bc != nil {
- bc.Purge(nil)
- }
if s.manifest != nil {
s.manifest.Close()
}
@@ -81,6 +94,7 @@ func (s *session) close() {
s.stVersion = nil
}
+// Release session lock.
func (s *session) release() {
s.storLock.Release()
}
@@ -98,26 +112,26 @@ func (s *session) recover() (err error) {
// Don't return os.ErrNotExist if the underlying storage contains
// other files that belong to LevelDB. So the DB won't get trashed.
if files, _ := s.stor.GetFiles(storage.TypeAll); len(files) > 0 {
- err = ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: manifest file missing")}
+ err = &errors.ErrCorrupted{File: &storage.FileInfo{Type: storage.TypeManifest}, Err: &errors.ErrMissingFiles{}}
}
}
}()
- file, err := s.stor.GetManifest()
+ m, err := s.stor.GetManifest()
if err != nil {
return
}
- reader, err := file.Open()
+ reader, err := m.Open()
if err != nil {
return
}
defer reader.Close()
strict := s.o.GetStrict(opt.StrictManifest)
- jr := journal.NewReader(reader, dropper{s, file}, strict, true)
+ jr := journal.NewReader(reader, dropper{s, m}, strict, true)
- staging := s.version_NB().newStaging()
- rec := &sessionRecord{}
+ staging := s.stVersion.newStaging()
+ rec := &sessionRecord{numLevel: s.o.GetNumLevel()}
for {
var r io.Reader
r, err = jr.Next()
@@ -126,51 +140,57 @@ func (s *session) recover() (err error) {
err = nil
break
}
- return
+ return errors.SetFile(err, m)
}
err = rec.decode(r)
if err == nil {
// save compact pointers
- for _, rp := range rec.compactionPointers {
- s.stCPtrs[rp.level] = iKey(rp.key)
+ for _, r := range rec.compPtrs {
+ s.stCompPtrs[r.level] = iKey(r.ikey)
}
// commit record to version staging
staging.commit(rec)
- } else if strict {
- return ErrCorrupted{Type: CorruptedManifest, Err: err}
} else {
- s.logf("manifest error: %v (skipped)", err)
+ err = errors.SetFile(err, m)
+ if strict || !errors.IsCorrupted(err) {
+ return
+ } else {
+ s.logf("manifest error: %v (skipped)", errors.SetFile(err, m))
+ }
}
- rec.resetCompactionPointers()
+ rec.resetCompPtrs()
rec.resetAddedTables()
rec.resetDeletedTables()
}
switch {
case !rec.has(recComparer):
- return ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: manifest missing comparer name")}
+ return newErrManifestCorrupted(m, "comparer", "missing")
case rec.comparer != s.icmp.uName():
- return ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: comparer mismatch, " + "want '" + s.icmp.uName() + "', " + "got '" + rec.comparer + "'")}
- case !rec.has(recNextNum):
- return ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: manifest missing next file number")}
+ return newErrManifestCorrupted(m, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer))
+ case !rec.has(recNextFileNum):
+ return newErrManifestCorrupted(m, "next-file-num", "missing")
case !rec.has(recJournalNum):
- return ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: manifest missing journal file number")}
- case !rec.has(recSeq):
- return ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: manifest missing seq number")}
+ return newErrManifestCorrupted(m, "journal-file-num", "missing")
+ case !rec.has(recSeqNum):
+ return newErrManifestCorrupted(m, "seq-num", "missing")
}
- s.manifestFile = file
+ s.manifestFile = m
s.setVersion(staging.finish())
- s.setFileNum(rec.nextNum)
+ s.setNextFileNum(rec.nextFileNum)
s.recordCommited(rec)
return nil
}
// Commit session; need external synchronization.
func (s *session) commit(r *sessionRecord) (err error) {
+ v := s.version()
+ defer v.release()
+
// spawn new version based on current version
- nv := s.version_NB().spawn(r)
+ nv := v.spawn(r)
if s.manifest == nil {
// manifest journal writer not yet created, create one
@@ -189,22 +209,22 @@ func (s *session) commit(r *sessionRecord) (err error) {
// Pick a compaction based on current state; need external synchronization.
func (s *session) pickCompaction() *compaction {
- v := s.version_NB()
+ v := s.version()
var level int
var t0 tFiles
if v.cScore >= 1 {
level = v.cLevel
- cp := s.stCPtrs[level]
- tt := v.tables[level]
- for _, t := range tt {
- if cp == nil || s.icmp.Compare(t.max, cp) > 0 {
+ cptr := s.stCompPtrs[level]
+ tables := v.tables[level]
+ for _, t := range tables {
+ if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 {
t0 = append(t0, t)
break
}
}
if len(t0) == 0 {
- t0 = append(t0, tt[0])
+ t0 = append(t0, tables[0])
}
} else {
if p := atomic.LoadPointer(&v.cSeek); p != nil {
@@ -212,29 +232,21 @@ func (s *session) pickCompaction() *compaction {
level = ts.level
t0 = append(t0, ts.table)
} else {
+ v.release()
return nil
}
}
- c := &compaction{s: s, version: v, level: level}
- if level == 0 {
- min, max := t0.getRange(s.icmp)
- t0 = nil
- v.tables[0].getOverlaps(min.ukey(), max.ukey(), &t0, false, s.icmp.ucmp)
- }
-
- c.tables[0] = t0
- c.expand()
- return c
+ return newCompaction(s, v, level, t0)
}
// Create compaction from given level and range; need external synchronization.
-func (s *session) getCompactionRange(level int, min, max []byte) *compaction {
- v := s.version_NB()
+func (s *session) getCompactionRange(level int, umin, umax []byte) *compaction {
+ v := s.version()
- var t0 tFiles
- v.tables[level].getOverlaps(min, max, &t0, level != 0, s.icmp.ucmp)
+ t0 := v.tables[level].getOverlaps(nil, s.icmp, umin, umax, level == 0)
if len(t0) == 0 {
+ v.release()
return nil
}
@@ -243,7 +255,7 @@ func (s *session) getCompactionRange(level int, min, max []byte) *compaction {
// and we must not pick one file and drop another older file if the
// two files overlap.
if level > 0 {
- limit := uint64(kMaxTableSize)
+ limit := uint64(v.s.o.GetCompactionSourceLimit(level))
total := uint64(0)
for i, t := range t0 {
total += t.size
@@ -255,90 +267,124 @@ func (s *session) getCompactionRange(level int, min, max []byte) *compaction {
}
}
- c := &compaction{s: s, version: v, level: level}
- c.tables[0] = t0
+ return newCompaction(s, v, level, t0)
+}
+
+func newCompaction(s *session, v *version, level int, t0 tFiles) *compaction {
+ c := &compaction{
+ s: s,
+ v: v,
+ level: level,
+ tables: [2]tFiles{t0, nil},
+ maxGPOverlaps: uint64(s.o.GetCompactionGPOverlaps(level)),
+ tPtrs: make([]int, s.o.GetNumLevel()),
+ }
c.expand()
+ c.save()
return c
}
-// compaction represent a compaction state
+// compaction represent a compaction state.
type compaction struct {
- s *session
- version *version
+ s *session
+ v *version
+
+ level int
+ tables [2]tFiles
+ maxGPOverlaps uint64
+
+ gp tFiles
+ gpi int
+ seenKey bool
+ gpOverlappedBytes uint64
+ imin, imax iKey
+ tPtrs []int
+ released bool
+
+ snapGPI int
+ snapSeenKey bool
+ snapGPOverlappedBytes uint64
+ snapTPtrs []int
+}
- level int
- tables [2]tFiles
+func (c *compaction) save() {
+ c.snapGPI = c.gpi
+ c.snapSeenKey = c.seenKey
+ c.snapGPOverlappedBytes = c.gpOverlappedBytes
+ c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...)
+}
- gp tFiles
- gpidx int
- seenKey bool
- overlappedBytes uint64
- min, max iKey
+func (c *compaction) restore() {
+ c.gpi = c.snapGPI
+ c.seenKey = c.snapSeenKey
+ c.gpOverlappedBytes = c.snapGPOverlappedBytes
+ c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...)
+}
- tPtrs [kNumLevels]int
+func (c *compaction) release() {
+ if !c.released {
+ c.released = true
+ c.v.release()
+ }
}
// Expand compacted tables; need external synchronization.
func (c *compaction) expand() {
- s := c.s
- v := c.version
-
- level := c.level
- vt0, vt1 := v.tables[level], v.tables[level+1]
+ limit := uint64(c.s.o.GetCompactionExpandLimit(c.level))
+ vt0, vt1 := c.v.tables[c.level], c.v.tables[c.level+1]
t0, t1 := c.tables[0], c.tables[1]
- min, max := t0.getRange(s.icmp)
- vt1.getOverlaps(min.ukey(), max.ukey(), &t1, true, s.icmp.ucmp)
-
- // Get entire range covered by compaction
- amin, amax := append(t0, t1...).getRange(s.icmp)
+ imin, imax := t0.getRange(c.s.icmp)
+ // We expand t0 here just incase ukey hop across tables.
+ t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.level == 0)
+ if len(t0) != len(c.tables[0]) {
+ imin, imax = t0.getRange(c.s.icmp)
+ }
+ t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
+ // Get entire range covered by compaction.
+ amin, amax := append(t0, t1...).getRange(c.s.icmp)
// See if we can grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up.
if len(t1) > 0 {
- var exp0 tFiles
- vt0.getOverlaps(amin.ukey(), amax.ukey(), &exp0, level != 0, s.icmp.ucmp)
- if len(exp0) > len(t0) && t1.size()+exp0.size() < kExpCompactionMaxBytes {
- var exp1 tFiles
- xmin, xmax := exp0.getRange(s.icmp)
- vt1.getOverlaps(xmin.ukey(), xmax.ukey(), &exp1, true, s.icmp.ucmp)
+ exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.level == 0)
+ if len(exp0) > len(t0) && t1.size()+exp0.size() < limit {
+ xmin, xmax := exp0.getRange(c.s.icmp)
+ exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false)
if len(exp1) == len(t1) {
- s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
- level, level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
+ c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
+ c.level, c.level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size())))
- min, max = xmin, xmax
+ imin, imax = xmin, xmax
t0, t1 = exp0, exp1
- amin, amax = append(t0, t1...).getRange(s.icmp)
+ amin, amax = append(t0, t1...).getRange(c.s.icmp)
}
}
}
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
- if level+2 < kNumLevels {
- v.tables[level+2].getOverlaps(amin.ukey(), amax.ukey(), &c.gp, true, s.icmp.ucmp)
+ if c.level+2 < c.s.o.GetNumLevel() {
+ c.gp = c.v.tables[c.level+2].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false)
}
c.tables[0], c.tables[1] = t0, t1
- c.min, c.max = min, max
+ c.imin, c.imax = imin, imax
}
// Check whether compaction is trivial.
func (c *compaction) trivial() bool {
- return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= kMaxGrandParentOverlapBytes
+ return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= c.maxGPOverlaps
}
-func (c *compaction) isBaseLevelForKey(key []byte) bool {
- s := c.s
- v := c.version
-
- for level, tt := range v.tables[c.level+2:] {
- for c.tPtrs[level] < len(tt) {
- t := tt[c.tPtrs[level]]
- if s.icmp.uCompare(key, t.max.ukey()) <= 0 {
- // We've advanced far enough
- if s.icmp.uCompare(key, t.min.ukey()) >= 0 {
- // Key falls in this file's range, so definitely not base level
+func (c *compaction) baseLevelForKey(ukey []byte) bool {
+ for level, tables := range c.v.tables[c.level+2:] {
+ for c.tPtrs[level] < len(tables) {
+ t := tables[c.tPtrs[level]]
+ if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
+ // We've advanced far enough.
+ if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
+ // Key falls in this file's range, so definitely not base level.
return false
}
break
@@ -349,55 +395,61 @@ func (c *compaction) isBaseLevelForKey(key []byte) bool {
return true
}
-func (c *compaction) shouldStopBefore(key iKey) bool {
- for ; c.gpidx < len(c.gp); c.gpidx++ {
- gp := c.gp[c.gpidx]
- if c.s.icmp.Compare(key, gp.max) <= 0 {
+func (c *compaction) shouldStopBefore(ikey iKey) bool {
+ for ; c.gpi < len(c.gp); c.gpi++ {
+ gp := c.gp[c.gpi]
+ if c.s.icmp.Compare(ikey, gp.imax) <= 0 {
break
}
if c.seenKey {
- c.overlappedBytes += gp.size
+ c.gpOverlappedBytes += gp.size
}
}
c.seenKey = true
- if c.overlappedBytes > kMaxGrandParentOverlapBytes {
- // Too much overlap for current output; start new output
- c.overlappedBytes = 0
+ if c.gpOverlappedBytes > c.maxGPOverlaps {
+ // Too much overlap for current output; start new output.
+ c.gpOverlappedBytes = 0
return true
}
return false
}
+// Creates an iterator.
func (c *compaction) newIterator() iterator.Iterator {
- s := c.s
-
- level := c.level
- icap := 2
+ // Creates iterator slice.
+ icap := len(c.tables)
if c.level == 0 {
+ // Special case for level-0
icap = len(c.tables[0]) + 1
}
its := make([]iterator.Iterator, 0, icap)
+ // Options.
ro := &opt.ReadOptions{
DontFillCache: true,
+ Strict: opt.StrictOverride,
+ }
+ strict := c.s.o.GetStrict(opt.StrictCompaction)
+ if strict {
+ ro.Strict |= opt.StrictReader
}
- strict := s.o.GetStrict(opt.StrictIterator)
- for i, tt := range c.tables {
- if len(tt) == 0 {
+ for i, tables := range c.tables {
+ if len(tables) == 0 {
continue
}
- if level+i == 0 {
- for _, t := range tt {
- its = append(its, s.tops.newIterator(t, nil, ro))
+ // Level-0 is not sorted and may overlaps each other.
+ if c.level+i == 0 {
+ for _, t := range tables {
+ its = append(its, c.s.tops.newIterator(t, nil, ro))
}
} else {
- it := iterator.NewIndexedIterator(tt.newIndexIterator(s.tops, s.icmp, nil, ro), strict, true)
+ it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict)
its = append(its, it)
}
}
- return iterator.NewMergedIterator(its, s.icmp, true)
+ return iterator.NewMergedIterator(its, c.s.icmp, strict)
}