diff --git a/cmd/enduro-a3m-worker/main.go b/cmd/enduro-a3m-worker/main.go index 98e04fea..2834b76f 100644 --- a/cmd/enduro-a3m-worker/main.go +++ b/cmd/enduro-a3m-worker/main.go @@ -115,7 +115,7 @@ func main() { } // Set up the event service. - evsvc, err := event.NewEventServiceRedis(logger.WithName("events"), &cfg.Event) + evsvc, err := event.NewEventServiceRedis(logger.WithName("events"), tp, &cfg.Event) if err != nil { logger.Error(err, "Error creating Event service.") os.Exit(1) @@ -130,7 +130,7 @@ func main() { // Set up the watcher service. var wsvc watcher.Service { - wsvc, err = watcher.New(ctx, logger.WithName("watcher"), &cfg.Watcher) + wsvc, err = watcher.New(ctx, tp, logger.WithName("watcher"), &cfg.Watcher) if err != nil { logger.Error(err, "Error setting up watchers.") os.Exit(1) diff --git a/cmd/enduro-am-worker/main.go b/cmd/enduro-am-worker/main.go index d7b911e7..f9613b97 100644 --- a/cmd/enduro-am-worker/main.go +++ b/cmd/enduro-am-worker/main.go @@ -118,7 +118,7 @@ func main() { // Set up the watcher service. var wsvc watcher.Service { - wsvc, err = watcher.New(ctx, logger.WithName("watcher"), &cfg.Watcher) + wsvc, err = watcher.New(ctx, tp, logger.WithName("watcher"), &cfg.Watcher) if err != nil { logger.Error(err, "Error setting up watchers.") os.Exit(1) @@ -126,7 +126,7 @@ func main() { } // Set up the event service. - evsvc, err := event.NewEventServiceRedis(logger.WithName("events"), &cfg.Event) + evsvc, err := event.NewEventServiceRedis(logger.WithName("events"), tp, &cfg.Event) if err != nil { logger.Error(err, "Error creating Event service.") os.Exit(1) diff --git a/go.mod b/go.mod index 314dcbc7..8dc1a778 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/aws/aws-sdk-go-v2 v1.25.3 github.com/aws/aws-sdk-go-v2/config v1.27.7 github.com/aws/aws-sdk-go-v2/credentials v1.17.7 - github.com/aws/aws-sdk-go-v2/service/s3 v1.52.0 + github.com/aws/aws-sdk-go-v2/service/s3 v1.52.1 github.com/coreos/go-oidc/v3 v3.9.0 github.com/cyphar/filepath-securejoin v0.2.4 github.com/dolmen-go/contextio v1.0.0 @@ -37,6 +37,7 @@ require ( github.com/pkg/sftp v1.13.6 github.com/prometheus/client_golang v1.19.0 github.com/radovskyb/watcher v1.0.7 + github.com/redis/go-redis/extra/redisotel/v9 v9.0.5 github.com/redis/go-redis/v9 v9.5.1 github.com/rukavina/sftpblob v0.0.0-20201030103652-e8e9601e6511 github.com/spf13/afero v1.11.0 @@ -131,6 +132,7 @@ require ( github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + github.com/redis/go-redis/extra/rediscmd/v9 v9.0.5 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect diff --git a/go.sum b/go.sum index 94709b23..b43be039 100644 --- a/go.sum +++ b/go.sum @@ -467,8 +467,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.5 h1:K/NXvIftO github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.5/go.mod h1:cl9HGLV66EnCmMNzq4sYOti+/xo8w34CsgzVtm2GgsY= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.3 h1:4t+QEX7BsXz98W8W1lNvMAG+NX8qHz2CjLBxQKku40g= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.3/go.mod h1:oFcjjUq5Hm09N9rpxTdeMeLeQcxS7mIkBkL8qUKng+A= -github.com/aws/aws-sdk-go-v2/service/s3 v1.52.0 h1:k7gL76sSR0e2pLphjfmjD/+pDDtoOHvWp8ezpTsdyes= -github.com/aws/aws-sdk-go-v2/service/s3 v1.52.0/go.mod h1:MGTaf3x/+z7ZGugCGvepnx2DS6+caCYYqKhzVoLNYPk= +github.com/aws/aws-sdk-go-v2/service/s3 v1.52.1 h1:Y/TTvxMdYwNvhzolvneV1wEEN/ncQUSd1AnzFGTMPqM= +github.com/aws/aws-sdk-go-v2/service/s3 v1.52.1/go.mod h1:MGTaf3x/+z7ZGugCGvepnx2DS6+caCYYqKhzVoLNYPk= github.com/aws/aws-sdk-go-v2/service/sso v1.20.2 h1:XOPfar83RIRPEzfihnp+U6udOveKZJvPQ76SKWrLRHc= github.com/aws/aws-sdk-go-v2/service/sso v1.20.2/go.mod h1:Vv9Xyk1KMHXrR3vNQe8W5LMFdTjSeWk0gBZBzvf3Qa0= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.2 h1:pi0Skl6mNl2w8qWZXcdOyg197Zsf4G97U7Sso9JXGZE= @@ -480,8 +480,10 @@ github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= @@ -847,6 +849,11 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/radovskyb/watcher v1.0.7 h1:AYePLih6dpmS32vlHfhCeli8127LzkIgwJGcwwe8tUE= github.com/radovskyb/watcher v1.0.7/go.mod h1:78okwvY5wPdzcb1UYnip1pvrZNIVEIh/Cm+ZuvsUYIg= +github.com/redis/go-redis/extra/rediscmd/v9 v9.0.5 h1:EaDatTxkdHG+U3Bk4EUr+DZ7fOGwTfezUiUJMaIcaho= +github.com/redis/go-redis/extra/rediscmd/v9 v9.0.5/go.mod h1:fyalQWdtzDBECAQFBJuQe5bzQ02jGd5Qcbgb97Flm7U= +github.com/redis/go-redis/extra/redisotel/v9 v9.0.5 h1:EfpWLLCyXw8PSM2/XNJLjI3Pb27yVE+gIAfeqp8LUCc= +github.com/redis/go-redis/extra/redisotel/v9 v9.0.5/go.mod h1:WZjPDy7VNzn77AAfnAfVjZNvfJTYfPetfZk5yoSTLaQ= +github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8= github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= diff --git a/go.work.sum b/go.work.sum index 73c8fbb2..6a3fc7f5 100644 --- a/go.work.sum +++ b/go.work.sum @@ -992,6 +992,8 @@ github.com/aws/aws-sdk-go-v2/service/kms v1.26.3 h1:li5dFiK1tkAFXvOC9QPWAVWqTu8Z github.com/aws/aws-sdk-go-v2/service/kms v1.26.3/go.mod h1:N3++/sLV97B8Zliz7KRqNcojOX7iMBZWKiuit5FKtH0= github.com/aws/aws-sdk-go-v2/service/kms v1.29.2 h1:3UaqodPQqPh5XowXJ9fWM4TQqwuftYYFvej+RI5uIO8= github.com/aws/aws-sdk-go-v2/service/kms v1.29.2/go.mod h1:elLDaj+1RNl9Ovn3dB6dWLVo5WQ+VLSUMKegl7N96fY= +github.com/aws/aws-sdk-go-v2/service/s3 v1.52.1 h1:Y/TTvxMdYwNvhzolvneV1wEEN/ncQUSd1AnzFGTMPqM= +github.com/aws/aws-sdk-go-v2/service/s3 v1.52.1/go.mod h1:MGTaf3x/+z7ZGugCGvepnx2DS6+caCYYqKhzVoLNYPk= github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.20.1 h1:AD8gRAXAXDU9+XTm0Q3D+NBsMCX4TlpN/qnNYbbQLO4= github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.20.1/go.mod h1:aFRHxQ3V4bs/uVQYpg8Wm6szKWuB2KnraKcIGp5JS/I= github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.23.3 h1:NurfTBFmaehSiWMv5drydRWs3On0kwoBe1gWYFt+5ws= @@ -1020,7 +1022,6 @@ github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuP github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/boombuler/barcode v1.0.1 h1:NDBbPmhS+EqABEs5Kg3n/5ZNjy73Pz7SIV+KCeqyXcs= -github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w= github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= github.com/bwesterb/go-ristretto v1.2.3 h1:1w53tCkGhCQ5djbat3+MH0BAQ5Kfgbt56UZQ/JMzngw= github.com/ccojocar/zxcvbn-go v1.0.1 h1:+sxrANSCj6CdadkcMnvde/GWU1vZiiXRbqYSCalV4/4= diff --git a/internal/api/auth/ticket_store.go b/internal/api/auth/ticket_store.go index 4170d55f..407a67d3 100644 --- a/internal/api/auth/ticket_store.go +++ b/internal/api/auth/ticket_store.go @@ -3,11 +3,14 @@ package auth import ( "context" "errors" + "fmt" "strings" "sync" "time" + "github.com/redis/go-redis/extra/redisotel/v9" "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel/trace" ) // TicketStore persists expirable tickets. @@ -39,14 +42,23 @@ const ( keyClassifier = "ticket" ) -func NewRedisStore(ctx context.Context, cfg *RedisConfig) (*RedisStore, error) { +func NewRedisStore(ctx context.Context, tp trace.TracerProvider, cfg *RedisConfig) (*RedisStore, error) { opts, err := redis.ParseURL(cfg.Address) if err != nil { return nil, err } + client := redis.NewClient(opts) + if err := redisotel.InstrumentTracing( + client, + redisotel.WithTracerProvider(tp), + redisotel.WithDBStatement(false), + ); err != nil { + return nil, fmt.Errorf("instrument redis client tracing: %v", err) + } + return &RedisStore{ - client: redis.NewClient(opts), + client: client, prefix: strings.TrimSuffix(cfg.Prefix, keySeparator), }, nil } diff --git a/internal/api/auth/ticket_store_test.go b/internal/api/auth/ticket_store_test.go index 0ac77469..0f341af6 100644 --- a/internal/api/auth/ticket_store_test.go +++ b/internal/api/auth/ticket_store_test.go @@ -7,6 +7,7 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel/trace/noop" "gotest.tools/v3/assert" "github.com/artefactual-sdps/enduro/internal/api/auth" @@ -16,13 +17,14 @@ func TestRedisStore(t *testing.T) { t.Parallel() storeKey := "key" + tp := noop.NewTracerProvider() t.Run("Fails when parsing invalid URL", func(t *testing.T) { t.Parallel() ctx := context.Background() - _, err := auth.NewRedisStore(ctx, &auth.RedisConfig{ + _, err := auth.NewRedisStore(ctx, tp, &auth.RedisConfig{ Address: "scheme://unknown", }) assert.Error(t, err, "redis: invalid URL scheme: scheme") @@ -32,7 +34,7 @@ func TestRedisStore(t *testing.T) { t.Parallel() ctx := context.Background() - s, err := auth.NewRedisStore(ctx, &auth.RedisConfig{ + s, err := auth.NewRedisStore(ctx, tp, &auth.RedisConfig{ Address: "redis://127.0.0.1:12345", }) assert.NilError(t, err) @@ -48,7 +50,7 @@ func TestRedisStore(t *testing.T) { redisServer := miniredis.RunT(t) redisClient := redis.NewClient(&redis.Options{Addr: redisServer.Addr()}) - store, err := auth.NewRedisStore(ctx, &auth.RedisConfig{ + store, err := auth.NewRedisStore(ctx, tp, &auth.RedisConfig{ Address: "redis://" + redisServer.Addr(), Prefix: "prefix", }) @@ -75,7 +77,7 @@ func TestRedisStore(t *testing.T) { redisServer := miniredis.RunT(t) redisClient := redis.NewClient(&redis.Options{Addr: redisServer.Addr()}) - store, err := auth.NewRedisStore(ctx, &auth.RedisConfig{ + store, err := auth.NewRedisStore(ctx, tp, &auth.RedisConfig{ Address: "redis://" + redisServer.Addr(), Prefix: "prefix:", }) @@ -100,7 +102,7 @@ func TestRedisStore(t *testing.T) { err := redisClient.SetEx(ctx, "prefix:ticket:"+storeKey, "", time.Minute).Err() assert.NilError(t, err) - store, err := auth.NewRedisStore(ctx, &auth.RedisConfig{ + store, err := auth.NewRedisStore(ctx, tp, &auth.RedisConfig{ Address: "redis://" + redisServer.Addr(), Prefix: "prefix", }) @@ -123,7 +125,7 @@ func TestRedisStore(t *testing.T) { err := redisClient.SetEx(ctx, "prefix:ticket:"+storeKey, "", time.Second*5).Err() assert.NilError(t, err) - store, err := auth.NewRedisStore(ctx, &auth.RedisConfig{ + store, err := auth.NewRedisStore(ctx, tp, &auth.RedisConfig{ Address: "redis://" + redisServer.Addr(), Prefix: "prefix", }) @@ -140,7 +142,7 @@ func TestRedisStore(t *testing.T) { ctx := context.Background() redisServer := miniredis.RunT(t) - store, err := auth.NewRedisStore(ctx, &auth.RedisConfig{ + store, err := auth.NewRedisStore(ctx, tp, &auth.RedisConfig{ Address: "redis://" + redisServer.Addr(), Prefix: "prefix", }) diff --git a/internal/event/event.go b/internal/event/event.go index ea5d6e85..c7982019 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -16,7 +16,7 @@ const ( type EventService interface { // Publishes an event to a user's event listeners. // If the user is not currently subscribed then this is a no-op. - PublishEvent(event *goapackage.MonitorEvent) + PublishEvent(ctx context.Context, event *goapackage.MonitorEvent) // Creates a subscription. Caller must call Subscription.Close() when done // with the subscription. @@ -28,7 +28,7 @@ func NopEventService() EventService { return &nopEventService{} } type nopEventService struct{} -func (*nopEventService) PublishEvent(event *goapackage.MonitorEvent) {} +func (*nopEventService) PublishEvent(ctx context.Context, event *goapackage.MonitorEvent) {} func (*nopEventService) Subscribe(ctx context.Context) (Subscription, error) { panic("not implemented") diff --git a/internal/event/inmem.go b/internal/event/inmem.go index 7a922fba..5b076f53 100644 --- a/internal/event/inmem.go +++ b/internal/event/inmem.go @@ -26,7 +26,7 @@ func NewEventServiceInMemImpl() *EventServiceInMemImpl { // // If user's channel is full then the user is disconnected. This is to prevent // slow users from blocking progress. -func (s *EventServiceInMemImpl) PublishEvent(event *goapackage.MonitorEvent) { +func (s *EventServiceInMemImpl) PublishEvent(ctx context.Context, event *goapackage.MonitorEvent) { s.mu.Lock() defer s.mu.Unlock() diff --git a/internal/event/inmem_test.go b/internal/event/inmem_test.go index 79f97866..7c557f92 100644 --- a/internal/event/inmem_test.go +++ b/internal/event/inmem_test.go @@ -24,7 +24,7 @@ func TestEventService(t *testing.T) { } // Publish event to both users - s.PublishEvent(&goapackage.MonitorEvent{}) + s.PublishEvent(ctx, &goapackage.MonitorEvent{}) // Verify both subscribers received the update. select { @@ -50,7 +50,7 @@ func TestEventService(t *testing.T) { } // Publish event & close. - s.PublishEvent(&goapackage.MonitorEvent{}) + s.PublishEvent(ctx, &goapackage.MonitorEvent{}) if err := sub.Close(); err != nil { t.Fatal(err) } diff --git a/internal/event/publish.go b/internal/event/publish.go index d6317da2..67d15831 100644 --- a/internal/event/publish.go +++ b/internal/event/publish.go @@ -32,5 +32,5 @@ func PublishEvent(ctx context.Context, events EventService, event interface{}) { panic("tried to publish unexpected event") } - events.PublishEvent(update) + events.PublishEvent(ctx, update) } diff --git a/internal/event/redis.go b/internal/event/redis.go index fe78d98b..d9c25bd3 100644 --- a/internal/event/redis.go +++ b/internal/event/redis.go @@ -3,10 +3,13 @@ package event import ( "context" "encoding/json" + "fmt" "time" "github.com/go-logr/logr" + "github.com/redis/go-redis/extra/redisotel/v9" "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel/trace" "github.com/artefactual-sdps/enduro/internal/api/gen/http/package_/client" "github.com/artefactual-sdps/enduro/internal/api/gen/http/package_/server" @@ -21,20 +24,30 @@ type EventServiceRedisImpl struct { var _ EventService = (*EventServiceRedisImpl)(nil) -func NewEventServiceRedis(logger logr.Logger, cfg *Config) (EventService, error) { +func NewEventServiceRedis(logger logr.Logger, tp trace.TracerProvider, cfg *Config) (EventService, error) { opts, err := redis.ParseURL(cfg.RedisAddress) if err != nil { return nil, err } + + client := redis.NewClient(opts) + if err := redisotel.InstrumentTracing( + client, + redisotel.WithTracerProvider(tp), + redisotel.WithDBStatement(false), + ); err != nil { + return nil, fmt.Errorf("instrument redis client tracing: %v", err) + } + return &EventServiceRedisImpl{ logger: logger, - client: redis.NewClient(opts), + client: client, cfg: cfg, }, nil } -func (s *EventServiceRedisImpl) PublishEvent(event *goapackage.MonitorEvent) { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) +func (s *EventServiceRedisImpl) PublishEvent(ctx context.Context, event *goapackage.MonitorEvent) { + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) defer cancel() blob, err := json.Marshal(server.NewMonitorResponseBody(event)) @@ -48,7 +61,7 @@ func (s *EventServiceRedisImpl) PublishEvent(event *goapackage.MonitorEvent) { } func (s *EventServiceRedisImpl) Subscribe(ctx context.Context) (Subscription, error) { - sub := NewSubscriptionRedis(s.logger, s.client, s.cfg.RedisChannel) + sub := NewSubscriptionRedis(ctx, s.logger, s.client, s.cfg.RedisChannel) return sub, nil } @@ -63,8 +76,7 @@ type SubscriptionRedisImpl struct { var _ Subscription = (*SubscriptionRedisImpl)(nil) -func NewSubscriptionRedis(logger logr.Logger, c redis.UniversalClient, channel string) Subscription { - ctx := context.Background() +func NewSubscriptionRedis(ctx context.Context, logger logr.Logger, c redis.UniversalClient, channel string) Subscription { pubsub := c.Subscribe(ctx, channel) // Call Receive to force the connection to wait a response from // Redis so the subscription is active immediately. diff --git a/internal/event/redis_test.go b/internal/event/redis_test.go index b4b7b9d0..81080c68 100644 --- a/internal/event/redis_test.go +++ b/internal/event/redis_test.go @@ -10,6 +10,7 @@ import ( "github.com/go-logr/logr/testr" "github.com/redis/go-redis/v9" "go.artefactual.dev/tools/ref" + "go.opentelemetry.io/otel/trace/noop" "gotest.tools/v3/assert" "github.com/artefactual-sdps/enduro/internal/api/gen/http/package_/server" @@ -52,10 +53,10 @@ func TestEventServiceRedisPublish(t *testing.T) { RedisAddress: "redis://" + s.Addr(), RedisChannel: channel, } - svc, err := event.NewEventServiceRedis(testr.New(t), &cfg) + svc, err := event.NewEventServiceRedis(testr.New(t), noop.NewTracerProvider(), &cfg) assert.NilError(t, err) - svc.PublishEvent(&goapackage.MonitorEvent{ + svc.PublishEvent(ctx, &goapackage.MonitorEvent{ Event: &goapackage.MonitorPingEvent{ Message: ref.New("hello"), }, @@ -80,7 +81,7 @@ func TestEventServiceRedisSubscribe(t *testing.T) { RedisAddress: "redis://" + s.Addr(), RedisChannel: "enduro-events", } - svc, err := event.NewEventServiceRedis(testr.New(t), &cfg) + svc, err := event.NewEventServiceRedis(testr.New(t), noop.NewTracerProvider(), &cfg) assert.NilError(t, err) sub, err := svc.Subscribe(ctx) diff --git a/internal/watcher/config.go b/internal/watcher/config.go index 9824d5c1..85a856b8 100644 --- a/internal/watcher/config.go +++ b/internal/watcher/config.go @@ -57,6 +57,7 @@ type MinioConfig struct { RedisAddress string RedisList string RedisFailedList string + RedisPopTimeout time.Duration Region string Endpoint string PathStyle bool diff --git a/internal/watcher/minio.go b/internal/watcher/minio.go index 6526f25f..163911c8 100644 --- a/internal/watcher/minio.go +++ b/internal/watcher/minio.go @@ -11,7 +11,9 @@ import ( "time" "github.com/go-logr/logr" + "github.com/redis/go-redis/extra/redisotel/v9" "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel/trace" "gocloud.dev/blob" "github.com/artefactual-sdps/enduro/internal/bucket" @@ -35,12 +37,20 @@ type MinioEventSet struct { var _ Watcher = (*minioWatcher)(nil) -func NewMinioWatcher(ctx context.Context, logger logr.Logger, config *MinioConfig) (*minioWatcher, error) { +func NewMinioWatcher(ctx context.Context, tp trace.TracerProvider, logger logr.Logger, config *MinioConfig) (*minioWatcher, error) { opts, err := redis.ParseURL(config.RedisAddress) if err != nil { return nil, err } + client := redis.NewClient(opts) + if err := redisotel.InstrumentTracing( + client, + redisotel.WithTracerProvider(tp), + redisotel.WithDBStatement(false), + ); err != nil { + return nil, fmt.Errorf("instrument redis client tracing: %v", err) + } bucketConfig := &bucket.Config{ Endpoint: config.Endpoint, diff --git a/internal/watcher/minio_test.go b/internal/watcher/minio_test.go index e54b8f54..a3867b85 100644 --- a/internal/watcher/minio_test.go +++ b/internal/watcher/minio_test.go @@ -10,6 +10,7 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/go-logr/logr" + "go.opentelemetry.io/otel/trace/noop" "gotest.tools/v3/assert" "gotest.tools/v3/fs" "gotest.tools/v3/poll" @@ -49,7 +50,7 @@ func newWatcher(t *testing.T, updateCfg func(c *watcher.MinioConfig)) (*miniredi updateCfg(config) } - w, err := watcher.NewMinioWatcher(context.Background(), logr.Discard(), config) + w, err := watcher.NewMinioWatcher(context.Background(), noop.NewTracerProvider(), logr.Discard(), config) if err != nil { t.Fatal(err) } @@ -69,8 +70,6 @@ func TestWatcherReturnsErrWhenNoMessages(t *testing.T) { }) defer cleanup(t, m) - // TODO: slow test, should inject smaller timeout. - check := func(t poll.LogT) poll.Result { _, _, err := w.Watch(context.Background()) @@ -85,7 +84,7 @@ func TestWatcherReturnsErrWhenNoMessages(t *testing.T) { return poll.Success() } - poll.WaitOn(t, check, poll.WithTimeout(time.Second*3)) + poll.WaitOn(t, check, poll.WithTimeout(time.Second*2)) } func TestWatcherReturnsErrOnInvalidMessages(t *testing.T) { diff --git a/internal/watcher/watcher.go b/internal/watcher/watcher.go index 3f59d6b6..203e7806 100644 --- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -9,6 +9,7 @@ import ( "time" "github.com/go-logr/logr" + "go.opentelemetry.io/otel/trace" "gocloud.dev/blob" ) @@ -94,12 +95,12 @@ type serviceImpl struct { var _ Service = (*serviceImpl)(nil) -func New(ctx context.Context, logger logr.Logger, c *Config) (*serviceImpl, error) { +func New(ctx context.Context, tp trace.TracerProvider, logger logr.Logger, c *Config) (*serviceImpl, error) { watchers := map[string]Watcher{} minioConfigs := append(c.Minio, c.Embedded) for _, item := range minioConfigs { - w, err := NewMinioWatcher(ctx, logger, item) + w, err := NewMinioWatcher(ctx, tp, logger, item) if err != nil { return nil, err } diff --git a/main.go b/main.go index ed7a1cbf..18d806e7 100644 --- a/main.go +++ b/main.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/pflag" "go.artefactual.dev/tools/log" + "go.opentelemetry.io/otel/codes" temporalsdk_activity "go.temporal.io/sdk/activity" temporalsdk_client "go.temporal.io/sdk/client" temporalsdk_contrib_opentelemetry "go.temporal.io/sdk/contrib/opentelemetry" @@ -156,7 +157,7 @@ func main() { } // Set up the event service. - evsvc, err := event.NewEventServiceRedis(logger.WithName("events"), &cfg.Event) + evsvc, err := event.NewEventServiceRedis(logger.WithName("events"), tp, &cfg.Event) if err != nil { logger.Error(err, "Error creating Event service.") os.Exit(1) @@ -183,7 +184,7 @@ func main() { if cfg.API.Auth.Enabled { if cfg.API.Auth.Ticket.Redis != nil { var err error - store, err = auth.NewRedisStore(ctx, cfg.API.Auth.Ticket.Redis) + store, err = auth.NewRedisStore(ctx, tp, cfg.API.Auth.Ticket.Redis) if err != nil { logger.Error(err, "Error creating ticket provider redis store.") os.Exit(1) @@ -240,7 +241,7 @@ func main() { // Set up the watcher service. var wsvc watcher.Service { - wsvc, err = watcher.New(ctx, logger.WithName("watcher"), &cfg.Watcher) + wsvc, err = watcher.New(ctx, tp, logger.WithName("watcher"), &cfg.Watcher) if err != nil { logger.Error(err, "Error setting up watchers.") os.Exit(1) @@ -270,7 +271,6 @@ func main() { { for _, w := range wsvc.Watchers() { done := make(chan struct{}) - cur := w g.Add( func() error { for { @@ -278,15 +278,20 @@ func main() { case <-done: return nil default: - event, clean, err := cur.Watch(ctx) + ctx, span := tp.Tracer("enduro").Start(ctx, "watcher.poll") + event, clean, err := w.Watch(ctx) if err != nil { if !errors.Is(err, watcher.ErrWatchTimeout) { - logger.Error(err, "Error monitoring watcher interface.", "watcher", cur) + logger.Error(err, "Error monitoring watcher interface.", "watcher", w) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) } + span.End() continue } logger.V(1).Info("Starting new workflow", "watcher", event.WatcherName, "bucket", event.Bucket, "key", event.Key, "dir", event.IsDir) go func() { + defer span.End() req := package_.ProcessingWorkflowRequest{ WatcherName: event.WatcherName, RetentionPeriod: event.RetentionPeriod, @@ -303,8 +308,13 @@ func main() { } if err := package_.InitProcessingWorkflow(ctx, temporalClient, &req); err != nil { logger.Error(err, "Error initializing processing workflow.") + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) } else { - _ = clean(ctx) + if err := clean(ctx); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } } }() }