Skip to content

Commit

Permalink
[FIX] Add MaxLen to avoid redis be filled (#2279)
Browse files Browse the repository at this point in the history
* Update sinker.go

* adding MaxLen for capped stream args

* add MaxLan on streams
  • Loading branch information
etaques authored Mar 22, 2023
1 parent 3509223 commit 6db998a
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 0 deletions.
1 change: 1 addition & 0 deletions maestro/redis/consumer/hashset.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func (es eventStore) PublishSinkStateChange(sink *sinkspb.SinkRes, status string

record := &redis2.XAddArgs{
Stream: streamID,
MaxLen: 1000,
Values: event.Encode(),
}
err = es.streamRedisClient.XAdd(context.Background(), record).Err()
Expand Down
4 changes: 4 additions & 0 deletions sinker/redis/producer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (e eventStore) DeployCollector(ctx context.Context, config config.SinkConfi
}
recordToSink := &redis.XAddArgs{
Stream: streamID,
MaxLen: 1000,
Values: eventToSink.Encode(),
}
err = e.client.XAdd(ctx, recordToSink).Err()
Expand Down Expand Up @@ -66,6 +67,7 @@ func (e eventStore) Add(config config.SinkConfig) error {
}
record := &redis.XAddArgs{
Stream: streamID,
MaxLen: 1000,
Values: event.Encode(),
}
err = e.client.XAdd(context.Background(), record).Err()
Expand All @@ -89,6 +91,7 @@ func (e eventStore) Remove(ownerID string, sinkID string) error {
}
record := &redis.XAddArgs{
Stream: streamID,
MaxLen: 1000,
Values: event.Encode(),
}
err = e.client.XAdd(context.Background(), record).Err()
Expand Down Expand Up @@ -117,6 +120,7 @@ func (e eventStore) Edit(config config.SinkConfig) error {
}
record := &redis.XAddArgs{
Stream: streamID,
MaxLen: 1000,
Values: event.Encode(),
}
err = e.client.XAdd(context.Background(), record).Err()
Expand Down
1 change: 1 addition & 0 deletions sinker/redis/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (s *sinkerCache) DeployCollector(ctx context.Context, config sinkerconfig.S
encodeEvent := redis.XAddArgs{
ID: config.SinkID,
Stream: idPrefix,
MaxLen: 1000,
Values: event,
}
if cmd := s.client.XAdd(ctx, &encodeEvent); cmd.Err() != nil {
Expand Down

0 comments on commit 6db998a

Please sign in to comment.