Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
viccon committed Jan 8, 2025
1 parent dae6326 commit a89cc59
Showing 1 changed file with 168 additions and 102 deletions.
270 changes: 168 additions & 102 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,14 @@ Next, we'll start to look at some of the more _advanced features_.

# Get or fetch

The API has been designed to make the process of integrating `sturdyc` with any
data source as straightforward as possible. The more advanced functionality is
accessed through just two core functions: `GetOrFetch` and `GetOrFetchBatch`
I have designed the API in a way that should make the process of integrating
`sturdyc` with any data source as straightforward as possible. While it
provides the basic get/set methods you would expect from a cache, the advanced
functionality is accessed through just two core functions: `GetOrFetch` and
`GetOrFetchBatch`

As an example, let's say that we had the following code for fetching orders:
As an example, let's say that we had the following code for fetching orders
from an API:

```go
func (c *Client) Order(ctx context.Context, id string) (Order, error) {
Expand Down Expand Up @@ -178,26 +181,109 @@ func (c *Client) Order(ctx context.Context, id string) (Order, error) {
}
```

The cache is then going to return the value from memory if it's available, and
otherwise it will call the `fetchFn` to retrieve the data from the underlying
data source.
The cache is going to return the value from memory if it's available, and
otherwise will call the `fetchFn` to retrieve the data from the underlying data
source.

Most of our examples are going to be retrieving data from HTTP APIs, but it's
just as easy to wrap a database query, a remote procedure call, a disk read, or
any other I/O operation.

We'll also see how we can use closures to pass query parameters and other
options.
The `fetchFn` that we pass to `GetOrFetch` has the following function
signature:

```go
type FetchFn[T any] func(ctx context.Context) (T, error)
```

For data sources capable of handling requests for multiple records at once,
we'll use `GetOrFetchBatch`:

```go
type KeyFn func(id string) string

type BatchFetchFn[T any] func(ctx context.Context, ids []string) (map[string]T, error)

func (c *Client[T]) GetOrFetchBatch(ctx context.Context, ids []string, keyFn KeyFn, fetchFn BatchFetchFn[T]) (map[string]T, error) {
// ...
}
```

There are a few things to unpack here, so let's start with the `KeyFn`. When
adding an in-memory cache to an API client capable of calling multiple
endpoints, it's highly unlikely that an ID alone is going to be enough to
uniquely identify a record.

To illustrate, let's say that we're building a Github client and want to use
this package to get around their rate limit. The username itself wouldn't make
for a good cache key because we could use it to fetch gists, commits,
repositories, etc. Therefore, `GetOrFetchBatch` takes a `KeyFn` that prefixes
each ID with something to identify the data source so that we don't get cache
key collisions:

```go
gistPrefixFn := cacheClient.BatchKeyFn("gists")
commitPrefixFn := cacheClient.BatchKeyFn("commits")
gists, err := cacheClient.GetOrFetchBatch(ctx, userIDs, gistPrefixFn, fetchGists)
commits, err := cacheClient.GetOrFetchBatch(ctx, userIDs, commitPrefixFn, fetchCommits)
```

We're now able to use the same cache for multiple data sources, and internally
we'd get cache keys of this format:

```
gists-ID-viccon
gists-ID-some-other-user
commits-ID-viccon
commits-ID-some-other-user
```

Now, let's use a bit of our imagination because Github doesn't actually allow
us to fetch gists from multiple users at once. However, if they did, our client
would probably look something like this:

```go
func (client *GithubClient) Gists(ctx context.Context, usernames []string) (map[string]Gist, error) {
cacheKeyFn := client.cache.BatchKeyFn("gists")
fetchFunc := func(ctx context.Context, cacheMisses []string) (map[string]Gist, error) {
timeoutCtx, cancel := context.WithTimeout(ctx, client.timeout)
defer cancel()

var response map[string]Gist
err := requests.URL(c.baseURL).
Path("/gists").
Param("usernames", strings.Join(cacheMisses, ",")).
ToJSON(&response).
Fetch(timeoutCtx)
return response, err
}
return sturdyc.GetOrFetchBatch(ctx, client.cache, usernames, cacheKeyFn, fetchFunc)
}
```

