Skip to content

Commit

Permalink
fix: changed eventing configuration mutex to rwmutex and added missin…
Browse files Browse the repository at this point in the history
…g lock (#220)

Fixes #219
  • Loading branch information
skyerus authored Dec 6, 2022
1 parent df7c6ee commit 5bbef9e
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions pkg/service/connect_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ type ConnectServiceConfiguration struct {
}

type eventingConfiguration struct {
mu *sync.Mutex
mu *sync.RWMutex
subs map[interface{}]chan Notification
}

func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator) error {
s.Eval = eval
s.eventingConfiguration = &eventingConfiguration{
subs: make(map[interface{}]chan Notification),
mu: &sync.Mutex{},
mu: &sync.RWMutex{},
}
lis, err := s.setupServer()
if err != nil {
Expand Down Expand Up @@ -144,13 +144,16 @@ func (s *ConnectService) EventStream(
req *connect.Request[emptypb.Empty],
stream *connect.ServerStream[schemaV1.EventStreamResponse],
) error {
s.eventingConfiguration.subs[req] = make(chan Notification, 1)
requestNotificationChan := make(chan Notification, 1)
s.eventingConfiguration.mu.Lock()
s.eventingConfiguration.subs[req] = requestNotificationChan
s.eventingConfiguration.mu.Unlock()
defer func() {
s.eventingConfiguration.mu.Lock()
delete(s.eventingConfiguration.subs, req)
s.eventingConfiguration.mu.Unlock()
}()
s.eventingConfiguration.subs[req] <- Notification{
requestNotificationChan <- Notification{
Type: ProviderReady,
}
for {
Expand All @@ -162,7 +165,7 @@ func (s *ConnectService) EventStream(
if err != nil {
s.Logger.Error(err.Error())
}
case notification := <-s.eventingConfiguration.subs[req]:
case notification := <-requestNotificationChan:
d, err := structpb.NewStruct(notification.Data)
if err != nil {
s.Logger.Error(err.Error())
Expand All @@ -181,11 +184,11 @@ func (s *ConnectService) EventStream(
}

func (s *ConnectService) Notify(n Notification) {
s.eventingConfiguration.mu.Lock()
s.eventingConfiguration.mu.RLock()
defer s.eventingConfiguration.mu.RUnlock()
for _, send := range s.eventingConfiguration.subs {
send <- n
}
s.eventingConfiguration.mu.Unlock()
}

func (s *ConnectService) ResolveBoolean(
Expand Down

0 comments on commit 5bbef9e

Please sign in to comment.