Skip to content

Commit

Permalink
Make TopicLock thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
RemindD committed Jul 16, 2024
1 parent baf9663 commit 0ee4781
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type RedisPubSubProvider struct {
Context *contexts.ManagerContext
ClaimedMessages map[string]bool
TopicLock map[string]*sync.Mutex
MapLock *sync.Mutex
}

type RedisMessageWrapper struct {
Expand Down Expand Up @@ -174,6 +175,7 @@ func (i *RedisPubSubProvider) Init(config providers.IProviderConfig) error {

i.Ctx, i.Cancel = context.WithCancel(context.Background())
i.ClaimedMessages = make(map[string]bool)
i.MapLock = &sync.Mutex{}
i.TopicLock = make(map[string]*sync.Mutex)

i.Subscribers = make(map[string][]v1alpha2.EventHandler)
Expand Down Expand Up @@ -220,8 +222,9 @@ func (i *RedisPubSubProvider) processMessage(msg RedisMessageWrapper) error {
return v1alpha2.NewCOAError(err, "failed to unmarshal event", v1alpha2.InternalError)
}
err = msg.Handler(msg.Topic, evt)
i.TopicLock[msg.Topic].Lock()
defer i.TopicLock[msg.Topic].Unlock()
lock := i.getTopicLock(msg.Topic)
lock.Lock()
defer lock.Unlock()
if err != nil {
delete(i.ClaimedMessages, msg.MessageID)
return v1alpha2.NewCOAError(err, fmt.Sprintf("failed to handle message %s", msg.MessageID), v1alpha2.InternalError)
Expand Down Expand Up @@ -250,7 +253,6 @@ func (i *RedisPubSubProvider) Subscribe(topic string, handler v1alpha2.EventHand
mLog.Errorf(" P (Redis PubSub) : failed to subscribe %v", err)
return v1alpha2.NewCOAError(err, fmt.Sprintf("failed to subsceribe to topic %s", topic), v1alpha2.InternalError)
}
i.TopicLock[topic] = &sync.Mutex{}
go i.pollNewMessagesLoop(topic, handler)
go i.ClaimMessageLoop(topic, i.Config.ConsumerID, handler, PendingMessagesScanInterval, ExtendMessageOwnershipWithIdleTime)
if i.Config.MultiInstance {
Expand Down Expand Up @@ -348,8 +350,9 @@ func (i *RedisPubSubProvider) reclaimPendingMessages(topic string, idleTime time
}
}
func (i *RedisPubSubProvider) XClaimWrapper(topic string, minIdle time.Duration, consumer string, msgIDs []string, handler v1alpha2.EventHandler) {
i.TopicLock[topic].Lock()
defer i.TopicLock[topic].Unlock()
lock := i.getTopicLock(topic)
lock.Lock()
defer lock.Unlock()
claimResult, err := i.Client.XClaim(i.Ctx, &redis.XClaimArgs{
Stream: topic,
Group: RedisGroup,
Expand Down Expand Up @@ -398,3 +401,12 @@ func toRedisPubSubProviderConfig(config providers.IProviderConfig) (RedisPubSubP
func generateConsumerIDSuffix() string {
return fmt.Sprintf("%d", time.Now().UnixNano())
}

func (i *RedisPubSubProvider) getTopicLock(topic string) *sync.Mutex {
i.MapLock.Lock()
defer i.MapLock.Unlock()
if _, ok := i.TopicLock[topic]; !ok {
i.TopicLock[topic] = &sync.Mutex{}
}
return i.TopicLock[topic]
}

0 comments on commit 0ee4781

Please sign in to comment.