aboutsummaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go
diff options
context:
space:
mode:
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go')
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go403
1 files changed, 403 insertions, 0 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go
new file mode 100644
index 000000000..6b2a61683
--- /dev/null
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go
@@ -0,0 +1,403 @@
+// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
+// All rights reserved.
+//
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package leveldb
+
+import (
+ "errors"
+ "io"
+ "os"
+ "sync"
+ "sync/atomic"
+
+ "github.com/syndtr/goleveldb/leveldb/iterator"
+ "github.com/syndtr/goleveldb/leveldb/journal"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+ "github.com/syndtr/goleveldb/leveldb/storage"
+ "github.com/syndtr/goleveldb/leveldb/util"
+)
+
+// session represent a persistent database session.
+type session struct {
+ // Need 64-bit alignment.
+ stFileNum uint64 // current unused file number
+ stJournalNum uint64 // current journal file number; need external synchronization
+ stPrevJournalNum uint64 // prev journal file number; no longer used; for compatibility with older version of leveldb
+ stSeq uint64 // last mem compacted seq; need external synchronization
+ stTempFileNum uint64
+
+ stor storage.Storage
+ storLock util.Releaser
+ o *opt.Options
+ icmp *iComparer
+ tops *tOps
+
+ manifest *journal.Writer
+ manifestWriter storage.Writer
+ manifestFile storage.File
+
+ stCPtrs [kNumLevels]iKey // compact pointers; need external synchronization
+ stVersion *version // current version
+ vmu sync.Mutex
+}
+
+func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
+ if stor == nil {
+ return nil, os.ErrInvalid
+ }
+ storLock, err := stor.Lock()
+ if err != nil {
+ return
+ }
+ s = &session{
+ stor: stor,
+ storLock: storLock,
+ }
+ s.setOptions(o)
+ s.tops = newTableOps(s, s.o.GetMaxOpenFiles())
+ s.setVersion(&version{s: s})
+ s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock D·DeletedEntry L·Level Q·SeqNum T·TimeElapsed")
+ return
+}
+
+// Close session.
+func (s *session) close() {
+ s.tops.close()
+ if bc := s.o.GetBlockCache(); bc != nil {
+ bc.Purge(nil)
+ }
+ if s.manifest != nil {
+ s.manifest.Close()
+ }
+ if s.manifestWriter != nil {
+ s.manifestWriter.Close()
+ }
+ s.manifest = nil
+ s.manifestWriter = nil
+ s.manifestFile = nil
+ s.stVersion = nil
+}
+
+func (s *session) release() {
+ s.storLock.Release()
+}
+
+// Create a new database session; need external synchronization.
+func (s *session) create() error {
+ // create manifest
+ return s.newManifest(nil, nil)
+}
+
+// Recover a database session; need external synchronization.
+func (s *session) recover() (err error) {
+ defer func() {
+ if os.IsNotExist(err) {
+ // Don't return os.ErrNotExist if the underlying storage contains
+ // other files that belong to LevelDB. So the DB won't get trashed.
+ if files, _ := s.stor.GetFiles(storage.TypeAll); len(files) > 0 {
+ err = ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: manifest file missing")}
+ }
+ }
+ }()
+
+ file, err := s.stor.GetManifest()
+ if err != nil {
+ return
+ }
+
+ reader, err := file.Open()
+ if err != nil {
+ return
+ }
+ defer reader.Close()
+ strict := s.o.GetStrict(opt.StrictManifest)
+ jr := journal.NewReader(reader, dropper{s, file}, strict, true)
+
+ staging := s.version_NB().newStaging()
+ rec := &sessionRecord{}
+ for {
+ var r io.Reader
+ r, err = jr.Next()
+ if err != nil {
+ if err == io.EOF {
+ err = nil
+ break
+ }
+ return
+ }
+
+ err = rec.decode(r)
+ if err == nil {
+ // save compact pointers
+ for _, rp := range rec.compactionPointers {
+ s.stCPtrs[rp.level] = iKey(rp.key)
+ }
+ // commit record to version staging
+ staging.commit(rec)
+ } else if strict {
+ return ErrCorrupted{Type: CorruptedManifest, Err: err}
+ } else {
+ s.logf("manifest error: %v (skipped)", err)
+ }
+ rec.resetCompactionPointers()
+ rec.resetAddedTables()
+ rec.resetDeletedTables()
+ }
+
+ switch {
+ case !rec.has(recComparer):
+ return ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: manifest missing comparer name")}
+ case rec.comparer != s.icmp.uName():
+ return ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: comparer mismatch, " + "want '" + s.icmp.uName() + "', " + "got '" + rec.comparer + "'")}
+ case !rec.has(recNextNum):
+ return ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: manifest missing next file number")}
+ case !rec.has(recJournalNum):
+ return ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: manifest missing journal file number")}
+ case !rec.has(recSeq):
+ return ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: manifest missing seq number")}
+ }
+
+ s.manifestFile = file
+ s.setVersion(staging.finish())
+ s.setFileNum(rec.nextNum)
+ s.recordCommited(rec)
+ return nil
+}
+
+// Commit session; need external synchronization.
+func (s *session) commit(r *sessionRecord) (err error) {
+ // spawn new version based on current version
+ nv := s.version_NB().spawn(r)
+
+ if s.manifest == nil {
+ // manifest journal writer not yet created, create one
+ err = s.newManifest(r, nv)
+ } else {
+ err = s.flushManifest(r)
+ }
+
+ // finally, apply new version if no error rise
+ if err == nil {
+ s.setVersion(nv)
+ }
+
+ return
+}
+
+// Pick a compaction based on current state; need external synchronization.
+func (s *session) pickCompaction() *compaction {
+ v := s.version_NB()
+
+ var level int
+ var t0 tFiles
+ if v.cScore >= 1 {
+ level = v.cLevel
+ cp := s.stCPtrs[level]
+ tt := v.tables[level]
+ for _, t := range tt {
+ if cp == nil || s.icmp.Compare(t.max, cp) > 0 {
+ t0 = append(t0, t)
+ break
+ }
+ }
+ if len(t0) == 0 {
+ t0 = append(t0, tt[0])
+ }
+ } else {
+ if p := atomic.LoadPointer(&v.cSeek); p != nil {
+ ts := (*tSet)(p)
+ level = ts.level
+ t0 = append(t0, ts.table)
+ } else {
+ return nil
+ }
+ }
+
+ c := &compaction{s: s, version: v, level: level}
+ if level == 0 {
+ min, max := t0.getRange(s.icmp)
+ t0 = nil
+ v.tables[0].getOverlaps(min.ukey(), max.ukey(), &t0, false, s.icmp.ucmp)
+ }
+
+ c.tables[0] = t0
+ c.expand()
+ return c
+}
+
+// Create compaction from given level and range; need external synchronization.
+func (s *session) getCompactionRange(level int, min, max []byte) *compaction {
+ v := s.version_NB()
+
+ var t0 tFiles
+ v.tables[level].getOverlaps(min, max, &t0, level != 0, s.icmp.ucmp)
+ if len(t0) == 0 {
+ return nil
+ }
+
+ // Avoid compacting too much in one shot in case the range is large.
+ // But we cannot do this for level-0 since level-0 files can overlap
+ // and we must not pick one file and drop another older file if the
+ // two files overlap.
+ if level > 0 {
+ limit := uint64(kMaxTableSize)
+ total := uint64(0)
+ for i, t := range t0 {
+ total += t.size
+ if total >= limit {
+ s.logf("table@compaction limiting F·%d -> F·%d", len(t0), i+1)
+ t0 = t0[:i+1]
+ break
+ }
+ }
+ }
+
+ c := &compaction{s: s, version: v, level: level}
+ c.tables[0] = t0
+ c.expand()
+ return c
+}
+
+// compaction represent a compaction state
+type compaction struct {
+ s *session
+ version *version
+
+ level int
+ tables [2]tFiles
+
+ gp tFiles
+ gpidx int
+ seenKey bool
+ overlappedBytes uint64
+ min, max iKey
+
+ tPtrs [kNumLevels]int
+}
+
+// Expand compacted tables; need external synchronization.
+func (c *compaction) expand() {
+ s := c.s
+ v := c.version
+
+ level := c.level
+ vt0, vt1 := v.tables[level], v.tables[level+1]
+
+ t0, t1 := c.tables[0], c.tables[1]
+ min, max := t0.getRange(s.icmp)
+ vt1.getOverlaps(min.ukey(), max.ukey(), &t1, true, s.icmp.ucmp)
+
+ // Get entire range covered by compaction
+ amin, amax := append(t0, t1...).getRange(s.icmp)
+
+ // See if we can grow the number of inputs in "level" without
+ // changing the number of "level+1" files we pick up.
+ if len(t1) > 0 {
+ var exp0 tFiles
+ vt0.getOverlaps(amin.ukey(), amax.ukey(), &exp0, level != 0, s.icmp.ucmp)
+ if len(exp0) > len(t0) && t1.size()+exp0.size() < kExpCompactionMaxBytes {
+ var exp1 tFiles
+ xmin, xmax := exp0.getRange(s.icmp)
+ vt1.getOverlaps(xmin.ukey(), xmax.ukey(), &exp1, true, s.icmp.ucmp)
+ if len(exp1) == len(t1) {
+ s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
+ level, level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
+ len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size())))
+ min, max = xmin, xmax
+ t0, t1 = exp0, exp1
+ amin, amax = append(t0, t1...).getRange(s.icmp)
+ }
+ }
+ }
+
+ // Compute the set of grandparent files that overlap this compaction
+ // (parent == level+1; grandparent == level+2)
+ if level+2 < kNumLevels {
+ v.tables[level+2].getOverlaps(amin.ukey(), amax.ukey(), &c.gp, true, s.icmp.ucmp)
+ }
+
+ c.tables[0], c.tables[1] = t0, t1
+ c.min, c.max = min, max
+}
+
+// Check whether compaction is trivial.
+func (c *compaction) trivial() bool {
+ return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= kMaxGrandParentOverlapBytes
+}
+
+func (c *compaction) isBaseLevelForKey(key []byte) bool {
+ s := c.s
+ v := c.version
+
+ for level, tt := range v.tables[c.level+2:] {
+ for c.tPtrs[level] < len(tt) {
+ t := tt[c.tPtrs[level]]
+ if s.icmp.uCompare(key, t.max.ukey()) <= 0 {
+ // We've advanced far enough
+ if s.icmp.uCompare(key, t.min.ukey()) >= 0 {
+ // Key falls in this file's range, so definitely not base level
+ return false
+ }
+ break
+ }
+ c.tPtrs[level]++
+ }
+ }
+ return true
+}
+
+func (c *compaction) shouldStopBefore(key iKey) bool {
+ for ; c.gpidx < len(c.gp); c.gpidx++ {
+ gp := c.gp[c.gpidx]
+ if c.s.icmp.Compare(key, gp.max) <= 0 {
+ break
+ }
+ if c.seenKey {
+ c.overlappedBytes += gp.size
+ }
+ }
+ c.seenKey = true
+
+ if c.overlappedBytes > kMaxGrandParentOverlapBytes {
+ // Too much overlap for current output; start new output
+ c.overlappedBytes = 0
+ return true
+ }
+ return false
+}
+
+func (c *compaction) newIterator() iterator.Iterator {
+ s := c.s
+
+ level := c.level
+ icap := 2
+ if c.level == 0 {
+ icap = len(c.tables[0]) + 1
+ }
+ its := make([]iterator.Iterator, 0, icap)
+
+ ro := &opt.ReadOptions{
+ DontFillCache: true,
+ }
+ strict := s.o.GetStrict(opt.StrictIterator)
+
+ for i, tt := range c.tables {
+ if len(tt) == 0 {
+ continue
+ }
+
+ if level+i == 0 {
+ for _, t := range tt {
+ its = append(its, s.tops.newIterator(t, nil, ro))
+ }
+ } else {
+ it := iterator.NewIndexedIterator(tt.newIndexIterator(s.tops, s.icmp, nil, ro), strict, true)
+ its = append(its, it)
+ }
+ }
+
+ return iterator.NewMergedIterator(its, s.icmp, true)
+}