Skip to content

Commit

Permalink
Bug missing owner_id redis sinker update (#918)
Browse files Browse the repository at this point in the history
* fix redis message without owner id

* fix unit test
  • Loading branch information
dscabral authored Mar 7, 2022
1 parent 3be1021 commit 32f9e57
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 48 deletions.
19 changes: 11 additions & 8 deletions sinker/redis/consumer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func (es eventStore) handleSinksUpdate(ctx context.Context, e updateSinkEvent) e
if err != nil {
return err
}
var config config.SinkConfig
if err := json.Unmarshal(data, &config); err != nil {
var cfg config.SinkConfig
if err := json.Unmarshal(data, &cfg); err != nil {
return err
}

Expand All @@ -113,15 +113,18 @@ func (es eventStore) handleSinksUpdate(ctx context.Context, e updateSinkEvent) e
if err != nil {
return err
}
sinkConfig.Url = config.Url
sinkConfig.User = config.User
sinkConfig.Password = config.Password
sinkConfig.Url = cfg.Url
sinkConfig.User = cfg.User
sinkConfig.Password = cfg.Password
if sinkConfig.OwnerID == "" {
sinkConfig.OwnerID = e.owner
}

es.configRepo.Edit(sinkConfig)
} else {
config.SinkID = e.sinkID
config.OwnerID = e.owner
es.configRepo.Add(config)
cfg.SinkID = e.sinkID
cfg.OwnerID = e.owner
es.configRepo.Add(cfg)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion sinks/api/http/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func updateSinkEndpoint(svc sinks.SinkService) endpoint.Endpoint {
Description: req.Description,
}

if err := svc.UpdateSink(ctx, req.token, sink); err != nil {
if _, err := svc.UpdateSink(ctx, req.token, sink); err != nil {
return nil, err
}
res := sinkRes{
Expand Down
2 changes: 1 addition & 1 deletion sinks/api/http/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (l loggingMiddleware) CreateSink(ctx context.Context, token string, s sinks
return l.svc.CreateSink(ctx, token, s)
}

func (l loggingMiddleware) UpdateSink(ctx context.Context, token string, s sinks.Sink) (err error) {
func (l loggingMiddleware) UpdateSink(ctx context.Context, token string, s sinks.Sink) (sink sinks.Sink, err error) {
defer func(begin time.Time) {
if err != nil {
l.logger.Warn("method call: edit_sink",
Expand Down
6 changes: 3 additions & 3 deletions sinks/api/http/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ func (m metricsMiddleware) CreateSink(ctx context.Context, token string, s sinks
return m.svc.CreateSink(ctx, token, s)
}

func (m metricsMiddleware) UpdateSink(ctx context.Context, token string, s sinks.Sink) (err error) {
func (m metricsMiddleware) UpdateSink(ctx context.Context, token string, s sinks.Sink) (sink sinks.Sink, err error) {
defer func(begin time.Time) {
labels := []string{
"method", "updateSink",
"owner_id", s.MFOwnerID,
"sink_id", s.ID,
"owner_id", sink.MFOwnerID,
"sink_id", sink.ID,
}

m.counter.With(labels...).Add(1)
Expand Down
55 changes: 26 additions & 29 deletions sinks/redis/producer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,35 +41,32 @@ func (es eventStore) CreateSink(ctx context.Context, token string, s sinks.Sink)
return es.svc.CreateSink(ctx, token, s)
}

func (es eventStore) UpdateSink(ctx context.Context, token string, s sinks.Sink) (err error) {
if err := es.svc.UpdateSink(ctx, token, s); err != nil {
return err
}

event := updateSinkEvent{
sinkID: s.ID,
owner: s.MFOwnerID,
config: s.Config,
}

encode, err := event.Encode()
if err != nil {
es.logger.Error("error encoding object", zap.Error(err))
return err
}

record := &redis.XAddArgs{
Stream: streamID,
MaxLenApprox: streamLen,
Values: encode,
}

err = es.client.XAdd(ctx, record).Err()
if err != nil {
es.logger.Error("error sending event to event store", zap.Error(err))
return err
}
return nil
func (es eventStore) UpdateSink(ctx context.Context, token string, s sinks.Sink) (sink sinks.Sink,err error) {
defer func() {
event := updateSinkEvent{
sinkID: sink.ID,
owner: sink.MFOwnerID,
config: sink.Config,
}

encode, err := event.Encode()
if err != nil {
es.logger.Error("error encoding object", zap.Error(err))
}

record := &redis.XAddArgs{
Stream: streamID,
MaxLenApprox: streamLen,
Values: encode,
}

err = es.client.XAdd(ctx, record).Err()
if err != nil {
es.logger.Error("error sending event to event store", zap.Error(err))
}
}()

return es.svc.UpdateSink(ctx, token, s)
}

func (es eventStore) ListSinks(ctx context.Context, token string, pm sinks.PageMetadata) (sinks.Page, error) {
Expand Down
2 changes: 1 addition & 1 deletion sinks/sinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type SinkService interface {
// CreateSink creates new data sink
CreateSink(ctx context.Context, token string, s Sink) (Sink, error)
// UpdateSink by id
UpdateSink(ctx context.Context, token string, s Sink) error
UpdateSink(ctx context.Context, token string, s Sink) (Sink, error)
// ListSinks retrieves data about sinks
ListSinks(ctx context.Context, token string, pm PageMetadata) (Page, error)
// ListBackends retrieves a list of available backends
Expand Down
12 changes: 8 additions & 4 deletions sinks/sinks_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,22 @@ func (svc sinkService) CreateSink(ctx context.Context, token string, sink Sink)
return sink, nil
}

func (svc sinkService) UpdateSink(ctx context.Context, token string, sink Sink) error {
func (svc sinkService) UpdateSink(ctx context.Context, token string, sink Sink) (Sink, error) {
skOwnerID, err := svc.identify(token)
if err != nil {
return err
return Sink{}, err
}

if sink.Backend != "" || sink.Error != "" {
return errors.ErrUpdateEntity
return Sink{}, errors.ErrUpdateEntity
}

sink.MFOwnerID = skOwnerID
return svc.sinkRepo.Update(ctx, sink)
err = svc.sinkRepo.Update(ctx, sink)
if err != nil {
return Sink{}, err
}
return sink, nil
}

func (svc sinkService) ListBackends(ctx context.Context, token string) ([]string, error) {
Expand Down
2 changes: 1 addition & 1 deletion sinks/sinks_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestUpdateSink(t *testing.T) {

for desc, tc := range cases {
t.Run(desc, func(t *testing.T) {
err := service.UpdateSink(context.Background(), tc.token, tc.sink)
_, err := service.UpdateSink(context.Background(), tc.token, tc.sink)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %d got %d", desc, tc.err, err))
})
}
Expand Down

0 comments on commit 32f9e57

Please sign in to comment.