Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce a ttl cache for resources #482

Merged
merged 5 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions cachettl/cachettl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
// the tail node (end) is the node with the highest expiration time
// Cleanups are done on Get() calls so if Get() is never invoked then Nodes stay in-memory.
type Cache[K comparable, V any] struct {
root *node[K, V]
mu sync.Mutex
m map[K]*node[K, V]
now func() time.Time
root *node[K, V]
mu sync.Mutex
m map[K]*node[K, V]
now func() time.Time
onEvicted func(key K, value V)
}

type node[K comparable, V any] struct {
Expand Down Expand Up @@ -49,8 +50,11 @@ func (c *Cache[K, V]) Get(key K) (zero V) {
cn := c.root.next // start from head since we're sorting by expiration with the highest expiration at the tail
for cn != nil && cn != c.root {
if c.now().After(cn.expiration) {
cn.remove() // removes a node from the linked list (leaves the map untouched)
delete(c.m, cn.key) // remove node from map too
cn.remove() // removes a node from the linked list (leaves the map untouched)
delete(c.m, cn.key) // remove node from map too
if c.onEvicted != nil { // call the OnEvicted callback if it's set
c.onEvicted(cn.key, cn.value)
}
} else { // there is nothing else to clean up, no need to iterate further
break
}
Expand Down Expand Up @@ -101,6 +105,10 @@ func (c *Cache[K, V]) Put(key K, value V, ttl time.Duration) {
c.add(n)
}

func (c *Cache[K, V]) OnEvicted(onEvicted func(key K, value V)) {
c.onEvicted = onEvicted
}

func (c *Cache[K, V]) add(n *node[K, V]) {
cn := c.root.prev // tail
for cn != nil { // iterate from tail to root because we have expiring nodes towards the tail
Expand Down
178 changes: 178 additions & 0 deletions resourcettl/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package resourcettl

import (
"fmt"
"sync"
"time"

"github.com/google/uuid"

"github.com/rudderlabs/rudder-go-kit/cachettl"
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
)

// NewCache creates a new resource cache.
//
// - ttl - is the time after which the resource is considered expired and cleaned up.
//
// A resource's ttl is extended every time it is checked out.
//
// The cache keeps track of the resources' usage and makes sure that
// expired resources are not cleaned up while they are still in use
// and cleaned up only when they are no longer needed (zero checkouts).
//
// Resources with any of following methods can be cleaned up:
// - Cleanup()
// - Close()
// - Close() error
// - Stop()
// - Stop() error
func NewCache[K comparable, R any](ttl time.Duration) *Cache[K, R] {
c := &Cache[K, R]{
keyMu: kitsync.NewPartitionLocker(),
resources: make(map[string]R),
checkouts: make(map[string]int),
expiries: make(map[string]struct{}),
ttl: ttl,
ttlcache: cachettl.New[K, string](),
}
c.ttlcache.OnEvicted(c.onEvicted)
return c
}

// Cache is a cache for resources that need to be closed/cleaned-up after expiration.
//
// The cache keeps track of the resources' usage and makes sure that
// expired resources are not cleaned up while they are still in use
// and cleaned up only when they are no longer needed (zero checkouts).
//
// Resources with any of following methods can be cleaned up:
// - Cleanup()
// - Close()
// - Close() error
// - Stop()
// - Stop() error
type Cache[K comparable, R any] struct {
// synchronizes access to the cache for a given key. This is to
// allow multiple go-routines to access the cache concurrently for different keys, but still
// avoid multiple go-routines creating multiple resources for the same key.
keyMu *kitsync.PartitionLocker

mu sync.RWMutex // protects the following maps
resources map[string]R // maps an resourceID to its resource
checkouts map[string]int // keeps track of how many checkouts are active for a given resourceID
expiries map[string]struct{} // keeps track of resources that are expired and need to be cleaned up after all checkouts are done

ttl time.Duration
ttlcache *cachettl.Cache[K, string]
}

// Checkout returns a resource for the given key. If the resource is not available, it creates a new one, using the new function.
// The caller must call the returned checkin function when the resource is no longer needed, to release the resource.
// Multiple checkouts for the same key are allowed and they can all share the same resource. The resource is cleaned up
// only when all checkouts are checked-in and the resource's ttl has expired (or its key has been invalidated through [Invalidate]).
func (c *Cache[K, R]) Checkout(key K, new func() (R, error)) (resource R, checkin func(), err error) {
defer c.lockKey(key)()

if resourceID := c.ttlcache.Get(key); resourceID != "" {
c.mu.Lock()
defer c.mu.Unlock()
r := c.resources[resourceID]
c.checkouts[resourceID]++
return r, c.checkinFunc(r, resourceID), nil
}
return c.newResource(key, new)
}

// Invalidate invalidates the resource for the given key.
func (c *Cache[K, R]) Invalidate(key K) {
defer c.lockKey(key)()
resourceID := c.ttlcache.Get(key)
if resourceID != "" {
c.ttlcache.Put(key, "", -1)
}
if resourceID != "" {
c.onEvicted(key, resourceID)
}
}

// newResource creates a new resource for the given key.
func (c *Cache[K, R]) newResource(key K, new func() (R, error)) (R, func(), error) {
r, err := new()
if err != nil {
return r, nil, err
}

resourceID := uuid.NewString()
c.mu.Lock()
defer c.mu.Unlock()
c.resources[resourceID] = r
c.checkouts[resourceID]++
c.ttlcache.Put(key, resourceID, c.ttl)
return r, c.checkinFunc(r, resourceID), nil
}

// checkinFunc returns a function that decrements the checkout count and cleans up the resource if it is no longer needed.
func (c *Cache[K, R]) checkinFunc(r R, resourceID string) func() {
var once sync.Once
return func() {
once.Do(func() {
c.mu.Lock()
defer c.mu.Unlock()
c.checkouts[resourceID]--
if _, ok := c.expiries[resourceID]; ok && // resource is expired
c.checkouts[resourceID] == 0 { // no more checkouts
delete(c.expiries, resourceID)
go c.cleanup(r)
}
})
}
}

// onEvicted is called when a key is evicted from the cache. It cleans up the resource if it is not checked out.
func (c *Cache[K, R]) onEvicted(_ K, resourceID string) {
c.mu.Lock()
defer c.mu.Unlock()
checkouts, ok := c.checkouts[resourceID]
if !ok {
return // already cleaned up through Invalidate
}
if checkouts == 0 {
r := c.resources[resourceID]
delete(c.resources, resourceID)
delete(c.checkouts, resourceID)
go c.cleanup(r)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the lack of synchronization create issues here? Wdyt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using another go-routine so that the main go-routine is not blocked. At this point no one should be using the resource, thus doing async cleanup should be safe

} else { // mark the resource for cleanup
c.expiries[resourceID] = struct{}{}
}
}

// cleanup cleans up the resource if it implements the cleanup interface or io.Closer.
func (c *Cache[K, R]) cleanup(r R) {
cleanup := func() {}
var v any = r
switch v := v.(type) {
case interface{ Cleanup() }:
cleanup = v.Cleanup
case interface{ Cleanup() error }:
cleanup = func() { _ = v.Cleanup() }
case interface{ Close() }:
cleanup = v.Close
case interface{ Close() error }:
cleanup = func() { _ = v.Close() }
case interface{ Stop() }:
cleanup = v.Stop
case interface{ Stop() error }:
cleanup = func() { _ = v.Stop() }
}
cleanup()
}

// lockKey locks the key for exclusive access and returns a function to unlock it, which can be deferred.
func (c *Cache[K, R]) lockKey(key K) func() {
k := fmt.Sprintf("%v", key)
c.keyMu.Lock(k)
return func() {
c.keyMu.Unlock(k)
}
}
174 changes: 174 additions & 0 deletions resourcettl/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package resourcettl_test

import (
"sync/atomic"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/resourcettl"
)

func TestCache(t *testing.T) {
const key = "key"
ttl := 10 * time.Millisecond
t.Run("checkout, checkin, then expire", func(t *testing.T) {
t.Run("using cleanup", func(t *testing.T) {
producer := &MockProducer{}
c := resourcettl.NewCache[string, *cleanuper](ttl)

r1, checkin1, err1 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err1, "it should be able to create a new resource")
require.NotNil(t, r1, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource")

r2, checkin2, err2 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err2, "it should be able to checkout the same resource")
require.NotNil(t, r2, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource")
require.Equal(t, r1.id, r2.id, "it should return the same resource")

time.Sleep(ttl + time.Millisecond)
checkin1()
checkin2()

r3, checkin3, err3 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err3, "it should be able to create a new resource")
require.NotNil(t, r3, "it should return a resource")
require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one expired")
require.NotEqual(t, r1.id, r3.id, "it should return a different resource")
time.Sleep(time.Millisecond) // wait for async cleanup
require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource")
checkin3()
})

t.Run("using closer", func(t *testing.T) {
producer := &MockProducer{}
c := resourcettl.NewCache[string, *closer](ttl)

r1, checkin1, err1 := c.Checkout(key, producer.NewCloser)
require.NoError(t, err1, "it should be able to create a new resource")
require.NotNil(t, r1, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource")

r2, checkin2, err2 := c.Checkout(key, producer.NewCloser)
require.NoError(t, err2, "it should be able to checkout the same resource")
require.NotNil(t, r2, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource")
require.Equal(t, r1.id, r2.id, "it should return the same resource")

time.Sleep(ttl + time.Millisecond)
checkin1()
checkin2()

r3, checkin3, err3 := c.Checkout(key, producer.NewCloser)
require.NoError(t, err3, "it should be able to create a new resource")
require.NotNil(t, r3, "it should return a resource")
require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one expired")
require.NotEqual(t, r1.id, r3.id, "it should return a different resource")
time.Sleep(time.Millisecond) // wait for async cleanup
require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource")
checkin3()
})
})

t.Run("expire while being used", func(t *testing.T) {
producer := &MockProducer{}
c := resourcettl.NewCache[string, *cleanuper](ttl)

r1, checkin1, err1 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err1, "it should be able to create a new resource")
require.NotNil(t, r1, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource")

r2, checkin2, err2 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err2, "it should be able to checkout the same resource")
require.NotNil(t, r2, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource")
require.Equal(t, r1.id, r2.id, "it should return the same resource")

time.Sleep(ttl + time.Millisecond) // wait for expiration

r3, checkin3, err3 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err3, "it should be able to return a resource")
require.NotNil(t, r3, "it should return a resource")
require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one expired")
require.NotEqual(t, r1.id, r3.id, "it should return a different resource")
require.EqualValues(t, 0, r1.cleanups.Load(), "it shouldn't cleanup the expired resource yet since it is being used by 2 clients")
checkin1()
time.Sleep(time.Millisecond) // wait for async cleanup
require.EqualValues(t, 0, r1.cleanups.Load(), "it shouldn't cleanup the expired resource yet since it is being used by 1 clients")
checkin2()
time.Sleep(time.Millisecond) // wait for async cleanup
require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource since it is not being used by any client")
checkin3()
})

t.Run("invalidate", func(t *testing.T) {
producer := &MockProducer{}
c := resourcettl.NewCache[string, *cleanuper](ttl)

r1, checkin1, err1 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err1, "it should be able to create a new resource")
require.NotNil(t, r1, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource")

r2, checkin2, err2 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err2, "it should be able to checkout the same resource")
require.NotNil(t, r2, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource")
require.Equal(t, r1.id, r2.id, "it should return the same resource")

c.Invalidate(key)

r3, checkin3, err3 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err3, "it should be able to create a new resource")
require.NotNil(t, r3, "it should return a resource")
require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one was invalidated")
require.NotEqual(t, r1.id, r3.id, "it should return a different resource")
time.Sleep(time.Millisecond) // wait for async cleanup
require.EqualValues(t, 0, r1.cleanups.Load(), "it shouldn't cleanup the expired resource yet since it is being used by 2 clients")
checkin1()
time.Sleep(time.Millisecond) // wait for async cleanup
require.EqualValues(t, 0, r1.cleanups.Load(), "it shouldn't cleanup the expired resource yet since it is being used by 1 client")
checkin2()
time.Sleep(time.Millisecond) // wait for async cleanup
require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource")
checkin3()
})
}

type MockProducer struct {
instances atomic.Int32
}

func (m *MockProducer) NewCleanuper() (*cleanuper, error) {
m.instances.Add(1)
return &cleanuper{id: uuid.NewString()}, nil
}

func (m *MockProducer) NewCloser() (*closer, error) {
m.instances.Add(1)
return &closer{id: uuid.NewString()}, nil
}

type cleanuper struct {
id string
cleanups atomic.Int32
}

func (m *cleanuper) Cleanup() {
m.cleanups.Add(1)
}

type closer struct {
id string
cleanups atomic.Int32
}

func (m *closer) Close() error {
m.cleanups.Add(1)
return nil
}
Loading