diff options
Diffstat (limited to 'vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go')
-rw-r--r-- | vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go | 40 |
1 files changed, 37 insertions, 3 deletions
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go index b6563e87e..28e50906a 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go @@ -640,6 +640,16 @@ func (db *DB) tableNeedCompaction() bool { return v.needCompaction() } +// resumeWrite returns an indicator whether we should resume write operation if enough level0 files are compacted. +func (db *DB) resumeWrite() bool { + v := db.s.version() + defer v.release() + if v.tLen(0) < db.s.o.GetWriteL0PauseTrigger() { + return true + } + return false +} + func (db *DB) pauseCompaction(ch chan<- struct{}) { select { case ch <- struct{}{}: @@ -653,6 +663,7 @@ type cCmd interface { } type cAuto struct { + // Note for table compaction, an empty ackC represents it's a compaction waiting command. ackC chan<- error } @@ -765,8 +776,10 @@ func (db *DB) mCompaction() { } func (db *DB) tCompaction() { - var x cCmd - var ackQ []cCmd + var ( + x cCmd + ackQ, waitQ []cCmd + ) defer func() { if x := recover(); x != nil { @@ -778,6 +791,10 @@ func (db *DB) tCompaction() { ackQ[i].ack(ErrClosed) ackQ[i] = nil } + for i := range waitQ { + waitQ[i].ack(ErrClosed) + waitQ[i] = nil + } if x != nil { x.ack(ErrClosed) } @@ -795,12 +812,25 @@ func (db *DB) tCompaction() { return default: } + // Resume write operation as soon as possible. + if len(waitQ) > 0 && db.resumeWrite() { + for i := range waitQ { + waitQ[i].ack(nil) + waitQ[i] = nil + } + waitQ = waitQ[:0] + } } else { for i := range ackQ { ackQ[i].ack(nil) ackQ[i] = nil } ackQ = ackQ[:0] + for i := range waitQ { + waitQ[i].ack(nil) + waitQ[i] = nil + } + waitQ = waitQ[:0] select { case x = <-db.tcompCmdC: case ch := <-db.tcompPauseC: @@ -813,7 +843,11 @@ func (db *DB) tCompaction() { if x != nil { switch cmd := x.(type) { case cAuto: - ackQ = append(ackQ, x) + if cmd.ackC != nil { + waitQ = append(waitQ, x) + } else { + ackQ = append(ackQ, x) + } case cRange: x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max)) default: |