aboutsummaryrefslogblamecommitdiffstats
path: root/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go
blob: 28e50906adb6cf15fa09a3a73dc46668357a42a6 (plain) (tree)
1
2
3
4
5
6
7
8
9








                                                                         


              
                                                    
                                                 
                                                     





                                                                                         
                   
                              

                      

 
                                      


                                

 
                                                                   


                                          
                          


                              

                      

 
                                     





                                    
                                    





                                                 


                        

 





                                                      
         

                             

 






                                                                                 

 
                                 
                     
      
                    

                        


                                            
                                                                         

                                            

                                           
                                 
                              


                 
                           

                        



                                            
                                          
                                                                         

                                            
                         
                                 










                                                                             
                                                  
                                 
                                                



                                                                                 









                                              





                                                                              

                                             


                                                                                




                                 
 




                                            






                                                                     
                           
                                                  


                                                   
                                 
                                                             



                                                   



                                                                    


                                               





                                                                                       
                                 

                                                   



                               


                                                                         

                 
















                                                                          
                                         

                                                           

                         


         




















                                                                                                                        


                                           







                                                                                              
                               

                                

                      
                          
 
                                                                           

                                     


                                               
                                  



                                  
                                      
                



                                                          
                         
                                           

         







                                                                                                   
                                  


                                                                                
                         


                                                                                                                       





                                          






                                           
 
                                                                                          
 
                                           

                                     
                                               
 
                             
                          

                                   



                                      
                                 
                                                   
                 


                                    
                                    

 




                                    
                                  
 





                              
 

                   
 














                                                                    
                                           

                                                             
                         

                 






                                             
 


                                      
 


                                                   
 




                                                
                                                
                               
                                                                                                                                                          


                  
 





                                            
 








                                                                                                
 
                         
 

                                 
 









                                         
 






                                           
                                                             




















                                                                                                
                                 
 

                                                                        
                                                   

                         



                                                                                      
                                                                                                  















                                                                                                 
                         



                                                     
                                           
                                   

                 

                                                                      
                 
         
 













                                                          
                                                                                                               
                                  
                 






                                                              

                                             

                                      



                                                                                             
                                                      
                      

         

                                         


                                                            
                                                               
                 


                                                        
                                                                                                                                                                    








                                                                  
                                                                            

                                               
 



                                         
 

                                                                                                                                                                                                      


                                
                                                                


         
                                                                        
                                                                      
                       
                                                                                    
                                                   

                 











                                                                                  
                         
                                   
 








                                                                                                     


                         

                  

 


                                                 


         



                                          

 









                                                                                                                  
                                                   

                              
                         
                                           






                      
                   
                                                                                                 


                         
                               





                                 








                                
                          


                                 



                             







                                                              
                                                                               
                                                              



                              
                                

                                 
                         


                                


                                 
                         


                                

 
                                 
                                                                                           




                                                  
                                  
                          
                         



                                
                        
                                 
                         
                                



                  
                             










                                                              
                                



                        

                                         
                                   





                                                                 
                                 
                              



                 
                             



                                  










                                                              



                                               


                                        
                                


             
                                             
                                


                                                      
                                        
                                         
                                      

                                







                                                                      





                                                




                                                 
                                


                                                      
                                        
                                         




                                                
                                   




                                                                
                                    
                                                                                           

                                                                 


                               
                                        

         
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

package leveldb

import (
    "sync"
    "time"

    "github.com/syndtr/goleveldb/leveldb/errors"
    "github.com/syndtr/goleveldb/leveldb/opt"
    "github.com/syndtr/goleveldb/leveldb/storage"
)

var (
    errCompactionTransactExiting = errors.New("leveldb: compaction transact exiting")
)

type cStat struct {
    duration time.Duration
    read     int64
    write    int64
}

func (p *cStat) add(n *cStatStaging) {
    p.duration += n.duration
    p.read += n.read
    p.write += n.write
}

func (p *cStat) get() (duration time.Duration, read, write int64) {
    return p.duration, p.read, p.write
}

type cStatStaging struct {
    start    time.Time
    duration time.Duration
    on       bool
    read     int64
    write    int64
}

func (p *cStatStaging) startTimer() {
    if !p.on {
        p.start = time.Now()
        p.on = true
    }
}

func (p *cStatStaging) stopTimer() {
    if p.on {
        p.duration += time.Since(p.start)
        p.on = false
    }
}

type cStats struct {
    lk    sync.Mutex
    stats []cStat
}

func (p *cStats) addStat(level int, n *cStatStaging) {
    p.lk.Lock()
    if level >= len(p.stats) {
        newStats := make([]cStat, level+1)
        copy(newStats, p.stats)
        p.stats = newStats
    }
    p.stats[level].add(n)
    p.lk.Unlock()
}

