aboutsummaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-04-28 17:18:01 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-04-28 17:18:01 +0800
commit7e3b080f8517731db774d5d2587b9ded4f9716e0 (patch)
treec27488e8e84dacaece8b07458e187906b7940384 /Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
parent182d484aa70bcd5b22117f02333b1fd3b1535dcb (diff)
downloadgo-tangerine-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar
go-tangerine-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar.gz
go-tangerine-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar.bz2
go-tangerine-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar.lz
go-tangerine-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar.xz
go-tangerine-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar.zst
go-tangerine-7e3b080f8517731db774d5d2587b9ded4f9716e0.zip
godeps: update leveldb and snappy, dump serpent-go
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go')
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go767
1 files changed, 457 insertions, 310 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
index c82bd9f28..447407aba 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
@@ -7,11 +7,12 @@
package leveldb
import (
- "errors"
"sync"
"time"
+ "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/memdb"
+ "github.com/syndtr/goleveldb/leveldb/opt"
)
var (
@@ -68,13 +69,13 @@ type cMem struct {
}
func newCMem(s *session) *cMem {
- return &cMem{s: s, rec: new(sessionRecord)}
+ return &cMem{s: s, rec: &sessionRecord{numLevel: s.o.GetNumLevel()}}
}
func (c *cMem) flush(mem *memdb.DB, level int) error {
s := c.s
- // Write memdb to table
+ // Write memdb to table.
iter := mem.NewIterator(nil)
defer iter.Release()
t, n, err := s.tops.createFrom(iter)
@@ -82,51 +83,85 @@ func (c *cMem) flush(mem *memdb.DB, level int) error {
return err
}
+ // Pick level.
if level < 0 {
- level = s.version_NB().pickLevel(t.min.ukey(), t.max.ukey())
+ v := s.version()
+ level = v.pickLevel(t.imin.ukey(), t.imax.ukey())
+ v.release()
}
c.rec.addTableFile(level, t)
- s.logf("mem@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.min, t.max)
+ s.logf("mem@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax)
c.level = level
return nil
}
func (c *cMem) reset() {
- c.rec = new(sessionRecord)
+ c.rec = &sessionRecord{numLevel: c.s.o.GetNumLevel()}
}
func (c *cMem) commit(journal, seq uint64) error {
c.rec.setJournalNum(journal)
- c.rec.setSeq(seq)
- // Commit changes
+ c.rec.setSeqNum(seq)
+
+ // Commit changes.
return c.s.commit(c.rec)
}
-func (d *DB) compactionError() {
- var err error
+func (db *DB) compactionError() {
+ var (
+ err error
+ wlocked bool
+ )
noerr:
+ // No error.
for {
select {
- case _, _ = <-d.closeC:
- return
- case err = <-d.compErrSetC:
- if err != nil {
+ case err = <-db.compErrSetC:
+ switch {
+ case err == nil:
+ case errors.IsCorrupted(err):
+ goto hasperr
+ default:
goto haserr
}
+ case _, _ = <-db.closeC:
+ return
}
}
haserr:
+ // Transient error.
for {
select {
- case _, _ = <-d.closeC:
- return
- case err = <-d.compErrSetC:
- if err == nil {
+ case db.compErrC <- err:
+ case err = <-db.compErrSetC:
+ switch {
+ case err == nil:
goto noerr
+ case errors.IsCorrupted(err):
+ goto hasperr
+ default:
}
- case d.compErrC <- err:
+ case _, _ = <-db.closeC:
+ return
+ }
+ }
+hasperr:
+ // Persistent error.
+ for {
+ select {
+ case db.compErrC <- err:
+ case db.compPerErrC <- err:
+ case db.writeLockC <- struct{}{}:
+ // Hold write lock, so that write won't pass-through.
+ wlocked = true
+ case _, _ = <-db.closeC:
+ if wlocked {
+ // We should release the lock or Close will hang.
+ <-db.writeLockC
+ }
+ return
}
}
}
@@ -137,114 +172,159 @@ func (cnt *compactionTransactCounter) incr() {
*cnt++
}
-func (d *DB) compactionTransact(name string, exec func(cnt *compactionTransactCounter) error, rollback func() error) {
- s := d.s
+type compactionTransactInterface interface {
+ run(cnt *compactionTransactCounter) error
+ revert() error
+}
+
+func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
defer func() {
if x := recover(); x != nil {
- if x == errCompactionTransactExiting && rollback != nil {
- if err := rollback(); err != nil {
- s.logf("%s rollback error %q", name, err)
+ if x == errCompactionTransactExiting {
+ if err := t.revert(); err != nil {
+ db.logf("%s revert error %q", name, err)
}
}
panic(x)
}
}()
+
const (
backoffMin = 1 * time.Second
backoffMax = 8 * time.Second
backoffMul = 2 * time.Second
)
- backoff := backoffMin
- backoffT := time.NewTimer(backoff)
- lastCnt := compactionTransactCounter(0)
+ var (
+ backoff = backoffMin
+ backoffT = time.NewTimer(backoff)
+ lastCnt = compactionTransactCounter(0)
+
+ disableBackoff = db.s.o.GetDisableCompactionBackoff()
+ )
for n := 0; ; n++ {
// Check wether the DB is closed.
- if d.isClosed() {
- s.logf("%s exiting", name)
- d.compactionExitTransact()
+ if db.isClosed() {
+ db.logf("%s exiting", name)
+ db.compactionExitTransact()
} else if n > 0 {
- s.logf("%s retrying N·%d", name, n)
+ db.logf("%s retrying N·%d", name, n)
}
// Execute.
cnt := compactionTransactCounter(0)
- err := exec(&cnt)
+ err := t.run(&cnt)
+ if err != nil {
+ db.logf("%s error I·%d %q", name, cnt, err)
+ }
// Set compaction error status.
select {
- case d.compErrSetC <- err:
- case _, _ = <-d.closeC:
- s.logf("%s exiting", name)
- d.compactionExitTransact()
+ case db.compErrSetC <- err:
+ case perr := <-db.compPerErrC:
+ if err != nil {
+ db.logf("%s exiting (persistent error %q)", name, perr)
+ db.compactionExitTransact()
+ }
+ case _, _ = <-db.closeC:
+ db.logf("%s exiting", name)
+ db.compactionExitTransact()
}
if err == nil {
return
}
- s.logf("%s error I·%d %q", name, cnt, err)
-
- // Reset backoff duration if counter is advancing.
- if cnt > lastCnt {
- backoff = backoffMin
- lastCnt = cnt
+ if errors.IsCorrupted(err) {
+ db.logf("%s exiting (corruption detected)", name)
+ db.compactionExitTransact()
}
- // Backoff.
- backoffT.Reset(backoff)
- if backoff < backoffMax {
- backoff *= backoffMul
- if backoff > backoffMax {
- backoff = backoffMax
+ if !disableBackoff {
+ // Reset backoff duration if counter is advancing.
+ if cnt > lastCnt {
+ backoff = backoffMin
+ lastCnt = cnt
+ }
+
+ // Backoff.
+ backoffT.Reset(backoff)
+ if backoff < backoffMax {
+ backoff *= backoffMul
+ if backoff > backoffMax {
+ backoff = backoffMax
+ }
+ }
+ select {
+ case <-backoffT.C:
+ case _, _ = <-db.closeC:
+ db.logf("%s exiting", name)
+ db.compactionExitTransact()
}
}
- select {
- case <-backoffT.C:
- case _, _ = <-d.closeC:
- s.logf("%s exiting", name)
- d.compactionExitTransact()
- }
}
}
-func (d *DB) compactionExitTransact() {
+type compactionTransactFunc struct {
+ runFunc func(cnt *compactionTransactCounter) error
+ revertFunc func() error
+}
+
+func (t *compactionTransactFunc) run(cnt *compactionTransactCounter) error {
+ return t.runFunc(cnt)
+}
+
+func (t *compactionTransactFunc) revert() error {
+ if t.revertFunc != nil {
+ return t.revertFunc()
+ }
+ return nil
+}
+
+func (db *DB) compactionTransactFunc(name string, run func(cnt *compactionTransactCounter) error, revert func() error) {
+ db.compactionTransact(name, &compactionTransactFunc{run, revert})
+}
+
+func (db *DB) compactionExitTransact() {
panic(errCompactionTransactExiting)
}
-func (d *DB) memCompaction() {
- mem := d.getFrozenMem()
+func (db *DB) memCompaction() {
+ mem := db.getFrozenMem()
if mem == nil {
return
}
+ defer mem.decref()
- s := d.s
- c := newCMem(s)
+ c := newCMem(db.s)
stats := new(cStatsStaging)
- s.logf("mem@flush N·%d S·%s", mem.Len(), shortenb(mem.Size()))
+ db.logf("mem@flush N·%d S·%s", mem.mdb.Len(), shortenb(mem.mdb.Size()))
// Don't compact empty memdb.
- if mem.Len() == 0 {
- s.logf("mem@flush skipping")
+ if mem.mdb.Len() == 0 {
+ db.logf("mem@flush skipping")
// drop frozen mem
- d.dropFrozenMem()
+ db.dropFrozenMem()
return
}
// Pause table compaction.
- ch := make(chan struct{})
+ resumeC := make(chan struct{})
select {
- case d.tcompPauseC <- (chan<- struct{})(ch):
- case _, _ = <-d.closeC:
+ case db.tcompPauseC <- (chan<- struct{})(resumeC):
+ case <-db.compPerErrC:
+ close(resumeC)
+ resumeC = nil
+ case _, _ = <-db.closeC:
return
}
- d.compactionTransact("mem@flush", func(cnt *compactionTransactCounter) (err error) {
+ db.compactionTransactFunc("mem@flush", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer()
defer stats.stopTimer()
- return c.flush(mem, -1)
+ return c.flush(mem.mdb, -1)
}, func() error {
for _, r := range c.rec.addedTables {
- s.logf("mem@flush rollback @%d", r.num)
- f := s.getTableFile(r.num)
+ db.logf("mem@flush revert @%d", r.num)
+ f := db.s.getTableFile(r.num)
if err := f.Remove(); err != nil {
return err
}
@@ -252,279 +332,327 @@ func (d *DB) memCompaction() {
return nil
})
- d.compactionTransact("mem@commit", func(cnt *compactionTransactCounter) (err error) {
+ db.compactionTransactFunc("mem@commit", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer()
defer stats.stopTimer()
- return c.commit(d.journalFile.Num(), d.frozenSeq)
+ return c.commit(db.journalFile.Num(), db.frozenSeq)
}, nil)
- s.logf("mem@flush commited F·%d T·%v", len(c.rec.addedTables), stats.duration)
+ db.logf("mem@flush committed F·%d T·%v", len(c.rec.addedTables), stats.duration)
for _, r := range c.rec.addedTables {
stats.write += r.size
}
- d.compStats[c.level].add(stats)
+ db.compStats[c.level].add(stats)
// Drop frozen mem.
- d.dropFrozenMem()
+ db.dropFrozenMem()
// Resume table compaction.
- select {
- case <-ch:
- case _, _ = <-d.closeC:
- return
+ if resumeC != nil {
+ select {
+ case <-resumeC:
+ close(resumeC)
+ case _, _ = <-db.closeC:
+ return
+ }
}
// Trigger table compaction.
- d.compTrigger(d.mcompTriggerC)
+ db.compSendTrigger(db.tcompCmdC)
}
-func (d *DB) tableCompaction(c *compaction, noTrivial bool) {
- s := d.s
+type tableCompactionBuilder struct {
+ db *DB
+ s *session
+ c *compaction
+ rec *sessionRecord
+ stat0, stat1 *cStatsStaging
- rec := new(sessionRecord)
- rec.addCompactionPointer(c.level, c.max)
+ snapHasLastUkey bool
+ snapLastUkey []byte
+ snapLastSeq uint64
+ snapIter int
+ snapKerrCnt int
+ snapDropCnt int
- if !noTrivial && c.trivial() {
- t := c.tables[0][0]
- s.logf("table@move L%d@%d -> L%d", c.level, t.file.Num(), c.level+1)
- rec.deleteTable(c.level, t.file.Num())
- rec.addTableFile(c.level+1, t)
- d.compactionTransact("table@move", func(cnt *compactionTransactCounter) (err error) {
- return s.commit(rec)
- }, nil)
- return
- }
+ kerrCnt int
+ dropCnt int
- var stats [2]cStatsStaging
- for i, tt := range c.tables {
- for _, t := range tt {
- stats[i].read += t.size
- // Insert deleted tables into record
- rec.deleteTable(c.level+i, t.file.Num())
- }
- }
- sourceSize := int(stats[0].read + stats[1].read)
- minSeq := d.minSeq()
- s.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.level, len(c.tables[0]), c.level+1, len(c.tables[1]), shortenb(sourceSize), minSeq)
-
- var snapUkey []byte
- var snapHasUkey bool
- var snapSeq uint64
- var snapIter int
- var snapDropCnt int
- var dropCnt int
- d.compactionTransact("table@build", func(cnt *compactionTransactCounter) (err error) {
- ukey := append([]byte{}, snapUkey...)
- hasUkey := snapHasUkey
- lseq := snapSeq
- dropCnt = snapDropCnt
- snapSched := snapIter == 0
-
- var tw *tWriter
- finish := func() error {
- t, err := tw.finish()
- if err != nil {
- return err
+ minSeq uint64
+ strict bool
+ tableSize int
+
+ tw *tWriter
+}
+
+func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
+ // Create new table if not already.
+ if b.tw == nil {
+ // Check for pause event.
+ if b.db != nil {
+ select {
+ case ch := <-b.db.tcompPauseC:
+ b.db.pauseCompaction(ch)
+ case _, _ = <-b.db.closeC:
+ b.db.compactionExitTransact()
+ default:
}
- rec.addTableFile(c.level+1, t)
- stats[1].write += t.size
- s.logf("table@build created L%d@%d N·%d S·%s %q:%q", c.level+1, t.file.Num(), tw.tw.EntriesLen(), shortenb(int(t.size)), t.min, t.max)
- return nil
}
- defer func() {
- stats[1].stopTimer()
- if tw != nil {
- tw.drop()
- tw = nil
- }
- }()
+ // Create new table.
+ var err error
+ b.tw, err = b.s.tops.create()
+ if err != nil {
+ return err
+ }
+ }
- stats[1].startTimer()
- iter := c.newIterator()
- defer iter.Release()
- for i := 0; iter.Next(); i++ {
- // Incr transact counter.
- cnt.incr()
-
- // Skip until last state.
- if i < snapIter {
- continue
- }
+ // Write key/value into table.
+ return b.tw.append(key, value)
+}
- key := iKey(iter.Key())
+func (b *tableCompactionBuilder) needFlush() bool {
+ return b.tw.tw.BytesLen() >= b.tableSize
+}
- if c.shouldStopBefore(key) && tw != nil {
- err = finish()
- if err != nil {
- return
- }
- snapSched = true
- tw = nil
- }
+func (b *tableCompactionBuilder) flush() error {
+ t, err := b.tw.finish()
+ if err != nil {
+ return err
+ }
+ b.rec.addTableFile(b.c.level+1, t)
+ b.stat1.write += t.size
+ b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.level+1, t.file.Num(), b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
+ b.tw = nil
+ return nil
+}
- // Scheduled for snapshot, snapshot will used to retry compaction
- // if error occured.
- if snapSched {
- snapUkey = append(snapUkey[:0], ukey...)
- snapHasUkey = hasUkey
- snapSeq = lseq
- snapIter = i
- snapDropCnt = dropCnt
- snapSched = false
- }
+func (b *tableCompactionBuilder) cleanup() {
+ if b.tw != nil {
+ b.tw.drop()
+ b.tw = nil
+ }
+}
- if seq, t, ok := key.parseNum(); !ok {
- // Don't drop error keys
- ukey = ukey[:0]
- hasUkey = false
- lseq = kMaxSeq
- } else {
- if !hasUkey || s.icmp.uCompare(key.ukey(), ukey) != 0 {
- // First occurrence of this user key
- ukey = append(ukey[:0], key.ukey()...)
- hasUkey = true
- lseq = kMaxSeq
- }
+func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error {
+ snapResumed := b.snapIter > 0
+ hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary.
+ lastUkey := append([]byte{}, b.snapLastUkey...)
+ lastSeq := b.snapLastSeq
+ b.kerrCnt = b.snapKerrCnt
+ b.dropCnt = b.snapDropCnt
+ // Restore compaction state.
+ b.c.restore()
- drop := false
- if lseq <= minSeq {
- // Dropped because newer entry for same user key exist
- drop = true // (A)
- } else if t == tDel && seq <= minSeq && c.isBaseLevelForKey(ukey) {
- // For this user key:
- // (1) there is no data in higher levels
- // (2) data in lower levels will have larger seq numbers
- // (3) data in layers that are being compacted here and have
- // smaller seq numbers will be dropped in the next
- // few iterations of this loop (by rule (A) above).
- // Therefore this deletion marker is obsolete and can be dropped.
- drop = true
- }
+ defer b.cleanup()
- lseq = seq
- if drop {
- dropCnt++
- continue
- }
- }
+ b.stat1.startTimer()
+ defer b.stat1.stopTimer()
- // Create new table if not already
- if tw == nil {
- // Check for pause event.
- select {
- case ch := <-d.tcompPauseC:
- d.pauseCompaction(ch)
- case _, _ = <-d.closeC:
- d.compactionExitTransact()
- default:
- }
+ iter := b.c.newIterator()
+ defer iter.Release()
+ for i := 0; iter.Next(); i++ {
+ // Incr transact counter.
+ cnt.incr()
+
+ // Skip until last state.
+ if i < b.snapIter {
+ continue
+ }
- // Create new table.
- tw, err = s.tops.create()
- if err != nil {
- return
+ resumed := false
+ if snapResumed {
+ resumed = true
+ snapResumed = false
+ }
+
+ ikey := iter.Key()
+ ukey, seq, kt, kerr := parseIkey(ikey)
+
+ if kerr == nil {
+ shouldStop := !resumed && b.c.shouldStopBefore(ikey)
+
+ if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 {
+ // First occurrence of this user key.
+
+ // Only rotate tables if ukey doesn't hop across.
+ if b.tw != nil && (shouldStop || b.needFlush()) {
+ if err := b.flush(); err != nil {
+ return err
+ }
+
+ // Creates snapshot of the state.
+ b.c.save()
+ b.snapHasLastUkey = hasLastUkey
+ b.snapLastUkey = append(b.snapLastUkey[:0], lastUkey...)
+ b.snapLastSeq = lastSeq
+ b.snapIter = i
+ b.snapKerrCnt = b.kerrCnt
+ b.snapDropCnt = b.dropCnt
}
- }
- // Write key/value into table
- err = tw.add(key, iter.Value())
- if err != nil {
- return
+ hasLastUkey = true
+ lastUkey = append(lastUkey[:0], ukey...)
+ lastSeq = kMaxSeq
}
- // Finish table if it is big enough
- if tw.tw.BytesLen() >= kMaxTableSize {
- err = finish()
- if err != nil {
- return
- }
- snapSched = true
- tw = nil
+ switch {
+ case lastSeq <= b.minSeq:
+ // Dropped because newer entry for same user key exist
+ fallthrough // (A)
+ case kt == ktDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey):
+ // For this user key:
+ // (1) there is no data in higher levels
+ // (2) data in lower levels will have larger seq numbers
+ // (3) data in layers that are being compacted here and have
+ // smaller seq numbers will be dropped in the next
+ // few iterations of this loop (by rule (A) above).
+ // Therefore this deletion marker is obsolete and can be dropped.
+ lastSeq = seq
+ b.dropCnt++
+ continue
+ default:
+ lastSeq = seq
+ }
+ } else {
+ if b.strict {
+ return kerr
}
+
+ // Don't drop corrupted keys.
+ hasLastUkey = false
+ lastUkey = lastUkey[:0]
+ lastSeq = kMaxSeq
+ b.kerrCnt++
}
- err = iter.Error()
- if err != nil {
- return
+ if err := b.appendKV(ikey, iter.Value()); err != nil {
+ return err
}
+ }
- // Finish last table
- if tw != nil && !tw.empty() {
- err = finish()
- if err != nil {
- return
- }
- tw = nil
+ if err := iter.Error(); err != nil {
+ return err
+ }
+
+ // Finish last table.
+ if b.tw != nil && !b.tw.empty() {
+ return b.flush()
+ }
+ return nil
+}
+
+func (b *tableCompactionBuilder) revert() error {
+ for _, at := range b.rec.addedTables {
+ b.s.logf("table@build revert @%d", at.num)
+ f := b.s.getTableFile(at.num)
+ if err := f.Remove(); err != nil {
+ return err
}
+ }
+ return nil
+}
+
+func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
+ defer c.release()
+
+ rec := &sessionRecord{numLevel: db.s.o.GetNumLevel()}
+ rec.addCompPtr(c.level, c.imax)
+
+ if !noTrivial && c.trivial() {
+ t := c.tables[0][0]
+ db.logf("table@move L%d@%d -> L%d", c.level, t.file.Num(), c.level+1)
+ rec.delTable(c.level, t.file.Num())
+ rec.addTableFile(c.level+1, t)
+ db.compactionTransactFunc("table@move", func(cnt *compactionTransactCounter) (err error) {
+ return db.s.commit(rec)
+ }, nil)
return
- }, func() error {
- for _, r := range rec.addedTables {
- s.logf("table@build rollback @%d", r.num)
- f := s.getTableFile(r.num)
- if err := f.Remove(); err != nil {
- return err
- }
+ }
+
+ var stats [2]cStatsStaging
+ for i, tables := range c.tables {
+ for _, t := range tables {
+ stats[i].read += t.size
+ // Insert deleted tables into record
+ rec.delTable(c.level+i, t.file.Num())
}
- return nil
- })
+ }
+ sourceSize := int(stats[0].read + stats[1].read)
+ minSeq := db.minSeq()
+ db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.level, len(c.tables[0]), c.level+1, len(c.tables[1]), shortenb(sourceSize), minSeq)
+
+ b := &tableCompactionBuilder{
+ db: db,
+ s: db.s,
+ c: c,
+ rec: rec,
+ stat1: &stats[1],
+ minSeq: minSeq,
+ strict: db.s.o.GetStrict(opt.StrictCompaction),
+ tableSize: db.s.o.GetCompactionTableSize(c.level + 1),
+ }
+ db.compactionTransact("table@build", b)
// Commit changes
- d.compactionTransact("table@commit", func(cnt *compactionTransactCounter) (err error) {
+ db.compactionTransactFunc("table@commit", func(cnt *compactionTransactCounter) (err error) {
stats[1].startTimer()
defer stats[1].stopTimer()
- return s.commit(rec)
+ return db.s.commit(rec)
}, nil)
- resultSize := int(int(stats[1].write))
- s.logf("table@compaction commited F%s S%s D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), dropCnt, stats[1].duration)
+ resultSize := int(stats[1].write)
+ db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)
// Save compaction stats
for i := range stats {
- d.compStats[c.level+1].add(&stats[i])
+ db.compStats[c.level+1].add(&stats[i])
}
}
-func (d *DB) tableRangeCompaction(level int, min, max []byte) {
- s := d.s
- s.logf("table@compaction range L%d %q:%q", level, min, max)
+func (db *DB) tableRangeCompaction(level int, umin, umax []byte) {
+ db.logf("table@compaction range L%d %q:%q", level, umin, umax)
if level >= 0 {
- if c := s.getCompactionRange(level, min, max); c != nil {
- d.tableCompaction(c, true)
+ if c := db.s.getCompactionRange(level, umin, umax); c != nil {
+ db.tableCompaction(c, true)
}
} else {
- v := s.version_NB()
+ v := db.s.version()
m := 1
for i, t := range v.tables[1:] {
- if t.isOverlaps(min, max, true, s.icmp) {
+ if t.overlaps(db.s.icmp, umin, umax, false) {
m = i + 1
}
}
+ v.release()
+
for level := 0; level < m; level++ {
- if c := s.getCompactionRange(level, min, max); c != nil {
- d.tableCompaction(c, true)
+ if c := db.s.getCompactionRange(level, umin, umax); c != nil {
+ db.tableCompaction(c, true)
}
}
}
}
-func (d *DB) tableAutoCompaction() {
- if c := d.s.pickCompaction(); c != nil {
- d.tableCompaction(c, false)
+func (db *DB) tableAutoCompaction() {
+ if c := db.s.pickCompaction(); c != nil {
+ db.tableCompaction(c, false)
}
}
-func (d *DB) tableNeedCompaction() bool {
- return d.s.version_NB().needCompaction()
+func (db *DB) tableNeedCompaction() bool {
+ v := db.s.version()
+ defer v.release()
+ return v.needCompaction()
}
-func (d *DB) pauseCompaction(ch chan<- struct{}) {
+func (db *DB) pauseCompaction(ch chan<- struct{}) {
select {
case ch <- struct{}{}:
- case _, _ = <-d.closeC:
- d.compactionExitTransact()
+ case _, _ = <-db.closeC:
+ db.compactionExitTransact()
}
}
@@ -537,7 +665,12 @@ type cIdle struct {
}
func (r cIdle) ack(err error) {
- r.ackC <- err
+ if r.ackC != nil {
+ defer func() {
+ recover()
+ }()
+ r.ackC <- err
+ }
}
type cRange struct {
@@ -547,56 +680,67 @@ type cRange struct {
}
func (r cRange) ack(err error) {
- defer func() {
- recover()
- }()
if r.ackC != nil {
+ defer func() {
+ recover()
+ }()
r.ackC <- err
}
}
-func (d *DB) compSendIdle(compC chan<- cCmd) error {
+// This will trigger auto compation and/or wait for all compaction to be done.
+func (db *DB) compSendIdle(compC chan<- cCmd) (err error) {
ch := make(chan error)
defer close(ch)
// Send cmd.
select {
case compC <- cIdle{ch}:
- case err := <-d.compErrC:
- return err
- case _, _ = <-d.closeC:
+ case err = <-db.compErrC:
+ return
+ case _, _ = <-db.closeC:
return ErrClosed
}
// Wait cmd.
- return <-ch
+ select {
+ case err = <-ch:
+ case err = <-db.compErrC:
+ case _, _ = <-db.closeC:
+ return ErrClosed
+ }
+ return err
}
-func (d *DB) compSendRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
+// This will trigger auto compaction but will not wait for it.
+func (db *DB) compSendTrigger(compC chan<- cCmd) {
+ select {
+ case compC <- cIdle{}:
+ default:
+ }
+}
+
+// Send range compaction request.
+func (db *DB) compSendRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
ch := make(chan error)
defer close(ch)
// Send cmd.
select {
case compC <- cRange{level, min, max, ch}:
- case err := <-d.compErrC:
+ case err := <-db.compErrC:
return err
- case _, _ = <-d.closeC:
+ case _, _ = <-db.closeC:
return ErrClosed
}
// Wait cmd.
select {
- case err = <-d.compErrC:
case err = <-ch:
+ case err = <-db.compErrC:
+ case _, _ = <-db.closeC:
+ return ErrClosed
}
return err
}
-func (d *DB) compTrigger(compTriggerC chan struct{}) {
- select {
- case compTriggerC <- struct{}{}:
- default:
- }
-}
-
-func (d *DB) mCompaction() {
+func (db *DB) mCompaction() {
var x cCmd
defer func() {
@@ -608,24 +752,27 @@ func (d *DB) mCompaction() {
if x != nil {
x.ack(ErrClosed)
}
- d.closeW.Done()
+ db.closeW.Done()
}()
for {
select {
- case _, _ = <-d.closeC:
+ case x = <-db.mcompCmdC:
+ switch x.(type) {
+ case cIdle:
+ db.memCompaction()
+ x.ack(nil)
+ x = nil
+ default:
+ panic("leveldb: unknown command")
+ }
+ case _, _ = <-db.closeC:
return
- case x = <-d.mcompCmdC:
- d.memCompaction()
- x.ack(nil)
- x = nil
- case <-d.mcompTriggerC:
- d.memCompaction()
}
}
}
-func (d *DB) tCompaction() {
+func (db *DB) tCompaction() {
var x cCmd
var ackQ []cCmd
@@ -642,19 +789,18 @@ func (d *DB) tCompaction() {
if x != nil {
x.ack(ErrClosed)
}
- d.closeW.Done()
+ db.closeW.Done()
}()
for {
- if d.tableNeedCompaction() {
+ if db.tableNeedCompaction() {
select {
- case x = <-d.tcompCmdC:
- case <-d.tcompTriggerC:
- case _, _ = <-d.closeC:
- return
- case ch := <-d.tcompPauseC:
- d.pauseCompaction(ch)
+ case x = <-db.tcompCmdC:
+ case ch := <-db.tcompPauseC:
+ db.pauseCompaction(ch)
continue
+ case _, _ = <-db.closeC:
+ return
default:
}
} else {
@@ -664,12 +810,11 @@ func (d *DB) tCompaction() {
}
ackQ = ackQ[:0]
select {
- case x = <-d.tcompCmdC:
- case <-d.tcompTriggerC:
- case ch := <-d.tcompPauseC:
- d.pauseCompaction(ch)
+ case x = <-db.tcompCmdC:
+ case ch := <-db.tcompPauseC:
+ db.pauseCompaction(ch)
continue
- case _, _ = <-d.closeC:
+ case _, _ = <-db.closeC:
return
}
}
@@ -678,11 +823,13 @@ func (d *DB) tCompaction() {
case cIdle:
ackQ = append(ackQ, x)
case cRange:
- d.tableRangeCompaction(cmd.level, cmd.min, cmd.max)
+ db.tableRangeCompaction(cmd.level, cmd.min, cmd.max)
x.ack(nil)
+ default:
+ panic("leveldb: unknown command")
}
x = nil
}
- d.tableAutoCompaction()
+ db.tableAutoCompaction()
}
}