aboutsummaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
diff options
context:
space:
mode:
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go')
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go281
1 files changed, 137 insertions, 144 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
index 447407aba..a94cf4c84 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
@@ -11,109 +11,79 @@ import (
"time"
"github.com/syndtr/goleveldb/leveldb/errors"
- "github.com/syndtr/goleveldb/leveldb/memdb"
"github.com/syndtr/goleveldb/leveldb/opt"
+ "github.com/syndtr/goleveldb/leveldb/storage"
)
var (
errCompactionTransactExiting = errors.New("leveldb: compaction transact exiting")
)
-type cStats struct {
- sync.Mutex
+type cStat struct {
duration time.Duration
- read uint64
- write uint64
+ read int64
+ write int64
}
-func (p *cStats) add(n *cStatsStaging) {
- p.Lock()
+func (p *cStat) add(n *cStatStaging) {
p.duration += n.duration
p.read += n.read
p.write += n.write
- p.Unlock()
}
-func (p *cStats) get() (duration time.Duration, read, write uint64) {
- p.Lock()
- defer p.Unlock()
+func (p *cStat) get() (duration time.Duration, read, write int64) {
return p.duration, p.read, p.write
}
-type cStatsStaging struct {
+type cStatStaging struct {
start time.Time
duration time.Duration
on bool
- read uint64
- write uint64
+ read int64
+ write int64
}
-func (p *cStatsStaging) startTimer() {
+func (p *cStatStaging) startTimer() {
if !p.on {
p.start = time.Now()
p.on = true
}
}
-func (p *cStatsStaging) stopTimer() {
+func (p *cStatStaging) stopTimer() {
if p.on {
p.duration += time.Since(p.start)
p.on = false
}
}
-type cMem struct {
- s *session
- level int
- rec *sessionRecord
-}
-
-func newCMem(s *session) *cMem {
- return &cMem{s: s, rec: &sessionRecord{numLevel: s.o.GetNumLevel()}}
+type cStats struct {
+ lk sync.Mutex
+ stats []cStat
}
-func (c *cMem) flush(mem *memdb.DB, level int) error {
- s := c.s
-
- // Write memdb to table.
- iter := mem.NewIterator(nil)
- defer iter.Release()
- t, n, err := s.tops.createFrom(iter)
- if err != nil {
- return err
- }
-
- // Pick level.
- if level < 0 {
- v := s.version()
- level = v.pickLevel(t.imin.ukey(), t.imax.ukey())
- v.release()
+func (p *cStats) addStat(level int, n *cStatStaging) {
+ p.lk.Lock()
+ if level >= len(p.stats) {
+ newStats := make([]cStat, level+1)
+ copy(newStats, p.stats)
+ p.stats = newStats
}
- c.rec.addTableFile(level, t)
-
- s.logf("mem@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax)
-
- c.level = level
- return nil
+ p.stats[level].add(n)
+ p.lk.Unlock()
}
-func (c *cMem) reset() {
- c.rec = &sessionRecord{numLevel: c.s.o.GetNumLevel()}
-}
-
-func (c *cMem) commit(journal, seq uint64) error {
- c.rec.setJournalNum(journal)
- c.rec.setSeqNum(seq)
-
- // Commit changes.
- return c.s.commit(c.rec)
+func (p *cStats) getStat(level int) (duration time.Duration, read, write int64) {
+ p.lk.Lock()
+ defer p.lk.Unlock()
+ if level < len(p.stats) {
+ return p.stats[level].get()
+ }
+ return
}
func (db *DB) compactionError() {
- var (
- err error
- wlocked bool
- )
+ var err error
noerr:
// No error.
for {
@@ -121,7 +91,7 @@ noerr:
case err = <-db.compErrSetC:
switch {
case err == nil:
- case errors.IsCorrupted(err):
+ case err == ErrReadOnly, errors.IsCorrupted(err):
goto hasperr
default:
goto haserr
@@ -139,7 +109,7 @@ haserr:
switch {
case err == nil:
goto noerr
- case errors.IsCorrupted(err):
+ case err == ErrReadOnly, errors.IsCorrupted(err):
goto hasperr
default:
}
@@ -155,9 +125,9 @@ hasperr:
case db.compPerErrC <- err:
case db.writeLockC <- struct{}{}:
// Hold write lock, so that write won't pass-through.
- wlocked = true
+ db.compWriteLocking = true
case _, _ = <-db.closeC:
- if wlocked {
+ if db.compWriteLocking {
// We should release the lock or Close will hang.
<-db.writeLockC
}
@@ -286,22 +256,27 @@ func (db *DB) compactionExitTransact() {
panic(errCompactionTransactExiting)
}
+func (db *DB) compactionCommit(name string, rec *sessionRecord) {
+ db.compCommitLk.Lock()
+ defer db.compCommitLk.Unlock() // Defer is necessary.
+ db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error {
+ return db.s.commit(rec)
+ }, nil)
+}
+
func (db *DB) memCompaction() {
- mem := db.getFrozenMem()
- if mem == nil {
+ mdb := db.getFrozenMem()
+ if mdb == nil {
return
}
- defer mem.decref()
-
- c := newCMem(db.s)
- stats := new(cStatsStaging)
+ defer mdb.decref()
- db.logf("mem@flush N·%d S·%s", mem.mdb.Len(), shortenb(mem.mdb.Size()))
+ db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size()))
// Don't compact empty memdb.
- if mem.mdb.Len() == 0 {
- db.logf("mem@flush skipping")
- // drop frozen mem
+ if mdb.Len() == 0 {
+ db.logf("memdb@flush skipping")
+ // drop frozen memdb
db.dropFrozenMem()
return
}
@@ -317,35 +292,44 @@ func (db *DB) memCompaction() {
return
}
- db.compactionTransactFunc("mem@flush", func(cnt *compactionTransactCounter) (err error) {
+ var (
+ rec = &sessionRecord{}
+ stats = &cStatStaging{}
+ flushLevel int
+ )
+
+ // Generate tables.
+ db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer()
- defer stats.stopTimer()
- return c.flush(mem.mdb, -1)
+ flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
+ stats.stopTimer()
+ return
}, func() error {
- for _, r := range c.rec.addedTables {
- db.logf("mem@flush revert @%d", r.num)
- f := db.s.getTableFile(r.num)
- if err := f.Remove(); err != nil {
+ for _, r := range rec.addedTables {
+ db.logf("memdb@flush revert @%d", r.num)
+ if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil {
return err
}
}
return nil
})
- db.compactionTransactFunc("mem@commit", func(cnt *compactionTransactCounter) (err error) {
- stats.startTimer()
- defer stats.stopTimer()
- return c.commit(db.journalFile.Num(), db.frozenSeq)
- }, nil)
+ rec.setJournalNum(db.journalFd.Num)
+ rec.setSeqNum(db.frozenSeq)
+
+ // Commit.
+ stats.startTimer()
+ db.compactionCommit("memdb", rec)
+ stats.stopTimer()
- db.logf("mem@flush committed F·%d T·%v", len(c.rec.addedTables), stats.duration)
+ db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)
- for _, r := range c.rec.addedTables {
+ for _, r := range rec.addedTables {
stats.write += r.size
}
- db.compStats[c.level].add(stats)
+ db.compStats.addStat(flushLevel, stats)
- // Drop frozen mem.
+ // Drop frozen memdb.
db.dropFrozenMem()
// Resume table compaction.
@@ -359,7 +343,7 @@ func (db *DB) memCompaction() {
}
// Trigger table compaction.
- db.compSendTrigger(db.tcompCmdC)
+ db.compTrigger(db.tcompCmdC)
}
type tableCompactionBuilder struct {
@@ -367,7 +351,7 @@ type tableCompactionBuilder struct {
s *session
c *compaction
rec *sessionRecord
- stat0, stat1 *cStatsStaging
+ stat0, stat1 *cStatStaging
snapHasLastUkey bool
snapLastUkey []byte
@@ -421,9 +405,9 @@ func (b *tableCompactionBuilder) flush() error {
if err != nil {
return err
}
- b.rec.addTableFile(b.c.level+1, t)
+ b.rec.addTableFile(b.c.sourceLevel+1, t)
b.stat1.write += t.size
- b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.level+1, t.file.Num(), b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
+ b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
b.tw = nil
return nil
}
@@ -546,8 +530,7 @@ func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error {
func (b *tableCompactionBuilder) revert() error {
for _, at := range b.rec.addedTables {
b.s.logf("table@build revert @%d", at.num)
- f := b.s.getTableFile(at.num)
- if err := f.Remove(); err != nil {
+ if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil {
return err
}
}
@@ -557,31 +540,31 @@ func (b *tableCompactionBuilder) revert() error {
func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
defer c.release()
- rec := &sessionRecord{numLevel: db.s.o.GetNumLevel()}
- rec.addCompPtr(c.level, c.imax)
+ rec := &sessionRecord{}
+ rec.addCompPtr(c.sourceLevel, c.imax)
if !noTrivial && c.trivial() {
- t := c.tables[0][0]
- db.logf("table@move L%d@%d -> L%d", c.level, t.file.Num(), c.level+1)
- rec.delTable(c.level, t.file.Num())
- rec.addTableFile(c.level+1, t)
+ t := c.levels[0][0]
+ db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1)
+ rec.delTable(c.sourceLevel, t.fd.Num)
+ rec.addTableFile(c.sourceLevel+1, t)
db.compactionTransactFunc("table@move", func(cnt *compactionTransactCounter) (err error) {
return db.s.commit(rec)
}, nil)
return
}
- var stats [2]cStatsStaging
- for i, tables := range c.tables {
+ var stats [2]cStatStaging
+ for i, tables := range c.levels {
for _, t := range tables {
stats[i].read += t.size
// Insert deleted tables into record
- rec.delTable(c.level+i, t.file.Num())
+ rec.delTable(c.sourceLevel+i, t.fd.Num)
}
}
sourceSize := int(stats[0].read + stats[1].read)
minSeq := db.minSeq()
- db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.level, len(c.tables[0]), c.level+1, len(c.tables[1]), shortenb(sourceSize), minSeq)
+ db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq)
b := &tableCompactionBuilder{
db: db,
@@ -591,49 +574,60 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
stat1: &stats[1],
minSeq: minSeq,
strict: db.s.o.GetStrict(opt.StrictCompaction),
- tableSize: db.s.o.GetCompactionTableSize(c.level + 1),
+ tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1),
}
db.compactionTransact("table@build", b)
- // Commit changes
- db.compactionTransactFunc("table@commit", func(cnt *compactionTransactCounter) (err error) {
- stats[1].startTimer()
- defer stats[1].stopTimer()
- return db.s.commit(rec)
- }, nil)
+ // Commit.
+ stats[1].startTimer()
+ db.compactionCommit("table", rec)
+ stats[1].stopTimer()
resultSize := int(stats[1].write)
db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)
// Save compaction stats
for i := range stats {
- db.compStats[c.level+1].add(&stats[i])
+ db.compStats.addStat(c.sourceLevel+1, &stats[i])
}
}
-func (db *DB) tableRangeCompaction(level int, umin, umax []byte) {
+func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error {
db.logf("table@compaction range L%d %q:%q", level, umin, umax)
-
if level >= 0 {
- if c := db.s.getCompactionRange(level, umin, umax); c != nil {
+ if c := db.s.getCompactionRange(level, umin, umax, true); c != nil {
db.tableCompaction(c, true)
}
} else {
- v := db.s.version()
- m := 1
- for i, t := range v.tables[1:] {
- if t.overlaps(db.s.icmp, umin, umax, false) {
- m = i + 1
+ // Retry until nothing to compact.
+ for {
+ compacted := false
+
+ // Scan for maximum level with overlapped tables.
+ v := db.s.version()
+ m := 1
+ for i := m; i < len(v.levels); i++ {
+ tables := v.levels[i]
+ if tables.overlaps(db.s.icmp, umin, umax, false) {
+ m = i
+ }
}
- }
- v.release()
+ v.release()
- for level := 0; level < m; level++ {
- if c := db.s.getCompactionRange(level, umin, umax); c != nil {
- db.tableCompaction(c, true)
+ for level := 0; level < m; level++ {
+ if c := db.s.getCompactionRange(level, umin, umax, false); c != nil {
+ db.tableCompaction(c, true)
+ compacted = true
+ }
+ }
+
+ if !compacted {
+ break
}
}
}
+
+ return nil
}
func (db *DB) tableAutoCompaction() {
@@ -660,11 +654,11 @@ type cCmd interface {
ack(err error)
}
-type cIdle struct {
+type cAuto struct {
ackC chan<- error
}
-func (r cIdle) ack(err error) {
+func (r cAuto) ack(err error) {
if r.ackC != nil {
defer func() {
recover()
@@ -688,13 +682,21 @@ func (r cRange) ack(err error) {
}
}
+// This will trigger auto compaction but will not wait for it.
+func (db *DB) compTrigger(compC chan<- cCmd) {
+ select {
+ case compC <- cAuto{}:
+ default:
+ }
+}
+
// This will trigger auto compation and/or wait for all compaction to be done.
-func (db *DB) compSendIdle(compC chan<- cCmd) (err error) {
+func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
ch := make(chan error)
defer close(ch)
// Send cmd.
select {
- case compC <- cIdle{ch}:
+ case compC <- cAuto{ch}:
case err = <-db.compErrC:
return
case _, _ = <-db.closeC:
@@ -710,16 +712,8 @@ func (db *DB) compSendIdle(compC chan<- cCmd) (err error) {
return err
}
-// This will trigger auto compaction but will not wait for it.
-func (db *DB) compSendTrigger(compC chan<- cCmd) {
- select {
- case compC <- cIdle{}:
- default:
- }
-}
-
// Send range compaction request.
-func (db *DB) compSendRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
+func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
ch := make(chan error)
defer close(ch)
// Send cmd.
@@ -759,7 +753,7 @@ func (db *DB) mCompaction() {
select {
case x = <-db.mcompCmdC:
switch x.(type) {
- case cIdle:
+ case cAuto:
db.memCompaction()
x.ack(nil)
x = nil
@@ -820,11 +814,10 @@ func (db *DB) tCompaction() {
}
if x != nil {
switch cmd := x.(type) {
- case cIdle:
+ case cAuto:
ackQ = append(ackQ, x)
case cRange:
- db.tableRangeCompaction(cmd.level, cmd.min, cmd.max)
- x.ack(nil)
+ x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
default:
panic("leveldb: unknown command")
}