diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-04-28 17:18:01 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-04-28 17:18:01 +0800 |
commit | 7e3b080f8517731db774d5d2587b9ded4f9716e0 (patch) | |
tree | c27488e8e84dacaece8b07458e187906b7940384 /Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go | |
parent | 182d484aa70bcd5b22117f02333b1fd3b1535dcb (diff) | |
download | dexon-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar dexon-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar.gz dexon-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar.bz2 dexon-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar.lz dexon-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar.xz dexon-7e3b080f8517731db774d5d2587b9ded4f9716e0.tar.zst dexon-7e3b080f8517731db774d5d2587b9ded4f9716e0.zip |
godeps: update leveldb and snappy, dump serpent-go
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go')
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go | 713 |
1 files changed, 632 insertions, 81 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go index 9b6a74977..c9670de5d 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go @@ -8,118 +8,669 @@ package cache import ( + "sync" "sync/atomic" + "unsafe" + + "github.com/syndtr/goleveldb/leveldb/util" ) -// SetFunc used by Namespace.Get method to create a cache object. SetFunc -// may return ok false, in that case the cache object will not be created. -type SetFunc func() (ok bool, value interface{}, charge int, fin SetFin) +// Cacher provides interface to implements a caching functionality. +// An implementation must be goroutine-safe. +type Cacher interface { + // Capacity returns cache capacity. + Capacity() int -// SetFin will be called when corresponding cache object are released. -type SetFin func() + // SetCapacity sets cache capacity. + SetCapacity(capacity int) -// DelFin will be called when corresponding cache object are released. -// DelFin will be called after SetFin. The exist is true if the corresponding -// cache object is actually exist in the cache tree. -type DelFin func(exist bool) + // Promote promotes the 'cache node'. + Promote(n *Node) -// PurgeFin will be called when corresponding cache object are released. -// PurgeFin will be called after SetFin. If PurgeFin present DelFin will -// not be executed but passed to the PurgeFin, it is up to the caller -// to call it or not. -type PurgeFin func(ns, key uint64, delfin DelFin) + // Ban evicts the 'cache node' and prevent subsequent 'promote'. + Ban(n *Node) -// Cache is a cache tree. -type Cache interface { - // SetCapacity sets cache capacity. - SetCapacity(capacity int) + // Evict evicts the 'cache node'. + Evict(n *Node) - // GetNamespace gets or creates a cache namespace for the given id. - GetNamespace(id uint64) Namespace + // EvictNS evicts 'cache node' with the given namespace. + EvictNS(ns uint64) - // Purge purges all cache namespaces, read Namespace.Purge method documentation. - Purge(fin PurgeFin) + // EvictAll evicts all 'cache node'. + EvictAll() + + // Close closes the 'cache tree' + Close() error +} + +// Value is a 'cacheable object'. It may implements util.Releaser, if +// so the the Release method will be called once object is released. +type Value interface{} + +type CacheGetter struct { + Cache *Cache + NS uint64 +} - // Zap zaps all cache namespaces, read Namespace.Zap method documentation. - Zap(closed bool) +func (g *CacheGetter) Get(key uint64, setFunc func() (size int, value Value)) *Handle { + return g.Cache.Get(g.NS, key, setFunc) } -// Namespace is a cache namespace. -type Namespace interface { - // Get gets cache object for the given key. The given SetFunc (if not nil) will - // be called if the given key does not exist. - // If the given key does not exist, SetFunc is nil or SetFunc return ok false, Get - // will return ok false. - Get(key uint64, setf SetFunc) (obj Object, ok bool) +// The hash tables implementation is based on: +// "Dynamic-Sized Nonblocking Hash Tables", by Yujie Liu, Kunlong Zhang, and Michael Spear. ACM Symposium on Principles of Distributed Computing, Jul 2014. - // Get deletes cache object for the given key. If exist the cache object will - // be deleted later when all of its handles have been released (i.e. no one use - // it anymore) and the given DelFin (if not nil) will finally be executed. If - // such cache object does not exist the given DelFin will be executed anyway. - // - // Delete returns true if such cache object exist. - Delete(key uint64, fin DelFin) bool +const ( + mInitialSize = 1 << 4 + mOverflowThreshold = 1 << 5 + mOverflowGrowThreshold = 1 << 7 +) - // Purge deletes all cache objects, read Delete method documentation. - Purge(fin PurgeFin) +type mBucket struct { + mu sync.Mutex + node []*Node + frozen bool +} - // Zap detaches the namespace from the cache tree and delete all its cache - // objects. The cache objects deletion and finalizers execution are happen - // immediately, even if its existing handles haven't yet been released. - // A zapped namespace can't never be filled again. - // If closed is false then the Get function will always call the given SetFunc - // if it is not nil, but resultant of the SetFunc will not be cached. - Zap(closed bool) +func (b *mBucket) freeze() []*Node { + b.mu.Lock() + defer b.mu.Unlock() + if !b.frozen { + b.frozen = true + } + return b.node } -// Object is a cache object. -type Object interface { - // Release releases the cache object. Other methods should not be called - // after the cache object has been released. - Release() +func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset bool) (done, added bool, n *Node) { + b.mu.Lock() + + if b.frozen { + b.mu.Unlock() + return + } + + // Scan the node. + for _, n := range b.node { + if n.hash == hash && n.ns == ns && n.key == key { + atomic.AddInt32(&n.ref, 1) + b.mu.Unlock() + return true, false, n + } + } + + // Get only. + if noset { + b.mu.Unlock() + return true, false, nil + } + + // Create node. + n = &Node{ + r: r, + hash: hash, + ns: ns, + key: key, + ref: 1, + } + // Add node to bucket. + b.node = append(b.node, n) + bLen := len(b.node) + b.mu.Unlock() + + // Update counter. + grow := atomic.AddInt32(&r.nodes, 1) >= h.growThreshold + if bLen > mOverflowThreshold { + grow = grow || atomic.AddInt32(&h.overflow, 1) >= mOverflowGrowThreshold + } - // Value returns value of the cache object. - Value() interface{} + // Grow. + if grow && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) { + nhLen := len(h.buckets) << 1 + nh := &mNode{ + buckets: make([]unsafe.Pointer, nhLen), + mask: uint32(nhLen) - 1, + pred: unsafe.Pointer(h), + growThreshold: int32(nhLen * mOverflowThreshold), + shrinkThreshold: int32(nhLen >> 1), + } + ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh)) + if !ok { + panic("BUG: failed swapping head") + } + go nh.initBuckets() + } + + return true, true, n } -// Namespace state. -type nsState int +func (b *mBucket) delete(r *Cache, h *mNode, hash uint32, ns, key uint64) (done, deleted bool) { + b.mu.Lock() -const ( - nsEffective nsState = iota - nsZapped - nsClosed -) + if b.frozen { + b.mu.Unlock() + return + } -// Node state. -type nodeState int + // Scan the node. + var ( + n *Node + bLen int + ) + for i := range b.node { + n = b.node[i] + if n.ns == ns && n.key == key { + if atomic.LoadInt32(&n.ref) == 0 { + deleted = true -const ( - nodeEffective nodeState = iota - nodeEvicted - nodeRemoved -) + // Call releaser. + if n.value != nil { + if r, ok := n.value.(util.Releaser); ok { + r.Release() + } + n.value = nil + } + + // Remove node from bucket. + b.node = append(b.node[:i], b.node[i+1:]...) + bLen = len(b.node) + } + break + } + } + b.mu.Unlock() -// Fake object. -type fakeObject struct { - value interface{} - fin func() - once uint32 + if deleted { + // Call OnDel. + for _, f := range n.onDel { + f() + } + + // Update counter. + atomic.AddInt32(&r.size, int32(n.size)*-1) + shrink := atomic.AddInt32(&r.nodes, -1) < h.shrinkThreshold + if bLen >= mOverflowThreshold { + atomic.AddInt32(&h.overflow, -1) + } + + // Shrink. + if shrink && len(h.buckets) > mInitialSize && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) { + nhLen := len(h.buckets) >> 1 + nh := &mNode{ + buckets: make([]unsafe.Pointer, nhLen), + mask: uint32(nhLen) - 1, + pred: unsafe.Pointer(h), + growThreshold: int32(nhLen * mOverflowThreshold), + shrinkThreshold: int32(nhLen >> 1), + } + ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh)) + if !ok { + panic("BUG: failed swapping head") + } + go nh.initBuckets() + } + } + + return true, deleted } -func (o *fakeObject) Value() interface{} { - if atomic.LoadUint32(&o.once) == 0 { - return o.value +type mNode struct { + buckets []unsafe.Pointer // []*mBucket + mask uint32 + pred unsafe.Pointer // *mNode + resizeInProgess int32 + + overflow int32 + growThreshold int32 + shrinkThreshold int32 +} + +func (n *mNode) initBucket(i uint32) *mBucket { + if b := (*mBucket)(atomic.LoadPointer(&n.buckets[i])); b != nil { + return b + } + + p := (*mNode)(atomic.LoadPointer(&n.pred)) + if p != nil { + var node []*Node + if n.mask > p.mask { + // Grow. + pb := (*mBucket)(atomic.LoadPointer(&p.buckets[i&p.mask])) + if pb == nil { + pb = p.initBucket(i & p.mask) + } + m := pb.freeze() + // Split nodes. + for _, x := range m { + if x.hash&n.mask == i { + node = append(node, x) + } + } + } else { + // Shrink. + pb0 := (*mBucket)(atomic.LoadPointer(&p.buckets[i])) + if pb0 == nil { + pb0 = p.initBucket(i) + } + pb1 := (*mBucket)(atomic.LoadPointer(&p.buckets[i+uint32(len(n.buckets))])) + if pb1 == nil { + pb1 = p.initBucket(i + uint32(len(n.buckets))) + } + m0 := pb0.freeze() + m1 := pb1.freeze() + // Merge nodes. + node = make([]*Node, 0, len(m0)+len(m1)) + node = append(node, m0...) + node = append(node, m1...) + } + b := &mBucket{node: node} + if atomic.CompareAndSwapPointer(&n.buckets[i], nil, unsafe.Pointer(b)) { + if len(node) > mOverflowThreshold { + atomic.AddInt32(&n.overflow, int32(len(node)-mOverflowThreshold)) + } + return b + } + } + + return (*mBucket)(atomic.LoadPointer(&n.buckets[i])) +} + +func (n *mNode) initBuckets() { + for i := range n.buckets { + n.initBucket(uint32(i)) + } + atomic.StorePointer(&n.pred, nil) +} + +// Cache is a 'cache map'. +type Cache struct { + mu sync.RWMutex + mHead unsafe.Pointer // *mNode + nodes int32 + size int32 + cacher Cacher + closed bool +} + +// NewCache creates a new 'cache map'. The cacher is optional and +// may be nil. +func NewCache(cacher Cacher) *Cache { + h := &mNode{ + buckets: make([]unsafe.Pointer, mInitialSize), + mask: mInitialSize - 1, + growThreshold: int32(mInitialSize * mOverflowThreshold), + shrinkThreshold: 0, + } + for i := range h.buckets { + h.buckets[i] = unsafe.Pointer(&mBucket{}) + } + r := &Cache{ + mHead: unsafe.Pointer(h), + cacher: cacher, + } + return r +} + +func (r *Cache) getBucket(hash uint32) (*mNode, *mBucket) { + h := (*mNode)(atomic.LoadPointer(&r.mHead)) + i := hash & h.mask + b := (*mBucket)(atomic.LoadPointer(&h.buckets[i])) + if b == nil { + b = h.initBucket(i) + } + return h, b +} + +func (r *Cache) delete(n *Node) bool { + for { + h, b := r.getBucket(n.hash) + done, deleted := b.delete(r, h, n.hash, n.ns, n.key) + if done { + return deleted + } + } + return false +} + +// Nodes returns number of 'cache node' in the map. +func (r *Cache) Nodes() int { + return int(atomic.LoadInt32(&r.nodes)) +} + +// Size returns sums of 'cache node' size in the map. +func (r *Cache) Size() int { + return int(atomic.LoadInt32(&r.size)) +} + +// Capacity returns cache capacity. +func (r *Cache) Capacity() int { + if r.cacher == nil { + return 0 + } + return r.cacher.Capacity() +} + +// SetCapacity sets cache capacity. +func (r *Cache) SetCapacity(capacity int) { + if r.cacher != nil { + r.cacher.SetCapacity(capacity) + } +} + +// Get gets 'cache node' with the given namespace and key. +// If cache node is not found and setFunc is not nil, Get will atomically creates +// the 'cache node' by calling setFunc. Otherwise Get will returns nil. +// +// The returned 'cache handle' should be released after use by calling Release +// method. +func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Handle { + r.mu.RLock() + defer r.mu.RUnlock() + if r.closed { + return nil + } + + hash := murmur32(ns, key, 0xf00) + for { + h, b := r.getBucket(hash) + done, _, n := b.get(r, h, hash, ns, key, setFunc == nil) + if done { + if n != nil { + n.mu.Lock() + if n.value == nil { + if setFunc == nil { + n.mu.Unlock() + n.unref() + return nil + } + + n.size, n.value = setFunc() + if n.value == nil { + n.size = 0 + n.mu.Unlock() + n.unref() + return nil + } + atomic.AddInt32(&r.size, int32(n.size)) + } + n.mu.Unlock() + if r.cacher != nil { + r.cacher.Promote(n) + } + return &Handle{unsafe.Pointer(n)} + } + + break + } } return nil } -func (o *fakeObject) Release() { - if !atomic.CompareAndSwapUint32(&o.once, 0, 1) { +// Delete removes and ban 'cache node' with the given namespace and key. +// A banned 'cache node' will never inserted into the 'cache tree'. Ban +// only attributed to the particular 'cache node', so when a 'cache node' +// is recreated it will not be banned. +// +// If onDel is not nil, then it will be executed if such 'cache node' +// doesn't exist or once the 'cache node' is released. +// +// Delete return true is such 'cache node' exist. +func (r *Cache) Delete(ns, key uint64, onDel func()) bool { + r.mu.RLock() + defer r.mu.RUnlock() + if r.closed { + return false + } + + hash := murmur32(ns, key, 0xf00) + for { + h, b := r.getBucket(hash) + done, _, n := b.get(r, h, hash, ns, key, true) + if done { + if n != nil { + if onDel != nil { + n.mu.Lock() + n.onDel = append(n.onDel, onDel) + n.mu.Unlock() + } + if r.cacher != nil { + r.cacher.Ban(n) + } + n.unref() + return true + } + + break + } + } + + if onDel != nil { + onDel() + } + + return false +} + +// Evict evicts 'cache node' with the given namespace and key. This will +// simply call Cacher.Evict. +// +// Evict return true is such 'cache node' exist. +func (r *Cache) Evict(ns, key uint64) bool { + r.mu.RLock() + defer r.mu.RUnlock() + if r.closed { + return false + } + + hash := murmur32(ns, key, 0xf00) + for { + h, b := r.getBucket(hash) + done, _, n := b.get(r, h, hash, ns, key, true) + if done { + if n != nil { + if r.cacher != nil { + r.cacher.Evict(n) + } + n.unref() + return true + } + + break + } + } + + return false +} + +// EvictNS evicts 'cache node' with the given namespace. This will +// simply call Cacher.EvictNS. +func (r *Cache) EvictNS(ns uint64) { + r.mu.RLock() + defer r.mu.RUnlock() + if r.closed { + return + } + + if r.cacher != nil { + r.cacher.EvictNS(ns) + } +} + +// EvictAll evicts all 'cache node'. This will simply call Cacher.EvictAll. +func (r *Cache) EvictAll() { + r.mu.RLock() + defer r.mu.RUnlock() + if r.closed { return } - if o.fin != nil { - o.fin() - o.fin = nil + + if r.cacher != nil { + r.cacher.EvictAll() + } +} + +// Close closes the 'cache map' and releases all 'cache node'. +func (r *Cache) Close() error { + r.mu.Lock() + if !r.closed { + r.closed = true + + if r.cacher != nil { + if err := r.cacher.Close(); err != nil { + return err + } + } + + h := (*mNode)(r.mHead) + h.initBuckets() + + for i := range h.buckets { + b := (*mBucket)(h.buckets[i]) + for _, n := range b.node { + // Call releaser. + if n.value != nil { + if r, ok := n.value.(util.Releaser); ok { + r.Release() + } + n.value = nil + } + + // Call OnDel. + for _, f := range n.onDel { + f() + } + } + } } + r.mu.Unlock() + return nil +} + +// Node is a 'cache node'. +type Node struct { + r *Cache + + hash uint32 + ns, key uint64 + + mu sync.Mutex + size int + value Value + + ref int32 + onDel []func() + + CacheData unsafe.Pointer +} + +// NS returns this 'cache node' namespace. +func (n *Node) NS() uint64 { + return n.ns +} + +// Key returns this 'cache node' key. +func (n *Node) Key() uint64 { + return n.key +} + +// Size returns this 'cache node' size. +func (n *Node) Size() int { + return n.size +} + +// Value returns this 'cache node' value. +func (n *Node) Value() Value { + return n.value +} + +// Ref returns this 'cache node' ref counter. +func (n *Node) Ref() int32 { + return atomic.LoadInt32(&n.ref) +} + +// GetHandle returns an handle for this 'cache node'. +func (n *Node) GetHandle() *Handle { + if atomic.AddInt32(&n.ref, 1) <= 1 { + panic("BUG: Node.GetHandle on zero ref") + } + return &Handle{unsafe.Pointer(n)} +} + +func (n *Node) unref() { + if atomic.AddInt32(&n.ref, -1) == 0 { + n.r.delete(n) + } +} + +func (n *Node) unrefLocked() { + if atomic.AddInt32(&n.ref, -1) == 0 { + n.r.mu.RLock() + if !n.r.closed { + n.r.delete(n) + } + n.r.mu.RUnlock() + } +} + +type Handle struct { + n unsafe.Pointer // *Node +} + +func (h *Handle) Value() Value { + n := (*Node)(atomic.LoadPointer(&h.n)) + if n != nil { + return n.value + } + return nil +} + +func (h *Handle) Release() { + nPtr := atomic.LoadPointer(&h.n) + if nPtr != nil && atomic.CompareAndSwapPointer(&h.n, nPtr, nil) { + n := (*Node)(nPtr) + n.unrefLocked() + } +} + +func murmur32(ns, key uint64, seed uint32) uint32 { + const ( + m = uint32(0x5bd1e995) + r = 24 + ) + + k1 := uint32(ns >> 32) + k2 := uint32(ns) + k3 := uint32(key >> 32) + k4 := uint32(key) + + k1 *= m + k1 ^= k1 >> r + k1 *= m + + k2 *= m + k2 ^= k2 >> r + k2 *= m + + k3 *= m + k3 ^= k3 >> r + k3 *= m + + k4 *= m + k4 ^= k4 >> r + k4 *= m + + h := seed + + h *= m + h ^= k1 + h *= m + h ^= k2 + h *= m + h ^= k3 + h *= m + h ^= k4 + + h ^= h >> 13 + h *= m + h ^= h >> 15 + + return h } |