In the example above, the fetchFunc would get called for users who don't have
their gists in our cache, and the cacheMisses slice would contain their actual
usernames (without the prefix from the keyFn).

The map that we return from our `fetchFunc` should have the IDs (in this case the
usernames) as keys, and the actual data that we want to cache (the gist) as the
value.

Later, we'll see how we can use closures to pass query parameters and options
to our fetch functions, as well as how to use the PermutatedBatchKeyFn to
create unique cache keys for each permutation of them.

# Stampede protection

When we're consuming data through `sturdyc` we'll get automatic protection
against cache stampedes. Cache stampades (also known as thundering herd) occur
when many requests for a particular piece of data, which has just expired or
been evicted from the cache, come in at once. Preventing this has been one of
the key objectives. We do not want to cause a significant load on an underlying
data source every time one of our keys expires. To address this, `sturdyc`
performs _in-flight_ tracking for every key.
against cache stampedes. If you're not familiar with the term, a cache stampade
(also known as thundering herd) occurs when many requests for a particular
piece of data, which has just expired or been evicted from the cache, come in
at once.

Preventing this has been one of the key objectives. We do not want to cause a
significant load on an underlying data source every time one of our keys
expire. To address this, `sturdyc` performs _in-flight_ tracking for every key.

We can demonstrate this using the `GetOrFetch` function which, as I mentioned
earlier, takes a key, and a function for retrieving the data if it's not in the
Expand All @@ -207,16 +293,18 @@ in-flight request per key:
```go
var count atomic.Int32
fetchFn := func(_ context.Context) (int, error) {
// Increment the count so that we can assert how many times this function was called.
count.Add(1)
time.Sleep(time.Second)
return 1337, nil
}

// Fetch the same key from 5 goroutines.
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
// We can ignore the error given the fetchFn we're using.
// We'll ignore the error here for brevity.
val, _ := cacheClient.GetOrFetch(context.Background(), "key2", fetchFn)
log.Printf("got value: %d\n", val)
wg.Done()
Expand All @@ -229,8 +317,8 @@ in-flight request per key:

```

Running this program we'll see that our requests for "key2" got deduplicated,
and that the fetchFn only got called once:
Running this program we can see that we were able to retrieve the value for all
5 goroutines, and that the fetchFn only got called once:

```sh
❯ go run .
Expand All @@ -243,114 +331,92 @@ and that the fetchFn only got called once:
2024/05/21 08:06:29 1337 true
```

The in-flight tracking works for batch operations too. The cache is able to
deduplicate a batch of cache misses, and then assemble the response by picking
records from _multiple_ in-flight requests.
The in-flight tracking works for batch operations too where the cache is able
to deduplicate a batch of cache misses, and then assemble the response by
picking records from **multiple** in-flight requests.

To demonstrate this, we'll use the `GetOrFetchBatch` function, which can be
used to retrieve data from a data source capable of handling requests for
multiple records at once.
To demonstrate this, we'll use the `GetOrFetchBatch` function, which as mentioned
earlier, can be used to retrieve data from a data source capable of handling
requests for multiple records at once.

We'll start by creating a mock function that sleeps for `5` seconds, and then
returns a map with a numerical value for every ID:

```go
var count atomic.Int32
fetchFn := func(_ context.Context, ids []string) (map[string]int, error) {
// Increment the counter so that we can assert how many times this function was called.
count.Add(1)
time.Sleep(time.Second * 5)

response := make(map[string]int, len(ids))
for _, id := range ids {
num, _ := strconv.Atoi(id)
response[id] = num
}

return response, nil
var count atomic.Int32
fetchFn := func(_ context.Context, ids []string) (map[string]int, error) {
// Increment the counter so that we can assert how many times this function was called.
count.Add(1)
time.Sleep(time.Second * 5)

response := make(map[string]int, len(ids))
for _, id := range ids {
num, _ := strconv.Atoi(id)
response[id] = num
}
```

