-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: introduce a ttl cache for resources #482
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
5788d07
feat: introduce a ttl cache for resources
atzoum e84ea02
fixup! feat: introduce a ttl cache for resources
atzoum 651ca1f
fixup! feat: introduce a ttl cache for resources
atzoum fafd33c
fixup! feat: introduce a ttl cache for resources
atzoum 0135630
fixup! feat: introduce a ttl cache for resources
atzoum File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
package resourcettl | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/google/uuid" | ||
|
||
"github.com/rudderlabs/rudder-go-kit/cachettl" | ||
kitsync "github.com/rudderlabs/rudder-go-kit/sync" | ||
) | ||
|
||
// NewCache creates a new resource cache. | ||
// | ||
// - ttl - is the time after which the resource is considered expired and cleaned up. | ||
// | ||
// A resource's ttl is extended every time it is checked out. | ||
// | ||
// The cache keeps track of the resources' usage and makes sure that | ||
// expired resources are not cleaned up while they are still in use | ||
// and cleaned up only when they are no longer needed (zero checkouts). | ||
// | ||
// Resources with any of following methods can be cleaned up: | ||
// - Cleanup() | ||
// - Close() | ||
// - Close() error | ||
// - Stop() | ||
// - Stop() error | ||
func NewCache[K comparable, R any](ttl time.Duration) *Cache[K, R] { | ||
c := &Cache[K, R]{ | ||
keyMu: kitsync.NewPartitionLocker(), | ||
resources: make(map[string]R), | ||
checkouts: make(map[string]int), | ||
expiries: make(map[string]struct{}), | ||
ttl: ttl, | ||
ttlcache: cachettl.New[K, string](), | ||
} | ||
c.ttlcache.OnEvicted(c.onEvicted) | ||
return c | ||
} | ||
|
||
// Cache is a cache for resources that need to be closed/cleaned-up after expiration. | ||
// | ||
// The cache keeps track of the resources' usage and makes sure that | ||
// expired resources are not cleaned up while they are still in use | ||
// and cleaned up only when they are no longer needed (zero checkouts). | ||
// | ||
// Resources with any of following methods can be cleaned up: | ||
// - Cleanup() | ||
// - Close() | ||
// - Close() error | ||
// - Stop() | ||
// - Stop() error | ||
type Cache[K comparable, R any] struct { | ||
// synchronizes access to the cache for a given key. This is to | ||
// allow multiple go-routines to access the cache concurrently for different keys, but still | ||
// avoid multiple go-routines creating multiple resources for the same key. | ||
keyMu *kitsync.PartitionLocker | ||
|
||
mu sync.RWMutex // protects the following maps | ||
resources map[string]R // maps an resourceID to its resource | ||
checkouts map[string]int // keeps track of how many checkouts are active for a given resourceID | ||
expiries map[string]struct{} // keeps track of resources that are expired and need to be cleaned up after all checkouts are done | ||
|
||
ttl time.Duration | ||
ttlcache *cachettl.Cache[K, string] | ||
} | ||
|
||
// Checkout returns a resource for the given key. If the resource is not available, it creates a new one, using the new function. | ||
// The caller must call the returned checkin function when the resource is no longer needed, to release the resource. | ||
// Multiple checkouts for the same key are allowed and they can all share the same resource. The resource is cleaned up | ||
// only when all checkouts are checked-in and the resource's ttl has expired (or its key has been invalidated through [Invalidate]). | ||
func (c *Cache[K, R]) Checkout(key K, new func() (R, error)) (resource R, checkin func(), err error) { | ||
defer c.lockKey(key)() | ||
|
||
if resourceID := c.ttlcache.Get(key); resourceID != "" { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
r := c.resources[resourceID] | ||
c.checkouts[resourceID]++ | ||
return r, c.checkinFunc(r, resourceID), nil | ||
} | ||
return c.newResource(key, new) | ||
} | ||
|
||
// Invalidate invalidates the resource for the given key. | ||
func (c *Cache[K, R]) Invalidate(key K) { | ||
defer c.lockKey(key)() | ||
resourceID := c.ttlcache.Get(key) | ||
if resourceID != "" { | ||
c.ttlcache.Put(key, "", -1) | ||
} | ||
if resourceID != "" { | ||
c.onEvicted(key, resourceID) | ||
} | ||
} | ||
|
||
// newResource creates a new resource for the given key. | ||
func (c *Cache[K, R]) newResource(key K, new func() (R, error)) (R, func(), error) { | ||
r, err := new() | ||
if err != nil { | ||
return r, nil, err | ||
} | ||
|
||
resourceID := uuid.NewString() | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
c.resources[resourceID] = r | ||
c.checkouts[resourceID]++ | ||
c.ttlcache.Put(key, resourceID, c.ttl) | ||
return r, c.checkinFunc(r, resourceID), nil | ||
} | ||
|
||
// checkinFunc returns a function that decrements the checkout count and cleans up the resource if it is no longer needed. | ||
func (c *Cache[K, R]) checkinFunc(r R, resourceID string) func() { | ||
var once sync.Once | ||
return func() { | ||
once.Do(func() { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
c.checkouts[resourceID]-- | ||
if _, ok := c.expiries[resourceID]; ok && // resource is expired | ||
c.checkouts[resourceID] == 0 { // no more checkouts | ||
delete(c.expiries, resourceID) | ||
go c.cleanup(r) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
// onEvicted is called when a key is evicted from the cache. It cleans up the resource if it is not checked out. | ||
func (c *Cache[K, R]) onEvicted(_ K, resourceID string) { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
checkouts, ok := c.checkouts[resourceID] | ||
if !ok { | ||
return // already cleaned up through Invalidate | ||
} | ||
if checkouts == 0 { | ||
r := c.resources[resourceID] | ||
delete(c.resources, resourceID) | ||
delete(c.checkouts, resourceID) | ||
go c.cleanup(r) | ||
} else { // mark the resource for cleanup | ||
c.expiries[resourceID] = struct{}{} | ||
} | ||
} | ||
|
||
// cleanup cleans up the resource if it implements the cleanup interface or io.Closer. | ||
func (c *Cache[K, R]) cleanup(r R) { | ||
cleanup := func() {} | ||
var v any = r | ||
switch v := v.(type) { | ||
case interface{ Cleanup() }: | ||
cleanup = v.Cleanup | ||
case interface{ Cleanup() error }: | ||
cleanup = func() { _ = v.Cleanup() } | ||
case interface{ Close() }: | ||
cleanup = v.Close | ||
case interface{ Close() error }: | ||
cleanup = func() { _ = v.Close() } | ||
case interface{ Stop() }: | ||
cleanup = v.Stop | ||
case interface{ Stop() error }: | ||
cleanup = func() { _ = v.Stop() } | ||
} | ||
cleanup() | ||
} | ||
|
||
// lockKey locks the key for exclusive access and returns a function to unlock it, which can be deferred. | ||
func (c *Cache[K, R]) lockKey(key K) func() { | ||
k := fmt.Sprintf("%v", key) | ||
c.keyMu.Lock(k) | ||
return func() { | ||
c.keyMu.Unlock(k) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
package resourcettl_test | ||
|
||
import ( | ||
"sync/atomic" | ||
"testing" | ||
"time" | ||
|
||
"github.com/google/uuid" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/rudderlabs/rudder-go-kit/resourcettl" | ||
) | ||
|
||
func TestCache(t *testing.T) { | ||
const key = "key" | ||
ttl := 10 * time.Millisecond | ||
t.Run("checkout, checkin, then expire", func(t *testing.T) { | ||
t.Run("using cleanup", func(t *testing.T) { | ||
producer := &MockProducer{} | ||
c := resourcettl.NewCache[string, *cleanuper](ttl) | ||
|
||
r1, checkin1, err1 := c.Checkout(key, producer.NewCleanuper) | ||
require.NoError(t, err1, "it should be able to create a new resource") | ||
require.NotNil(t, r1, "it should return a resource") | ||
require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource") | ||
|
||
r2, checkin2, err2 := c.Checkout(key, producer.NewCleanuper) | ||
require.NoError(t, err2, "it should be able to checkout the same resource") | ||
require.NotNil(t, r2, "it should return a resource") | ||
require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource") | ||
require.Equal(t, r1.id, r2.id, "it should return the same resource") | ||
|
||
time.Sleep(ttl + time.Millisecond) | ||
checkin1() | ||
checkin2() | ||
|
||
r3, checkin3, err3 := c.Checkout(key, producer.NewCleanuper) | ||
require.NoError(t, err3, "it should be able to create a new resource") | ||
require.NotNil(t, r3, "it should return a resource") | ||
require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one expired") | ||
require.NotEqual(t, r1.id, r3.id, "it should return a different resource") | ||
time.Sleep(time.Millisecond) // wait for async cleanup | ||
require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource") | ||
checkin3() | ||
}) | ||
|
||
t.Run("using closer", func(t *testing.T) { | ||
producer := &MockProducer{} | ||
c := resourcettl.NewCache[string, *closer](ttl) | ||
|
||
r1, checkin1, err1 := c.Checkout(key, producer.NewCloser) | ||
require.NoError(t, err1, "it should be able to create a new resource") | ||
require.NotNil(t, r1, "it should return a resource") | ||
require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource") | ||
|
||
r2, checkin2, err2 := c.Checkout(key, producer.NewCloser) | ||
require.NoError(t, err2, "it should be able to checkout the same resource") | ||
require.NotNil(t, r2, "it should return a resource") | ||
require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource") | ||
require.Equal(t, r1.id, r2.id, "it should return the same resource") | ||
|
||
time.Sleep(ttl + time.Millisecond) | ||
checkin1() | ||
checkin2() | ||
|
||
r3, checkin3, err3 := c.Checkout(key, producer.NewCloser) | ||
require.NoError(t, err3, "it should be able to create a new resource") | ||
require.NotNil(t, r3, "it should return a resource") | ||
require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one expired") | ||
require.NotEqual(t, r1.id, r3.id, "it should return a different resource") | ||
time.Sleep(time.Millisecond) // wait for async cleanup | ||
require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource") | ||
checkin3() | ||
}) | ||
}) | ||
|
||
t.Run("expire while being used", func(t *testing.T) { | ||
producer := &MockProducer{} | ||
c := resourcettl.NewCache[string, *cleanuper](ttl) | ||
|
||
r1, checkin1, err1 := c.Checkout(key, producer.NewCleanuper) | ||
require.NoError(t, err1, "it should be able to create a new resource") | ||
require.NotNil(t, r1, "it should return a resource") | ||
require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource") | ||
|
||
r2, checkin2, err2 := c.Checkout(key, producer.NewCleanuper) | ||
require.NoError(t, err2, "it should be able to checkout the same resource") | ||
require.NotNil(t, r2, "it should return a resource") | ||
require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource") | ||
require.Equal(t, r1.id, r2.id, "it should return the same resource") | ||
|
||
time.Sleep(ttl + time.Millisecond) // wait for expiration | ||
|
||
r3, checkin3, err3 := c.Checkout(key, producer.NewCleanuper) | ||
require.NoError(t, err3, "it should be able to return a resource") | ||
require.NotNil(t, r3, "it should return a resource") | ||
require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one expired") | ||
require.NotEqual(t, r1.id, r3.id, "it should return a different resource") | ||
require.EqualValues(t, 0, r1.cleanups.Load(), "it shouldn't cleanup the expired resource yet since it is being used by 2 clients") | ||
checkin1() | ||
time.Sleep(time.Millisecond) // wait for async cleanup | ||
require.EqualValues(t, 0, r1.cleanups.Load(), "it shouldn't cleanup the expired resource yet since it is being used by 1 clients") | ||
checkin2() | ||
time.Sleep(time.Millisecond) // wait for async cleanup | ||
require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource since it is not being used by any client") | ||
checkin3() | ||
}) | ||
|
||
t.Run("invalidate", func(t *testing.T) { | ||
producer := &MockProducer{} | ||
c := resourcettl.NewCache[string, *cleanuper](ttl) | ||
|
||
r1, checkin1, err1 := c.Checkout(key, producer.NewCleanuper) | ||
require.NoError(t, err1, "it should be able to create a new resource") | ||
require.NotNil(t, r1, "it should return a resource") | ||
require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource") | ||
|
||
r2, checkin2, err2 := c.Checkout(key, producer.NewCleanuper) | ||
require.NoError(t, err2, "it should be able to checkout the same resource") | ||
require.NotNil(t, r2, "it should return a resource") | ||
require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource") | ||
require.Equal(t, r1.id, r2.id, "it should return the same resource") | ||
|
||
c.Invalidate(key) | ||
|
||
r3, checkin3, err3 := c.Checkout(key, producer.NewCleanuper) | ||
require.NoError(t, err3, "it should be able to create a new resource") | ||
require.NotNil(t, r3, "it should return a resource") | ||
require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one was invalidated") | ||
require.NotEqual(t, r1.id, r3.id, "it should return a different resource") | ||
time.Sleep(time.Millisecond) // wait for async cleanup | ||
require.EqualValues(t, 0, r1.cleanups.Load(), "it shouldn't cleanup the expired resource yet since it is being used by 2 clients") | ||
checkin1() | ||
time.Sleep(time.Millisecond) // wait for async cleanup | ||
require.EqualValues(t, 0, r1.cleanups.Load(), "it shouldn't cleanup the expired resource yet since it is being used by 1 client") | ||
checkin2() | ||
time.Sleep(time.Millisecond) // wait for async cleanup | ||
require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource") | ||
checkin3() | ||
}) | ||
} | ||
|
||
type MockProducer struct { | ||
instances atomic.Int32 | ||
} | ||
|
||
func (m *MockProducer) NewCleanuper() (*cleanuper, error) { | ||
m.instances.Add(1) | ||
return &cleanuper{id: uuid.NewString()}, nil | ||
} | ||
|
||
func (m *MockProducer) NewCloser() (*closer, error) { | ||
m.instances.Add(1) | ||
return &closer{id: uuid.NewString()}, nil | ||
} | ||
|
||
type cleanuper struct { | ||
id string | ||
cleanups atomic.Int32 | ||
} | ||
|
||
func (m *cleanuper) Cleanup() { | ||
m.cleanups.Add(1) | ||
} | ||
|
||
type closer struct { | ||
id string | ||
cleanups atomic.Int32 | ||
} | ||
|
||
func (m *closer) Close() error { | ||
m.cleanups.Add(1) | ||
return nil | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could the lack of synchronization create issues here? Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using another go-routine so that the main go-routine is not blocked. At this point no one should be using the resource, thus doing async cleanup should be safe