aboutsummaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go
diff options
context:
space:
mode:
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go')
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go596
1 files changed, 371 insertions, 225 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go
index 323353b2a..537addb62 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go
@@ -36,14 +36,14 @@ type DB struct {
s *session
// MemDB.
- memMu sync.RWMutex
- memPool chan *memdb.DB
- mem, frozenMem *memDB
- journal *journal.Writer
- journalWriter storage.Writer
- journalFile storage.File
- frozenJournalFile storage.File
- frozenSeq uint64
+ memMu sync.RWMutex
+ memPool chan *memdb.DB
+ mem, frozenMem *memDB
+ journal *journal.Writer
+ journalWriter storage.Writer
+ journalFd storage.FileDesc
+ frozenJournalFd storage.FileDesc
+ frozenSeq uint64
// Snapshot.
snapsMu sync.Mutex
@@ -61,15 +61,19 @@ type DB struct {
writeDelayN int
journalC chan *Batch
journalAckC chan error
+ tr *Transaction
// Compaction.
- tcompCmdC chan cCmd
- tcompPauseC chan chan<- struct{}
- mcompCmdC chan cCmd
- compErrC chan error
- compPerErrC chan error
- compErrSetC chan error
- compStats []cStats
+ compCommitLk sync.Mutex
+ tcompCmdC chan cCmd
+ tcompPauseC chan chan<- struct{}
+ mcompCmdC chan cCmd
+ compErrC chan error
+ compPerErrC chan error
+ compErrSetC chan error
+ compWriteLocking bool
+ compStats cStats
+ memdbMaxLevel int // For testing.
// Close.
closeW sync.WaitGroup
@@ -103,33 +107,48 @@ func openDB(s *session) (*DB, error) {
compErrC: make(chan error),
compPerErrC: make(chan error),
compErrSetC: make(chan error),
- compStats: make([]cStats, s.o.GetNumLevel()),
// Close
closeC: make(chan struct{}),
}
- if err := db.recoverJournal(); err != nil {
- return nil, err
- }
+ // Read-only mode.
+ readOnly := s.o.GetReadOnly()
- // Remove any obsolete files.
- if err := db.checkAndCleanFiles(); err != nil {
- // Close journal.
- if db.journal != nil {
- db.journal.Close()
- db.journalWriter.Close()
+ if readOnly {
+ // Recover journals (read-only mode).
+ if err := db.recoverJournalRO(); err != nil {
+ return nil, err
}
- return nil, err
+ } else {
+ // Recover journals.
+ if err := db.recoverJournal(); err != nil {
+ return nil, err
+ }
+
+ // Remove any obsolete files.
+ if err := db.checkAndCleanFiles(); err != nil {
+ // Close journal.
+ if db.journal != nil {
+ db.journal.Close()
+ db.journalWriter.Close()
+ }
+ return nil, err
+ }
+
}
// Doesn't need to be included in the wait group.
go db.compactionError()
go db.mpoolDrain()
- db.closeW.Add(3)
- go db.tCompaction()
- go db.mCompaction()
- go db.jWriter()
+ if readOnly {
+ db.SetReadOnly()
+ } else {
+ db.closeW.Add(3)
+ go db.tCompaction()
+ go db.mCompaction()
+ go db.jWriter()
+ }
s.logf("db@open done T·%v", time.Since(start))
@@ -192,7 +211,7 @@ func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
// The returned DB instance is goroutine-safe.
// The DB must be closed after use, by calling Close method.
func OpenFile(path string, o *opt.Options) (db *DB, err error) {
- stor, err := storage.OpenFile(path)
+ stor, err := storage.OpenFile(path, o.GetReadOnly())
if err != nil {
return
}
@@ -242,7 +261,7 @@ func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
// The returned DB instance is goroutine-safe.
// The DB must be closed after use, by calling Close method.
func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
- stor, err := storage.OpenFile(path)
+ stor, err := storage.OpenFile(path, false)
if err != nil {
return
}
@@ -261,12 +280,11 @@ func recoverTable(s *session, o *opt.Options) error {
o.Strict &= ^opt.StrictReader
// Get all tables and sort it by file number.
- tableFiles_, err := s.getFiles(storage.TypeTable)
+ fds, err := s.stor.List(storage.TypeTable)
if err != nil {
return err
}
- tableFiles := files(tableFiles_)
- tableFiles.sort()
+ sortFds(fds)
var (
maxSeq uint64
@@ -274,21 +292,22 @@ func recoverTable(s *session, o *opt.Options) error {
// We will drop corrupted table.
strict = o.GetStrict(opt.StrictRecovery)
+ noSync = o.GetNoSync()
- rec = &sessionRecord{numLevel: o.GetNumLevel()}
+ rec = &sessionRecord{}
bpool = util.NewBufferPool(o.GetBlockSize() + 5)
)
- buildTable := func(iter iterator.Iterator) (tmp storage.File, size int64, err error) {
- tmp = s.newTemp()
- writer, err := tmp.Create()
+ buildTable := func(iter iterator.Iterator) (tmpFd storage.FileDesc, size int64, err error) {
+ tmpFd = s.newTemp()
+ writer, err := s.stor.Create(tmpFd)
if err != nil {
return
}
defer func() {
writer.Close()
if err != nil {
- tmp.Remove()
- tmp = nil
+ s.stor.Remove(tmpFd)
+ tmpFd = storage.FileDesc{}
}
}()
@@ -311,16 +330,18 @@ func recoverTable(s *session, o *opt.Options) error {
if err != nil {
return
}
- err = writer.Sync()
- if err != nil {
- return
+ if !noSync {
+ err = writer.Sync()
+ if err != nil {
+ return
+ }
}
size = int64(tw.BytesLen())
return
}
- recoverTable := func(file storage.File) error {
- s.logf("table@recovery recovering @%d", file.Num())
- reader, err := file.Open()
+ recoverTable := func(fd storage.FileDesc) error {
+ s.logf("table@recovery recovering @%d", fd.Num)
+ reader, err := s.stor.Open(fd)
if err != nil {
return err
}
@@ -342,7 +363,7 @@ func recoverTable(s *session, o *opt.Options) error {
tgoodKey, tcorruptedKey, tcorruptedBlock int
imin, imax []byte
)
- tr, err := table.NewReader(reader, size, storage.NewFileInfo(file), nil, bpool, o)
+ tr, err := table.NewReader(reader, size, fd, nil, bpool, o)
if err != nil {
return err
}
@@ -350,7 +371,7 @@ func recoverTable(s *session, o *opt.Options) error {
if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok {
itererr.SetErrorCallback(func(err error) {
if errors.IsCorrupted(err) {
- s.logf("table@recovery block corruption @%d %q", file.Num(), err)
+ s.logf("table@recovery block corruption @%d %q", fd.Num, err)
tcorruptedBlock++
}
})
@@ -385,23 +406,23 @@ func recoverTable(s *session, o *opt.Options) error {
if strict && (tcorruptedKey > 0 || tcorruptedBlock > 0) {
droppedTable++
- s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", file.Num(), tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
+ s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
return nil
}
if tgoodKey > 0 {
if tcorruptedKey > 0 || tcorruptedBlock > 0 {
// Rebuild the table.
- s.logf("table@recovery rebuilding @%d", file.Num())
+ s.logf("table@recovery rebuilding @%d", fd.Num)
iter := tr.NewIterator(nil, nil)
- tmp, newSize, err := buildTable(iter)
+ tmpFd, newSize, err := buildTable(iter)
iter.Release()
if err != nil {
return err
}
closed = true
reader.Close()
- if err := file.Replace(tmp); err != nil {
+ if err := s.stor.Rename(tmpFd, fd); err != nil {
return err
}
size = newSize
@@ -411,30 +432,30 @@ func recoverTable(s *session, o *opt.Options) error {
}
recoveredKey += tgoodKey
// Add table to level 0.
- rec.addTable(0, file.Num(), uint64(size), imin, imax)
- s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", file.Num(), tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
+ rec.addTable(0, fd.Num, size, imin, imax)
+ s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
} else {
droppedTable++
- s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", file.Num(), tcorruptedKey, tcorruptedBlock, size)
+ s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", fd.Num, tcorruptedKey, tcorruptedBlock, size)
}
return nil
}
// Recover all tables.
- if len(tableFiles) > 0 {
- s.logf("table@recovery F·%d", len(tableFiles))
+ if len(fds) > 0 {
+ s.logf("table@recovery F·%d", len(fds))
// Mark file number as used.
- s.markFileNum(tableFiles[len(tableFiles)-1].Num())
+ s.markFileNum(fds[len(fds)-1].Num)
- for _, file := range tableFiles {
- if err := recoverTable(file); err != nil {
+ for _, fd := range fds {
+ if err := recoverTable(fd); err != nil {
return err
}
}
- s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(tableFiles), recoveredKey, goodKey, corruptedKey, maxSeq)
+ s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(fds), recoveredKey, goodKey, corruptedKey, maxSeq)
}
// Set sequence number.
@@ -450,132 +471,136 @@ func recoverTable(s *session, o *opt.Options) error {
}
func (db *DB) recoverJournal() error {
- // Get all tables and sort it by file number.
- journalFiles_, err := db.s.getFiles(storage.TypeJournal)
+ // Get all journals and sort it by file number.
+ fds_, err := db.s.stor.List(storage.TypeJournal)
if err != nil {
return err
}
- journalFiles := files(journalFiles_)
- journalFiles.sort()
+ sortFds(fds_)
- // Discard older journal.
- prev := -1
- for i, file := range journalFiles {
- if file.Num() >= db.s.stJournalNum {
- if prev >= 0 {
- i--
- journalFiles[i] = journalFiles[prev]
- }
- journalFiles = journalFiles[i:]
- break
- } else if file.Num() == db.s.stPrevJournalNum {
- prev = i
- }
- }
-
- var jr *journal.Reader
- var of storage.File
- var mem *memdb.DB
- batch := new(Batch)
- cm := newCMem(db.s)
- buf := new(util.Buffer)
- // Options.
- strict := db.s.o.GetStrict(opt.StrictJournal)
- checksum := db.s.o.GetStrict(opt.StrictJournalChecksum)
- writeBuffer := db.s.o.GetWriteBuffer()
- recoverJournal := func(file storage.File) error {
- db.logf("journal@recovery recovering @%d", file.Num())
- reader, err := file.Open()
- if err != nil {
- return err
+ // Journals that will be recovered.
+ var fds []storage.FileDesc
+ for _, fd := range fds_ {
+ if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
+ fds = append(fds, fd)
}
- defer reader.Close()
+ }
- // Create/reset journal reader instance.
- if jr == nil {
- jr = journal.NewReader(reader, dropper{db.s, file}, strict, checksum)
- } else {
- jr.Reset(reader, dropper{db.s, file}, strict, checksum)
- }
+ var (
+ ofd storage.FileDesc // Obsolete file.
+ rec = &sessionRecord{}
+ )
- // Flush memdb and remove obsolete journal file.
- if of != nil {
- if mem.Len() > 0 {
- if err := cm.flush(mem, 0); err != nil {
- return err
- }
- }
- if err := cm.commit(file.Num(), db.seq); err != nil {
+ // Recover journals.
+ if len(fds) > 0 {
+ db.logf("journal@recovery F·%d", len(fds))
+
+ // Mark file number as used.
+ db.s.markFileNum(fds[len(fds)-1].Num)
+
+ var (
+ // Options.
+ strict = db.s.o.GetStrict(opt.StrictJournal)
+ checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
+ writeBuffer = db.s.o.GetWriteBuffer()
+
+ jr *journal.Reader
+ mdb = memdb.New(db.s.icmp, writeBuffer)
+ buf = &util.Buffer{}
+ batch = &Batch{}
+ )
+
+ for _, fd := range fds {
+ db.logf("journal@recovery recovering @%d", fd.Num)
+
+ fr, err := db.s.stor.Open(fd)
+ if err != nil {
return err
}
- cm.reset()
- of.Remove()
- of = nil
- }
- // Replay journal to memdb.
- mem.Reset()
- for {
- r, err := jr.Next()
- if err != nil {
- if err == io.EOF {
- break
- }
- return errors.SetFile(err, file)
+ // Create or reset journal reader instance.
+ if jr == nil {
+ jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
+ } else {
+ jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
}
- buf.Reset()
- if _, err := buf.ReadFrom(r); err != nil {
- if err == io.ErrUnexpectedEOF {
- // This is error returned due to corruption, with strict == false.
- continue
- } else {
- return errors.SetFile(err, file)
+ // Flush memdb and remove obsolete journal file.
+ if !ofd.Nil() {
+ if mdb.Len() > 0 {
+ if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
+ fr.Close()
+ return err
+ }
}
- }
- if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mem); err != nil {
- if strict || !errors.IsCorrupted(err) {
- return errors.SetFile(err, file)
- } else {
- db.s.logf("journal error: %v (skipped)", err)
- // We won't apply sequence number as it might be corrupted.
- continue
+
+ rec.setJournalNum(fd.Num)
+ rec.setSeqNum(db.seq)
+ if err := db.s.commit(rec); err != nil {
+ fr.Close()
+ return err
}
+ rec.resetAddedTables()
+
+ db.s.stor.Remove(ofd)
+ ofd = storage.FileDesc{}
}
- // Save sequence number.
- db.seq = batch.seq + uint64(batch.Len())
+ // Replay journal to memdb.
+ mdb.Reset()
+ for {
+ r, err := jr.Next()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
- // Flush it if large enough.
- if mem.Size() >= writeBuffer {
- if err := cm.flush(mem, 0); err != nil {
- return err
+ fr.Close()
+ return errors.SetFd(err, fd)
}
- mem.Reset()
- }
- }
- of = file
- return nil
- }
+ buf.Reset()
+ if _, err := buf.ReadFrom(r); err != nil {
+ if err == io.ErrUnexpectedEOF {
+ // This is error returned due to corruption, with strict == false.
+ continue
+ }
+
+ fr.Close()
+ return errors.SetFd(err, fd)
+ }
+ if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
+ if !strict && errors.IsCorrupted(err) {
+ db.s.logf("journal error: %v (skipped)", err)
+ // We won't apply sequence number as it might be corrupted.
+ continue
+ }
+
+ fr.Close()
+ return errors.SetFd(err, fd)
+ }
- // Recover all journals.
- if len(journalFiles) > 0 {
- db.logf("journal@recovery F·%d", len(journalFiles))
+ // Save sequence number.
+ db.seq = batch.seq + uint64(batch.Len())
- // Mark file number as used.
- db.s.markFileNum(journalFiles[len(journalFiles)-1].Num())
+ // Flush it if large enough.
+ if mdb.Size() >= writeBuffer {
+ if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
+ fr.Close()
+ return err
+ }
- mem = memdb.New(db.s.icmp, writeBuffer)
- for _, file := range journalFiles {
- if err := recoverJournal(file); err != nil {
- return err
+ mdb.Reset()
+ }
}
+
+ fr.Close()
+ ofd = fd
}
- // Flush the last journal.
- if mem.Len() > 0 {
- if err := cm.flush(mem, 0); err != nil {
+ // Flush the last memdb.
+ if mdb.Len() > 0 {
+ if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
return err
}
}
@@ -587,8 +612,10 @@ func (db *DB) recoverJournal() error {
}
// Commit.
- if err := cm.commit(db.journalFile.Num(), db.seq); err != nil {
- // Close journal.
+ rec.setJournalNum(db.journalFd.Num)
+ rec.setSeqNum(db.seq)
+ if err := db.s.commit(rec); err != nil {
+ // Close journal on error.
if db.journal != nil {
db.journal.Close()
db.journalWriter.Close()
@@ -597,15 +624,139 @@ func (db *DB) recoverJournal() error {
}
// Remove the last obsolete journal file.
- if of != nil {
- of.Remove()
+ if !ofd.Nil() {
+ db.s.stor.Remove(ofd)
+ }
+
+ return nil
+}
+
+func (db *DB) recoverJournalRO() error {
+ // Get all journals and sort it by file number.
+ fds_, err := db.s.stor.List(storage.TypeJournal)
+ if err != nil {
+ return err
+ }
+ sortFds(fds_)
+
+ // Journals that will be recovered.
+ var fds []storage.FileDesc
+ for _, fd := range fds_ {
+ if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
+ fds = append(fds, fd)
+ }
+ }
+
+ var (
+ // Options.
+ strict = db.s.o.GetStrict(opt.StrictJournal)
+ checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
+ writeBuffer = db.s.o.GetWriteBuffer()
+
+ mdb = memdb.New(db.s.icmp, writeBuffer)
+ )
+
+ // Recover journals.
+ if len(fds) > 0 {
+ db.logf("journal@recovery RO·Mode F·%d", len(fds))
+
+ var (
+ jr *journal.Reader
+ buf = &util.Buffer{}
+ batch = &Batch{}
+ )
+
+ for _, fd := range fds {
+ db.logf("journal@recovery recovering @%d", fd.Num)
+
+ fr, err := db.s.stor.Open(fd)
+ if err != nil {
+ return err
+ }
+
+ // Create or reset journal reader instance.
+ if jr == nil {
+ jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
+ } else {
+ jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
+ }
+
+ // Replay journal to memdb.
+ for {
+ r, err := jr.Next()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+
+ fr.Close()
+ return errors.SetFd(err, fd)
+ }
+
+ buf.Reset()
+ if _, err := buf.ReadFrom(r); err != nil {
+ if err == io.ErrUnexpectedEOF {
+ // This is error returned due to corruption, with strict == false.
+ continue
+ }
+
+ fr.Close()
+ return errors.SetFd(err, fd)
+ }
+ if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
+ if !strict && errors.IsCorrupted(err) {
+ db.s.logf("journal error: %v (skipped)", err)
+ // We won't apply sequence number as it might be corrupted.
+ continue
+ }
+
+ fr.Close()
+ return errors.SetFd(err, fd)
+ }
+
+ // Save sequence number.
+ db.seq = batch.seq + uint64(batch.Len())
+ }
+
+ fr.Close()
+ }
}
+ // Set memDB.
+ db.mem = &memDB{db: db, DB: mdb, ref: 1}
+
return nil
}
-func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
- ikey := newIkey(key, seq, ktSeek)
+func memGet(mdb *memdb.DB, ikey iKey, icmp *iComparer) (ok bool, mv []byte, err error) {
+ mk, mv, err := mdb.Find(ikey)
+ if err == nil {
+ ukey, _, kt, kerr := parseIkey(mk)
+ if kerr != nil {
+ // Shouldn't have had happen.
+ panic(kerr)
+ }
+ if icmp.uCompare(ukey, ikey.ukey()) == 0 {
+ if kt == ktDel {
+ return true, nil, ErrNotFound
+ }
+ return true, mv, nil
+
+ }
+ } else if err != ErrNotFound {
+ return true, nil, err
+ }
+ return
+}
+
+func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
+ ikey := makeIkey(nil, key, seq, ktSeek)
+
+ if auxm != nil {
+ if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok {
+ return append([]byte{}, mv...), me
+ }
+ }
em, fm := db.getMems()
for _, m := range [...]*memDB{em, fm} {
@@ -614,36 +765,36 @@ func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, er
}
defer m.decref()
- mk, mv, me := m.mdb.Find(ikey)
- if me == nil {
- ukey, _, kt, kerr := parseIkey(mk)
- if kerr != nil {
- // Shouldn't have had happen.
- panic(kerr)
- }
- if db.s.icmp.uCompare(ukey, key) == 0 {
- if kt == ktDel {
- return nil, ErrNotFound
- }
- return append([]byte{}, mv...), nil
- }
- } else if me != ErrNotFound {
- return nil, me
+ if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok {
+ return append([]byte{}, mv...), me
}
}
v := db.s.version()
- value, cSched, err := v.get(ikey, ro, false)
+ value, cSched, err := v.get(auxt, ikey, ro, false)
v.release()
if cSched {
// Trigger table compaction.
- db.compSendTrigger(db.tcompCmdC)
+ db.compTrigger(db.tcompCmdC)
}
return
}
-func (db *DB) has(key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err error) {
- ikey := newIkey(key, seq, ktSeek)
+func nilIfNotFound(err error) error {
+ if err == ErrNotFound {
+ return nil
+ }
+ return err
+}
+
+func (db *DB) has(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err error) {
+ ikey := makeIkey(nil, key, seq, ktSeek)
+
+ if auxm != nil {
+ if ok, _, me := memGet(auxm, ikey, db.s.icmp); ok {
+ return me == nil, nilIfNotFound(me)
+ }
+ }
em, fm := db.getMems()
for _, m := range [...]*memDB{em, fm} {
@@ -652,30 +803,17 @@ func (db *DB) has(key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err er
}
defer m.decref()
- mk, _, me := m.mdb.Find(ikey)
- if me == nil {
- ukey, _, kt, kerr := parseIkey(mk)
- if kerr != nil {
- // Shouldn't have had happen.
- panic(kerr)
- }
- if db.s.icmp.uCompare(ukey, key) == 0 {
- if kt == ktDel {
- return false, nil
- }
- return true, nil
- }
- } else if me != ErrNotFound {
- return false, me
+ if ok, _, me := memGet(m.DB, ikey, db.s.icmp); ok {
+ return me == nil, nilIfNotFound(me)
}
}
v := db.s.version()
- _, cSched, err := v.get(ikey, ro, true)
+ _, cSched, err := v.get(auxt, ikey, ro, true)
v.release()
if cSched {
// Trigger table compaction.
- db.compSendTrigger(db.tcompCmdC)
+ db.compTrigger(db.tcompCmdC)
}
if err == nil {
ret = true
@@ -699,7 +837,7 @@ func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
se := db.acquireSnapshot()
defer db.releaseSnapshot(se)
- return db.get(key, se.seq, ro)
+ return db.get(nil, nil, key, se.seq, ro)
}
// Has returns true if the DB does contains the given key.
@@ -713,11 +851,11 @@ func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
se := db.acquireSnapshot()
defer db.releaseSnapshot(se)
- return db.has(key, se.seq, ro)
+ return db.has(nil, nil, key, se.seq, ro)
}
// NewIterator returns an iterator for the latest snapshot of the
-// uderlying DB.
+// underlying DB.
// The returned iterator is not goroutine-safe, but it is safe to use
// multiple iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently with modifying its
@@ -741,7 +879,7 @@ func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Itera
defer db.releaseSnapshot(se)
// Iterator holds 'version' lock, 'version' is immutable so snapshot
// can be released after iterator created.
- return db.newIterator(se.seq, slice, ro)
+ return db.newIterator(nil, nil, se.seq, slice, ro)
}
// GetSnapshot returns a latest snapshot of the underlying DB. A snapshot
@@ -784,7 +922,7 @@ func (db *DB) GetProperty(name string) (value string, err error) {
const prefix = "leveldb."
if !strings.HasPrefix(name, prefix) {
- return "", errors.New("leveldb: GetProperty: unknown property: " + name)
+ return "", ErrNotFound
}
p := name[len(prefix):]
@@ -797,8 +935,8 @@ func (db *DB) GetProperty(name string) (value string, err error) {
var level uint
var rest string
n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest)
- if n != 1 || int(level) >= db.s.o.GetNumLevel() {
- err = errors.New("leveldb: GetProperty: invalid property: " + name)
+ if n != 1 {
+ err = ErrNotFound
} else {
value = fmt.Sprint(v.tLen(int(level)))
}
@@ -806,8 +944,8 @@ func (db *DB) GetProperty(name string) (value string, err error) {
value = "Compactions\n" +
" Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" +
"-------+------------+---------------+---------------+---------------+---------------\n"
- for level, tables := range v.tables {
- duration, read, write := db.compStats[level].get()
+ for level, tables := range v.levels {
+ duration, read, write := db.compStats.getStat(level)
if len(tables) == 0 && duration == 0 {
continue
}
@@ -816,10 +954,10 @@ func (db *DB) GetProperty(name string) (value string, err error) {
float64(read)/1048576.0, float64(write)/1048576.0)
}
case p == "sstables":
- for level, tables := range v.tables {
+ for level, tables := range v.levels {
value += fmt.Sprintf("--- level %d ---\n", level)
for _, t := range tables {
- value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.file.Num(), t.size, t.imin, t.imax)
+ value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.fd.Num, t.size, t.imin, t.imax)
}
}
case p == "blockpool":
@@ -837,7 +975,7 @@ func (db *DB) GetProperty(name string) (value string, err error) {
case p == "aliveiters":
value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters))
default:
- err = errors.New("leveldb: GetProperty: unknown property: " + name)
+ err = ErrNotFound
}
return
@@ -859,8 +997,8 @@ func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) {
sizes := make(Sizes, 0, len(ranges))
for _, r := range ranges {
- imin := newIkey(r.Start, kMaxSeq, ktSeek)
- imax := newIkey(r.Limit, kMaxSeq, ktSeek)
+ imin := makeIkey(nil, r.Start, kMaxSeq, ktSeek)
+ imax := makeIkey(nil, r.Limit, kMaxSeq, ktSeek)
start, err := v.offsetOf(imin)
if err != nil {
return nil, err
@@ -879,8 +1017,8 @@ func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) {
return sizes, nil
}
-// Close closes the DB. This will also releases any outstanding snapshot and
-// abort any in-flight compaction.
+// Close closes the DB. This will also releases any outstanding snapshot,
+// abort any in-flight compaction and discard open transaction.
//
// It is not safe to close a DB until all outstanding iterators are released.
// It is valid to call Close multiple times. Other methods should not be
@@ -900,17 +1038,27 @@ func (db *DB) Close() error {
var err error
select {
case err = <-db.compErrC:
+ if err == ErrReadOnly {
+ err = nil
+ }
default:
}
// Signal all goroutines.
close(db.closeC)
+ // Discard open transaction.
+ if db.tr != nil {
+ db.tr.Discard()
+ }
+
+ // Acquire writer lock.
+ db.writeLockC <- struct{}{}
+
// Wait for all gorotines to exit.
db.closeW.Wait()
- // Lock writer and closes journal.
- db.writeLockC <- struct{}{}
+ // Closes journal.
if db.journal != nil {
db.journal.Close()
db.journalWriter.Close()
@@ -937,8 +1085,6 @@ func (db *DB) Close() error {
db.frozenMem = nil
db.journal = nil
db.journalWriter = nil
- db.journalFile = nil
- db.frozenJournalFile = nil
db.closer = nil
return err