Skip to content

Commit

Permalink
feat: grpc tls connectivity (grpcs) (#477)
Browse files Browse the repository at this point in the history
## This PR

Introduce TLS connectivity for GRPC sync provider.

TLS can be enabled using schema `grpcs://`. For example, 

`./flagd start --uri grpcs://localhost:8090`

Further, a self-sign certificate can be provided for TLS connectivity
using configuration source field `certPath`

ex:- `./flagd start
--sources='[{"uri":"grpcs://localhost:9090","provider":"grpc",
"certPath":"<CA_CERT>"}]'`


### How to test 

Start mock server impl -
https://github.com/Kavindu-Dodan/flagd-grpc-sync-server & then run flagd
with grpc tls mode

---------

Signed-off-by: Kavindu Dodanduwa <kavindudodanduwa@gmail.com>
Co-authored-by: James Milligan <james@omnant.co.uk>
Co-authored-by: Skye Gill <gill.skye95@gmail.com>
  • Loading branch information
3 people authored Mar 9, 2023
1 parent 1762503 commit 228f430
Show file tree
Hide file tree
Showing 5 changed files with 489 additions and 99 deletions.
24 changes: 14 additions & 10 deletions docs/configuration/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ Config file expects the keys to have the exact naming as the flags.

Any URI passed to flagd via the `--uri` flag must follow one of the 4 following patterns to ensure that it is passed to the correct implementation:

| Sync | Pattern | Example |
|------------|------------------------------------|---------------------------------------|
| Sync | Pattern | Example |
|------------|---------------------------------------|---------------------------------------|
| Kubernetes | `core.openfeature.dev/namespace/name` | `core.openfeature.dev/default/my-crd` |
| Filepath | `file:path/to/my/flag` | `file:etc/flagd/my-flags.json` |
| Remote | `http(s)://flag-source-url` | `https://my-flags.com/flags` |
| Grpc | `grpc://flag-source-url` | `grpc://my-flags-server` |
| Filepath | `file:path/to/my/flag` | `file:etc/flagd/my-flags.json` |
| Remote | `http(s)://flag-source-url` | `https://my-flags.com/flags` |
| Grpc | `grpc(s)://flag-source-url` | `grpc://my-flags-server` |


### Customising sync providers
Expand All @@ -42,11 +42,12 @@ While a URI may be passed to flagd via the `--uri` flag, some implementations ma
The flag takes a string argument, which should be a JSON representation of an array of `SourceConfig` objects. Alternatively, these configurations should be passed to
flagd via config file, specified using the `--config` flag.

| Field | Type |
|------------|------------------------------------|
| uri | required `string` | |
| provider | required `string` (`file`, `kubernetes`, `http` or `grpc`) |
| bearerToken | optional `string` |
| Field | Type | Note |
|-------------|------------------------------------------------------------|----------------------------------------------------|
| uri | required `string` | |
| provider | required `string` (`file`, `kubernetes`, `http` or `grpc`) | |
| bearerToken | optional `string` | Used for http sync |
| certPath | optional `string` | Used for grpcs sync when TLS certificate is needed |

The `uri` field values do not need to follow the [URI patterns](#uri-patterns), the provider type is instead derived from the provider field. If the prefix is supplied, it will be removed on startup without error.

Expand All @@ -68,4 +69,7 @@ sources:
provider: kubernetes
- uri: grpc://my-flag-source:8080
provider: grpc
- uri: grpcs://my-flag-source:8080
provider: grpc
certPath: /certs/ca.cert
```
15 changes: 9 additions & 6 deletions pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,18 @@ const (
)

var (
regCrd *regexp.Regexp
regURL *regexp.Regexp
regGRPC *regexp.Regexp
regFile *regexp.Regexp
regCrd *regexp.Regexp
regURL *regexp.Regexp
regGRPC *regexp.Regexp
regGRPCSecure *regexp.Regexp
regFile *regexp.Regexp
)

func init() {
regCrd = regexp.MustCompile("^core.openfeature.dev/")
regURL = regexp.MustCompile("^https?://")
regGRPC = regexp.MustCompile("^" + grpc.Prefix)
regGRPCSecure = regexp.MustCompile("^" + grpc.PrefixSecure)
regFile = regexp.MustCompile("^file:")
}

Expand Down Expand Up @@ -120,11 +122,12 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error {

func (r *Runtime) newGRPC(config sync.SourceConfig, logger *logger.Logger) *grpc.Sync {
return &grpc.Sync{
Target: grpc.URLToGRPCTarget(config.URI),
URI: config.URI,
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "grpc"),
),
CertPath: config.CertPath,
}
}

Expand Down Expand Up @@ -211,7 +214,7 @@ func SyncProvidersFromURIs(uris []string) ([]sync.SourceConfig, error) {
URI: uri,
Provider: syncProviderHTTP,
})
case regGRPC.Match(uriB):
case regGRPC.Match(uriB), regGRPCSecure.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
URI: uri,
Provider: syncProviderGrpc,
Expand Down
141 changes: 95 additions & 46 deletions pkg/sync/grpc/grpc_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package grpc

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"math"
"os"
"strings"
msync "sync"
"time"

"google.golang.org/grpc/credentials"

"buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc"
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1"

Expand All @@ -18,47 +23,55 @@ import (
)

const (
// Prefix for GRPC URL inputs. GRPC does not define a prefix through standard. This prefix helps to differentiate
// remote URLs for REST APIs (i.e - HTTP) from GRPC endpoints.
Prefix = "grpc://"
// Prefix for GRPC URL inputs. GRPC does not define a standard prefix. This prefix helps to differentiate remote
// URLs for REST APIs (i.e - HTTP) from GRPC endpoints.
Prefix = "grpc://"
PrefixSecure = "grpcs://"

// Connection retry constants
// Back off period is calculated with backOffBase ^ #retry-iteration. However, when #retry-iteration count reach
// backOffLimit, retry delay fallback to constantBackOffDelay
backOffLimit = 3
backOffBase = 4
constantBackOffDelay = 60

tlsVersion = tls.VersionTLS12
)

var once msync.Once

type Sync struct {
Target string
URI string
ProviderID string
CertPath string
Logger *logger.Logger
Mux *msync.RWMutex

syncClient syncv1grpc.FlagSyncService_SyncFlagsClient
client syncv1grpc.FlagSyncServiceClient
options []grpc.DialOption
ready bool
client syncv1grpc.FlagSyncServiceClient
ready bool
}

func (g *Sync) connectClient(ctx context.Context) error {
// initial dial and connection. Failure here must result in a startup failure
dial, err := grpc.DialContext(ctx, g.Target, g.options...)
func (g *Sync) Init(ctx context.Context) error {
tCredentials, err := buildTransportCredentials(g.URI, g.CertPath)
if err != nil {
g.Logger.Error(fmt.Sprintf("error building transport credentials: %s", err.Error()))
return err
}

g.client = syncv1grpc.NewFlagSyncServiceClient(dial)
target, ok := sourceToGRPCTarget(g.URI)
if !ok {
return fmt.Errorf("invalid grpc source: %s", g.URI)
}

syncClient, err := g.client.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID})
// Derive reusable client connection
rpcCon, err := grpc.DialContext(ctx, target, grpc.WithTransportCredentials(tCredentials))
if err != nil {
g.Logger.Error(fmt.Sprintf("error calling streaming operation: %s", err.Error()))
g.Logger.Error(fmt.Sprintf("error initiating grpc client connection: %s", err.Error()))
return err
}
g.syncClient = syncClient

// Setup service client
g.client = syncv1grpc.NewFlagSyncServiceClient(rpcCon)

return nil
}

Expand All @@ -70,30 +83,28 @@ func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error
}
dataSync <- sync.DataSync{
FlagData: res.GetFlagConfiguration(),
Source: g.Target,
Source: g.URI,
Type: sync.ALL,
}
return nil
}

func (g *Sync) Init(ctx context.Context) error {
g.options = []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}

// initial dial and connection. Failure here must result in a startup failure
return g.connectClient(ctx)
}

func (g *Sync) IsReady() bool {
return g.ready
}

func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
// initial stream listening
err := g.handleFlagSync(g.syncClient, dataSync)
// Initialize SyncFlags client. This fails if server connection establishment fails (ex:- grpc server offline)
syncClient, err := g.client.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID})
if err != nil {
return err
}

// Initial stream listening. Error will be logged and continue and retry connection establishment
err = g.handleFlagSync(syncClient, dataSync)
if err == nil {
return nil
// This should not happen as handleFlagSync expects to return with an error
return err
}

g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error()))
Expand Down Expand Up @@ -141,20 +152,15 @@ func (g *Sync) connectWithRetry(
return nil, false
}

g.Logger.Warn(fmt.Sprintf("connection re-establishment attempt in-progress for grpc target: %s", g.Target))

if err := g.connectClient(ctx); err != nil {
g.Logger.Debug(fmt.Sprintf("error dialing target: %s", err.Error()))
continue
}
g.Logger.Warn(fmt.Sprintf("connection re-establishment attempt in-progress for grpc target: %s", g.URI))

syncClient, err := g.client.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID})
if err != nil {
g.Logger.Debug(fmt.Sprintf("error opening service client: %s", err.Error()))
continue
}

g.Logger.Info(fmt.Sprintf("connection re-established with grpc target: %s", g.Target))
g.Logger.Info(fmt.Sprintf("connection re-established with grpc target: %s", g.URI))
return syncClient, true
}
}
Expand All @@ -176,31 +182,31 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient,
case v1.SyncState_SYNC_STATE_ALL:
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.Target,
Source: g.URI,
Type: sync.ALL,
}

g.Logger.Debug("received full configuration payload")
case v1.SyncState_SYNC_STATE_ADD:
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.Target,
Source: g.URI,
Type: sync.ADD,
}

g.Logger.Debug("received an add payload")
case v1.SyncState_SYNC_STATE_UPDATE:
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.Target,
Source: g.URI,
Type: sync.UPDATE,
}

g.Logger.Debug("received an update payload")
case v1.SyncState_SYNC_STATE_DELETE:
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.Target,
Source: g.URI,
Type: sync.DELETE,
}

Expand All @@ -213,14 +219,57 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient,
}
}

// URLToGRPCTarget is a helper to derive GRPC target from a provided URL
// buildTransportCredentials is a helper to build grpc credentials.TransportCredentials based on source and cert path
func buildTransportCredentials(source string, certPath string) (credentials.TransportCredentials, error) {
if strings.Contains(source, Prefix) {
return insecure.NewCredentials(), nil
}

if !strings.Contains(source, PrefixSecure) {
return nil, fmt.Errorf("invalid source. grpc source must contain prefix %s or %s", Prefix, PrefixSecure)
}

if certPath == "" {
// Rely on CA certs provided from system
return credentials.NewTLS(&tls.Config{MinVersion: tlsVersion}), nil
}

// Rely on provided certificate
certBytes, err := os.ReadFile(certPath)
if err != nil {
return nil, err
}

cp := x509.NewCertPool()
if !cp.AppendCertsFromPEM(certBytes) {
return nil, fmt.Errorf("invalid certificate provided at path: %s", certPath)
}

return credentials.NewTLS(&tls.Config{
MinVersion: tlsVersion,
RootCAs: cp,
}), nil
}

// sourceToGRPCTarget is a helper to derive GRPC target from a provided URL
// For example, function returns the target localhost:9090 for the input grpc://localhost:9090
func URLToGRPCTarget(url string) string {
index := strings.Split(url, Prefix)
func sourceToGRPCTarget(url string) (string, bool) {
var separator string

switch {
case strings.Contains(url, Prefix):
separator = Prefix
case strings.Contains(url, PrefixSecure):
separator = PrefixSecure
default:
return "", false
}

index := strings.Split(url, separator)

if len(index) == 2 {
return index[1]
if len(index) == 2 && len(index[1]) != 0 {
return index[1], true
}

return index[0]
return "", false
}
Loading

0 comments on commit 228f430

Please sign in to comment.