Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: serve sync.proto on port 8015 #1237

Merged
merged 24 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
942fe3e
initial wire-up
Kavindu-Dodan Mar 1, 2024
ae50c71
core logic and wiring
Kavindu-Dodan Mar 4, 2024
8354ce8
finalize wiring
Kavindu-Dodan Mar 5, 2024
1d662a4
fix sync schema and remove unwanted change
Kavindu-Dodan Mar 6, 2024
6c491fc
unit tests
Kavindu-Dodan Mar 6, 2024
791488f
code docs
Kavindu-Dodan Mar 6, 2024
ef19fe7
add service test and fix identified issues
Kavindu-Dodan Mar 6, 2024
6bb0db5
review - param casing, import alias and method name
Kavindu-Dodan Mar 7, 2024
8810e45
minor improvements
Kavindu-Dodan Mar 7, 2024
437ab08
complete docs
Kavindu-Dodan Mar 7, 2024
baa34f6
Update docs/architecture.md
Kavindu-Dodan Mar 7, 2024
6c14a7f
improve cli help text
Kavindu-Dodan Mar 7, 2024
75f50cd
Update docs/architecture.md
Kavindu-Dodan Mar 8, 2024
28e890e
Update docs/reference/grpc-sync-service.md
Kavindu-Dodan Mar 8, 2024
4a56a63
Update docs/reference/grpc-sync-service.md
Kavindu-Dodan Mar 8, 2024
9ecbef4
Update docs/reference/grpc-sync-service.md
Kavindu-Dodan Mar 8, 2024
791b565
Update docs/reference/grpc-sync-service.md
Kavindu-Dodan Mar 8, 2024
c2a95ce
improve service shutdown
Kavindu-Dodan Mar 8, 2024
6abdc85
document source uri
Kavindu-Dodan Mar 8, 2024
c8e8697
improve service start delay and add tets
Kavindu-Dodan Mar 8, 2024
5eac82f
remove flag and enable flag syncs by default
Kavindu-Dodan Mar 11, 2024
997ed6b
review changes - rw lock, improve log
Kavindu-Dodan Mar 11, 2024
65e9234
change method name as recommended by review
Kavindu-Dodan Mar 11, 2024
ae6a547
rename chan getter
Kavindu-Dodan Mar 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ erDiagram