func (p *cStats) getStat(level int) (duration time.Duration, read, write int64) {
    p.lk.Lock()
    defer p.lk.Unlock()
    if level < len(p.stats) {
        return p.stats[level].get()
    }
    return
}

func (db *DB) compactionError() {
    var err error
noerr:
    // No error.
    for {
        select {
        case err = <-db.compErrSetC:
            switch {
            case err == nil:
            case err == ErrReadOnly, errors.IsCorrupted(err):
                goto hasperr
            default:
                goto haserr
            }
        case <-db.closeC:
            return
        }
    }
haserr:
    // Transient error.
    for {
        select {
        case db.compErrC <- err:
        case err = <-db.compErrSetC:
            switch {
            case err == nil:
                goto noerr
            case err == ErrReadOnly, errors.IsCorrupted(err):
                goto hasperr
            default:
            }
        case <-db.closeC:
            return
        }
    }
hasperr:
    // Persistent error.
    for {
        select {
        case db.compErrC <- err:
        case db.compPerErrC <- err:
        case db.writeLockC <- struct{}{}:
            // Hold write lock, so that write won't pass-through.
            db.compWriteLocking = true
        case <-db.closeC:
            if db.compWriteLocking {
                // We should release the lock or Close will hang.
                <-db.writeLockC
            }
            return
        }
    }
}

type compactionTransactCounter int

func (cnt *compactionTransactCounter) incr() {
    *cnt++
}

type compactionTransactInterface interface {
    run(cnt *compactionTransactCounter) error
    revert() error
}

func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
    defer func() {
        if x := recover(); x != nil {
            if x == errCompactionTransactExiting {
                if err := t.revert(); err != nil {
                    db.logf("%s revert error %q", name, err)
                }
            }
            panic(x)
        }
    }()

    const (
        backoffMin = 1 * time.Second
        backoffMax = 8 * time.Second
        backoffMul = 2 * time.Second
    )
    var (
        backoff  = backoffMin
        backoffT = time.NewTimer(backoff)
        lastCnt  = compactionTransactCounter(0)

        disableBackoff = db.s.o.GetDisableCompactionBackoff()
    )
    for n := 0; ; n++ {
        // Check whether the DB is closed.
        if db.isClosed() {
            db.logf("%s exiting", name)
            db.compactionExitTransact()
        } else if n > 0 {
            db.logf("%s retrying N·%d", name, n)
        }

        // Execute.
        cnt := compactionTransactCounter(0)
        err := t.run(&cnt)
        if err != nil {
            db.logf("%s error I·%d %q", name, cnt, err)
        }

        // Set compaction error status.
        select {
        case db.compErrSetC <- err:
        case perr := <-db.compPerErrC:
            if err != nil {
                db.logf("%s exiting (persistent error %q)", name, perr)
                db.compactionExitTransact()
            }
        case <-db.closeC:
            db.logf("%s exiting", name)
            db.compactionExitTransact()
        }
        if err == nil {
            return
        }
        if errors.IsCorrupted(err) {
            db.logf("%s exiting (corruption detected)", name)
            db.compactionExitTransact()
        }

        if !disableBackoff {
            // Reset backoff duration if counter is advancing.
            if cnt > lastCnt {
                backoff = backoffMin
                lastCnt = cnt
            }

            // Backoff.
            backoffT.Reset(backoff)
            if backoff < backoffMax {
                backoff *= backoffMul
                if backoff > backoffMax {
                    backoff = backoffMax
                }
            }
            select {
            case <-backoffT.C:
            case <-db.closeC:
                db.logf("%s exiting", name)
                db.compactionExitTransact()
            }
        }
    }
}

type compactionTransactFunc struct {
    runFunc    func(cnt *compactionTransactCounter) error
    revertFunc func() error
}

func (t *compactionTransactFunc) run(cnt *compactionTransactCounter) error {
    return t.runFunc(cnt)
}

func (t *compactionTransactFunc) revert() error {
    if t.revertFunc != nil {
        return t.revertFunc()
    }
    return nil
}

func (db *DB) compactionTransactFunc(name string, run func(cnt *compactionTransactCounter) error, revert func() error) {
    db.compactionTransact(name, &compactionTransactFunc{run, revert})
}

func (db *DB) compactionExitTransact() {
    panic(errCompactionTransactExiting)
}

func (db *DB) compactionCommit(name string, rec *sessionRecord) {
    db.compCommitLk.Lock()
    defer db.compCommitLk.Unlock() // Defer is necessary.
    db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error {
        return db.s.commit(rec)
    }, nil)
}

