aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgary rong <garyrong0905@gmail.com>2019-03-18 16:28:47 +0800
committerPéter Szilágyi <peterke@gmail.com>2019-03-18 16:28:47 +0800
commitdef1b0d7e1eb3393f9a749a1b348610453d7b93c (patch)
treea639cbd7442ed6b3baed484f9eb475c681664085
parentacebccc3bf8198910a3f5848a7b60e5a14858a9e (diff)
downloadgo-tangerine-def1b0d7e1eb3393f9a749a1b348610453d7b93c.tar
go-tangerine-def1b0d7e1eb3393f9a749a1b348610453d7b93c.tar.gz
go-tangerine-def1b0d7e1eb3393f9a749a1b348610453d7b93c.tar.bz2
go-tangerine-def1b0d7e1eb3393f9a749a1b348610453d7b93c.tar.lz
go-tangerine-def1b0d7e1eb3393f9a749a1b348610453d7b93c.tar.xz
go-tangerine-def1b0d7e1eb3393f9a749a1b348610453d7b93c.tar.zst
go-tangerine-def1b0d7e1eb3393f9a749a1b348610453d7b93c.zip
vendor: udpate leveldb upstream (#19284)
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/session.go51
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go12
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/session_util.go236
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/table.go81
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/version.go31
-rw-r--r--vendor/vendor.json6
6 files changed, 362 insertions, 55 deletions
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session.go b/vendor/github.com/syndtr/goleveldb/leveldb/session.go
index 1bec34c4c..7310209ba 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/session.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/session.go
@@ -47,15 +47,24 @@ type session struct {
o *cachedOptions
icmp *iComparer
tops *tOps
- fileRef map[int64]int
manifest *journal.Writer
manifestWriter storage.Writer
manifestFd storage.FileDesc
- stCompPtrs []internalKey // compaction pointers; need external synchronization
- stVersion *version // current version
- vmu sync.Mutex
+ stCompPtrs []internalKey // compaction pointers; need external synchronization
+ stVersion *version // current version
+ ntVersionId int64 // next version id to assign
+ refCh chan *vTask
+ relCh chan *vTask
+ deltaCh chan *vDelta
+ abandon chan int64
+ closeC chan struct{}
+ closeW sync.WaitGroup
+ vmu sync.Mutex
+
+ // Testing fields
+ fileRefCh chan chan map[int64]int // channel used to pass current reference stat
}
// Creates new initialized session instance.
@@ -68,13 +77,21 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
return
}
s = &session{
- stor: newIStorage(stor),
- storLock: storLock,
- fileRef: make(map[int64]int),
+ stor: newIStorage(stor),
+ storLock: storLock,
+ refCh: make(chan *vTask),
+ relCh: make(chan *vTask),
+ deltaCh: make(chan *vDelta),
+ abandon: make(chan int64),
+ fileRefCh: make(chan chan map[int64]int),
+ closeC: make(chan struct{}),
}
s.setOptions(o)
s.tops = newTableOps(s)
- s.setVersion(newVersion(s))
+
+ s.closeW.Add(1)
+ go s.refLoop()
+ s.setVersion(nil, newVersion(s))
s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed")
return
}
@@ -90,7 +107,11 @@ func (s *session) close() {
}
s.manifest = nil
s.manifestWriter = nil
- s.setVersion(&version{s: s, closing: true})
+ s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionId})
+
+ // Close all background goroutines
+ close(s.closeC)
+ s.closeW.Wait()
}
// Release session lock.
@@ -180,7 +201,7 @@ func (s *session) recover() (err error) {
}
s.manifestFd = fd
- s.setVersion(staging.finish(false))
+ s.setVersion(rec, staging.finish(false))
s.setNextFileNum(rec.nextFileNum)
s.recordCommited(rec)
return nil
@@ -194,6 +215,14 @@ func (s *session) commit(r *sessionRecord, trivial bool) (err error) {
// spawn new version based on current version
nv := v.spawn(r, trivial)
+ // abandon useless version id to prevent blocking version processing loop.
+ defer func() {
+ if err != nil {
+ s.abandon <- nv.id
+ s.logf("commit@abandon useless vid D%d", nv.id)
+ }
+ }()
+
if s.manifest == nil {
// manifest journal writer not yet created, create one
err = s.newManifest(r, nv)
@@ -203,7 +232,7 @@ func (s *session) commit(r *sessionRecord, trivial bool) (err error) {
// finally, apply new version if no error rise
if err == nil {
- s.setVersion(nv)
+ s.setVersion(r, nv)
}
return
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go
index 089cd00b2..f6030022d 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go
@@ -181,10 +181,14 @@ func (c *compaction) expand() {
t0, t1 := c.levels[0], c.levels[1]
imin, imax := t0.getRange(c.s.icmp)
- // We expand t0 here just incase ukey hop across tables.
- t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0)
- if len(t0) != len(c.levels[0]) {
- imin, imax = t0.getRange(c.s.icmp)
+
+ // For non-zero levels, the ukey can't hop across tables at all.
+ if c.sourceLevel == 0 {
+ // We expand t0 here just incase ukey hop across tables.
+ t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0)
+ if len(t0) != len(c.levels[0]) {
+ imin, imax = t0.getRange(c.s.icmp)
+ }
}
t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
// Get entire range covered by compaction.
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go
index 40cb2cf95..67a6df979 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go
@@ -9,6 +9,7 @@ package leveldb
import (
"fmt"
"sync/atomic"
+ "time"
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/storage"
@@ -39,19 +40,213 @@ func (s *session) newTemp() storage.FileDesc {
return storage.FileDesc{Type: storage.TypeTemp, Num: num}
}
-func (s *session) addFileRef(fd storage.FileDesc, ref int) int {
- ref += s.fileRef[fd.Num]
- if ref > 0 {
- s.fileRef[fd.Num] = ref
- } else if ref == 0 {
- delete(s.fileRef, fd.Num)
- } else {
- panic(fmt.Sprintf("negative ref: %v", fd))
- }
- return ref
+// Session state.
+
+const (
+ // maxCachedNumber represents the maximum number of version tasks
+ // that can be cached in the ref loop.
+ maxCachedNumber = 256
+
+ // maxCachedTime represents the maximum time for ref loop to cache
+ // a version task.
+ maxCachedTime = 5 * time.Minute
+)
+
+// vDelta indicates the change information between the next version
+// and the currently specified version
+type vDelta struct {
+ vid int64
+ added []int64
+ deleted []int64
}
-// Session state.
+// vTask defines a version task for either reference or release.
+type vTask struct {
+ vid int64
+ files []tFiles
+ created time.Time
+}
+
+func (s *session) refLoop() {
+ var (
+ fileRef = make(map[int64]int) // Table file reference counter
+ ref = make(map[int64]*vTask) // Current referencing version store
+ deltas = make(map[int64]*vDelta)
+ referenced = make(map[int64]struct{})
+ released = make(map[int64]*vDelta) // Released version that waiting for processing
+ abandoned = make(map[int64]struct{}) // Abandoned version id
+ next, last int64
+ )
+ // addFileRef adds file reference counter with specified file number and
+ // reference value
+ addFileRef := func(fnum int64, ref int) int {
+ ref += fileRef[fnum]
+ if ref > 0 {
+ fileRef[fnum] = ref
+ } else if ref == 0 {
+ delete(fileRef, fnum)
+ } else {
+ panic(fmt.Sprintf("negative ref: %v", fnum))
+ }
+ return ref
+ }
+ // skipAbandoned skips useless abandoned version id.
+ skipAbandoned := func() bool {
+ if _, exist := abandoned[next]; exist {
+ delete(abandoned, next)
+ return true
+ }
+ return false
+ }
+ // applyDelta applies version change to current file reference.
+ applyDelta := func(d *vDelta) {
+ for _, t := range d.added {
+ addFileRef(t, 1)
+ }
+ for _, t := range d.deleted {
+ if addFileRef(t, -1) == 0 {
+ s.tops.remove(storage.FileDesc{Type: storage.TypeTable, Num: t})
+ }
+ }
+ }
+
+ timer := time.NewTimer(0)
+ <-timer.C // discard the initial tick
+ defer timer.Stop()
+
+ // processTasks processes version tasks in strict order.
+ //
+ // If we want to use delta to reduce the cost of file references and dereferences,
+ // we must strictly follow the id of the version, otherwise some files that are
+ // being referenced will be deleted.
+ //
+ // In addition, some db operations (such as iterators) may cause a version to be
+ // referenced for a long time. In order to prevent such operations from blocking
+ // the entire processing queue, we will properly convert some of the version tasks
+ // into full file references and releases.
+ processTasks := func() {
+ timer.Reset(maxCachedTime)
+ // Make sure we don't cache too many version tasks.
+ for {
+ // Skip any abandoned version number to prevent blocking processing.
+ if skipAbandoned() {
+ next += 1
+ continue
+ }
+ // Don't bother the version that has been released.
+ if _, exist := released[next]; exist {
+ break
+ }
+ // Ensure the specified version has been referenced.
+ if _, exist := ref[next]; !exist {
+ break
+ }
+ if last-next < maxCachedNumber && time.Since(ref[next].created) < maxCachedTime {
+ break
+ }
+ // Convert version task into full file references and releases mode.
+ // Reference version(i+1) first and wait version(i) to release.
+ // FileRef(i+1) = FileRef(i) + Delta(i)
+ for _, tt := range ref[next].files {
+ for _, t := range tt {
+ addFileRef(t.fd.Num, 1)
+ }
+ }
+ // Note, if some compactions take a long time, even more than 5 minutes,
+ // we may miss the corresponding delta information here.
+ // Fortunately it will not affect the correctness of the file reference,
+ // and we can apply the delta once we receive it.
+ if d := deltas[next]; d != nil {
+ applyDelta(d)
+ }
+ referenced[next] = struct{}{}
+ delete(ref, next)
+ delete(deltas, next)
+ next += 1
+ }
+
+ // Use delta information to process all released versions.
+ for {
+ if skipAbandoned() {
+ next += 1
+ continue
+ }
+ if d, exist := released[next]; exist {
+ if d != nil {
+ applyDelta(d)
+ }
+ delete(released, next)
+ next += 1
+ continue
+ }
+ return
+ }
+ }
+
+ for {
+ processTasks()
+
+ select {
+ case t := <-s.refCh:
+ if _, exist := ref[t.vid]; exist {
+ panic("duplicate reference request")
+ }
+ ref[t.vid] = t
+ if t.vid > last {
+ last = t.vid
+ }
+
+ case d := <-s.deltaCh:
+ if _, exist := ref[d.vid]; !exist {
+ if _, exist2 := referenced[d.vid]; !exist2 {
+ panic("invalid release request")
+ }
+ // The reference opt is already expired, apply
+ // delta here.
+ applyDelta(d)
+ continue
+ }
+ deltas[d.vid] = d
+
+ case t := <-s.relCh:
+ if _, exist := referenced[t.vid]; exist {
+ for _, tt := range t.files {
+ for _, t := range tt {
+ if addFileRef(t.fd.Num, -1) == 0 {
+ s.tops.remove(t.fd)
+ }
+ }
+ }
+ delete(referenced, t.vid)
+ continue
+ }
+ if _, exist := ref[t.vid]; !exist {
+ panic("invalid release request")
+ }
+ released[t.vid] = deltas[t.vid]
+ delete(deltas, t.vid)
+ delete(ref, t.vid)
+
+ case id := <-s.abandon:
+ if id >= next {
+ abandoned[id] = struct{}{}
+ }
+
+ case <-timer.C:
+
+ case r := <-s.fileRefCh:
+ ref := make(map[int64]int)
+ for f, c := range fileRef {
+ ref[f] = c
+ }
+ r <- ref
+
+ case <-s.closeC:
+ s.closeW.Done()
+ return
+ }
+ }
+}
// Get current version. This will incr version ref, must call
// version.release (exactly once) after use.
@@ -69,13 +264,30 @@ func (s *session) tLen(level int) int {
}
// Set current version to v.
-func (s *session) setVersion(v *version) {
+func (s *session) setVersion(r *sessionRecord, v *version) {
s.vmu.Lock()
defer s.vmu.Unlock()
// Hold by session. It is important to call this first before releasing
// current version, otherwise the still used files might get released.
v.incref()
if s.stVersion != nil {
+ if r != nil {
+ var (
+ added = make([]int64, 0, len(r.addedTables))
+ deleted = make([]int64, 0, len(r.deletedTables))
+ )
+ for _, t := range r.addedTables {
+ added = append(added, t.num)
+ }
+ for _, t := range r.deletedTables {
+ deleted = append(deleted, t.num)
+ }
+ select {
+ case s.deltaCh <- &vDelta{vid: s.stVersion.id, added: added, deleted: deleted}:
+ case <-v.s.closeC:
+ s.log("reference loop already exist")
+ }
+ }
// Release current version.
s.stVersion.releaseNB()
}
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table.go b/vendor/github.com/syndtr/goleveldb/leveldb/table.go
index 518e1db1c..5ad1f8054 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/table.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/table.go
@@ -7,6 +7,7 @@
package leveldb
import (
+ "bytes"
"fmt"
"sort"
"sync/atomic"
@@ -158,6 +159,22 @@ func (tf tFiles) searchNumLess(num int64) int {
})
}
+// Searches smallest index of tables whose its smallest
+// key is after the given key.
+func (tf tFiles) searchMinUkey(icmp *iComparer, umin []byte) int {
+ return sort.Search(len(tf), func(i int) bool {
+ return icmp.ucmp.Compare(tf[i].imin.ukey(), umin) > 0
+ })
+}
+
+// Searches smallest index of tables whose its largest
+// key is after the given key.
+func (tf tFiles) searchMaxUkey(icmp *iComparer, umax []byte) int {
+ return sort.Search(len(tf), func(i int) bool {
+ return icmp.ucmp.Compare(tf[i].imax.ukey(), umax) > 0
+ })
+}
+
// Returns true if given key range overlaps with one or more
// tables key range. If unsorted is true then binary search will not be used.
func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool {
@@ -189,6 +206,50 @@ func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) boo
// expanded.
// The dst content will be overwritten.
func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles {
+ // Short circuit if tf is empty
+ if len(tf) == 0 {
+ return nil
+ }
+ // For non-zero levels, there is no ukey hop across at all.
+ // And what's more, the files in these levels are strictly sorted,
+ // so use binary search instead of heavy traverse.
+ if !overlapped {
+ var begin, end int
+ // Determine the begin index of the overlapped file
+ if umin != nil {
+ index := tf.searchMinUkey(icmp, umin)
+ if index == 0 {
+ begin = 0
+ } else if bytes.Compare(tf[index-1].imax.ukey(), umin) >= 0 {
+ // The min ukey overlaps with the index-1 file, expand it.
+ begin = index - 1
+ } else {
+ begin = index
+ }
+ }
+ // Determine the end index of the overlapped file
+ if umax != nil {
+ index := tf.searchMaxUkey(icmp, umax)
+ if index == len(tf) {
+ end = len(tf)
+ } else if bytes.Compare(tf[index].imin.ukey(), umax) <= 0 {
+ // The max ukey overlaps with the index file, expand it.
+ end = index + 1
+ } else {
+ end = index
+ }
+ } else {
+ end = len(tf)
+ }
+ // Ensure the overlapped file indexes are valid.
+ if begin >= end {
+ return nil
+ }
+ dst = make([]*tFile, end-begin)
+ copy(dst, tf[begin:end])
+ return dst
+ }
+
dst = dst[:0]
for i := 0; i < len(tf); {
t := tf[i]
@@ -201,11 +262,9 @@ func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, ove
} else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 {
umax = t.imax.ukey()
// Restart search if it is overlapped.
- if overlapped {
- dst = dst[:0]
- i = 0
- continue
- }
+ dst = dst[:0]
+ i = 0
+ continue
}
dst = append(dst, t)
@@ -424,15 +483,15 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite
// Removes table from persistent storage. It waits until
// no one use the the table.
-func (t *tOps) remove(f *tFile) {
- t.cache.Delete(0, uint64(f.fd.Num), func() {
- if err := t.s.stor.Remove(f.fd); err != nil {
- t.s.logf("table@remove removing @%d %q", f.fd.Num, err)
+func (t *tOps) remove(fd storage.FileDesc) {
+ t.cache.Delete(0, uint64(fd.Num), func() {
+ if err := t.s.stor.Remove(fd); err != nil {
+ t.s.logf("table@remove removing @%d %q", fd.Num, err)
} else {
- t.s.logf("table@remove removed @%d", f.fd.Num)
+ t.s.logf("table@remove removed @%d", fd.Num)
}
if t.evictRemoved && t.bcache != nil {
- t.bcache.EvictNS(uint64(f.fd.Num))
+ t.bcache.EvictNS(uint64(fd.Num))
}
})
}
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/version.go b/vendor/github.com/syndtr/goleveldb/leveldb/version.go
index 51361e5d8..2664560e1 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/version.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/version.go
@@ -9,6 +9,7 @@ package leveldb
import (
"fmt"
"sync/atomic"
+ "time"
"unsafe"
"github.com/syndtr/goleveldb/leveldb/iterator"
@@ -22,7 +23,8 @@ type tSet struct {
}
type version struct {
- s *session
+ id int64 // unique monotonous increasing version id
+ s *session
levels []tFiles
@@ -39,8 +41,11 @@ type version struct {
released bool
}
+// newVersion creates a new version with an unique monotonous increasing id.
func newVersion(s *session) *version {
- return &version{s: s}
+ id := atomic.AddInt64(&s.ntVersionId, 1)
+ nv := &version{s: s, id: id - 1}
+ return nv
}
func (v *version) incref() {
@@ -50,11 +55,11 @@ func (v *version) incref() {
v.ref++
if v.ref == 1 {
- // Incr file ref.
- for _, tt := range v.levels {
- for _, t := range tt {
- v.s.addFileRef(t.fd, 1)
- }
+ select {
+ case v.s.refCh <- &vTask{vid: v.id, files: v.levels, created: time.Now()}:
+ // We can use v.levels directly here since it is immutable.
+ case <-v.s.closeC:
+ v.s.log("reference loop already exist")
}
}
}
@@ -66,13 +71,11 @@ func (v *version) releaseNB() {
} else if v.ref < 0 {
panic("negative version ref")
}
-
- for _, tt := range v.levels {
- for _, t := range tt {
- if v.s.addFileRef(t.fd, -1) == 0 {
- v.s.tops.remove(t)
- }
- }
+ select {
+ case v.s.relCh <- &vTask{vid: v.id, files: v.levels, created: time.Now()}:
+ // We can use v.levels directly here since it is immutable.
+ case <-v.s.closeC:
+ v.s.log("reference loop already exist")
}
v.released = true
diff --git a/vendor/vendor.json b/vendor/vendor.json
index 2c913d4a3..c2c7d7c3d 100644
--- a/vendor/vendor.json
+++ b/vendor/vendor.json
@@ -478,10 +478,10 @@
"revisionTime": "2017-07-05T02:17:15Z"
},
{
- "checksumSHA1": "4vxle8JfbPDO0ndiBUjMmRXGBQM=",
+ "checksumSHA1": "4NTmfUj7H5J59M2wCnp3/8FWt1I=",
"path": "github.com/syndtr/goleveldb/leveldb",
- "revision": "3a907f965fc16db5f7787e18d4434bbe46d47f6e",
- "revisionTime": "2019-03-04T06:08:05Z"
+ "revision": "c3a204f8e96543bb0cc090385c001078f184fc46",
+ "revisionTime": "2019-03-18T03:00:20Z"
},
{
"checksumSHA1": "mPNraL2edpk/2FYq26rSXfMHbJg=",