From c8946abc3b83795dd17e4b54009e42ccd2eaacbb Mon Sep 17 00:00:00 2001 From: mclcavalcante Date: Wed, 2 Nov 2022 14:13:36 -0300 Subject: [PATCH 1/8] feat(sinker): verify if sink uses opentelemetry before remove from cache --- sinker/config/types.go | 1 + sinker/config_state_check.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sinker/config/types.go b/sinker/config/types.go index e253ad179..7522a46a2 100644 --- a/sinker/config/types.go +++ b/sinker/config/types.go @@ -15,6 +15,7 @@ type SinkConfig struct { Url string `json:"remote_host"` User string `json:"username"` Password string `json:"password"` + Opentelemetry string `json:"opentelemetry"` State PrometheusState `json:"state,omitempty"` Msg string `json:"msg,omitempty"` LastRemoteWrite time.Time `json:"last_remote_write,omitempty"` diff --git a/sinker/config_state_check.go b/sinker/config_state_check.go index 666802af8..c32c2ee62 100644 --- a/sinker/config_state_check.go +++ b/sinker/config_state_check.go @@ -33,7 +33,7 @@ func (svc *SinkerService) checkState(_ time.Time) { for _, cfg := range configs { // Set idle if the sinker is more than 30 minutes not sending metrics (Remove from Redis) if cfg.LastRemoteWrite.Add(DefaultTimeout).Before(time.Now()) { - if cfg.State == config.Active { + if cfg.State == config.Active && cfg.Opentelemetry != "enabled" { if err := svc.sinkerCache.Remove(cfg.OwnerID, cfg.SinkID); err != nil { svc.logger.Error("error updating sink config cache", zap.Error(err)) return From 47d1cca423d6ef781df0c655093ffa0a589588ae Mon Sep 17 00:00:00 2001 From: mclcavalcante Date: Wed, 2 Nov 2022 14:16:56 -0300 Subject: [PATCH 2/8] feat(sinker): add log --- sinker/config_state_check.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sinker/config_state_check.go b/sinker/config_state_check.go index c32c2ee62..527f13a66 100644 --- a/sinker/config_state_check.go +++ b/sinker/config_state_check.go @@ -33,6 +33,7 @@ func (svc *SinkerService) checkState(_ time.Time) { for _, cfg := range configs { // Set idle if the sinker is more than 30 minutes not sending metrics (Remove from Redis) if cfg.LastRemoteWrite.Add(DefaultTimeout).Before(time.Now()) { + svc.logger.Info("opentelemetry:", zap.String("otel", cfg.Opentelemetry)) if cfg.State == config.Active && cfg.Opentelemetry != "enabled" { if err := svc.sinkerCache.Remove(cfg.OwnerID, cfg.SinkID); err != nil { svc.logger.Error("error updating sink config cache", zap.Error(err)) From e1eb37e9826100acbb31e0db7610286e405f0aa7 Mon Sep 17 00:00:00 2001 From: mclcavalcante Date: Wed, 2 Nov 2022 16:09:30 -0300 Subject: [PATCH 3/8] fix(sinker): edit cache instead of removing sink --- sinker/config_state_check.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sinker/config_state_check.go b/sinker/config_state_check.go index 527f13a66..97fe44133 100644 --- a/sinker/config_state_check.go +++ b/sinker/config_state_check.go @@ -33,9 +33,9 @@ func (svc *SinkerService) checkState(_ time.Time) { for _, cfg := range configs { // Set idle if the sinker is more than 30 minutes not sending metrics (Remove from Redis) if cfg.LastRemoteWrite.Add(DefaultTimeout).Before(time.Now()) { - svc.logger.Info("opentelemetry:", zap.String("otel", cfg.Opentelemetry)) - if cfg.State == config.Active && cfg.Opentelemetry != "enabled" { - if err := svc.sinkerCache.Remove(cfg.OwnerID, cfg.SinkID); err != nil { + if cfg.State == config.Active { + cfg.State = config.Idle + if err := svc.sinkerCache.Edit(cfg); err != nil { svc.logger.Error("error updating sink config cache", zap.Error(err)) return } From ee95fb1d8d83ad9dd0da2be0efa1414137925bad Mon Sep 17 00:00:00 2001 From: etaques <97463920+etaques@users.noreply.github.com> Date: Wed, 2 Nov 2022 18:24:33 -0300 Subject: [PATCH 4/8] test --- sinker/config_state_check.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/sinker/config_state_check.go b/sinker/config_state_check.go index 97fe44133..ed435a03b 100644 --- a/sinker/config_state_check.go +++ b/sinker/config_state_check.go @@ -34,10 +34,17 @@ func (svc *SinkerService) checkState(_ time.Time) { // Set idle if the sinker is more than 30 minutes not sending metrics (Remove from Redis) if cfg.LastRemoteWrite.Add(DefaultTimeout).Before(time.Now()) { if cfg.State == config.Active { - cfg.State = config.Idle - if err := svc.sinkerCache.Edit(cfg); err != nil { - svc.logger.Error("error updating sink config cache", zap.Error(err)) - return + if cfg.Opentelemetry == "enabled" { + cfg.State = config.Idle + if err := svc.sinkerCache.Edit(cfg); err != nil { + svc.logger.Error("error updating sink config cache for otel", zap.Error(err)) + return + } + } else { + if err := svc.sinkerCache.Remove(cfg.OwnerID, cfg.SinkID); err != nil { + svc.logger.Error("error updating sink config cache", zap.Error(err)) + return + } } } } From beb97f54d16beee5b4300a1bb7de75eb113a3abe Mon Sep 17 00:00:00 2001 From: etaques <97463920+etaques@users.noreply.github.com> Date: Wed, 2 Nov 2022 19:18:44 -0300 Subject: [PATCH 5/8] remove typo --- maestro/config/config_builder.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/maestro/config/config_builder.go b/maestro/config/config_builder.go index 3262b3aa5..aa216428c 100644 --- a/maestro/config/config_builder.go +++ b/maestro/config/config_builder.go @@ -105,7 +105,9 @@ var k8sOtelCollector = ` { "name": "data", "readOnly": true, - "mountPath": "/etc/otelcol-contrib/config.yaml", + if _, err := strconv.Atoi(sinkUsername); err != nil { + sinkUsername = "#$#" + sinkUsername + } "mountPath": "/etc/otelcol-contrib/config.yaml", "subPath": "config.yaml" } ], @@ -180,9 +182,6 @@ func GetDeploymentJson(sinkId, sinkUrl, sinkUsername, sinkPassword string) (stri // ReturnConfigYamlFromSink this is the main method, which will generate the YAML file from the func ReturnConfigYamlFromSink(_ context.Context, kafkaUrlConfig, sinkId, sinkUrl, sinkUsername, sinkPassword string) (string, error) { - if _, err := strconv.Atoi(sinkUsername); err != nil { - sinkUsername = "#$#" + sinkUsername - } config := OtelConfigFile{ Receivers: Receivers{ Kafka: KafkaReceiver{ From c2e92b2b9f9ce97d1cacc5a6ed856fdac3befaa1 Mon Sep 17 00:00:00 2001 From: etaques <97463920+etaques@users.noreply.github.com> Date: Wed, 2 Nov 2022 19:20:30 -0300 Subject: [PATCH 6/8] remove --- maestro/config/config_builder.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/maestro/config/config_builder.go b/maestro/config/config_builder.go index aa216428c..54a7aa8c8 100644 --- a/maestro/config/config_builder.go +++ b/maestro/config/config_builder.go @@ -105,9 +105,7 @@ var k8sOtelCollector = ` { "name": "data", "readOnly": true, - if _, err := strconv.Atoi(sinkUsername); err != nil { - sinkUsername = "#$#" + sinkUsername - } "mountPath": "/etc/otelcol-contrib/config.yaml", + "mountPath": "/etc/otelcol-contrib/config.yaml", "subPath": "config.yaml" } ], From 1efd3c4a36891627db591e1063e2965246b9045a Mon Sep 17 00:00:00 2001 From: etaques <97463920+etaques@users.noreply.github.com> Date: Wed, 2 Nov 2022 19:27:09 -0300 Subject: [PATCH 7/8] remove strconv --- maestro/config/config_builder.go | 1 - 1 file changed, 1 deletion(-) diff --git a/maestro/config/config_builder.go b/maestro/config/config_builder.go index 54a7aa8c8..57587ad48 100644 --- a/maestro/config/config_builder.go +++ b/maestro/config/config_builder.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/ns1labs/orb/pkg/errors" "gopkg.in/yaml.v2" - "strconv" "strings" ) From a582cc2706b0aa7575335d39d7d81dbcb9b4d8cd Mon Sep 17 00:00:00 2001 From: etaques <97463920+etaques@users.noreply.github.com> Date: Wed, 2 Nov 2022 19:43:03 -0300 Subject: [PATCH 8/8] back to right --- sinker/config_state_check.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sinker/config_state_check.go b/sinker/config_state_check.go index ca7bf5c7b..bd7679a8c 100644 --- a/sinker/config_state_check.go +++ b/sinker/config_state_check.go @@ -36,9 +36,13 @@ func (svc *SinkerService) checkState(_ time.Time) { svc.logger.Info("opentelemetry:", zap.String("otel", cfg.Opentelemetry)) if cfg.State == config.Active { if cfg.Opentelemetry == "enabled" { - cfg.State = config.Idle + err := cfg.State.SetFromString("idle") + if err != nil { + svc.logger.Error("error updating sink state otel", zap.Error(err)) + return + } if err := svc.sinkerCache.Edit(cfg); err != nil { - svc.logger.Error("error updating sink config cache for otel", zap.Error(err)) + svc.logger.Error("error updating sink config cache otel", zap.Error(err)) return } } else {