Skip to content

Commit

Permalink
fix(redis): add debug logs
Browse files Browse the repository at this point in the history
Signed-off-by: Luiz Pegoraro <luiz.pegoraro@encora.com>
  • Loading branch information
lpegoraro committed Feb 12, 2023
1 parent 0ba14f9 commit 44f4b98
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 40 deletions.
77 changes: 39 additions & 38 deletions maestro/redis/consumer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,54 +81,55 @@ func (es eventStore) Subscribe(context context.Context) error {
streams, err := es.streamRedisClient.XReadGroup(context, &redis.XReadGroupArgs{
Group: groupMaestro,
Consumer: "orb_maestro-es-consumer",
Streams: []string{streamSinks, streamSinker, ">"},
Streams: []string{streamSinks, streamSinker},
Count: 100,
}).Result()
if err != nil || len(streams) == 0 {
continue
}

for _, msg := range streams[0].Messages {
event := msg.Values
switch event["operation"] {
case sinkerUpdate:
rte := decodeSinkerStateUpdate(event)
if rte.State == "active" {
err = es.handleSinkerCreateCollector(context, rte) //sinker request create collector
}
es.streamRedisClient.XAck(context, streamSinker, groupMaestro, msg.ID)
case sinksCreate:
rte, err := decodeSinksEvent(event, event["operation"].(string))
if err != nil {
es.logger.Error("error decoding sinks event", zap.Any("operation", event["operation"]), zap.Any("sink_event", event), zap.Error(err))
break
}
if v, ok := rte.Config["opentelemetry"]; ok && v.(string) == "enabled" {
err = es.handleSinksCreateCollector(context, rte) //should create collector
for _, stream := range streams {
for _, msg := range stream.Messages {
event := msg.Values
switch event["operation"] {
case sinkerUpdate:
rte := decodeSinkerStateUpdate(event)
if rte.State == "active" {
err = es.handleSinkerCreateCollector(context, rte) //sinker request create collector
}
es.streamRedisClient.XAck(context, streamSinker, groupMaestro, msg.ID)
case sinksCreate:
rte, err := decodeSinksEvent(event, event["operation"].(string))
if err != nil {
es.logger.Error("error decoding sinks event", zap.Any("operation", event["operation"]), zap.Any("sink_event", event), zap.Error(err))
break
}
if v, ok := rte.Config["opentelemetry"]; ok && v.(string) == "enabled" {
err = es.handleSinksCreateCollector(context, rte) //should create collector
}
es.streamRedisClient.XAck(context, streamSinks, groupMaestro, msg.ID)
case sinksUpdate:
rte, err := decodeSinksEvent(event, event["operation"].(string))
if err != nil {
es.logger.Error("error decoding sinks event", zap.Any("operation", event["operation"]), zap.Any("sink_event", event), zap.Error(err))
break
}
err = es.handleSinksUpdateCollector(context, rte) //should create collector
es.streamRedisClient.XAck(context, streamSinks, groupMaestro, msg.ID)
case sinksDelete:
rte, err := decodeSinksEvent(event, event["operation"].(string))
if err != nil {
es.logger.Error("error decoding sinks event", zap.Any("operation", event["operation"]), zap.Any("sink_event", event), zap.Error(err))
break
}
err = es.handleSinksDeleteCollector(context, rte) //should delete collector
es.streamRedisClient.XAck(context, streamSinks, groupMaestro, msg.ID)
}
es.streamRedisClient.XAck(context, streamSinks, groupMaestro, msg.ID)
case sinksUpdate:
rte, err := decodeSinksEvent(event, event["operation"].(string))
if err != nil {
es.logger.Error("error decoding sinks event", zap.Any("operation", event["operation"]), zap.Any("sink_event", event), zap.Error(err))
es.logger.Error("Failed to handle sinks event", zap.Any("operation", event["operation"]), zap.Error(err))
break
}
err = es.handleSinksUpdateCollector(context, rte) //should create collector
es.streamRedisClient.XAck(context, streamSinks, groupMaestro, msg.ID)
case sinksDelete:
rte, err := decodeSinksEvent(event, event["operation"].(string))
if err != nil {
es.logger.Error("error decoding sinks event", zap.Any("operation", event["operation"]), zap.Any("sink_event", event), zap.Error(err))
break
}
err = es.handleSinksDeleteCollector(context, rte) //should delete collector
es.streamRedisClient.XAck(context, streamSinks, groupMaestro, msg.ID)
}
if err != nil {
es.logger.Error("Failed to handle sinks event", zap.Any("operation", event["operation"]), zap.Error(err))
break
}

}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions sinker/redis/producer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (e eventStore) Add(config config.SinkConfig) error {
if err != nil {
e.logger.Error("error sending event to event store", zap.Error(err))
}
e.logger.Info("DEBUG sinker.update event with information", zap.Any("sinkID", event.SinkID))
return nil
}

Expand All @@ -95,6 +96,7 @@ func (e eventStore) Remove(ownerID string, sinkID string) error {
if err != nil {
e.logger.Error("error sending event to event store", zap.Error(err))
}
e.logger.Info("DEBUG sinker.update event with information", zap.Any("sinkID", event.SinkID))
return nil
}

Expand Down Expand Up @@ -123,6 +125,7 @@ func (e eventStore) Edit(config config.SinkConfig) error {
if err != nil {
e.logger.Error("error sending event to event store", zap.Error(err))
}
e.logger.Info("DEBUG sinker.update event with information", zap.Any("sinkID", event.SinkID))
return nil
}

Expand Down
5 changes: 3 additions & 2 deletions sinks/redis/producer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (es eventStore) CreateSink(ctx context.Context, token string, s sinks.Sink)
if err != nil {
es.logger.Error("error sending event to sinker event store", zap.Error(err))
}

es.logger.Info("DEBUG sinks.create event with information", zap.Any("sinkID", event.sinkID))
}()

return es.svc.CreateSink(ctx, token, s)
Expand Down Expand Up @@ -95,8 +95,8 @@ func (es eventStore) UpdateSink(ctx context.Context, token string, s sinks.Sink)
if err != nil {
es.logger.Error("error sending event to sinker event store", zap.Error(err))
}
es.logger.Info("DEBUG sinks.update event with information", zap.Any("sinkID", event.sinkID))
}()

return es.svc.UpdateSink(ctx, token, s)
}

Expand Down Expand Up @@ -147,6 +147,7 @@ func (es eventStore) DeleteSink(ctx context.Context, token, id string) (err erro
es.logger.Error("error sending event to sinker event store", zap.Error(err))
return err
}
es.logger.Info("DEBUG sinks.remove event with information", zap.Any("sinkID", event.sinkID))
return nil
}

Expand Down

0 comments on commit 44f4b98

Please sign in to comment.