Skip to content

Commit

Permalink
feat: introduce a ttl cache for resources
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed May 24, 2024
1 parent b45393b commit 97f8068
Show file tree
Hide file tree
Showing 3 changed files with 375 additions and 6 deletions.
20 changes: 14 additions & 6 deletions cachettl/cachettl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
// the tail node (end) is the node with the highest expiration time
// Cleanups are done on Get() calls so if Get() is never invoked then Nodes stay in-memory.
type Cache[K comparable, V any] struct {
root *node[K, V]
mu sync.Mutex
m map[K]*node[K, V]
now func() time.Time
root *node[K, V]
mu sync.Mutex
m map[K]*node[K, V]
now func() time.Time
onevicted func(key K, value V)
}

type node[K comparable, V any] struct {
Expand Down Expand Up @@ -49,8 +50,11 @@ func (c *Cache[K, V]) Get(key K) (zero V) {
cn := c.root.next // start from head since we're sorting by expiration with the highest expiration at the tail
for cn != nil && cn != c.root {
if c.now().After(cn.expiration) {
cn.remove() // removes a node from the linked list (leaves the map untouched)
delete(c.m, cn.key) // remove node from map too
cn.remove() // removes a node from the linked list (leaves the map untouched)
delete(c.m, cn.key) // remove node from map too
if c.onevicted != nil { // call the OnEvicted callback if it's set
c.onevicted(cn.key, cn.value)
}
} else { // there is nothing else to clean up, no need to iterate further
break
}
Expand Down Expand Up @@ -101,6 +105,10 @@ func (c *Cache[K, V]) Put(key K, value V, ttl time.Duration) {
c.add(n)
}

func (c *Cache[K, V]) OnEvicted(onevicted func(key K, value V)) {
c.onevicted = onevicted
}

func (c *Cache[K, V]) add(n *node[K, V]) {
cn := c.root.prev // tail
for cn != nil { // iterate from tail to root because we have expiring nodes towards the tail
Expand Down
187 changes: 187 additions & 0 deletions resourcettl/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package resourcettl

import (
"fmt"
"sync"
"time"

"github.com/google/uuid"

"github.com/rudderlabs/rudder-go-kit/cachettl"
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
)

// NewCache creates a new resource cache.
//
// - new - function is used to create a new resource when it is not available in the cache.
// - ttl - is the time after which the resource is considered expired and cleaned up.
//
// A resource's ttl is extended every time it is checked out.
//
// The cache keeps track of the resources' usage and makes sure that
// expired resources are not cleaned up while they are still in use
// and cleaned up only when they are no longer needed (zero checkouts).
//
// Resources with any of following methods can be cleaned up:
// - Cleanup()
// - Close()
// - Close() error
// - Stop()
// - Stop() error
func NewCache[K comparable, R any](new func(key K) (R, error), ttl time.Duration) *Cache[K, R] {
c := &Cache[K, R]{
new: new,
keyMu: kitsync.NewPartitionLocker(),
latestInstance: make(map[K]string),
resources: make(map[string]R),
checkouts: make(map[string]int),
expiries: make(map[string]struct{}),
ttl: ttl,
ttlcache: cachettl.New[K, string](),
}
c.ttlcache.OnEvicted(c.onEvicted)
return c
}

// Cache is a cache for resources that need to be closed/cleaned-up after expiration.
//
// The cache keeps track of the resources' usage and makes sure that
// expired resources are not cleaned up while they are still in use
// and cleaned up only when they are no longer needed (zero checkouts).
//
// Resources with any of following methods can be cleaned up:
// - Cleanup()
// - Close()
// - Close() error
// - Stop()
// - Stop() error
type Cache[K comparable, R any] struct {
new func(key K) (R, error) // creates a new resource

// synchronizes access to the cache for a given key. This is to
// allow multiple go-routines to access the cache concurrently for different keys, but still
// avoid multiple go-routines creating multiple resources for the same key.
keyMu *kitsync.PartitionLocker

mu sync.RWMutex // protects the following maps
latestInstance map[K]string // maps a key to its latest instanceID. Multiple instances can co-exist for the same key (due to expired resources which might still be checked out)
resources map[string]R // maps an instanceID to its resource
checkouts map[string]int // keeps track of how many checkouts are there for a given instanceID
expiries map[string]struct{} // keeps track of instances that are expired and need to be cleaned up after all checkouts are done

ttl time.Duration
ttlcache *cachettl.Cache[K, string]
}

// Checkout returns a resource for the given key. If the resource is not available, it creates a new one.
// The caller must call the returned checkin function when the resource is no longer needed, to release the resource.
// Multiple checkouts for the same key are allowed and they can all share the same resource. The resource is cleaned up
// only when all checkouts are checked-in and the resource's ttl has expired (or its key has been invalidated through [Invalidate]).
func (c *Cache[K, R]) Checkout(key K) (resource R, checkin func(), err error) {
defer c.lockKey(key)()

if instanceID := c.ttlcache.Get(key); instanceID != "" {
c.mu.Lock()
defer c.mu.Unlock()
r := c.resources[instanceID]
c.checkouts[instanceID]++
return r, c.checkinFunc(key, r, instanceID), nil
}
return c.newInstance(key)
}

// Invalidate invalidates the resource for the given key.
func (c *Cache[K, R]) Invalidate(key K) {
defer c.lockKey(key)()
c.mu.Lock()
instanceID, ok := c.latestInstance[key]
c.mu.Unlock()
if ok {
c.ttlcache.Put(key, "", 0)
c.onEvicted(key, instanceID)
}
}

// newInstance creates a new resource for the given key.
func (c *Cache[K, R]) newInstance(key K) (R, func(), error) {
r, err := c.new(key)
if err != nil {
return r, nil, err
}

instanceID := uuid.NewString()
c.mu.Lock()
defer c.mu.Unlock()
c.latestInstance[key] = instanceID
c.resources[instanceID] = r
c.checkouts[instanceID]++
c.ttlcache.Put(key, instanceID, c.ttl)
return r, c.checkinFunc(key, r, instanceID), nil
}

// checkinFunc returns a function that decrements the checkout count and cleans up the resource if it is no longer needed.
func (c *Cache[K, R]) checkinFunc(key K, r R, instanceID string) func() {
var once sync.Once
return func() {
once.Do(func() {
c.mu.Lock()
defer c.mu.Unlock()
c.checkouts[instanceID]--
if _, ok := c.expiries[instanceID]; ok && // instance is expired
c.checkouts[instanceID] == 0 { // no more checkouts
delete(c.expiries, instanceID)
delete(c.latestInstance, key)
// not deleting the latestInstance entry, as it might be used by another instance
go c.cleanup(r)
}
})
}
}

// onEvicted is called when a key is evicted from the cache. It cleans up the resource if it is not checked out.
func (c *Cache[K, R]) onEvicted(_ K, instanceID string) {
c.mu.Lock()
defer c.mu.Unlock()
checkouts, ok := c.checkouts[instanceID]
if !ok {
return // already cleaned up through invalidate
}
if checkouts == 0 {
r := c.resources[instanceID]
delete(c.resources, instanceID)
delete(c.checkouts, instanceID)
go c.cleanup(r)
} else { // mark the instance for cleanup
c.expiries[instanceID] = struct{}{}
}
}

// cleanup cleans up the resource if it implements the cleanup interface or io.Closer.
func (c *Cache[K, R]) cleanup(r R) {
cleanup := func() {}
var v any = r
switch v := v.(type) {
case interface{ Cleanup() }:
cleanup = v.Cleanup
case interface{ Cleanup() error }:
cleanup = func() { _ = v.Cleanup() }
case interface{ Close() }:
cleanup = v.Close
case interface{ Close() error }:
cleanup = func() { _ = v.Close() }
case interface{ Stop() }:
cleanup = v.Stop
case interface{ Stop() error }:
cleanup = func() { _ = v.Stop() }
}
cleanup()
}

// lockKey locks the key for exclusive access and returns a function to unlock it, which can be deferred.
func (c *Cache[K, R]) lockKey(key K) func() {
k := fmt.Sprintf("%v", key)
c.keyMu.Lock(k)
return func() {
c.keyMu.Unlock(k)
}
}
Loading

0 comments on commit 97f8068

Please sign in to comment.