Skip to content

Commit

Permalink
feat(redis): enable tuning of caching parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Feb 13, 2025
1 parent c755b6f commit fd6a513
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 65 deletions.
3 changes: 3 additions & 0 deletions deploy/app/lambda.tf
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,13 @@ resource "aws_lambda_function" "lambda" {
variables = {
PROVIDERS_REDIS_URL = aws_elasticache_serverless_cache.cache["providers"].endpoint[0].address
PROVIDERS_REDIS_CACHE = aws_elasticache_serverless_cache.cache["providers"].name
PROVIDERS_CACHE_EXPIRATION_SECONDS = "${terraform.workspace == "prod" ? 30 * 24 * 60 * 60 : 24 * 60 * 60}"
INDEXES_REDIS_URL = aws_elasticache_serverless_cache.cache["indexes"].endpoint[0].address
INDEXES_REDIS_CACHE = aws_elasticache_serverless_cache.cache["indexes"].name
INDEXES_CACHE_EXPIRATION_SECONDS = "${terraform.workspace == "prod" ? 24 * 60 * 60 : 60 * 60}"
CLAIMS_REDIS_URL = aws_elasticache_serverless_cache.cache["claims"].endpoint[0].address
CLAIMS_REDIS_CACHE = aws_elasticache_serverless_cache.cache["claims"].name
CLAIMS_CACHE_EXPIRATION_SECONDS = "${terraform.workspace == "prod" ? 7 * 24 * 60 * 60 : 24 * 60 * 60}"
REDIS_USER_ID = aws_elasticache_user.cache_iam_user.user_id
IPNI_ENDPOINT = "https://cid.contact"
PROVIDER_CACHING_QUEUE_URL = aws_sqs_queue.caching.id
Expand Down
106 changes: 63 additions & 43 deletions pkg/aws/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"maps"
"net/url"
"os"
"strconv"
"strings"
"time"

Expand All @@ -17,7 +18,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/ssm"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/redis/go-redis/v9"
goredis "github.com/redis/go-redis/v9"
"github.com/storacha/go-metadata"
"github.com/storacha/go-ucanto/core/delegation"
"github.com/storacha/go-ucanto/did"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/storacha/go-ucanto/principal/signer"
"github.com/storacha/indexing-service/pkg/construct"
"github.com/storacha/indexing-service/pkg/presets"
"github.com/storacha/indexing-service/pkg/redis"
"github.com/storacha/indexing-service/pkg/service/contentclaims"
"github.com/storacha/indexing-service/pkg/service/legacy"
"github.com/storacha/indexing-service/pkg/service/providerindex"
Expand All @@ -45,28 +47,40 @@ func mustGetEnv(envVar string) string {
return value
}

func mustGetInt(envVar string) int64 {
stringValue := mustGetEnv(envVar)
value, err := strconv.ParseInt(stringValue, 10, 64)
if err != nil {
panic(fmt.Errorf("parsing env var %s to int: %w", envVar, err))

Check warning on line 54 in pkg/aws/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/aws/service.go#L50-L54

Added lines #L50 - L54 were not covered by tests
}
return value

Check warning on line 56 in pkg/aws/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/aws/service.go#L56

Added line #L56 was not covered by tests
}

// Config describes all the values required to setup AWS from the environment
type Config struct {
construct.ServiceConfig
aws.Config
SQSCachingQueueURL string
CachingBucket string
ChunkLinksTableName string
MetadataTableName string
IPNIStoreBucket string
IPNIStorePrefix string
NotifierHeadBucket string
NotifierTopicArn string
ClaimStoreBucket string
ClaimStorePrefix string
LegacyClaimsTableName string
LegacyClaimsTableRegion string
LegacyClaimsBucket string
LegacyBlockIndexTableName string
LegacyBlockIndexTableRegion string
LegacyDataBucketURL string
HoneycombAPIKey string
PrincipalMapping map[string]string
ProvidersCacheExpirationSeconds int64
ClaimsCacheExpirationSeconds int64
IndexesCacheExpirationSeconds int64
SQSCachingQueueURL string
CachingBucket string
ChunkLinksTableName string
MetadataTableName string
IPNIStoreBucket string
IPNIStorePrefix string
NotifierHeadBucket string
NotifierTopicArn string
ClaimStoreBucket string
ClaimStorePrefix string
LegacyClaimsTableName string
LegacyClaimsTableRegion string
LegacyClaimsBucket string
LegacyBlockIndexTableName string
LegacyBlockIndexTableRegion string
LegacyDataBucketURL string
HoneycombAPIKey string
PrincipalMapping map[string]string
principal.Signer
}

Expand Down Expand Up @@ -135,21 +149,21 @@ func FromEnv(ctx context.Context) Config {
ServiceConfig: construct.ServiceConfig{
PrivateKey: cryptoPrivKey,
PublicURL: strings.Split(mustGetEnv("PUBLIC_URL"), ","),
ProvidersRedis: redis.Options{
ProvidersRedis: goredis.Options{

Check warning on line 152 in pkg/aws/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/aws/service.go#L152

Added line #L152 was not covered by tests
Addr: mustGetEnv("PROVIDERS_REDIS_URL") + ":6379",
CredentialsProviderContext: redisCredentialVerifier(awsConfig, mustGetEnv("REDIS_USER_ID"), mustGetEnv("PROVIDERS_REDIS_CACHE")),
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
},
ClaimsRedis: redis.Options{
ClaimsRedis: goredis.Options{

Check warning on line 159 in pkg/aws/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/aws/service.go#L159

Added line #L159 was not covered by tests
Addr: mustGetEnv("CLAIMS_REDIS_URL") + ":6379",
CredentialsProviderContext: redisCredentialVerifier(awsConfig, mustGetEnv("REDIS_USER_ID"), mustGetEnv("CLAIMS_REDIS_CACHE")),
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
},
IndexesRedis: redis.Options{
IndexesRedis: goredis.Options{

Check warning on line 166 in pkg/aws/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/aws/service.go#L166

Added line #L166 was not covered by tests
Addr: mustGetEnv("INDEXES_REDIS_URL") + ":6379",
CredentialsProviderContext: redisCredentialVerifier(awsConfig, mustGetEnv("REDIS_USER_ID"), mustGetEnv("INDEXES_REDIS_CACHE")),
TLSConfig: &tls.Config{
Expand All @@ -159,33 +173,36 @@ func FromEnv(ctx context.Context) Config {
IndexerURL: mustGetEnv("IPNI_ENDPOINT"),
PublisherAnnounceAddrs: []string{ipniPublisherAnnounceAddress},
},
SQSCachingQueueURL: mustGetEnv("PROVIDER_CACHING_QUEUE_URL"),
CachingBucket: mustGetEnv("PROVIDER_CACHING_BUCKET_NAME"),
ChunkLinksTableName: mustGetEnv("CHUNK_LINKS_TABLE_NAME"),
MetadataTableName: mustGetEnv("METADATA_TABLE_NAME"),
IPNIStoreBucket: mustGetEnv("IPNI_STORE_BUCKET_NAME"),
IPNIStorePrefix: ipniStoreKeyPrefix,
NotifierHeadBucket: mustGetEnv("NOTIFIER_HEAD_BUCKET_NAME"),
NotifierTopicArn: mustGetEnv("NOTIFIER_SNS_TOPIC_ARN"),
ClaimStoreBucket: mustGetEnv("CLAIM_STORE_BUCKET_NAME"),
ClaimStorePrefix: os.Getenv("CLAIM_STORE_KEY_REFIX"),
LegacyClaimsTableName: mustGetEnv("LEGACY_CLAIMS_TABLE_NAME"),
LegacyClaimsTableRegion: mustGetEnv("LEGACY_CLAIMS_TABLE_REGION"),
LegacyClaimsBucket: mustGetEnv("LEGACY_CLAIMS_BUCKET_NAME"),
LegacyBlockIndexTableName: mustGetEnv("LEGACY_BLOCK_INDEX_TABLE_NAME"),
LegacyBlockIndexTableRegion: mustGetEnv("LEGACY_BLOCK_INDEX_TABLE_REGION"),
LegacyDataBucketURL: mustGetEnv("LEGACY_DATA_BUCKET_URL"),
HoneycombAPIKey: os.Getenv("HONEYCOMB_API_KEY"),
PrincipalMapping: principalMapping,
ProvidersCacheExpirationSeconds: mustGetInt("PROVIDER_CACHE_EXPIRATION_SECONDS"),
ClaimsCacheExpirationSeconds: mustGetInt("CLAIMS_CACHE_EXPIRATION_SECONDS"),
IndexesCacheExpirationSeconds: mustGetInt("INDEXES_CACHE_EXPIRATION_SECONDS"),
SQSCachingQueueURL: mustGetEnv("PROVIDER_CACHING_QUEUE_URL"),
CachingBucket: mustGetEnv("PROVIDER_CACHING_BUCKET_NAME"),
ChunkLinksTableName: mustGetEnv("CHUNK_LINKS_TABLE_NAME"),
MetadataTableName: mustGetEnv("METADATA_TABLE_NAME"),
IPNIStoreBucket: mustGetEnv("IPNI_STORE_BUCKET_NAME"),
IPNIStorePrefix: ipniStoreKeyPrefix,
NotifierHeadBucket: mustGetEnv("NOTIFIER_HEAD_BUCKET_NAME"),
NotifierTopicArn: mustGetEnv("NOTIFIER_SNS_TOPIC_ARN"),
ClaimStoreBucket: mustGetEnv("CLAIM_STORE_BUCKET_NAME"),
ClaimStorePrefix: os.Getenv("CLAIM_STORE_KEY_REFIX"),
LegacyClaimsTableName: mustGetEnv("LEGACY_CLAIMS_TABLE_NAME"),
LegacyClaimsTableRegion: mustGetEnv("LEGACY_CLAIMS_TABLE_REGION"),
LegacyClaimsBucket: mustGetEnv("LEGACY_CLAIMS_BUCKET_NAME"),
LegacyBlockIndexTableName: mustGetEnv("LEGACY_BLOCK_INDEX_TABLE_NAME"),
LegacyBlockIndexTableRegion: mustGetEnv("LEGACY_BLOCK_INDEX_TABLE_REGION"),
LegacyDataBucketURL: mustGetEnv("LEGACY_DATA_BUCKET_URL"),
HoneycombAPIKey: os.Getenv("HONEYCOMB_API_KEY"),
PrincipalMapping: principalMapping,

Check warning on line 196 in pkg/aws/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/aws/service.go#L176-L196

Added lines #L176 - L196 were not covered by tests
}
}

// Construct constructs types.Service from AWS deps for Lamda functions
func Construct(cfg Config) (types.Service, error) {
httpClient := construct.DefaultHTTPClient()
providersClient := redis.NewClient(&cfg.ProvidersRedis)
claimsClient := redis.NewClient(&cfg.ClaimsRedis)
indexesClient := redis.NewClient(&cfg.IndexesRedis)
providersClient := goredis.NewClient(&cfg.ProvidersRedis)
claimsClient := goredis.NewClient(&cfg.ClaimsRedis)
indexesClient := goredis.NewClient(&cfg.IndexesRedis)

Check warning on line 205 in pkg/aws/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/aws/service.go#L203-L205

Added lines #L203 - L205 were not covered by tests

// instrument HTTP and redis clients if telemetry is enabled
if cfg.HoneycombAPIKey != "" {
Expand Down Expand Up @@ -231,6 +248,9 @@ func Construct(cfg Config) (types.Service, error) {
construct.WithProvidersClient(providersClient),
construct.WithClaimsClient(claimsClient),
construct.WithIndexesClient(indexesClient),
construct.WithProvidersCacheOptions(redis.ExpirationTime(time.Duration(cfg.ProvidersCacheExpirationSeconds)*time.Second)),
construct.WithClaimsCacheOptions(redis.ExpirationTime(time.Duration(cfg.ClaimsCacheExpirationSeconds)*time.Second)),
construct.WithIndexesCacheOptions(redis.ExpirationTime(time.Duration(cfg.IndexesCacheExpirationSeconds)*time.Second)),

Check warning on line 253 in pkg/aws/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/aws/service.go#L251-L253

Added lines #L251 - L253 were not covered by tests
)
if err != nil {
return nil, err
Expand Down
54 changes: 41 additions & 13 deletions pkg/construct/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,20 @@ type ServiceConfig struct {
}

type config struct {
cachingQueue blobindexlookup.CachingQueue
opts []service.Option
ds datastore.Batching
skipNotification bool
startIPNIServer bool
publisherStore store.PublisherStore
claimsStore types.ContentClaimsStore
providersClient redis.Client
claimsClient redis.Client
indexesClient redis.Client
cachingQueue blobindexlookup.CachingQueue
opts []service.Option
ds datastore.Batching
skipNotification bool
startIPNIServer bool
publisherStore store.PublisherStore
claimsStore types.ContentClaimsStore
providersClient redis.Client
claimsClient redis.Client
indexesClient redis.Client
providersCacheOpts []redis.Option
claimsCacheOpts []redis.Option
indexesCacheOpts []redis.Option

legacyClaimsMappers []providerindex.ContentToClaimsMapper
legacyClaimsBucket types.ContentClaimsStore
legacyClaimsUrl string
Expand Down Expand Up @@ -216,6 +220,30 @@ func WithHTTPClient(httpClient *http.Client) Option {
}
}

// WithProvidersCacheOptions passes configuration to the providers cache
func WithProvidersCacheOptions(opts ...redis.Option) Option {
return func(cfg *config) error {
cfg.providersCacheOpts = opts
return nil
}

Check warning on line 228 in pkg/construct/construct.go

View check run for this annotation

Codecov / codecov/patch

pkg/construct/construct.go#L224-L228

Added lines #L224 - L228 were not covered by tests
}

// WithClaimsCacheOptions passes configuration to the providers cache
func WithClaimsCacheOptions(opts ...redis.Option) Option {
return func(cfg *config) error {
cfg.claimsCacheOpts = opts
return nil
}

Check warning on line 236 in pkg/construct/construct.go

View check run for this annotation

Codecov / codecov/patch

pkg/construct/construct.go#L232-L236

Added lines #L232 - L236 were not covered by tests
}

// WithIndexesCacheOptions passes configuration to the providers cache
func WithIndexesCacheOptions(opts ...redis.Option) Option {
return func(cfg *config) error {
cfg.indexesCacheOpts = opts
return nil
}

Check warning on line 244 in pkg/construct/construct.go

View check run for this annotation

Codecov / codecov/patch

pkg/construct/construct.go#L240-L244

Added lines #L240 - L244 were not covered by tests
}

// Service is the core methods of the indexing service but with additional
// lifecycle methods.
type Service interface {
Expand Down Expand Up @@ -276,9 +304,9 @@ func Construct(sc ServiceConfig, opts ...Option) (Service, error) {
}

// build caches
providersCache := redis.NewProviderStore(providersClient)
claimsCache := redis.NewContentClaimsStore(claimsClient)
shardDagIndexesCache := redis.NewShardedDagIndexStore(indexesClient)
providersCache := redis.NewProviderStore(providersClient, cfg.providersCacheOpts...)
claimsCache := redis.NewContentClaimsStore(claimsClient, cfg.claimsCacheOpts...)
shardDagIndexesCache := redis.NewShardedDagIndexStore(indexesClient, cfg.indexesCacheOpts...)

Check warning on line 309 in pkg/construct/construct.go

View check run for this annotation

Codecov / codecov/patch

pkg/construct/construct.go#L307-L309

Added lines #L307 - L309 were not covered by tests

cachingQueue := cfg.cachingQueue
if cachingQueue == nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/redis/contentclaimsstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ var _ types.ContentClaimsCache = (*ContentClaimsStore)(nil)
type ContentClaimsStore = Store[cid.Cid, delegation.Delegation]

// NewContentClaimsStore returns a new instance of a Content Claims Store using the given redis client
func NewContentClaimsStore(client Client) *ContentClaimsStore {
return &Store[cid.Cid, delegation.Delegation]{delegationFromRedis, delegationToRedis, cidKeyString, client}
func NewContentClaimsStore(client Client, opts ...Option) *ContentClaimsStore {
return NewStore(delegationFromRedis, delegationToRedis, cidKeyString, client, opts...)
}

func delegationFromRedis(data string) (delegation.Delegation, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/redis/providerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ var (
type ProviderStore = Store[multihash.Multihash, model.ProviderResult]

// NewProviderStore returns a new instance of an IPNI store using the given redis client
func NewProviderStore(client Client) *ProviderStore {
return NewStore(providerResultFromRedis, providerResultToRedis, multihashKeyString, client)
func NewProviderStore(client Client, opts ...Option) *ProviderStore {
return NewStore(providerResultFromRedis, providerResultToRedis, multihashKeyString, client, opts...)
}

func providerResultFromRedis(data string) (model.ProviderResult, error) {
Expand Down
30 changes: 27 additions & 3 deletions pkg/redis/redisstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,44 @@ type Store[Key, Value any] struct {
toRedis func(Value) (string, error)
keyString func(Key) string
client Client
config config
}

var (
_ Client = (*redis.Client)(nil)
_ types.Cache[any, any] = (*Store[any, any])(nil)
)

type config struct {
expirationTime time.Duration
}

func newConfig(opts []Option) config {
c := config{
expirationTime: DefaultExpire,
}
for _, opt := range opts {
opt(&c)
}

Check warning on line 50 in pkg/redis/redisstore.go

View check run for this annotation

Codecov / codecov/patch

pkg/redis/redisstore.go#L49-L50

Added lines #L49 - L50 were not covered by tests
return c
}

type Option func(*config)

func ExpirationTime(expirationTime time.Duration) Option {
return func(c *config) {
c.expirationTime = expirationTime
}

Check warning on line 59 in pkg/redis/redisstore.go

View check run for this annotation

Codecov / codecov/patch

pkg/redis/redisstore.go#L56-L59

Added lines #L56 - L59 were not covered by tests
}

// NewStore returns a new instance of a redis store with the provided serialization/deserialization functions
func NewStore[Key, Value any](
fromRedis func(string) (Value, error),
toRedis func(Value) (string, error),
keyString func(Key) string,
client Client) *Store[Key, Value] {
return &Store[Key, Value]{fromRedis, toRedis, keyString, client}
client Client,
opts ...Option) *Store[Key, Value] {
return &Store[Key, Value]{fromRedis, toRedis, keyString, client, newConfig(opts)}
}

// Get returns deserialized values from redis
Expand Down Expand Up @@ -80,7 +104,7 @@ func (rs *Store[Key, Value]) Set(ctx context.Context, key Key, value Value, expi
func (rs *Store[Key, Value]) SetExpirable(ctx context.Context, key Key, expires bool) error {
var err error
if expires {
err = rs.client.Expire(ctx, rs.keyString(key), DefaultExpire).Err()
err = rs.client.Expire(ctx, rs.keyString(key), rs.config.expirationTime).Err()
} else {
err = rs.client.Persist(ctx, rs.keyString(key)).Err()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/redis/shareddagindexstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ var (
type ShardedDagIndexStore = Store[types.EncodedContextID, blobindex.ShardedDagIndexView]

// NewShardedDagIndexStore returns a new instance of a ShardedDagIndex store using the given redis client
func NewShardedDagIndexStore(client Client) *ShardedDagIndexStore {
return &Store[types.EncodedContextID, blobindex.ShardedDagIndexView]{shardedDagIndexFromRedis, shardedDagIndexToRedis, encodedContextIDKeyString, client}
func NewShardedDagIndexStore(client Client, opts ...Option) *ShardedDagIndexStore {
return NewStore(shardedDagIndexFromRedis, shardedDagIndexToRedis, encodedContextIDKeyString, client, opts...)
}

func shardedDagIndexFromRedis(data string) (blobindex.ShardedDagIndexView, error) {
Expand Down

0 comments on commit fd6a513

Please sign in to comment.