func (db *DB) memCompaction() {
    mdb := db.getFrozenMem()
    if mdb == nil {
        return
    }
    defer mdb.decref()

    db.logf("memdb@flush N·%d %s", mdb.Len(), shortenb(mdb.Size()))

    // Don't compact empty memdb.
    if mdb.Len() == 0 {
        db.logf("memdb@flush skipping")
        // drop frozen memdb
        db.dropFrozenMem()
        return
    }

    // Pause table compaction.
    resumeC := make(chan struct{})
    select {
    case db.tcompPauseC <- (chan<- struct{})(resumeC):
    case <-db.compPerErrC:
        close(resumeC)
        resumeC = nil
    case <-db.closeC:
        db.compactionExitTransact()
    }

    var (
        rec        = &sessionRecord{}
        stats      = &cStatStaging{}
        flushLevel int
    )

    // Generate tables.
    db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
        stats.startTimer()
        flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
        stats.stopTimer()
        return
    }, func() error {
        for _, r := range rec.addedTables {
            db.logf("memdb@flush revert @%d", r.num)
            if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil {
                return err
            }
        }
        return nil
    })

    rec.setJournalNum(db.journalFd.Num)
    rec.setSeqNum(db.frozenSeq)

    // Commit.
    stats.startTimer()
    db.compactionCommit("memdb", rec)
    stats.stopTimer()

    db.logf("memdb@flush committed F·%d %v", len(rec.addedTables), stats.duration)

    for _, r := range rec.addedTables {
        stats.write += r.size
    }
    db.compStats.addStat(flushLevel, stats)

    // Drop frozen memdb.
    db.dropFrozenMem()

    // Resume table compaction.
    if resumeC != nil {
        select {
        case <-resumeC:
            close(resumeC)
        case <-db.closeC:
            db.compactionExitTransact()
        }
    }

    // Trigger table compaction.
    db.compTrigger(db.tcompCmdC)
}

type tableCompactionBuilder struct {
    db           *DB
    s            *session
    c            *compaction
    rec          *sessionRecord
    stat0, stat1 *cStatStaging

    snapHasLastUkey bool
    snapLastUkey    []byte
    snapLastSeq     uint64
    snapIter        int
    snapKerrCnt     int
    snapDropCnt     int

    kerrCnt int
    dropCnt int

    minSeq    uint64
    strict    bool
    tableSize int

    tw *tWriter
}

func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
    // Create new table if not already.
    if b.tw == nil {
        // Check for pause event.
        if b.db != nil {
            select {
            case ch := <-b.db.tcompPauseC:
                b.db.pauseCompaction(ch)
            case <-b.db.closeC:
                b.db.compactionExitTransact()
            default:
            }
        }

        // Create new table.
        var err error
        b.tw, err = b.s.tops.create()
        if err != nil {
            return err
        }
    }

    // Write key/value into table.
    return b.tw.append(key, value)
}

func (b *tableCompactionBuilder) needFlush() bool {
    return b.tw.tw.BytesLen() >= b.tableSize
}

