diff --git a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go index c2b48ba9f..8498be7e3 100644 --- a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go +++ b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go @@ -36,6 +36,7 @@ type RedisPubSubProvider struct { Context *contexts.ManagerContext ClaimedMessages map[string]bool TopicLock map[string]*sync.Mutex + MapLock *sync.Mutex } type RedisMessageWrapper struct { @@ -174,6 +175,7 @@ func (i *RedisPubSubProvider) Init(config providers.IProviderConfig) error { i.Ctx, i.Cancel = context.WithCancel(context.Background()) i.ClaimedMessages = make(map[string]bool) + i.MapLock = &sync.Mutex{} i.TopicLock = make(map[string]*sync.Mutex) i.Subscribers = make(map[string][]v1alpha2.EventHandler) @@ -220,8 +222,9 @@ func (i *RedisPubSubProvider) processMessage(msg RedisMessageWrapper) error { return v1alpha2.NewCOAError(err, "failed to unmarshal event", v1alpha2.InternalError) } err = msg.Handler(msg.Topic, evt) - i.TopicLock[msg.Topic].Lock() - defer i.TopicLock[msg.Topic].Unlock() + lock := i.getTopicLock(msg.Topic) + lock.Lock() + defer lock.Unlock() if err != nil { delete(i.ClaimedMessages, msg.MessageID) return v1alpha2.NewCOAError(err, fmt.Sprintf("failed to handle message %s", msg.MessageID), v1alpha2.InternalError) @@ -250,7 +253,6 @@ func (i *RedisPubSubProvider) Subscribe(topic string, handler v1alpha2.EventHand mLog.Errorf(" P (Redis PubSub) : failed to subscribe %v", err) return v1alpha2.NewCOAError(err, fmt.Sprintf("failed to subsceribe to topic %s", topic), v1alpha2.InternalError) } - i.TopicLock[topic] = &sync.Mutex{} go i.pollNewMessagesLoop(topic, handler) go i.ClaimMessageLoop(topic, i.Config.ConsumerID, handler, PendingMessagesScanInterval, ExtendMessageOwnershipWithIdleTime) if i.Config.MultiInstance { @@ -348,8 +350,9 @@ func (i *RedisPubSubProvider) reclaimPendingMessages(topic string, idleTime time } } func (i *RedisPubSubProvider) XClaimWrapper(topic string, minIdle time.Duration, consumer string, msgIDs []string, handler v1alpha2.EventHandler) { - i.TopicLock[topic].Lock() - defer i.TopicLock[topic].Unlock() + lock := i.getTopicLock(topic) + lock.Lock() + defer lock.Unlock() claimResult, err := i.Client.XClaim(i.Ctx, &redis.XClaimArgs{ Stream: topic, Group: RedisGroup, @@ -398,3 +401,12 @@ func toRedisPubSubProviderConfig(config providers.IProviderConfig) (RedisPubSubP func generateConsumerIDSuffix() string { return fmt.Sprintf("%d", time.Now().UnixNano()) } + +func (i *RedisPubSubProvider) getTopicLock(topic string) *sync.Mutex { + i.MapLock.Lock() + defer i.MapLock.Unlock() + if _, ok := i.TopicLock[topic]; !ok { + i.TopicLock[topic] = &sync.Mutex{} + } + return i.TopicLock[topic] +}