diff --git a/maestro/service.go b/maestro/service.go index 9f9be798a..8ee979f3a 100644 --- a/maestro/service.go +++ b/maestro/service.go @@ -11,6 +11,7 @@ package maestro import ( "context" "encoding/json" + "fmt" "strings" "github.com/go-redis/redis/v8" @@ -93,6 +94,11 @@ func (svc *maestroService) Start(ctx context.Context, cancelFunction context.Can svc.logger.Warn("failed to create deploymentEntry for sink, skipping", zap.String("sink-id", sinkRes.Id)) continue } + err = svc.updateSinkCache(ctx, data) + if err != nil { + svc.logger.Warn("failed to update cache for sink", zap.String("sink-id", sinkRes.Id)) + continue + } svc.logger.Info("successfully created deploymentEntry for sink", zap.String("sink-id", sinkRes.Id), zap.String("state", sinkRes.State)) } @@ -135,6 +141,23 @@ func (svc *maestroService) Start(ctx context.Context, cancelFunction context.Can return nil } +func (svc *maestroService) updateSinkCache(ctx context.Context, data maestroconfig.SinkData) (err error) { + data.State = maestroconfig.Unknown + keyPrefix := "sinker_key" + skey := fmt.Sprintf("%s-%s:%s", keyPrefix, data.OwnerID, data.SinkID) + bytes, err := json.Marshal(data) + if err != nil { + return err + } + if err = svc.sinkerRedisClient.Set(ctx, skey, bytes, 0).Err(); err != nil { + return err + } + if err != nil { + return err + } + return +} + func (svc *maestroService) subscribeToSinkerES(ctx context.Context) { svc.logger.Info("Subscribed to Redis Event Store for sinker") if err := svc.eventStore.SubscribeSinker(ctx); err != nil {