Skip to content

Commit

Permalink
refactor(notify): optimize locking mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
Th3Shadowbroker committed Feb 17, 2025
1 parent f7f38c7 commit d141102
Showing 1 changed file with 35 additions and 28 deletions.
63 changes: 35 additions & 28 deletions internal/notify/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,17 @@ import (
type NotificationListener struct{}

func (n NotificationListener) OnAdd(event *hazelcast.EntryNotified, obj message.CircuitBreakerMessage) {
// Acquire lock for entry (with lease, to enforce unlocking after 30s)
lockCtx := cache.NotificationSender.NewLockContext(context.Background())
acquired, _ := cache.NotificationSender.TryLockWithLease(lockCtx, event.Key, 30*time.Second)
if !acquired {
log.Debug().Str("subscriptionId", event.Key.(string)).Msg("Could not acquire notification-lock")
return
}
log.Debug().Str("subscriptionId", event.Key.(string)).Str("trigger", "add").Msg("Acquired notification-lock")

// Manually release lock when done
defer func() {
_ = cache.NotificationSender.Unlock(lockCtx, event.Key)
}()

notificationConfig := config.Current.Notifications
circuitBreakerOpen := obj.Status == enum.CircuitBreakerStatusOpen

// Circuit breaker initialized
if obj.LoopCounter == 0 && circuitBreakerOpen {
lockAcquired, unlock := n.lockKey(event.Key, "add", 30*time.Second)
if !lockAcquired {
return
}
defer unlock()

template := notificationConfig.Mail.Templates.OpenCircuitBreaker
if err := notifyConsumer(&obj, notificationConfig.Mail.Subject.OpenCircuitBreaker, template); err != nil {
log.Error().
Expand All @@ -51,25 +43,17 @@ func (n NotificationListener) OnAdd(event *hazelcast.EntryNotified, obj message.
}

func (n NotificationListener) OnUpdate(event *hazelcast.EntryNotified, obj message.CircuitBreakerMessage, oldObj message.CircuitBreakerMessage) {
// Acquire lock for entry (with lease, to enforce unlocking after 30s)
lockCtx := cache.NotificationSender.NewLockContext(context.Background())
acquired, _ := cache.NotificationSender.TryLockWithLease(lockCtx, event.Key, 30*time.Second)
if !acquired {
log.Debug().Str("subscriptionId", event.Key.(string)).Msg("Could not acquire notification-lock")
return
}
log.Debug().Str("subscriptionId", event.Key.(string)).Str("trigger", "update").Msg("Acquired notification-lock")

// Manually release lock when done
defer func() {
_ = cache.NotificationSender.Unlock(lockCtx, event.Key)
}()

notificationConfig := config.Current.Notifications
circuitBreakerOpen := obj.Status == enum.CircuitBreakerStatusOpen

// Circuit-breaker reset
if circuitBreakerOpen && (obj.LoopCounter == 0 && oldObj.LoopCounter > 0) {
lockAcquired, unlock := n.lockKey(event.Key, "update", 30*time.Second)
if !lockAcquired {
return
}
defer unlock()

template := notificationConfig.Mail.Templates.OpenCircuitBreaker
if err := notifyConsumer(&obj, notificationConfig.Mail.Subject.OpenCircuitBreaker, template); err != nil {
log.Error().
Expand All @@ -78,10 +62,18 @@ func (n NotificationListener) OnUpdate(event *hazelcast.EntryNotified, obj messa
Err(err).
Msg("Failed to send notification when circuit-breaker was updated")
}

return
}

// Loop detected
if circuitBreakerOpen && (obj.LoopCounter > oldObj.LoopCounter && obj.LoopCounter%notificationConfig.LoopModulo == 0) {
lockAcquired, unlock := n.lockKey(event.Key, "update", 30*time.Second)
if !lockAcquired {
return
}
defer unlock()

template := notificationConfig.Mail.Templates.LoopDetected
if err := notifyConsumer(&obj, notificationConfig.Mail.Subject.LoopDetected, template); err != nil {
log.Error().
Expand All @@ -103,6 +95,21 @@ func (n NotificationListener) OnError(event *hazelcast.EntryNotified, err error)
Msg("Could not listen for circuit breaker changes")
}

func (n NotificationListener) lockKey(key any, action string, leaseDuration time.Duration) (bool, func()) {
lockCache := cache.NotificationSender
lockCtx := lockCache.NewLockContext(context.Background())
acquired, _ := lockCache.TryLockWithLease(lockCtx, key, leaseDuration)
if !acquired {
log.Debug().Str("subscriptionId", key.(string)).Str("trigger", action).Msg("Could not acquire notification-lock")
return false, nil
}
log.Debug().Str("subscriptionId", key.(string)).Str("trigger", action).Msg("Acquired notification-lock")

return true, func() {
_ = lockCache.Unlock(lockCtx, key)
}
}

func notifyConsumer(cbMessage *message.CircuitBreakerMessage, subject string, template string) error {
log.Debug().
Str("subscriptionId", cbMessage.SubscriptionId).
Expand Down

0 comments on commit d141102

Please sign in to comment.