func (b *tableCompactionBuilder) flush() error {
    t, err := b.tw.finish()
    if err != nil {
        return err
    }
    b.rec.addTableFile(b.c.sourceLevel+1, t)
    b.stat1.write += t.size
    b.s.logf("table@build created L%d@%d %d %s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
    b.tw = nil
    return nil
}

func (b *tableCompactionBuilder) cleanup() {
    if b.tw != nil {
        b.tw.drop()
        b.tw = nil
    }
}

func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error {
    snapResumed := b.snapIter > 0
    hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary.
    lastUkey := append([]byte{}, b.snapLastUkey...)
    lastSeq := b.snapLastSeq
    b.kerrCnt = b.snapKerrCnt
    b.dropCnt = b.snapDropCnt
    // Restore compaction state.
    b.c.restore()

    defer b.cleanup()

    b.stat1.startTimer()
    defer b.stat1.stopTimer()

    iter := b.c.newIterator()
    defer iter.Release()
    for i := 0; iter.Next(); i++ {
        // Incr transact counter.
        cnt.incr()

        // Skip until last state.
        if i < b.snapIter {
            continue
        }

        resumed := false
        if snapResumed {
            resumed = true
            snapResumed = false
        }

        ikey := iter.Key()
        ukey, seq, kt, kerr := parseInternalKey(ikey)

        if kerr == nil {
            shouldStop := !resumed && b.c.shouldStopBefore(ikey)

            if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 {
                // First occurrence of this user key.

                // Only rotate tables if ukey doesn't hop across.
                if b.tw != nil && (shouldStop || b.needFlush()) {
                    if err := b.flush(); err != nil {
                        return err
                    }

                    // Creates snapshot of the state.
                    b.c.save()
                    b.snapHasLastUkey = hasLastUkey
                    b.snapLastUkey = append(b.snapLastUkey[:0], lastUkey...)
                    b.snapLastSeq = lastSeq
                    b.snapIter = i
                    b.snapKerrCnt = b.kerrCnt
                    b.snapDropCnt = b.dropCnt
                }

                hasLastUkey = true
                lastUkey = append(lastUkey[:0], ukey...)
                lastSeq = keyMaxSeq
            }

            switch {
            case lastSeq <= b.minSeq:
                // Dropped because newer entry for same user key exist
                fallthrough // (A)
            case kt == keyTypeDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey):
                // For this user key:
                // (1) there is no data in higher levels
                // (2) data in lower levels will have larger seq numbers
                // (3) data in layers that are being compacted here and have
                //     smaller seq numbers will be dropped in the next
                //     few iterations of this loop (by rule (A) above).
                // Therefore this deletion marker is obsolete and can be dropped.
                lastSeq = seq
                b.dropCnt++
                continue
            default:
                lastSeq = seq
            }
        } else {
            if b.strict {
                return kerr
            }

            // Don't drop corrupted keys.
            hasLastUkey = false
            lastUkey = lastUkey[:0]
            lastSeq = keyMaxSeq
            b.kerrCnt++
        }

        if err := b.appendKV(ikey, iter.Value()); err != nil {
            return err
        }
    }

    if err := iter.Error(); err != nil {
        return err
    }

    // Finish last table.
    if b.tw != nil && !b.tw.empty() {
        return b.flush()
    }
    return nil
}

func (b *tableCompactionBuilder) revert() error {
    for _, at := range b.rec.addedTables {
        b.s.logf("table@build revert @%d", at.num)
        if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil {
            return err
        }
    }
    return nil
}

func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
    defer c.release()

    rec := &sessionRecord{}
    rec.addCompPtr(c.sourceLevel, c.imax)

    if !noTrivial && c.trivial() {
        t := c.levels[0][0]
        db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1)
        rec.delTable(c.sourceLevel, t.fd.Num)
        rec.addTableFile(c.sourceLevel+1, t)
        db.compactionCommit("table-move", rec)
        return
    }

    var stats [2]cStatStaging
    for i, tables := range c.levels {
        for _, t := range tables {
            stats[i].read += t.size
            // Insert deleted tables into record
            rec.delTable(c.sourceLevel+i, t.fd.Num)
        }
    }
    sourceSize := int(stats[0].read + stats[1].read)
    minSeq := db.minSeq()
    db.logf("table@compaction L%d·%d -> L%d·%d %s %d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq)

    b := &tableCompactionBuilder{
        db:        db,
        s:         db.s,
        c:         c,
        rec:       rec,
        stat1:     &stats[1],
        minSeq:    minSeq,
        strict:    db.s.o.GetStrict(opt.StrictCompaction),
        tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1),
    }
    db.compactionTransact("table@build", b)

    // Commit.
    stats[1].startTimer()
    db.compactionCommit("table", rec)
    stats[1].stopTimer()

    resultSize := int(stats[1].write)
    db.logf("table@compaction committed F%s S%s Ke·%d %d %v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)

    // Save compaction stats
    for i := range stats {
        db.compStats.addStat(c.sourceLevel+1, &stats[i])
    }
}

func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error {
    db.logf("table@compaction range L%d %q:%q", level, umin, umax)
    if level >= 0 {
        if c := db.s.getCompactionRange(level, umin, umax, true); c != nil {
            db.tableCompaction(c, true)
        }
    } else {
        // Retry until nothing to compact.
        for {
            compacted := false

            // Scan for maximum level with overlapped tables.
            v := db.s.version()
            m := 1
            for i := m; i < len(v.levels); i++ {
                tables := v.levels[i]
                if tables.overlaps(db.s.icmp, umin, umax, false) {
                    m = i
                }
            }
            v.release()

            for level := 0; level < m; level++ {
                if c := db.s.getCompactionRange(level, umin, umax, false); c != nil {
                    db.tableCompaction(c, true)
                    compacted = true
                }
            }

            if !compacted {
                break
            }
        }
    }

    return nil
}

func (db *DB) tableAutoCompaction() {
    if c := db.s.pickCompaction(); c != nil {
        db.tableCompaction(c, false)
    }
}

func (db *DB) tableNeedCompaction() bool {
    v := db.s.version()
    defer v.release()
    return v.needCompaction()
}

