From 5bbef9ea4b1960686e58298c2c2e192ca99f072f Mon Sep 17 00:00:00 2001 From: Skye Gill Date: Tue, 6 Dec 2022 15:12:20 +0000 Subject: [PATCH] fix: changed eventing configuration mutex to rwmutex and added missing lock (#220) Fixes https://github.com/open-feature/flagd/issues/219 --- pkg/service/connect_service.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/pkg/service/connect_service.go b/pkg/service/connect_service.go index 7c21418e9..a089c67dc 100644 --- a/pkg/service/connect_service.go +++ b/pkg/service/connect_service.go @@ -45,7 +45,7 @@ type ConnectServiceConfiguration struct { } type eventingConfiguration struct { - mu *sync.Mutex + mu *sync.RWMutex subs map[interface{}]chan Notification } @@ -53,7 +53,7 @@ func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator) error s.Eval = eval s.eventingConfiguration = &eventingConfiguration{ subs: make(map[interface{}]chan Notification), - mu: &sync.Mutex{}, + mu: &sync.RWMutex{}, } lis, err := s.setupServer() if err != nil { @@ -144,13 +144,16 @@ func (s *ConnectService) EventStream( req *connect.Request[emptypb.Empty], stream *connect.ServerStream[schemaV1.EventStreamResponse], ) error { - s.eventingConfiguration.subs[req] = make(chan Notification, 1) + requestNotificationChan := make(chan Notification, 1) + s.eventingConfiguration.mu.Lock() + s.eventingConfiguration.subs[req] = requestNotificationChan + s.eventingConfiguration.mu.Unlock() defer func() { s.eventingConfiguration.mu.Lock() delete(s.eventingConfiguration.subs, req) s.eventingConfiguration.mu.Unlock() }() - s.eventingConfiguration.subs[req] <- Notification{ + requestNotificationChan <- Notification{ Type: ProviderReady, } for { @@ -162,7 +165,7 @@ func (s *ConnectService) EventStream( if err != nil { s.Logger.Error(err.Error()) } - case notification := <-s.eventingConfiguration.subs[req]: + case notification := <-requestNotificationChan: d, err := structpb.NewStruct(notification.Data) if err != nil { s.Logger.Error(err.Error()) @@ -181,11 +184,11 @@ func (s *ConnectService) EventStream( } func (s *ConnectService) Notify(n Notification) { - s.eventingConfiguration.mu.Lock() + s.eventingConfiguration.mu.RLock() + defer s.eventingConfiguration.mu.RUnlock() for _, send := range s.eventingConfiguration.subs { send <- n } - s.eventingConfiguration.mu.Unlock() } func (s *ConnectService) ResolveBoolean(