// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> // All rights reserved. // // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. // Package cache provides interface and implementation of a cache algorithms. package cache import ( "sync" "sync/atomic" "unsafe" "github.com/syndtr/goleveldb/leveldb/util" ) // Cacher provides interface to implements a caching functionality. // An implementation must be safe for concurrent use. type Cacher interface { // Capacity returns cache capacity. Capacity() int // SetCapacity sets cache capacity. SetCapacity(capacity int) // Promote promotes the 'cache node'. Promote(n *Node) // Ban evicts the 'cache node' and prevent subsequent 'promote'. Ban(n *Node) // Evict evicts the 'cache node'. Evict(n *Node) // EvictNS evicts 'cache node' with the given namespace. EvictNS(ns uint64) // EvictAll evicts all 'cache node'. EvictAll() // Close closes the 'cache tree' Close() error } // Value is a 'cacheable object'. It may implements util.Releaser, if // so the the Release method will be called once object is released. type Value interface{} // NamespaceGetter provides convenient wrapper for namespace. type NamespaceGetter struct { Cache *Cache NS uint64 } // Get simply calls Cache.Get() method. func (g *NamespaceGetter) Get(key uint64, setFunc func() (size int, value Value)) *Handle { return g.Cache.Get(g.NS, key, setFunc) } // The hash tables implementation is based on: // "Dynamic-Sized Nonblocking Hash Tables", by Yujie Liu, // Kunlong Zhang, and Michael Spear. // ACM Symposium on Principles of Distributed Computing, Jul 2014. const ( mInitialSize = 1 << 4 mOverflowThreshold = 1 << 5 mOverflowGrowThreshold = 1 << 7 ) type mBucket struct { mu sync.Mutex node []*Node frozen bool } func (b *mBucket) freeze() []*Node { b.mu.Lock() defer b.mu.Unlock() if !b.frozen { b.frozen = true } return b.node } func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset bool) (done, added bool, n *Node) { b.mu.Lock() if b.frozen { b.mu.Unlock() return } // Scan the node. for _, n := range b.node { if n.hash == hash && n.ns == ns && n.key == key { atomic.AddInt32(&n.ref, 1) b.mu.Unlock() return true, false, n } } // Get only. if noset { b.mu.Unlock() return true, false, nil } // Create node. n = &Node{ r: r, hash: hash, ns: ns, key: key, ref: 1, } // Add node to bucket. b.node = append(b.node, n) bLen := len(b.node) b.mu.Unlock() // Update counter. grow := atomic.AddInt32(&r.nodes, 1) >= h.growThreshold if bLen > mOverflowThreshold { grow = grow || atomic.AddInt32(&h.overflow, 1) >= mOverflowGrowThreshold } // Grow. if grow && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) { nhLen := len(h.buckets) << 1 nh := &mNode{ buckets: make([]unsafe.Pointer, nhLen), mask: uint32(nhLen) - 1, pred: unsafe.Pointer(h), growThreshold: int32(nhLen * mOverflowThreshold), shrinkThreshold: int32(nhLen >> 1), } ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh)) if !ok { panic("BUG: failed swapping head") } go nh.initBuckets() } return true, true, n } func (b *mBucket) delete(r *Cache, h *mNode, hash uint32, ns, key uint64) (done, deleted bool) { b.mu.Lock() if b.frozen { b.mu.Unlock() return } // Scan the node. var ( n *Node bLen int ) for i := range b.node { n = b.node[i] if n.ns == ns && n.key == key { if atomic.LoadInt32(&n.ref) == 0 { deleted = true // Call releaser. if n.value != nil { if r, ok := n.value.(util.Releaser); ok { r.Release() } n.value = nil } // Remove node from bucket. b.node = append(b.node[:i], b.node[i+1:]...) bLen = len(b.node) } break } } b.mu.Unlock() if deleted { // Call OnDel. for _, f := range n.onDel { f() } // Update counter. atomic.AddInt32(&r.size, int32(n.size)*-1) shrink := atomic.AddInt32(&r.nodes, -1) < h.shrinkThreshold if bLen >= mOverflowThreshold { atomic.AddInt32(&h.overflow, -1) } // Shrink. if shrink && len(h.buckets) > mInitialSize && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) { nhLen := len(h.buckets) >> 1 nh := &mNode{ buckets: make([]unsafe.Pointer, nhLen), mask: uint32(nhLen) - 1, pred: unsafe.Pointer(h), growThreshold: int32(nhLen * mOverflowThreshold), shrinkThreshold: int32(nhLen >> 1), } ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh)) if !ok { panic("BUG: failed swapping head") } go nh.initBuckets() } } return true, deleted } type mNode struct { buckets []unsafe.Pointer // []*mBucket mask uint32 pred unsafe.Pointer // *mNode resizeInProgess int32 overflow int32 growThreshold int32 shrinkThreshold int32 } func (n *mNode) initBucket(i uint32) *mBucket { if b := (*mBucket)(atomic.LoadPointer(&n.buckets[i])); b != nil { return b } p := (*mNode)(atomic.LoadPointer(&n.pred)) if p != nil { var node []*Node if n.mask > p.mask { // Grow. pb := (*mBucket)(atomic.LoadPointer(&p.buckets[i&p.mask])) if pb == nil { pb = p.initBucket(i & p.mask) } m := pb.freeze() // Split nodes. for _, x := range m { if x.hash&n.mask == i { node = append(node, x) } } } else { // Shrink. pb0 := (*mBucket)(atomic.LoadPointer(&p.buckets[i])) if pb0 == nil { pb0 = p.initBucket(i) } pb1 := (*mBucket)(atomic.LoadPointer(&p.buckets[i+uint32(len(n.buckets))])) if pb1 == nil { pb1 = p.initBucket(i + uint32(len(n.buckets))) } m0 := pb0.freeze() m1 := pb1.freeze() // Merge nodes. node = make([]*Node, 0, len(m0)+len(m1)) node = append(node, m0...) node = append(node, m1...) } b := &mBucket{node: node} if atomic.CompareAndSwapPointer(&n.buckets[i], nil, unsafe.Pointer(b)) { if len(node) > mOverflowThreshold { atomic.AddInt32(&n.overflow, int32(len(node)-mOverflowThreshold)) } return b } } return (*mBucket)(atomic.LoadPointer(&n.buckets[i])) } func (n *mNode) initBuckets() { for i := range n.buckets { n.initBucket(uint32(i)) } atomic.StorePointer(&n.pred, nil) } // Cache is a 'cache map'. type Cache struct { mu sync.RWMutex mHead unsafe.Pointer // *mNode nodes int32 size int32 cacher Cacher closed bool } // NewCache creates a new 'cache map'. The cacher is optional and // may be nil. func NewCache(cacher Cacher) *Cache { h := &mNode{ buckets: make([]unsafe.Pointer, mInitialSize), mask: mInitialSize - 1, growThreshold: int32(mInitialSize * mOverflowThreshold), shrinkThreshold: 0, } for i := range h.buckets { h.buckets[i] = unsafe.Pointer(&mBucket{}) } r := &Cache{ mHead: unsafe.Pointer(h), cacher: cacher, } return r } func (r *Cache) getBucket(hash uint32) (*mNode, *mBucket) { h := (*mNode)(atomic.LoadPointer(&r.mHead)) i := hash & h.mask b := (*mBucket)(atomic.LoadPointer(&h.buckets[i])) if b == nil { b = h.initBucket(i) } return h, b } func (r *Cache) delete(n *Node) bool { for { h, b := r.getBucket(n.hash) done, deleted := b.delete(r, h, n.hash, n.ns, n.key) if done { return deleted } } } // Nodes returns number of 'cache node' in the map. func (r *Cache) Nodes() int { return int(atomic.LoadInt32(&r.nodes)) } // Size returns sums of 'cache node' size in the map. func (r *Cache) Size() int { return int(atomic.LoadInt32(&r.size)) } // Capacity returns cache capacity. func (r *Cache) Capacity() int { if r.cacher == nil { return 0 } return r.cacher.Capacity() } // SetCapacity sets cache capacity. func (r *Cache) SetCapacity(capacity int) { if r.cacher != nil { r.cacher.SetCapacity(capacity) } } // Get gets 'cache node' with the given namespace and key. // If cache node is not found and setFunc is not nil, Get will atomically creates // the 'cache node' by calling setFunc. Otherwise Get will returns nil. // // The returned 'cache handle' should be released after use by calling Release // method. func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Handle { r.mu.RLock() defer r.mu.RUnlock() if r.closed { return nil } hash := murmur32(ns, key, 0xf00) for { h, b := r.getBucket(hash) done, _, n := b.get(r, h, hash, ns, key, setFunc == nil) if done { if n != nil { n.mu.Lock() if n.value == nil { if setFunc == nil { n.mu.Unlock() n.unref() return nil } n.size, n.value = setFunc() if n.value == nil { n.size = 0 n.mu.Unlock() n.unref() return nil } atomic.AddInt32(&r.size, int32(n.size)) } n.mu.Unlock() if r.cacher != nil { r.cacher.Promote(n) } return &Handle{unsafe.Pointer(n)} } break } } return nil } // Delete removes and ban 'cache node' with the given namespace and key. // A banned 'cache node' will never inserted into the 'cache tree'. Ban // only attributed to the particular 'cache node', so when a 'cache node' // is recreated it will not be banned. // // If onDel is not nil, then it will be executed if such 'cache node' // doesn't exist or once the 'cache node' is released. // // Delete return true is such 'cache node' exist. func (r *Cache) Delete(ns, key uint64, onDel func()) bool { r.mu.RLock() defer r.mu.RUnlock() if r.closed { return false } hash := murmur32(ns, key, 0xf00) for { h, b := r.getBucket(hash) done, _, n := b.get(r, h, hash, ns, key, true) if done { if n != nil { if onDel != nil { n.mu.Lock() n.onDel = append(n.onDel, onDel) n.mu.Unlock() } if r.cacher != nil { r.cacher.Ban(n) } n.unref() return true } break } } if onDel != nil { onDel() } return false } // Evict evicts 'cache node' with the given namespace and key. This will // simply call Cacher.Evict. // // Evict return true is such 'cache node' exist. func (r *Cache) Evict(ns, key uint64) bool { r.mu.RLock() defer r.mu.RUnlock() if r.closed { return false } hash := murmur32(ns, key, 0xf00) for { h, b := r.getBucket(hash) done, _, n := b.get(r, h, hash, ns, key, true) if done { if n != nil { if r.cacher != nil { r.cacher.Evict(n) } n.unref() return true } break } } return false } // EvictNS evicts 'cache node' with the given namespace. This will // simply call Cacher.EvictNS. func (r *Cache) EvictNS(ns uint64) { r.mu.RLock() defer r.mu.RUnlock() if r.closed { return } if r.cacher != nil { r.cacher.EvictNS(ns) } } // EvictAll evicts all 'cache node'. This will simply call Cacher.EvictAll. func (r *Cache) EvictAll() { r.mu.RLock() defer r.mu.RUnlock() if r.closed { return } if r.cacher != nil { r.cacher.EvictAll() } } // Close closes the 'cache map' and forcefully releases all 'cache node'. func (r *Cache) Close() error { r.mu.Lock() if !r.closed { r.closed = true h := (*mNode)(r.mHead) h.initBuckets() for i := range h.buckets { b := (*mBucket)(h.buckets[i]) for _, n := range b.node { // Call releaser. if n.value != nil { if r, ok := n.value.(util.Releaser); ok { r.Release() } n.value = nil } // Call OnDel. for _, f := range n.onDel { f() } n.onDel = nil } } } r.mu.Unlock() // Avoid deadlock. if r.cacher != nil { if err := r.cacher.Close(); err != nil { return err } } return nil } // CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but // unlike Close it doesn't forcefully releases 'cache node'. func (r *Cache) CloseWeak() error { r.mu.Lock() if !r.closed { r.closed = true } r.mu.Unlock() // Avoid deadlock. if r.cacher != nil { r.cacher.EvictAll() if err := r.cacher.Close(); err != nil { return err } } return nil } // Node is a 'cache node'. type Node struct { r *Cache hash uint32 ns, key uint64 mu sync.Mutex size int value Value ref int32 onDel []func() CacheData unsafe.Pointer } // NS returns this 'cache node' namespace. func (n *Node) NS() uint64 { return n.ns } // Key returns this 'cache node' key. func (n *Node) Key() uint64 { return n.key } // Size returns this 'cache node' size. func (n *Node) Size() int { return n.size } // Value returns this 'cache node' value. func (n *Node) Value() Value { return n.value } // Ref returns this 'cache node' ref counter. func (n *Node) Ref() int32 { return atomic.LoadInt32(&n.ref) } // GetHandle returns an handle for this 'cache node'. func (n *Node) GetHandle() *Handle { if atomic.AddInt32(&n.ref, 1) <= 1 { panic("BUG: Node.GetHandle on zero ref") } return &Handle{unsafe.Pointer(n)} } func (n *Node) unref() { if atomic.AddInt32(&n.ref, -1) == 0 { n.r.delete(n) } } func (n *Node) unrefLocked() { if atomic.AddInt32(&n.ref, -1) == 0 { n.r.mu.RLock() if !n.r.closed { n.r.delete(n) } n.r.mu.RUnlock() } } // Handle is a 'cache handle' of a 'cache node'. type Handle struct { n unsafe.Pointer // *Node } // Value returns the value of the 'cache node'. func (h *Handle) Value() Value { n := (*Node)(atomic.LoadPointer(&h.n)) if n != nil { return n.value } return nil } // Release releases this 'cache handle'. // It is safe to call release multiple times. func (h *Handle) Release() { nPtr := atomic.LoadPointer(&h.n) if nPtr != nil && atomic.CompareAndSwapPointer(&h.n, nPtr, nil) { n := (*Node)(nPtr) n.unrefLocked() } } func murmur32(ns, key uint64, seed uint32) uint32 { const ( m = uint32(0x5bd1e995) r = 24 ) k1 := uint32(ns >> 32) k2 := uint32(ns) k3 := uint32(key >> 32) k4 := uint32(key) k1 *= m k1 ^= k1 >> r k1 *= m k2 *= m k2 ^= k2 >> r k2 *= m k3 *= m k3 ^= k3 >> r k3 *= m k4 *= m k4 ^= k4 >> r k4 *= m h := seed h *= m h ^= k1 h *= m h ^= k2 h *= m h ^= k3 h *= m h ^= k4 h ^= h >> 13 h *= m h ^= h >> 15 return h }