Skip to content

Commit

Permalink
feat(maestro): update cache with creation of yaml. (#2169)
Browse files Browse the repository at this point in the history
Signed-off-by: Luiz Pegoraro <luiz.pegoraro@encora.com>
  • Loading branch information
lpegoraro authored Feb 1, 2023
1 parent 4ad0020 commit 51688e9
Showing 1 changed file with 23 additions and 0 deletions.
23 changes: 23 additions & 0 deletions maestro/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package maestro
import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -93,6 +94,11 @@ func (svc *maestroService) Start(ctx context.Context, cancelFunction context.Can
svc.logger.Warn("failed to create deploymentEntry for sink, skipping", zap.String("sink-id", sinkRes.Id))
continue
}
err = svc.updateSinkCache(ctx, data)
if err != nil {
svc.logger.Warn("failed to update cache for sink", zap.String("sink-id", sinkRes.Id))
continue
}
svc.logger.Info("successfully created deploymentEntry for sink", zap.String("sink-id", sinkRes.Id), zap.String("state", sinkRes.State))
}

Expand Down Expand Up @@ -135,6 +141,23 @@ func (svc *maestroService) Start(ctx context.Context, cancelFunction context.Can
return nil
}

func (svc *maestroService) updateSinkCache(ctx context.Context, data maestroconfig.SinkData) (err error) {
data.State = maestroconfig.Unknown
keyPrefix := "sinker_key"
skey := fmt.Sprintf("%s-%s:%s", keyPrefix, data.OwnerID, data.SinkID)
bytes, err := json.Marshal(data)
if err != nil {
return err
}
if err = svc.sinkerRedisClient.Set(ctx, skey, bytes, 0).Err(); err != nil {
return err
}
if err != nil {
return err
}
return
}

func (svc *maestroService) subscribeToSinkerES(ctx context.Context) {
svc.logger.Info("Subscribed to Redis Event Store for sinker")
if err := svc.eventStore.SubscribeSinker(ctx); err != nil {
Expand Down

0 comments on commit 51688e9

Please sign in to comment.