Skip to content

Commit

Permalink
[Fix] Sinker state checker for otel (#1958)
Browse files Browse the repository at this point in the history
* feat(sinker): verify if sink uses opentelemetry before remove from cache

* feat(sinker): add log

* fix(sinker): edit cache instead of removing sink

* test

* remove typo

* remove

* remove strconv

* back to right

Co-authored-by: mclcavalcante <mariana.cavalcante@encora.com>
  • Loading branch information
etaques and mclcavalcante authored Nov 2, 2022
1 parent 639f544 commit 02cb787
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
4 changes: 0 additions & 4 deletions maestro/config/config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"github.com/ns1labs/orb/pkg/errors"
"gopkg.in/yaml.v2"
"strconv"
"strings"
)

Expand Down Expand Up @@ -180,9 +179,6 @@ func GetDeploymentJson(sinkId, sinkUrl, sinkUsername, sinkPassword string) (stri

// ReturnConfigYamlFromSink this is the main method, which will generate the YAML file from the
func ReturnConfigYamlFromSink(_ context.Context, kafkaUrlConfig, sinkId, sinkUrl, sinkUsername, sinkPassword string) (string, error) {
if _, err := strconv.Atoi(sinkUsername); err != nil {
sinkUsername = "#$#" + sinkUsername
}
config := OtelConfigFile{
Receivers: Receivers{
Kafka: KafkaReceiver{
Expand Down
23 changes: 15 additions & 8 deletions sinker/config_state_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,21 @@ func (svc *SinkerService) checkState(_ time.Time) {
if cfg.LastRemoteWrite.Add(DefaultTimeout).Before(time.Now()) {
svc.logger.Info("opentelemetry:", zap.String("otel", cfg.Opentelemetry))
if cfg.State == config.Active {
err := cfg.State.SetFromString("idle")
if err != nil {
svc.logger.Error("error updating sink config cache", zap.Error(err))
return
}
if err := svc.sinkerCache.Edit(cfg); err != nil {
svc.logger.Error("error updating sink config cache", zap.Error(err))
return
if cfg.Opentelemetry == "enabled" {
err := cfg.State.SetFromString("idle")
if err != nil {
svc.logger.Error("error updating sink state otel", zap.Error(err))
return
}
if err := svc.sinkerCache.Edit(cfg); err != nil {
svc.logger.Error("error updating sink config cache otel", zap.Error(err))
return
}
} else {
if err := svc.sinkerCache.Remove(cfg.OwnerID, cfg.SinkID); err != nil {
svc.logger.Error("error updating sink config cache", zap.Error(err))
return
}
}
}
}
Expand Down

0 comments on commit 02cb787

Please sign in to comment.