Skip to content

Commit

Permalink
core logic and wiring
Browse files Browse the repository at this point in the history
Signed-off-by: Kavindu Dodanduwa <kavindudodanduwa@gmail.com>
  • Loading branch information
Kavindu-Dodan committed Mar 5, 2024
1 parent d5ad018 commit 8b0323e
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 43 deletions.
12 changes: 7 additions & 5 deletions flagd/pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,

// build flag store, collect flag sources & fill sources details
s := store.NewFlags()
var sources []string
sources := []string{}

for _, provider := range config.SyncProviders {
s.FlagSources = append(s.FlagSources, provider.URI)
Expand All @@ -80,9 +80,11 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
recorder)

// todo - port as parameter and disable sync service by default
flagSyncService := flag_sync.NewSyncService(sources, flag_sync.SvcConfigurations{
Port: 8015,
Logger: logger.WithFields(zap.String("component", "FlagSyncService")),
flagSyncService := flag_sync.NewSyncService(flag_sync.SvcConfigurations{
Logger: logger.WithFields(zap.String("component", "FlagSyncService")),
Port: 8015,
Sources: sources,
Store: s,
})

// build sync providers
Expand All @@ -100,7 +102,7 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
return &Runtime{
Logger: logger.WithFields(zap.String("component", "runtime")),
Evaluator: evaluator,
FlagSync: flagSyncService,
FlagSync: &flagSyncService,
Service: connectService,
ServiceConfig: service.Configuration{
Port: config.ServicePort,
Expand Down
16 changes: 14 additions & 2 deletions flagd/pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
type Runtime struct {
Evaluator evaluator.IEvaluator
Logger *logger.Logger
FlagSync flag_sync.SyncService
FlagSync *flag_sync.Service
Service service.IFlagEvaluationService
ServiceConfig service.Configuration
SyncImpl []sync.ISync
Expand Down Expand Up @@ -88,6 +88,7 @@ func (r *Runtime) Start() error {
defer func() {
r.Logger.Info("Shutting down server...")
r.Service.Shutdown()
r.FlagSync.Shutdown()
r.Logger.Info("Server successfully shutdown.")
}()

Expand All @@ -101,7 +102,13 @@ func (r *Runtime) Start() error {
})

g.Go(func() error {
return r.FlagSync.Serve()
// todo delay start to allow syncs to complete
err := r.FlagSync.Serve()
if err != nil {
return fmt.Errorf("error from server: %w", err)
}

return nil
})

<-gCtx.Done()
Expand Down Expand Up @@ -139,5 +146,10 @@ func (r *Runtime) updateAndEmit(payload sync.DataSync) bool {
},
})

// Emit flag syncs only when re-syncs are not needed
if !resyncRequired {
r.FlagSync.Emit()
}

return resyncRequired
}
77 changes: 66 additions & 11 deletions flagd/pkg/service/flag-sync/handler.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,81 @@
package flag_sync
package sync

import (
"context"
"fmt"

syncv1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
"connectrpc.com/connect"
"context"
"github.com/open-feature/flagd/core/pkg/logger"
"google.golang.org/protobuf/types/known/structpb"
)

// Request handler
type syncHandler struct {
mux *syncMultiplexer
log *logger.Logger
}

func (s syncHandler) SyncFlags(ctx context.Context, c *connect.Request[syncv1.SyncFlagsRequest], c2 *connect.ServerStream[syncv1.SyncFlagsResponse]) error {
//TODO implement me
panic("implement me")
func (s *syncHandler) SyncFlags(ctx context.Context, req *connect.Request[syncv1.SyncFlagsRequest],
rsp *connect.ServerStream[syncv1.SyncFlagsResponse],
) error {
muxPayload := make(chan payload)
selector := req.Msg.GetSelector()

err := s.mux.register(ctx, selector, muxPayload)
if err != nil {
return err
}

for {
select {
case payload := <-muxPayload:
err := rsp.Send(&syncv1.SyncFlagsResponse{
FlagConfiguration: payload.flags,
})
if err != nil {
s.log.Debug(fmt.Sprintf("error sending stream response: %v", err))
return fmt.Errorf("error sending stream response: %w", err)
}
case <-ctx.Done():
s.mux.unregister(ctx, selector)
s.log.Debug("context done, exiting stream request")
return nil
}
}
}

func (s syncHandler) FetchAllFlags(ctx context.Context, c *connect.Request[syncv1.FetchAllFlagsRequest]) (*connect.Response[syncv1.FetchAllFlagsResponse], error) {
//TODO implement me
panic("implement me")
func (s *syncHandler) FetchAllFlags(_ context.Context, req *connect.Request[syncv1.FetchAllFlagsRequest]) (
*connect.Response[syncv1.FetchAllFlagsResponse], error,
) {
flags, err := s.mux.getALlFlags(req.Msg.GetSelector())
if err != nil {
return nil, err
}

return &connect.Response[syncv1.FetchAllFlagsResponse]{
Msg: &syncv1.FetchAllFlagsResponse{
FlagConfiguration: flags,
},
}, nil
}

func (s syncHandler) GetMetadata(ctx context.Context, c *connect.Request[syncv1.GetMetadataRequest]) (*connect.Response[syncv1.GetMetadataResponse], error) {
//TODO implement me
panic("implement me")
func (s *syncHandler) GetMetadata(_ context.Context, _ *connect.Request[syncv1.GetMetadataRequest]) (
*connect.Response[syncv1.GetMetadataResponse], error,
) {
// todo handle request
metadata, err := structpb.NewStruct(map[string]interface{}{
"sources": s.mux.sourcesAsMetadata(),
})
if err != nil {
s.log.Warn(fmt.Sprintf("error from struct creation: %v", err))
return nil, fmt.Errorf("error constructing response")
}

return &connect.Response[syncv1.GetMetadataResponse]{
Msg: &syncv1.GetMetadataResponse{
Metadata: metadata,
},
},
nil
}
183 changes: 181 additions & 2 deletions flagd/pkg/service/flag-sync/sync-multiplexer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,185 @@
package flag_sync
package sync

// multiplex data sync to listeners
import (
"encoding/json"
"fmt"
"slices"
"strings"
"sync"

"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/store"
)

type syncMultiplexer struct {
store *store.Flags
sources []string

subs map[interface{}]subscription // subscriptions on all sources
selectorSubs map[string]map[interface{}]subscription // source specific subscriptions

allFlags string // pre-calculated all flags in store as a string
selectorFlags map[string]string // pre-calculated selector scoped flags

mu sync.Mutex
}

type subscription struct {
id interface{}
channel chan payload
}

type payload struct {
flags string
}

func newMux(store *store.Flags, sources []string) *syncMultiplexer {
return &syncMultiplexer{
store: store,
sources: sources,
subs: map[interface{}]subscription{},
selectorSubs: map[string]map[interface{}]subscription{},
selectorFlags: map[string]string{},
}
}

func (r *syncMultiplexer) register(id interface{}, source string, con chan payload) error {
r.mu.Lock()
defer r.mu.Unlock()

if source != "" && !slices.Contains(r.sources, source) {
return fmt.Errorf("no flag watcher setup for source %s", source)
}

var initSync string
var err error

if source == "" {
// subscribe for flags from all source
r.subs[id] = subscription{
id: id,
channel: con,
}

initSync, err = r.store.String()
if err != nil {
return fmt.Errorf("errpr getting all flags: %w", err)
}
} else {
// subscribe for specific source
s, ok := r.selectorSubs[source]
if ok {
s[id] = subscription{
id: id,
channel: con,
}
} else {
r.selectorSubs[source] = map[interface{}]subscription{
id: {
id: id,
channel: con,
},
}
}

initSync = r.selectorFlags[source]
}

// Initial sync
con <- payload{flags: initSync}
return nil
}

func (r *syncMultiplexer) pushUpdates() error {
r.mu.Lock()
defer r.mu.Unlock()

err := r.extract()
if err != nil {
return err
}

// push to all source subs
for _, sub := range r.subs {
sub.channel <- payload{r.allFlags}
}

// push to selector subs
for source, flags := range r.selectorFlags {
for _, s := range r.selectorSubs[source] {
s.channel <- payload{flags}
}
}

return nil
}

func (r *syncMultiplexer) unregister(id interface{}, selector string) {
r.mu.Lock()
defer r.mu.Unlock()

var from map[interface{}]subscription

if selector == "" {
from = r.subs
} else {
from = r.selectorSubs[selector]
}

delete(from, id)
}

func (r *syncMultiplexer) getALlFlags(source string) (string, error) {
if source != "" && !slices.Contains(r.sources, source) {
return "", fmt.Errorf("no flag watcher setup for source %s", source)
}

if source == "" {
return r.allFlags, nil
}

return r.selectorFlags[source], nil
}

func (r *syncMultiplexer) sourcesAsMetadata() string {
r.mu.Lock()
defer r.mu.Unlock()

return strings.Join(r.store.FlagSources, ",")
}

func (r *syncMultiplexer) extract() error {
clear(r.selectorFlags)

all := r.store.GetAll()
bytes, err := json.Marshal(all)
if err != nil {
return fmt.Errorf("error from marshallin: %w", err)
}

r.allFlags = string(bytes)

collector := map[string]map[string]model.Flag{}

for key, flag := range all {
c, ok := collector[flag.Source]
if ok {
c[key] = flag
} else {
collector[flag.Source] = map[string]model.Flag{
key: flag,
}
}
}

for source, flags := range collector {
bytes, err := json.Marshal(flags)
if err != nil {
return fmt.Errorf("unable to marshal flags: %w", err)
}

r.selectorFlags[source] = string(bytes)
}

return nil
}
Loading

0 comments on commit 8b0323e

Please sign in to comment.