diff --git a/maestro/config/config_builder.go b/maestro/config/config_builder.go index 6048f7b07..3262b3aa5 100644 --- a/maestro/config/config_builder.go +++ b/maestro/config/config_builder.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/ns1labs/orb/pkg/errors" "gopkg.in/yaml.v2" + "strconv" "strings" ) @@ -179,7 +180,9 @@ func GetDeploymentJson(sinkId, sinkUrl, sinkUsername, sinkPassword string) (stri // ReturnConfigYamlFromSink this is the main method, which will generate the YAML file from the func ReturnConfigYamlFromSink(_ context.Context, kafkaUrlConfig, sinkId, sinkUrl, sinkUsername, sinkPassword string) (string, error) { - + if _, err := strconv.Atoi(sinkUsername); err != nil { + sinkUsername = "#$#" + sinkUsername + } config := OtelConfigFile{ Receivers: Receivers{ Kafka: KafkaReceiver{ @@ -233,9 +236,9 @@ func ReturnConfigYamlFromSink(_ context.Context, kafkaUrlConfig, sinkId, sinkUrl return "", err } returnedString := "---\n" + string(marshal) - returnString := strings.Replace(returnedString, "\"", "", -1) - returnString = strings.Replace(returnedString, "\n", `\n`, -1) - return returnString, nil + s := strings.ReplaceAll(returnedString, "\"", "") + s = strings.ReplaceAll(s, "\n", `\n`) + return s, nil } diff --git a/maestro/config/config_builder_test.go b/maestro/config/config_builder_test.go index 90f23bf9d..8f028fcbc 100644 --- a/maestro/config/config_builder_test.go +++ b/maestro/config/config_builder_test.go @@ -25,9 +25,9 @@ func TestReturnConfigYamlFromSink(t *testing.T) { kafkaUrlConfig: "kafka:9092", sinkId: "sink-id-222", sinkUrl: "https://mysinkurl:9922", - sinkUsername: "wile.e.coyote", + sinkUsername: "1234123", sinkPassword: "CarnivorousVulgaris", - }, want: `---\nreceivers:\n kafka:\n brokers:\n - kafka:9092\n topic: otlp_metrics-sink-id-222\n protocol_version: 2.0.0\nextensions:\n health_check: {}\n pprof:\n endpoint: :1888\n basicauth/exporter:\n client_auth:\n username: wile.e.coyote\n password: CarnivorousVulgaris\nexporters:\n prometheusremotewrite:\n endpoint: https://mysinkurl:9922\n auth:\n authenticator: basicauth/exporter\nservice:\n extensions:\n - pprof\n - health_check\n - basicauth/exporter\n pipelines:\n metrics:\n receivers:\n - kafka\n exporters:\n - prometheusremotewrite\n`, + }, want: `---\nreceivers:\n kafka:\n brokers:\n - kafka:9092\n topic: otlp_metrics-sink-id-222\n protocol_version: 2.0.0\nextensions:\n health_check: {}\n pprof:\n endpoint: :1888\n basicauth/exporter:\n client_auth:\n username: 1234123\n password: CarnivorousVulgaris\nexporters:\n prometheusremotewrite:\n endpoint: https://mysinkurl:9922\n auth:\n authenticator: basicauth/exporter\nservice:\n extensions:\n - pprof\n - health_check\n - basicauth/exporter\n pipelines:\n metrics:\n receivers:\n - kafka\n exporters:\n - prometheusremotewrite\n`, wantErr: false}, } for _, tt := range tests { diff --git a/sinker/redis/consumer/streams.go b/sinker/redis/consumer/streams.go index 9d550e9a9..5c86d1bd1 100644 --- a/sinker/redis/consumer/streams.go +++ b/sinker/redis/consumer/streams.go @@ -18,8 +18,8 @@ const ( sinksPrefix = "sinks." sinksUpdate = sinksPrefix + "update" sinksCreate = sinksPrefix + "create" - - exists = "BUSYGROUP Consumer Group name already exists" + sinksDelete = sinksPrefix + "remove" + exists = "BUSYGROUP Consumer Group name already exists" ) type Subscriber interface { @@ -75,6 +75,13 @@ func (es eventStore) Subscribe(context context.Context) error { break } err = es.handleSinksUpdate(context, rte) + case sinksDelete: + rte, derr := decodeSinksRemove(event) + if derr != nil { + err = derr + break + } + err = es.handleSinksRemove(context, rte) } if err != nil { es.logger.Error("Failed to handle event", zap.String("operation", event["operation"].(string)), zap.Error(err)) @@ -124,6 +131,30 @@ func decodeSinksUpdate(event map[string]interface{}) (updateSinkEvent, error) { return val, nil } +func decodeSinksRemove(event map[string]interface{}) (updateSinkEvent, error) { + val := updateSinkEvent{ + sinkID: read(event, "sink_id", ""), + owner: read(event, "owner", ""), + timestamp: time.Time{}, + } + var metadata types.Metadata + if err := json.Unmarshal([]byte(read(event, "config", "")), &metadata); err != nil { + return updateSinkEvent{}, err + } + val.config = metadata + return val, nil +} + +func (es eventStore) handleSinksRemove(_ context.Context, e updateSinkEvent) error { + if ok := es.configRepo.Exists(e.owner, e.sinkID); ok { + err := es.configRepo.Remove(e.owner, e.sinkID) + if err != nil { + return err + } + } + return nil +} + func (es eventStore) handleSinksUpdate(_ context.Context, e updateSinkEvent) error { data, err := json.Marshal(e.config) if err != nil {