diff options
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go')
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go | 306 |
1 files changed, 179 insertions, 127 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go index 6b2a61683..b3906f7fc 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go @@ -7,12 +7,13 @@ package leveldb import ( - "errors" + "fmt" "io" "os" "sync" "sync/atomic" + "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/journal" "github.com/syndtr/goleveldb/leveldb/opt" @@ -20,18 +21,31 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) +type ErrManifestCorrupted struct { + Field string + Reason string +} + +func (e *ErrManifestCorrupted) Error() string { + return fmt.Sprintf("leveldb: manifest corrupted (field '%s'): %s", e.Field, e.Reason) +} + +func newErrManifestCorrupted(f storage.File, field, reason string) error { + return errors.NewErrCorrupted(f, &ErrManifestCorrupted{field, reason}) +} + // session represent a persistent database session. type session struct { // Need 64-bit alignment. - stFileNum uint64 // current unused file number + stNextFileNum uint64 // current unused file number stJournalNum uint64 // current journal file number; need external synchronization stPrevJournalNum uint64 // prev journal file number; no longer used; for compatibility with older version of leveldb - stSeq uint64 // last mem compacted seq; need external synchronization + stSeqNum uint64 // last mem compacted seq; need external synchronization stTempFileNum uint64 stor storage.Storage storLock util.Releaser - o *opt.Options + o *cachedOptions icmp *iComparer tops *tOps @@ -39,11 +53,12 @@ type session struct { manifestWriter storage.Writer manifestFile storage.File - stCPtrs [kNumLevels]iKey // compact pointers; need external synchronization - stVersion *version // current version - vmu sync.Mutex + stCompPtrs []iKey // compaction pointers; need external synchronization + stVersion *version // current version + vmu sync.Mutex } +// Creates new initialized session instance. func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) { if stor == nil { return nil, os.ErrInvalid @@ -53,22 +68,20 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) { return } s = &session{ - stor: stor, - storLock: storLock, + stor: stor, + storLock: storLock, + stCompPtrs: make([]iKey, o.GetNumLevel()), } s.setOptions(o) - s.tops = newTableOps(s, s.o.GetMaxOpenFiles()) - s.setVersion(&version{s: s}) - s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock D·DeletedEntry L·Level Q·SeqNum T·TimeElapsed") + s.tops = newTableOps(s) + s.setVersion(newVersion(s)) + s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed") return } // Close session. func (s *session) close() { s.tops.close() - if bc := s.o.GetBlockCache(); bc != nil { - bc.Purge(nil) - } if s.manifest != nil { s.manifest.Close() } @@ -81,6 +94,7 @@ func (s *session) close() { s.stVersion = nil } +// Release session lock. func (s *session) release() { s.storLock.Release() } @@ -98,26 +112,26 @@ func (s *session) recover() (err error) { // Don't return os.ErrNotExist if the underlying storage contains // other files that belong to LevelDB. So the DB won't get trashed. if files, _ := s.stor.GetFiles(storage.TypeAll); len(files) > 0 { - err = ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: manifest file missing")} + err = &errors.ErrCorrupted{File: &storage.FileInfo{Type: storage.TypeManifest}, Err: &errors.ErrMissingFiles{}} } } }() - file, err := s.stor.GetManifest() + m, err := s.stor.GetManifest() if err != nil { return } - reader, err := file.Open() + reader, err := m.Open() if err != nil { return } defer reader.Close() strict := s.o.GetStrict(opt.StrictManifest) - jr := journal.NewReader(reader, dropper{s, file}, strict, true) + jr := journal.NewReader(reader, dropper{s, m}, strict, true) - staging := s.version_NB().newStaging() - rec := &sessionRecord{} + staging := s.stVersion.newStaging() + rec := &sessionRecord{numLevel: s.o.GetNumLevel()} for { var r io.Reader r, err = jr.Next() @@ -126,51 +140,57 @@ func (s *session) recover() (err error) { err = nil break } - return + return errors.SetFile(err, m) } err = rec.decode(r) if err == nil { // save compact pointers - for _, rp := range rec.compactionPointers { - s.stCPtrs[rp.level] = iKey(rp.key) + for _, r := range rec.compPtrs { + s.stCompPtrs[r.level] = iKey(r.ikey) } // commit record to version staging staging.commit(rec) - } else if strict { - return ErrCorrupted{Type: CorruptedManifest, Err: err} } else { - s.logf("manifest error: %v (skipped)", err) + err = errors.SetFile(err, m) + if strict || !errors.IsCorrupted(err) { + return + } else { + s.logf("manifest error: %v (skipped)", errors.SetFile(err, m)) + } } - rec.resetCompactionPointers() + rec.resetCompPtrs() rec.resetAddedTables() rec.resetDeletedTables() } switch { case !rec.has(recComparer): - return ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: manifest missing comparer name")} + return newErrManifestCorrupted(m, "comparer", "missing") case rec.comparer != s.icmp.uName(): - return ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: comparer mismatch, " + "want '" + s.icmp.uName() + "', " + "got '" + rec.comparer + "'")} - case !rec.has(recNextNum): - return ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: manifest missing next file number")} + return newErrManifestCorrupted(m, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer)) + case !rec.has(recNextFileNum): + return newErrManifestCorrupted(m, "next-file-num", "missing") case !rec.has(recJournalNum): - return ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: manifest missing journal file number")} - case !rec.has(recSeq): - return ErrCorrupted{Type: CorruptedManifest, Err: errors.New("leveldb: manifest missing seq number")} + return newErrManifestCorrupted(m, "journal-file-num", "missing") + case !rec.has(recSeqNum): + return newErrManifestCorrupted(m, "seq-num", "missing") } - s.manifestFile = file + s.manifestFile = m s.setVersion(staging.finish()) - s.setFileNum(rec.nextNum) + s.setNextFileNum(rec.nextFileNum) s.recordCommited(rec) return nil } // Commit session; need external synchronization. func (s *session) commit(r *sessionRecord) (err error) { + v := s.version() + defer v.release() + // spawn new version based on current version - nv := s.version_NB().spawn(r) + nv := v.spawn(r) if s.manifest == nil { // manifest journal writer not yet created, create one @@ -189,22 +209,22 @@ func (s *session) commit(r *sessionRecord) (err error) { // Pick a compaction based on current state; need external synchronization. func (s *session) pickCompaction() *compaction { - v := s.version_NB() + v := s.version() var level int var t0 tFiles if v.cScore >= 1 { level = v.cLevel - cp := s.stCPtrs[level] - tt := v.tables[level] - for _, t := range tt { - if cp == nil || s.icmp.Compare(t.max, cp) > 0 { + cptr := s.stCompPtrs[level] + tables := v.tables[level] + for _, t := range tables { + if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 { t0 = append(t0, t) break } } if len(t0) == 0 { - t0 = append(t0, tt[0]) + t0 = append(t0, tables[0]) } } else { if p := atomic.LoadPointer(&v.cSeek); p != nil { @@ -212,29 +232,21 @@ func (s *session) pickCompaction() *compaction { level = ts.level t0 = append(t0, ts.table) } else { + v.release() return nil } } - c := &compaction{s: s, version: v, level: level} - if level == 0 { - min, max := t0.getRange(s.icmp) - t0 = nil - v.tables[0].getOverlaps(min.ukey(), max.ukey(), &t0, false, s.icmp.ucmp) - } - - c.tables[0] = t0 - c.expand() - return c + return newCompaction(s, v, level, t0) } // Create compaction from given level and range; need external synchronization. -func (s *session) getCompactionRange(level int, min, max []byte) *compaction { - v := s.version_NB() +func (s *session) getCompactionRange(level int, umin, umax []byte) *compaction { + v := s.version() - var t0 tFiles - v.tables[level].getOverlaps(min, max, &t0, level != 0, s.icmp.ucmp) + t0 := v.tables[level].getOverlaps(nil, s.icmp, umin, umax, level == 0) if len(t0) == 0 { + v.release() return nil } @@ -243,7 +255,7 @@ func (s *session) getCompactionRange(level int, min, max []byte) *compaction { // and we must not pick one file and drop another older file if the // two files overlap. if level > 0 { - limit := uint64(kMaxTableSize) + limit := uint64(v.s.o.GetCompactionSourceLimit(level)) total := uint64(0) for i, t := range t0 { total += t.size @@ -255,90 +267,124 @@ func (s *session) getCompactionRange(level int, min, max []byte) *compaction { } } - c := &compaction{s: s, version: v, level: level} - c.tables[0] = t0 + return newCompaction(s, v, level, t0) +} + +func newCompaction(s *session, v *version, level int, t0 tFiles) *compaction { + c := &compaction{ + s: s, + v: v, + level: level, + tables: [2]tFiles{t0, nil}, + maxGPOverlaps: uint64(s.o.GetCompactionGPOverlaps(level)), + tPtrs: make([]int, s.o.GetNumLevel()), + } c.expand() + c.save() return c } -// compaction represent a compaction state +// compaction represent a compaction state. type compaction struct { - s *session - version *version + s *session + v *version + + level int + tables [2]tFiles + maxGPOverlaps uint64 + + gp tFiles + gpi int + seenKey bool + gpOverlappedBytes uint64 + imin, imax iKey + tPtrs []int + released bool + + snapGPI int + snapSeenKey bool + snapGPOverlappedBytes uint64 + snapTPtrs []int +} - level int - tables [2]tFiles +func (c *compaction) save() { + c.snapGPI = c.gpi + c.snapSeenKey = c.seenKey + c.snapGPOverlappedBytes = c.gpOverlappedBytes + c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...) +} - gp tFiles - gpidx int - seenKey bool - overlappedBytes uint64 - min, max iKey +func (c *compaction) restore() { + c.gpi = c.snapGPI + c.seenKey = c.snapSeenKey + c.gpOverlappedBytes = c.snapGPOverlappedBytes + c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...) +} - tPtrs [kNumLevels]int +func (c *compaction) release() { + if !c.released { + c.released = true + c.v.release() + } } // Expand compacted tables; need external synchronization. func (c *compaction) expand() { - s := c.s - v := c.version - - level := c.level - vt0, vt1 := v.tables[level], v.tables[level+1] + limit := uint64(c.s.o.GetCompactionExpandLimit(c.level)) + vt0, vt1 := c.v.tables[c.level], c.v.tables[c.level+1] t0, t1 := c.tables[0], c.tables[1] - min, max := t0.getRange(s.icmp) - vt1.getOverlaps(min.ukey(), max.ukey(), &t1, true, s.icmp.ucmp) - - // Get entire range covered by compaction - amin, amax := append(t0, t1...).getRange(s.icmp) + imin, imax := t0.getRange(c.s.icmp) + // We expand t0 here just incase ukey hop across tables. + t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.level == 0) + if len(t0) != len(c.tables[0]) { + imin, imax = t0.getRange(c.s.icmp) + } + t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false) + // Get entire range covered by compaction. + amin, amax := append(t0, t1...).getRange(c.s.icmp) // See if we can grow the number of inputs in "level" without // changing the number of "level+1" files we pick up. if len(t1) > 0 { - var exp0 tFiles - vt0.getOverlaps(amin.ukey(), amax.ukey(), &exp0, level != 0, s.icmp.ucmp) - if len(exp0) > len(t0) && t1.size()+exp0.size() < kExpCompactionMaxBytes { - var exp1 tFiles - xmin, xmax := exp0.getRange(s.icmp) - vt1.getOverlaps(xmin.ukey(), xmax.ukey(), &exp1, true, s.icmp.ucmp) + exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.level == 0) + if len(exp0) > len(t0) && t1.size()+exp0.size() < limit { + xmin, xmax := exp0.getRange(c.s.icmp) + exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false) if len(exp1) == len(t1) { - s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)", - level, level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())), + c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)", + c.level, c.level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())), len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size()))) - min, max = xmin, xmax + imin, imax = xmin, xmax t0, t1 = exp0, exp1 - amin, amax = append(t0, t1...).getRange(s.icmp) + amin, amax = append(t0, t1...).getRange(c.s.icmp) } } } // Compute the set of grandparent files that overlap this compaction // (parent == level+1; grandparent == level+2) - if level+2 < kNumLevels { - v.tables[level+2].getOverlaps(amin.ukey(), amax.ukey(), &c.gp, true, s.icmp.ucmp) + if c.level+2 < c.s.o.GetNumLevel() { + c.gp = c.v.tables[c.level+2].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false) } c.tables[0], c.tables[1] = t0, t1 - c.min, c.max = min, max + c.imin, c.imax = imin, imax } // Check whether compaction is trivial. func (c *compaction) trivial() bool { - return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= kMaxGrandParentOverlapBytes + return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= c.maxGPOverlaps } -func (c *compaction) isBaseLevelForKey(key []byte) bool { - s := c.s - v := c.version - - for level, tt := range v.tables[c.level+2:] { - for c.tPtrs[level] < len(tt) { - t := tt[c.tPtrs[level]] - if s.icmp.uCompare(key, t.max.ukey()) <= 0 { - // We've advanced far enough - if s.icmp.uCompare(key, t.min.ukey()) >= 0 { - // Key falls in this file's range, so definitely not base level +func (c *compaction) baseLevelForKey(ukey []byte) bool { + for level, tables := range c.v.tables[c.level+2:] { + for c.tPtrs[level] < len(tables) { + t := tables[c.tPtrs[level]] + if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 { + // We've advanced far enough. + if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 { + // Key falls in this file's range, so definitely not base level. return false } break @@ -349,55 +395,61 @@ func (c *compaction) isBaseLevelForKey(key []byte) bool { return true } -func (c *compaction) shouldStopBefore(key iKey) bool { - for ; c.gpidx < len(c.gp); c.gpidx++ { - gp := c.gp[c.gpidx] - if c.s.icmp.Compare(key, gp.max) <= 0 { +func (c *compaction) shouldStopBefore(ikey iKey) bool { + for ; c.gpi < len(c.gp); c.gpi++ { + gp := c.gp[c.gpi] + if c.s.icmp.Compare(ikey, gp.imax) <= 0 { break } if c.seenKey { - c.overlappedBytes += gp.size + c.gpOverlappedBytes += gp.size } } c.seenKey = true - if c.overlappedBytes > kMaxGrandParentOverlapBytes { - // Too much overlap for current output; start new output - c.overlappedBytes = 0 + if c.gpOverlappedBytes > c.maxGPOverlaps { + // Too much overlap for current output; start new output. + c.gpOverlappedBytes = 0 return true } return false } +// Creates an iterator. func (c *compaction) newIterator() iterator.Iterator { - s := c.s - - level := c.level - icap := 2 + // Creates iterator slice. + icap := len(c.tables) if c.level == 0 { + // Special case for level-0 icap = len(c.tables[0]) + 1 } its := make([]iterator.Iterator, 0, icap) + // Options. ro := &opt.ReadOptions{ DontFillCache: true, + Strict: opt.StrictOverride, + } + strict := c.s.o.GetStrict(opt.StrictCompaction) + if strict { + ro.Strict |= opt.StrictReader } - strict := s.o.GetStrict(opt.StrictIterator) - for i, tt := range c.tables { - if len(tt) == 0 { + for i, tables := range c.tables { + if len(tables) == 0 { continue } - if level+i == 0 { - for _, t := range tt { - its = append(its, s.tops.newIterator(t, nil, ro)) + // Level-0 is not sorted and may overlaps each other. + if c.level+i == 0 { + for _, t := range tables { + its = append(its, c.s.tops.newIterator(t, nil, ro)) } } else { - it := iterator.NewIndexedIterator(tt.newIndexIterator(s.tops, s.icmp, nil, ro), strict, true) + it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict) its = append(its, it) } } - return iterator.NewMergedIterator(its, s.icmp, true) + return iterator.NewMergedIterator(its, c.s.icmp, strict) } |