Skip to content

Commit

Permalink
Fix/quote remove on sinker redis config(#1957)
Browse files Browse the repository at this point in the history
* feat(sinker): remove from cache on sink deletion

Signed-off-by: Luiz Pegoraro <luiz.pegoraro@encora.com>

* feat(sinker): fix config builder using wrong variable

Signed-off-by: Luiz Pegoraro <luiz.pegoraro@encora.com>

Signed-off-by: Luiz Pegoraro <luiz.pegoraro@encora.com>
  • Loading branch information
lpegoraro authored Nov 2, 2022
1 parent 882808e commit 639f544
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 8 deletions.
11 changes: 7 additions & 4 deletions maestro/config/config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/ns1labs/orb/pkg/errors"
"gopkg.in/yaml.v2"
"strconv"
"strings"
)

Expand Down Expand Up @@ -179,7 +180,9 @@ func GetDeploymentJson(sinkId, sinkUrl, sinkUsername, sinkPassword string) (stri

// ReturnConfigYamlFromSink this is the main method, which will generate the YAML file from the
func ReturnConfigYamlFromSink(_ context.Context, kafkaUrlConfig, sinkId, sinkUrl, sinkUsername, sinkPassword string) (string, error) {

if _, err := strconv.Atoi(sinkUsername); err != nil {
sinkUsername = "#$#" + sinkUsername
}
config := OtelConfigFile{
Receivers: Receivers{
Kafka: KafkaReceiver{
Expand Down Expand Up @@ -233,9 +236,9 @@ func ReturnConfigYamlFromSink(_ context.Context, kafkaUrlConfig, sinkId, sinkUrl
return "", err
}
returnedString := "---\n" + string(marshal)
returnString := strings.Replace(returnedString, "\"", "", -1)
returnString = strings.Replace(returnedString, "\n", `\n`, -1)
return returnString, nil
s := strings.ReplaceAll(returnedString, "\"", "")
s = strings.ReplaceAll(s, "\n", `\n`)
return s, nil

}

Expand Down
4 changes: 2 additions & 2 deletions maestro/config/config_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ func TestReturnConfigYamlFromSink(t *testing.T) {
kafkaUrlConfig: "kafka:9092",
sinkId: "sink-id-222",
sinkUrl: "https://mysinkurl:9922",
sinkUsername: "wile.e.coyote",
sinkUsername: "1234123",
sinkPassword: "CarnivorousVulgaris",
}, want: `---\nreceivers:\n kafka:\n brokers:\n - kafka:9092\n topic: otlp_metrics-sink-id-222\n protocol_version: 2.0.0\nextensions:\n health_check: {}\n pprof:\n endpoint: :1888\n basicauth/exporter:\n client_auth:\n username: wile.e.coyote\n password: CarnivorousVulgaris\nexporters:\n prometheusremotewrite:\n endpoint: https://mysinkurl:9922\n auth:\n authenticator: basicauth/exporter\nservice:\n extensions:\n - pprof\n - health_check\n - basicauth/exporter\n pipelines:\n metrics:\n receivers:\n - kafka\n exporters:\n - prometheusremotewrite\n`,
}, want: `---\nreceivers:\n kafka:\n brokers:\n - kafka:9092\n topic: otlp_metrics-sink-id-222\n protocol_version: 2.0.0\nextensions:\n health_check: {}\n pprof:\n endpoint: :1888\n basicauth/exporter:\n client_auth:\n username: 1234123\n password: CarnivorousVulgaris\nexporters:\n prometheusremotewrite:\n endpoint: https://mysinkurl:9922\n auth:\n authenticator: basicauth/exporter\nservice:\n extensions:\n - pprof\n - health_check\n - basicauth/exporter\n pipelines:\n metrics:\n receivers:\n - kafka\n exporters:\n - prometheusremotewrite\n`,
wantErr: false},
}
for _, tt := range tests {
Expand Down
35 changes: 33 additions & 2 deletions sinker/redis/consumer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ const (
sinksPrefix = "sinks."
sinksUpdate = sinksPrefix + "update"
sinksCreate = sinksPrefix + "create"

exists = "BUSYGROUP Consumer Group name already exists"
sinksDelete = sinksPrefix + "remove"
exists = "BUSYGROUP Consumer Group name already exists"
)

type Subscriber interface {
Expand Down Expand Up @@ -75,6 +75,13 @@ func (es eventStore) Subscribe(context context.Context) error {
break
}
err = es.handleSinksUpdate(context, rte)
case sinksDelete:
rte, derr := decodeSinksRemove(event)
if derr != nil {
err = derr
break
}
err = es.handleSinksRemove(context, rte)
}
if err != nil {
es.logger.Error("Failed to handle event", zap.String("operation", event["operation"].(string)), zap.Error(err))
Expand Down Expand Up @@ -124,6 +131,30 @@ func decodeSinksUpdate(event map[string]interface{}) (updateSinkEvent, error) {
return val, nil
}

func decodeSinksRemove(event map[string]interface{}) (updateSinkEvent, error) {
val := updateSinkEvent{
sinkID: read(event, "sink_id", ""),
owner: read(event, "owner", ""),
timestamp: time.Time{},
}
var metadata types.Metadata
if err := json.Unmarshal([]byte(read(event, "config", "")), &metadata); err != nil {
return updateSinkEvent{}, err
}
val.config = metadata
return val, nil
}

func (es eventStore) handleSinksRemove(_ context.Context, e updateSinkEvent) error {
if ok := es.configRepo.Exists(e.owner, e.sinkID); ok {
err := es.configRepo.Remove(e.owner, e.sinkID)
if err != nil {
return err
}
}
return nil
}

func (es eventStore) handleSinksUpdate(_ context.Context, e updateSinkEvent) error {
data, err := json.Marshal(e.config)
if err != nil {
Expand Down

0 comments on commit 639f544

Please sign in to comment.