// resumeWrite returns an indicator whether we should resume write operation if enough level0 files are compacted.
func (db *DB) resumeWrite() bool {
    v := db.s.version()
    defer v.release()
    if v.tLen(0) < db.s.o.GetWriteL0PauseTrigger() {
        return true
    }
    return false
}

func (db *DB) pauseCompaction(ch chan<- struct{}) {
    select {
    case ch <- struct{}{}:
    case <-db.closeC:
        db.compactionExitTransact()
    }
}

type cCmd interface {
    ack(err error)
}

type cAuto struct {
    // Note for table compaction, an empty ackC represents it's a compaction waiting command.
    ackC chan<- error
}

func (r cAuto) ack(err error) {
    if r.ackC != nil {
        defer func() {
            recover()
        }()
        r.ackC <- err
    }
}

type cRange struct {
    level    int
    min, max []byte
    ackC     chan<- error
}

func (r cRange) ack(err error) {
    if r.ackC != nil {
        defer func() {
            recover()
        }()
        r.ackC <- err
    }
}

// This will trigger auto compaction but will not wait for it.
func (db *DB) compTrigger(compC chan<- cCmd) {
    select {
    case compC <- cAuto{}:
    default:
    }
}

// This will trigger auto compaction and/or wait for all compaction to be done.
func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
    ch := make(chan error)
    defer close(ch)
    // Send cmd.
    select {
    case compC <- cAuto{ch}:
    case err = <-db.compErrC:
        return
    case <-db.closeC:
        return ErrClosed
    }
    // Wait cmd.
    select {
    case err = <-ch:
    case err = <-db.compErrC:
    case <-db.closeC:
        return ErrClosed
    }
    return err
}

// Send range compaction request.
func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
    ch := make(chan error)
    defer close(ch)
    // Send cmd.
    select {
    case compC <- cRange{level, min, max, ch}:
    case err := <-db.compErrC:
        return err
    case <-db.closeC:
        return ErrClosed
    }
    // Wait cmd.
    select {
    case err = <-ch:
    case err = <-db.compErrC:
    case <-db.closeC:
        return ErrClosed
    }
    return err
}

func (db *DB) mCompaction() {
    var x cCmd

    defer func() {
        if x := recover(); x != nil {
            if x != errCompactionTransactExiting {
                panic(x)
            }
        }
        if x != nil {
            x.ack(ErrClosed)
        }
        db.closeW.Done()
    }()

    for {
        select {
        case x = <-db.mcompCmdC:
            switch x.(type) {
            case cAuto:
                db.memCompaction()
                x.ack(nil)
                x = nil
            default:
                panic("leveldb: unknown command")
            }
        case <-db.closeC:
            return
        }
    }
}

func (db *DB) tCompaction() {
    var (
        x           cCmd
        ackQ, waitQ []cCmd
    )

    defer func() {
        if x := recover(); x != nil {
            if x != errCompactionTransactExiting {
                panic(x)
            }
        }
        for i := range ackQ {
            ackQ[i].ack(ErrClosed)
            ackQ[i] = nil
        }
        for i := range waitQ {
            waitQ[i].ack(ErrClosed)
            waitQ[i] = nil
        }
        if x != nil {
            x.ack(ErrClosed)
        }
        db.closeW.Done()
    }()

    for {
        if db.tableNeedCompaction() {
            select {
            case x = <-db.tcompCmdC:
            case ch := <-db.tcompPauseC:
                db.pauseCompaction(ch)
                continue
            case <-db.closeC:
                return
            default:
            }
            // Resume write operation as soon as possible.
            if len(waitQ) > 0 && db.resumeWrite() {
                for i := range waitQ {
                    waitQ[i].ack(nil)
                    waitQ[i] = nil
                }
                waitQ = waitQ[:0]
            }
        } else {
            for i := range ackQ {
                ackQ[i].ack(nil)
                ackQ[i] = nil
            }
            ackQ = ackQ[:0]
            for i := range waitQ {
                waitQ[i].ack(nil)
                waitQ[i] = nil
            }
            waitQ = waitQ[:0]
            select {
            case x = <-db.tcompCmdC:
            case ch := <-db.tcompPauseC:
                db.pauseCompaction(ch)
                continue
            case <-db.closeC:
                return
            }
        }
        if x != nil {
            switch cmd := x.(type) {
            case cAuto:
                if cmd.ackC != nil {
                    waitQ = append(waitQ, x)
                } else {
                    ackQ = append(ackQ, x)
                }
            case cRange:
                x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
            default:
                panic("leveldb: unknown command")
            }
            x = nil
        }
        db.tableAutoCompaction()
    }
}