Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(redis): fixing how maestro and sinker handles errors in reading and handling event errors #2206

Merged
merged 4 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions maestro/redis/consumer/hashset.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func decodeSinksEvent(event map[string]interface{}, operation string) (redis.Sin
val := redis.SinksUpdateEvent{
SinkID: read(event, "sink_id", ""),
Owner: read(event, "owner", ""),
Config: readMetadata(event, "config"),
Timestamp: time.Now(),
}
if operation != sinksDelete {
Expand Down
131 changes: 77 additions & 54 deletions maestro/redis/consumer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package consumer
import (
"context"
"github.com/ns1labs/orb/maestro/config"
"github.com/ns1labs/orb/pkg/errors"
"time"

"github.com/ns1labs/orb/maestro/kubecontrol"
redis2 "github.com/ns1labs/orb/maestro/redis"
maestroredis "github.com/ns1labs/orb/maestro/redis"
"github.com/ns1labs/orb/pkg/types"
sinkspb "github.com/ns1labs/orb/sinks/pb"

Expand Down Expand Up @@ -40,7 +41,8 @@ type Subscriber interface {
GetActivity(sinkID string) (int64, error)
RemoveSinkActivity(ctx context.Context, sinkId string) error

Subscribe(context context.Context) error
SubscribeSinksEvents(context context.Context) error
SubscribeSinkerEvents(context context.Context) error
}

type eventStore struct {
Expand All @@ -65,14 +67,49 @@ func NewEventStore(streamRedisClient, sinkerKeyRedisClient *redis.Client, kafkaU
}
}

// Subscribe to listen events from sinker to maestro
func (es eventStore) Subscribe(context context.Context) error {
//listening sinker events
err := es.streamRedisClient.XGroupCreateMkStream(context, streamSinks, groupMaestro, "$").Err()
// SubscribeSinkerEvents Subscribe to listen events from sinker to maestro
func (es eventStore) SubscribeSinkerEvents(ctx context.Context) error {
err := es.streamRedisClient.XGroupCreateMkStream(ctx, streamSinker, groupMaestro, "$").Err()
if err != nil && err.Error() != exists {
return err
}
err = es.streamRedisClient.XGroupCreateMkStream(context, streamSinker, groupMaestro, "$").Err()

for {
streams, err := es.streamRedisClient.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupMaestro,
Consumer: "orb_maestro-es-consumer",
Streams: []string{streamSinker, ">"},
Count: 100,
}).Result()
es.logger.Info("subscribed to stream", zap.String("stream", streams[0].Stream))
if err != nil || len(streams) == 0 {
continue
}
for _, msg := range streams[0].Messages {
event := msg.Values
rte := decodeSinkerStateUpdate(event)
es.logger.Info("received message in sinker event bus", zap.Any("operation", event["operation"]))
switch event["operation"] {
case sinkerUpdate:
if rte.State == "active" {
err = es.handleSinkerCreateCollector(ctx, rte) //sinker request create collector
}
es.streamRedisClient.XAck(ctx, streamSinker, groupMaestro, msg.ID)
case <-ctx.Done():
return errors.New("stopped listening to sinks, due to context cancellation")
}
if err != nil {
es.logger.Error("Failed to handle sinks event", zap.Any("operation", event["operation"]), zap.Error(err))
continue
}
}
}
}

// SubscribeSinksEvents Subscribe to listen events from sinks to maestro
func (es eventStore) SubscribeSinksEvents(context context.Context) error {
//listening sinker events
err := es.streamRedisClient.XGroupCreateMkStream(context, streamSinks, groupMaestro, "$").Err()
if err != nil && err.Error() != exists {
return err
}
Expand All @@ -81,61 +118,47 @@ func (es eventStore) Subscribe(context context.Context) error {
streams, err := es.streamRedisClient.XReadGroup(context, &redis.XReadGroupArgs{
Group: groupMaestro,
Consumer: "orb_maestro-es-consumer",
Streams: []string{streamSinks, streamSinker, ">"},
Streams: []string{streamSinks, ">"},
Count: 100,
}).Result()
es.logger.Info("subscribed to stream", zap.String("stream", streams[0].Stream))
if err != nil || len(streams) == 0 {
continue
}
for _, stream := range streams {
for _, msg := range stream.Messages {
event := msg.Values
switch event["operation"] {
case sinkerUpdate:
rte := decodeSinkerStateUpdate(event)
if rte.State == "active" {
err = es.handleSinkerCreateCollector(context, rte) //sinker request create collector
}
es.streamRedisClient.XAck(context, streamSinker, groupMaestro, msg.ID)
case sinksCreate:
rte, err := decodeSinksEvent(event, event["operation"].(string))
if err != nil {
es.logger.Error("error decoding sinks event", zap.Any("operation", event["operation"]), zap.Any("sink_event", event), zap.Error(err))
break
}
if v, ok := rte.Config["opentelemetry"]; ok && v.(string) == "enabled" {
err = es.handleSinksCreateCollector(context, rte) //should create collector
}
es.streamRedisClient.XAck(context, streamSinks, groupMaestro, msg.ID)
case sinksUpdate:
rte, err := decodeSinksEvent(event, event["operation"].(string))
if err != nil {
es.logger.Error("error decoding sinks event", zap.Any("operation", event["operation"]), zap.Any("sink_event", event), zap.Error(err))
break
}
err = es.handleSinksUpdateCollector(context, rte) //should create collector
es.streamRedisClient.XAck(context, streamSinks, groupMaestro, msg.ID)
case sinksDelete:
rte, err := decodeSinksEvent(event, event["operation"].(string))
if err != nil {
es.logger.Error("error decoding sinks event", zap.Any("operation", event["operation"]), zap.Any("sink_event", event), zap.Error(err))
break
}
err = es.handleSinksDeleteCollector(context, rte) //should delete collector
es.streamRedisClient.XAck(context, streamSinks, groupMaestro, msg.ID)
}
if err != nil {
es.logger.Error("Failed to handle sinks event", zap.Any("operation", event["operation"]), zap.Error(err))
break
for _, msg := range streams[0].Messages {
event := msg.Values
rte, err := decodeSinksEvent(event, event["operation"].(string))
if err != nil {
es.logger.Error("Failed to handle sinks event", zap.Any("operation", event["operation"]), zap.Error(err))
break
}
es.logger.Info("received message in sinks event bus", zap.Any("operation", event["operation"]))
switch event["operation"] {
case sinksCreate:
if v, ok := rte.Config["opentelemetry"]; ok && v.(string) == "enabled" {
err = es.handleSinksCreateCollector(context, rte) //should create collector
}
es.streamRedisClient.XAck(context, streamSinks, groupMaestro, msg.ID)
case sinksUpdate:
err = es.handleSinksUpdateCollector(context, rte) //should create collector
es.streamRedisClient.XAck(context, streamSinks, groupMaestro, msg.ID)
case sinksDelete:
err = es.handleSinksDeleteCollector(context, rte) //should delete collector
es.streamRedisClient.XAck(context, streamSinks, groupMaestro, msg.ID)
case <-context.Done():
return errors.New("stopped listening to sinks, due to context cancellation")
}
if err != nil {
es.logger.Error("Failed to handle sinks event", zap.Any("operation", event["operation"]), zap.Error(err))
continue
}
}
}
}

// handleSinkerDeleteCollector Delete collector
func (es eventStore) handleSinkerDeleteCollector(ctx context.Context, event redis2.SinkerUpdateEvent) error {
es.logger.Info("Received maestro DELETE event from sinker, sink state", zap.String("state", event.State), zap.String("sinkdID", event.SinkID), zap.String("ownerID", event.Owner))
func (es eventStore) handleSinkerDeleteCollector(ctx context.Context, event maestroredis.SinkerUpdateEvent) error {
es.logger.Info("Received maestro DELETE event from sinker, sink state", zap.String("state", event.State), zap.String("sinkID", event.SinkID), zap.String("ownerID", event.Owner))
_, err := es.GetDeploymentEntryFromSinkId(ctx, event.SinkID)
if err != nil {
return err
Expand All @@ -148,8 +171,8 @@ func (es eventStore) handleSinkerDeleteCollector(ctx context.Context, event redi
}

// handleSinkerCreateCollector Create collector
func (es eventStore) handleSinkerCreateCollector(ctx context.Context, event redis2.SinkerUpdateEvent) error {
es.logger.Info("Received maestro CREATE event from sinker, sink state", zap.String("state", event.State), zap.String("sinkdID", event.SinkID), zap.String("ownerID", event.Owner))
func (es eventStore) handleSinkerCreateCollector(ctx context.Context, event maestroredis.SinkerUpdateEvent) error {
es.logger.Info("Received maestro CREATE event from sinker, sink state", zap.String("state", event.State), zap.String("sinkID", event.SinkID), zap.String("ownerID", event.Owner))
deploymentEntry, err := es.GetDeploymentEntryFromSinkId(ctx, event.SinkID)
if err != nil {
es.logger.Error("could not find deployment entry from sink-id", zap.String("sinkID", event.SinkID), zap.Error(err))
Expand All @@ -163,8 +186,8 @@ func (es eventStore) handleSinkerCreateCollector(ctx context.Context, event redi
return nil
}

func decodeSinkerStateUpdate(event map[string]interface{}) redis2.SinkerUpdateEvent {
val := redis2.SinkerUpdateEvent{
func decodeSinkerStateUpdate(event map[string]interface{}) maestroredis.SinkerUpdateEvent {
val := maestroredis.SinkerUpdateEvent{
Owner: read(event, "owner", ""),
SinkID: read(event, "sink_id", ""),
State: read(event, "state", ""),
Expand Down
19 changes: 15 additions & 4 deletions maestro/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ func (svc *maestroService) Start(ctx context.Context, cancelFunction context.Can
}
}

go svc.subscribeToEventStore(ctx)
go svc.subscribeToSinksEvents(ctx)
go svc.subscribeToSinkerEvents(ctx)

monitorCtx := context.WithValue(ctx, "routine", "monitor")
err = svc.monitor.Start(monitorCtx, cancelFunction)
Expand All @@ -140,10 +141,20 @@ func (svc *maestroService) Start(ctx context.Context, cancelFunction context.Can
return nil
}

func (svc *maestroService) subscribeToEventStore(ctx context.Context) {
if err := svc.eventStore.Subscribe(ctx); err != nil {
func (svc *maestroService) subscribeToSinksEvents(ctx context.Context) {
if err := svc.eventStore.SubscribeSinksEvents(ctx); err != nil {
svc.logger.Error("Bootstrap service failed to subscribe to event sourcing", zap.Error(err))
return
}
svc.logger.Info("Subscribed to Redis Event Store")
svc.logger.Info("finished reading sinks events")
ctx.Done()
}

func (svc *maestroService) subscribeToSinkerEvents(ctx context.Context) {
if err := svc.eventStore.SubscribeSinksEvents(ctx); err != nil {
svc.logger.Error("Bootstrap service failed to subscribe to event sourcing", zap.Error(err))
return
}
svc.logger.Info("finished reading sinker events")
ctx.Done()
}
2 changes: 1 addition & 1 deletion sinker/redis/consumer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (es eventStore) Subscribe(context context.Context) error {
}
if err != nil {
es.logger.Error("Failed to handle event", zap.String("operation", event["operation"].(string)), zap.Error(err))
break
continue
}
es.client.XAck(context, stream, group, msg.ID)
}
Expand Down