Next, we'll need some batches to test with, so I created three batches with 5
IDs each:

```go
batches := [][]string{
{"1", "2", "3", "4", "5"},
{"6", "7", "8", "9", "10"},
{"11", "12", "13", "14", "15"},
}
return response, nil
}
```

IDs can often be used to fetch data from multiple data sources. As an example,
we might use an id to fetch a users orders, payments, shipment options, etc.
Hence, if we're using the cache with an API client that is capable of calling
different endpoints, we'll want to prefix this user id with something in order
to avoid key collisions for different data types, e.g:
Next, we'll need some batches to test with, so here I've created three batches
with 5 IDs each:

```sh
// 1234 is our user id
orders-1234
payments-1234
shipments-1234
```go
batches := [][]string{
{"1", "2", "3", "4", "5"},
{"6", "7", "8", "9", "10"},
{"11", "12", "13", "14", "15"},
}
```

The package provides more functionality for this that we'll see later on, but
for now we'll use the most simple version which adds a string prefix to every
ID:
and we can now request each batch in a separate goroutine:

```go
keyPrefixFn := cacheClient.BatchKeyFn("my-data-source")
```

This will result in cache keys of this format:
for _, batch := range batches {
go func() {
res, _ := cacheClient.GetOrFetchBatch(context.Background(), batch, keyPrefixFn, fetchFn)
log.Printf("got batch: %v\n", res)
}()
}

```
my-data-source-ID-1
my-data-source-ID-2
my-data-source-ID-3
// Just to ensure that these batches are in fact in-flight, we'll sleep to give the goroutines a chance to run.
time.Sleep(time.Second * 2)
```

We can now request each batch in a separate goroutine:
At this point, the cache should have 3 in-flight requests for IDs 1-15:

```go
for _, batch := range batches {
go func() {
res, _ := cacheClient.GetOrFetchBatch(context.Background(), batch, keyPrefixFn, fetchFn)
log.Printf("got batch: %v\n", res)
}()
}

// Sleep to give the goroutines above a chance to run.
// This ensures that the batches are in-flight.
time.Sleep(time.Second * 3)
```sh
[1,2,3,4,5] => REQUEST 1 (IN-FLIGHT)
[6,7,8,9,10] => REQUEST 2 (IN-FLIGHT)
[11,12,13,14,15] => REQUEST 3 (IN-FLIGHT)
```

At this point, the cache should have 3 in-flight requests for IDs 1-15. Knowing
this. Let's now test the stampede protection by launching another five
goroutines. Each of these goroutines are going to request two random IDs from
our previous batches:
Knowing this, let's test the stampede protection by launching another five
goroutines. Each of these goroutines will request two random IDs from our
previous batches. For example, they could request one ID from the first
request, and another from the second or third.

```go
// Launch another 5 goroutines that are going to pick two random IDs from any of our in-flight batches.
// e.g:
// [1,8]
// [4,11]
// [14,2]
// [6,15]
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
ids := []string{batches[rand.IntN(2)][rand.IntN(4)], batches[rand.IntN(2)][rand.IntN(4)]}
res, _ := cacheClient.GetOrFetchBatch(context.Background(), ids, keyPrefixFn, fetchFn)
log.Printf("got batch: %v\n", res)
wg.Done()
}()
}
// Launch another 5 goroutines that are going to pick two random IDs from any of our in-flight batches.
// e.g:
// [1,8]
// [4,11]
// [14,2]
// [6,15]
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
ids := []string{batches[rand.IntN(2)][rand.IntN(4)], batches[rand.IntN(2)][rand.IntN(4)]}
res, _ := cacheClient.GetOrFetchBatch(context.Background(), ids, keyPrefixFn, fetchFn)
log.Printf("got batch: %v\n", res)
wg.Done()
}()
}

wg.Wait()
log.Printf("fetchFn was called %d times\n", count.Load())
wg.Wait()
log.Printf("fetchFn was called %d times\n", count.Load())
```

Running this program, and looking at the logs, we'll see that the cache is able
Expand Down

0 comments on commit a89cc59

Please sign in to comment.