In-process deployments embed the flagd evaluation engine directly into the client application through the use of an [in-process provider](./installation.md#in-process).
The in-process provider is connected via the sync protocol to an implementing [gRPC service](./concepts/syncs.md#grpc-sync) that provides the flag definitions.
This pattern requires an in-process implementation of the flagd evaluation engine, but has the benefit of no I/O overhead, since no inter-process communication is required.
You can use flagd as a [gRPC sync service](./reference/grpc-sync-service.md).
In this mode, the flag sync stream will expose aggregated flag configurations currently configured through [syncs](./concepts/syncs.md).
This pattern requires an in-process implementation of the flagd evaluation engine but has the benefit of no I/O overhead for flag evaluations, since no inter-process communication is required.

```mermaid
---
Expand Down
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ With flagd you can:
* perform pseudorandom assignments for experimentation
* perform progressive roll-outs of new features
* aggregate flag definitions from multiple sources
* expose aggregated flags as a gRPC stream to be used by in-process providers

It doesn't include a UI, management console or a persistence layer.
It's configurable entirely via a [POSIX-style CLI](./reference/flagd-cli/flagd.md).
Expand Down
1 change: 1 addition & 0 deletions docs/reference/flagd-cli/flagd_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ flagd start [flags]
-k, --server-key-path string Server side tls key path
-d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally.
-s, --sources string JSON representation of an array of SourceConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object: https://flagd.dev/reference/sync-configuration/#source-configuration
-g, --sync-port int32 gRPC Sync port (default 8015)
-f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath, URL (HTTP and gRPC) or FeatureFlag custom resource. When flag keys are duplicated across multiple providers the merge priority follows the index of the flag arguments, as such flags from the uri at index 0 take the lowest precedence, with duplicated keys being overwritten by those from the uri at index 1. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension.
```

Expand Down
42 changes: 42 additions & 0 deletions docs/reference/grpc-sync-service.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
description: flagd as a gRPC sync service
---

# Overview

flagd can expose a gRPC sync service, allowing in-process providers to obtain their flag definitions.
The gRPC sync stream contains flag definitions currently configured at flagd as [sync-configurations](./sync-configuration.md).

```mermaid
---
title: gRPC sync
---
erDiagram
flagd ||--o{ "sync (file)" : watches
flagd ||--o{ "sync (http)" : polls
flagd ||--o{ "sync (grpc)" : "sync.proto (gRPC/stream)"
flagd ||--o{ "sync (kubernetes)" : watches
"In-Process provider" ||--|| flagd : "gRPC sync stream (default port 8015)"
```

You may change the default port of the service using startup flag `--sync-port` (or `-g` shothand flag).

By default, the gRPC stream exposes all the flag configurations, with conflicting flag keys merged following flag's standard merge strategy.
You can read more about the merge strategy in our dedicated [concepts guide on syncs](../concepts/syncs.md).

If you specify a `selector` in the gRPC sync request, the gRPC service will attempt match the provided selector value to a source, and stream just the flags identified in that source.
For example, if `selector` is set to `myFlags.json`, service will stream flags observed from `myFlags.json` file.
toddbaert marked this conversation as resolved.
Show resolved Hide resolved
Note that, to observe flags from `myFlags.json` file, you may use startup option `uri` like `--uri myFlags.json` or `source` option `--sources='[{"uri":"myFlags.json", provider":"file"}]`.
And the request will fail if there is no flag source matching the requested `selector`.

flagd provider implementations expose the ability to define the `selector` value. Please consider below example for Java,

```java
final FlagdProvider flagdProvider =
new FlagdProvider(FlagdOptions.builder()
.resolverType(Config.Evaluator.IN_PROCESS)
.host("localhost")
.port(8015)
.selector("myFlags.json")
.build());
```
9 changes: 7 additions & 2 deletions flagd/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ import (
const (
corsFlagName = "cors-origin"
logFormatFlagName = "log-format"
metricsExporter = "metrics-exporter"
managementPortFlagName = "management-port"
metricsExporter = "metrics-exporter"
otelCollectorURI = "otel-collector-uri"
portFlagName = "port"
serverCertPathFlagName = "server-cert-path"
serverKeyPathFlagName = "server-key-path"
socketPathFlagName = "socket-path"
sourcesFlagName = "sources"
syncPortFlagName = "sync-port"
uriFlagName = "uri"
docsLinkConfiguration = "https://flagd.dev/reference/flagd-cli/flagd_start/"
)

func init() {
Expand All @@ -36,8 +36,11 @@ func init() {
// allows environment variables to use _ instead of -
viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) // sync-provider-args becomes SYNC_PROVIDER_ARGS
viper.SetEnvPrefix("FLAGD") // port becomes FLAGD_PORT

flags.Int32P(managementPortFlagName, "m", 8014, "Port for management operations")
flags.Int32P(portFlagName, "p", 8013, "Port to listen on")
flags.Int32P(syncPortFlagName, "g", 8015, "gRPC Sync port")

flags.StringP(socketPathFlagName, "d", "", "Flagd socket path. "+
"With grpc the service will become available on this address. "+
"With http(s) the grpc-gateway proxy will use this address internally.")
Expand Down Expand Up @@ -74,6 +77,7 @@ func init() {
_ = viper.BindPFlag(socketPathFlagName, flags.Lookup(socketPathFlagName))
_ = viper.BindPFlag(sourcesFlagName, flags.Lookup(sourcesFlagName))
_ = viper.BindPFlag(uriFlagName, flags.Lookup(uriFlagName))
_ = viper.BindPFlag(syncPortFlagName, flags.Lookup(syncPortFlagName))
}

// startCmd represents the start command
Expand Down Expand Up @@ -128,6 +132,7 @@ var startCmd = &cobra.Command{
ServiceKeyPath: viper.GetString(serverKeyPathFlagName),
ServicePort: viper.GetUint16(portFlagName),
ServiceSocketPath: viper.GetString(socketPathFlagName),
SyncServicePort: viper.GetUint16(syncPortFlagName),
SyncProviders: syncProviders,
})
if err != nil {
Expand Down
23 changes: 21 additions & 2 deletions flagd/pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
syncbuilder "github.com/open-feature/flagd/core/pkg/sync/builder"
"github.com/open-feature/flagd/core/pkg/telemetry"
flageval "github.com/open-feature/flagd/flagd/pkg/service/flag-evaluation"
flagsync "github.com/open-feature/flagd/flagd/pkg/service/flag-sync"
"go.uber.org/zap"
)

Expand All @@ -28,6 +29,7 @@ type Config struct {
ServiceKeyPath string
ServicePort uint16
ServiceSocketPath string
SyncServicePort uint16

SyncProviders []sync.SourceConfig
CORS []string
Expand Down Expand Up @@ -56,25 +58,41 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
return nil, fmt.Errorf("error building metrics recorder: %w", err)
}

// build flag store & fill sources details
// build flag store, collect flag sources & fill sources details
s := store.NewFlags()
sources := []string{}

for _, provider := range config.SyncProviders {
s.FlagSources = append(s.FlagSources, provider.URI)
s.SourceMetadata[provider.URI] = store.SourceDetails{
Source: provider.URI,
Selector: provider.Selector,
}
sources = append(sources, provider.URI)
}

// derive evaluator
evaluator := setupJSONEvaluator(logger, s)

// derive service
// derive services

// connect service
connectService := flageval.NewConnectService(
logger.WithFields(zap.String("component", "service")),
evaluator,
recorder)

// flag sync service
flagSyncService, err := flagsync.NewSyncService(flagsync.SvcConfigurations{
Logger: logger.WithFields(zap.String("component", "FlagSyncService")),
Port: config.SyncServicePort,
Sources: sources,
Store: s,
})
if err != nil {
return nil, fmt.Errorf("error creating sync service: %w", err)
}

// build sync providers
syncLogger := logger.WithFields(zap.String("component", "sync"))
iSyncs, err := syncProvidersFromConfig(syncLogger, config.SyncProviders)
Expand All @@ -90,6 +108,7 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
return &Runtime{
Logger: logger.WithFields(zap.String("component", "runtime")),
Evaluator: evaluator,
FlagSync: flagSyncService,
Service: connectService,
ServiceConfig: service.Configuration{
Port: config.ServicePort,
Expand Down
37 changes: 24 additions & 13 deletions flagd/pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/service"
"github.com/open-feature/flagd/core/pkg/sync"
flagsync "github.com/open-feature/flagd/flagd/pkg/service/flag-sync"
"golang.org/x/sync/errgroup"
)

type Runtime struct {
Evaluator evaluator.IEvaluator
Logger *logger.Logger
FlagSync flagsync.ISyncService
Service service.IFlagEvaluationService
ServiceConfig service.Configuration
SyncImpl []sync.ISync
Expand Down Expand Up @@ -49,18 +51,16 @@ func (r *Runtime) Start() error {
// resync events are triggered when a delete occurs during flag merges in the store
// resync events may trigger further resync events, however for a flag to be deleted from the store
// its source must match, preventing the opportunity for resync events to snowball
if resyncRequired := r.updateWithNotify(data); resyncRequired {
if resyncRequired := r.updateAndEmit(data); resyncRequired {
for _, s := range r.SyncImpl {
p := s
go func() {
g.Go(func() error {
err := p.ReSync(gCtx, dataSync)
if err != nil {
return fmt.Errorf("error resyncing sources: %w", err)
}
return nil
})
}()
g.Go(func() error {
err := p.ReSync(gCtx, dataSync)
if err != nil {
return fmt.Errorf("error resyncing sources: %w", err)
}
return nil
})
}
}
case <-gCtx.Done():
Expand Down Expand Up @@ -99,7 +99,16 @@ func (r *Runtime) Start() error {
}
return nil
})
<-gCtx.Done()

g.Go(func() error {
err := r.FlagSync.Start(gCtx)
if err != nil {
return fmt.Errorf("error from sync server: %w", err)
}

return nil
})

if err := g.Wait(); err != nil {
return fmt.Errorf("errgroup closed with error: %w", err)
}
Expand All @@ -116,8 +125,8 @@ func (r *Runtime) isReady() bool {
return true
}

// updateWithNotify helps to update state and notify listeners
func (r *Runtime) updateWithNotify(payload sync.DataSync) bool {
// updateAndEmit helps to update state, notify changes and trigger sync updates
func (r *Runtime) updateAndEmit(payload sync.DataSync) bool {
r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -134,5 +143,7 @@ func (r *Runtime) updateWithNotify(payload sync.DataSync) bool {
},
})

r.FlagSync.Emit(resyncRequired, payload.Source)

return resyncRequired
}
74 changes: 74 additions & 0 deletions flagd/pkg/service/flag-sync/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package sync

import (
"context"
"fmt"

"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
"buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
"github.com/open-feature/flagd/core/pkg/logger"
"google.golang.org/protobuf/types/known/structpb"
)

// syncHandler implements the sync contract
type syncHandler struct {
mux *Multiplexer
log *logger.Logger
}

func (s syncHandler) SyncFlags(req *syncv1.SyncFlagsRequest, server syncv1grpc.FlagSyncService_SyncFlagsServer) error {
muxPayload := make(chan payload, 1)
selector := req.GetSelector()

ctx := server.Context()

err := s.mux.Register(ctx, selector, muxPayload)
if err != nil {
return err
}

for {
select {
case payload := <-muxPayload:
err := server.Send(&syncv1.SyncFlagsResponse{FlagConfiguration: payload.flags})
if err != nil {
s.log.Debug(fmt.Sprintf("error sending stream response: %v", err))
return fmt.Errorf("error sending stream response: %w", err)
}
case <-ctx.Done():
s.mux.Unregister(ctx, selector)
s.log.Debug("context complete and exiting stream request")
return nil
}
}
}

func (s syncHandler) FetchAllFlags(_ context.Context, req *syncv1.FetchAllFlagsRequest) (
*syncv1.FetchAllFlagsResponse, error,
) {
flags, err := s.mux.GetAllFlags(req.GetSelector())
if err != nil {
return nil, err
}

return &syncv1.FetchAllFlagsResponse{
FlagConfiguration: flags,
}, nil
}

func (s syncHandler) GetMetadata(_ context.Context, _ *syncv1.GetMetadataRequest) (
*syncv1.GetMetadataResponse, error,
) {
metadata, err := structpb.NewStruct(map[string]interface{}{
"sources": s.mux.SourcesAsMetadata(),
})
if err != nil {
s.log.Warn(fmt.Sprintf("error from struct creation: %v", err))
return nil, fmt.Errorf("error constructing metadata response")
}

return &syncv1.GetMetadataResponse{
Metadata: metadata,
},
nil
}
Loading
Loading