diff --git a/maestro/redis/consumer/hashset.go b/maestro/redis/consumer/hashset.go index 920392eb4..61eabd766 100644 --- a/maestro/redis/consumer/hashset.go +++ b/maestro/redis/consumer/hashset.go @@ -193,6 +193,7 @@ func (es eventStore) PublishSinkStateChange(sink *sinkspb.SinkRes, status string record := &redis2.XAddArgs{ Stream: streamID, + MaxLen: 1000, Values: event.Encode(), } err = es.streamRedisClient.XAdd(context.Background(), record).Err() diff --git a/sinker/redis/producer/streams.go b/sinker/redis/producer/streams.go index 6b6d93167..1d283b396 100644 --- a/sinker/redis/producer/streams.go +++ b/sinker/redis/producer/streams.go @@ -37,6 +37,7 @@ func (e eventStore) DeployCollector(ctx context.Context, config config.SinkConfi } recordToSink := &redis.XAddArgs{ Stream: streamID, + MaxLen: 1000, Values: eventToSink.Encode(), } err = e.client.XAdd(ctx, recordToSink).Err() @@ -66,6 +67,7 @@ func (e eventStore) Add(config config.SinkConfig) error { } record := &redis.XAddArgs{ Stream: streamID, + MaxLen: 1000, Values: event.Encode(), } err = e.client.XAdd(context.Background(), record).Err() @@ -89,6 +91,7 @@ func (e eventStore) Remove(ownerID string, sinkID string) error { } record := &redis.XAddArgs{ Stream: streamID, + MaxLen: 1000, Values: event.Encode(), } err = e.client.XAdd(context.Background(), record).Err() @@ -117,6 +120,7 @@ func (e eventStore) Edit(config config.SinkConfig) error { } record := &redis.XAddArgs{ Stream: streamID, + MaxLen: 1000, Values: event.Encode(), } err = e.client.XAdd(context.Background(), record).Err() diff --git a/sinker/redis/sinker.go b/sinker/redis/sinker.go index 1fd2c14f1..fab64c203 100644 --- a/sinker/redis/sinker.go +++ b/sinker/redis/sinker.go @@ -132,6 +132,7 @@ func (s *sinkerCache) DeployCollector(ctx context.Context, config sinkerconfig.S encodeEvent := redis.XAddArgs{ ID: config.SinkID, Stream: idPrefix, + MaxLen: 1000, Values: event, } if cmd := s.client.XAdd(ctx, &encodeEvent); cmd.Err() != nil {