Skip to content

Commit

Permalink
Add support for blob type sources.
Browse files Browse the repository at this point in the history
  • Loading branch information
alan-kut committed Jul 22, 2024
1 parent bed077b commit 8d56f66
Show file tree
Hide file tree
Showing 8 changed files with 434 additions and 62 deletions.
5 changes: 5 additions & 0 deletions core/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ require (
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/gax-go/v2 v2.12.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
github.com/huandu/xstrings v1.4.0 // indirect
github.com/imdario/mergo v0.3.16 // indirect
Expand All @@ -82,15 +83,19 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
gocloud.dev v0.37.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/term v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/api v0.169.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
google.golang.org/protobuf v1.34.2 // indirect
Expand Down
9 changes: 9 additions & 0 deletions core/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,8 @@ github.com/googleapis/gax-go/v2 v2.8.0/go.mod h1:4orTrqY6hXxxaUL4LHIPl6lGo8vAE38
github.com/googleapis/gax-go/v2 v2.10.0/go.mod h1:4UOEnMCrxsSqQ940WnTiD6qJ63le2ev3xfyagutxiPw=
github.com/googleapis/gax-go/v2 v2.11.0/go.mod h1:DxmR61SGKkGLa2xigwuZIQpkCI2S5iydzRfb3peWZJI=
github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU=
github.com/googleapis/gax-go/v2 v2.12.2 h1:mhN09QQW1jEWeMF74zGR81R30z4VJzjZsfkUhuHF+DA=
github.com/googleapis/gax-go/v2 v2.12.2/go.mod h1:61M8vcyyXR2kqKFxKrfA22jaA8JGF7Dc8App1U3H6jc=
github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4=
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
Expand Down Expand Up @@ -1572,6 +1574,7 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs=
go.opentelemetry.io/otel v1.26.0/go.mod h1:UmLkJHUAidDval2EICqBMbnAd0/m2vmpf/dAM+fvFs4=
Expand Down Expand Up @@ -1642,6 +1645,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
gocloud.dev v0.37.0 h1:XF1rN6R0qZI/9DYjN16Uy0durAmSlf58DHOcb28GPro=
gocloud.dev v0.37.0/go.mod h1:7/O4kqdInCNsc6LqgmuFnS0GRew4XNNYWpA44yQnwco=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down Expand Up @@ -2086,6 +2091,8 @@ golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU=
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90=
gomodules.xyz/jsonpatch/v2 v2.4.0 h1:Ci3iUJyx9UeRx7CeFN8ARgGbkESwJK+KB9lLcWxY/Zw=
gomodules.xyz/jsonpatch/v2 v2.4.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
Expand Down Expand Up @@ -2161,6 +2168,8 @@ google.golang.org/api v0.126.0/go.mod h1:mBwVAtz+87bEN6CbA1GtZPDOqY2R5ONPqJeIlvy
google.golang.org/api v0.128.0/go.mod h1:Y611qgqaE92On/7g65MQgxYul3c0rEB894kniWLY750=
google.golang.org/api v0.139.0/go.mod h1:CVagp6Eekz9CjGZ718Z+sloknzkDJE7Vc1Ckj9+viBk=
google.golang.org/api v0.149.0/go.mod h1:Mwn1B7JTXrzXtnvmzQE2BD6bYZQ8DShKZDZbeN9I7qI=
google.golang.org/api v0.169.0 h1:QwWPy71FgMWqJN/l6jVlFHUa29a7dcUy02I8o799nPY=
google.golang.org/api v0.169.0/go.mod h1:gpNOiMA2tZ4mf5R9Iwf4rK/Dcz0fbdIgWYWVoxmsyLg=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
Expand Down
144 changes: 144 additions & 0 deletions core/pkg/sync/blob/blob_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package blob

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"time"

"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
"gocloud.dev/blob"
//nolint:gosec
)

type Sync struct {
Bucket string
Object string
BlobURLMux *blob.URLMux
Cron Cron
Logger *logger.Logger
Interval uint32
ready bool
lastUpdated time.Time
}

// Cron defines the behaviour required of a cron
type Cron interface {
AddFunc(spec string, cmd func()) error
Start()
Stop()
}

func (hs *Sync) Init(ctx context.Context) error {
return nil
}

func (hs *Sync) IsReady() bool {
return hs.ready
}

func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
hs.Logger.Info(fmt.Sprintf("starting sync from %s/%s with interval %d", hs.Bucket, hs.Object, hs.Interval))
// Initial fetch
hs.Logger.Debug(fmt.Sprintf("initial sync of the %s/%s", hs.Bucket, hs.Object))
err := hs.ReSync(ctx, dataSync)
if err != nil {
return err
}
hs.ready = true

hs.Logger.Debug(fmt.Sprintf("polling %s/%s every %d seconds", hs.Bucket, hs.Object, hs.Interval))
_ = hs.Cron.AddFunc(fmt.Sprintf("*/%d * * * *", hs.Interval), func() {
hs.Logger.Debug(fmt.Sprintf("fetching configuration from %s/%s", hs.Bucket, hs.Object))
bucket, err := hs.getBucket(ctx)
if err != nil {
hs.Logger.Warn(fmt.Sprintf("couldn't get bucket: %v", err))
return
}
defer bucket.Close()
updated, err := hs.fetchObjectModificationTime(ctx, bucket)
if err != nil {
hs.Logger.Warn(fmt.Sprintf("couldn't get object attributes: %v", err))
return
}
if hs.lastUpdated == updated {
hs.Logger.Debug("configuration hasn't changed, skipping fetching full object")
return
}
msg, err := hs.fetchObject(ctx, bucket)
if err != nil {
hs.Logger.Warn(fmt.Sprintf("couldn't get object: %v", err))
return
}
hs.Logger.Info(fmt.Sprintf("configuration updated: %s", msg))
dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + "/" + hs.Object, Type: sync.ALL}
hs.lastUpdated = updated
})

hs.Cron.Start()

<-ctx.Done()
hs.Cron.Stop()

return nil
}

func (hs *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
bucket, err := hs.getBucket(ctx)
if err != nil {
return err
}
defer bucket.Close()
updated, err := hs.fetchObjectModificationTime(ctx, bucket)
if err != nil {
return err
}
msg, err := hs.fetchObject(ctx, bucket)
if err != nil {
return err
}
hs.Logger.Info(fmt.Sprintf("configuration updated: %s", msg))
dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + "/" + hs.Object, Type: sync.ALL}
hs.lastUpdated = updated
return nil
}

func (hs *Sync) getBucket(ctx context.Context) (*blob.Bucket, error) {
if hs.Bucket == "" {
return nil, errors.New("no bucket string set")
}
return hs.BlobURLMux.OpenBucket(ctx, hs.Bucket)
}

func (hs *Sync) fetchObjectModificationTime(ctx context.Context, bucket *blob.Bucket) (time.Time, error) {
if hs.Object == "" {
return time.Time{}, errors.New("no object string set")
}
attrs, err := bucket.Attributes(ctx, hs.Object)
if err != nil {
return time.Time{}, fmt.Errorf("error fetching attributes for object %s/%s: %w", hs.Bucket, hs.Object, err)
}
return attrs.ModTime, nil
}

func (hs *Sync) fetchObject(ctx context.Context, bucket *blob.Bucket) (string, error) {
if hs.Object == "" {
return "", errors.New("no object string set")
}
r, err := bucket.NewReader(ctx, hs.Object, nil)
if err != nil {
return "", fmt.Errorf("error creating reader for object %s/%s: %w", hs.Bucket, hs.Object, err)
}
defer r.Close()

buf := bytes.NewBuffer(nil)
_, err = io.Copy(buf, r)
if err != nil {
return "", fmt.Errorf("error reading object %s/%s: %w", hs.Bucket, hs.Object, err)
}

return string(buf.Bytes()), nil
}
127 changes: 127 additions & 0 deletions core/pkg/sync/blob/blob_sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package blob

import (
"context"
"log"
"testing"
"time"

"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
synctesting "github.com/open-feature/flagd/core/pkg/sync/testing"
"go.uber.org/mock/gomock"
)

const (
scheme = "xyz"
bucket = "b"
object = "o"
)

func TestSync(t *testing.T) {
ctrl := gomock.NewController(t)
mockCron := synctesting.NewMockCron(ctrl)
mockCron.EXPECT().AddFunc(gomock.Any(), gomock.Any()).DoAndReturn(func(spec string, cmd func()) error {
return nil
})
mockCron.EXPECT().Start().Times(1)

blobSync := &Sync{
Bucket: scheme + "://" + bucket,
Object: object,
Cron: mockCron,
Logger: logger.NewLogger(nil, false),
}
blobMock := NewMockBlob(scheme, func() *Sync {
return blobSync
})
blobSync.BlobURLMux = blobMock.URLMux()

ctx := context.Background()
dataSyncChan := make(chan sync.DataSync, 1)

config := "my-config"
blobMock.AddObject(object, config)

go func() {
err := blobSync.Sync(ctx, dataSyncChan)
if err != nil {
log.Fatalf("Error start sync: %s", err.Error())
return
}
}()

data := <-dataSyncChan // initial sync
if data.FlagData != config {
t.Errorf("expected content: %s, but received content: %s", config, data.FlagData)
}
tickWithConfigChange(t, mockCron, dataSyncChan, blobMock, "new config")
tickWithoutConfigChange(t, mockCron, dataSyncChan)
tickWithConfigChange(t, mockCron, dataSyncChan, blobMock, "new config 2")
tickWithoutConfigChange(t, mockCron, dataSyncChan)
tickWithoutConfigChange(t, mockCron, dataSyncChan)
}

func tickWithConfigChange(t *testing.T, mockCron *synctesting.MockCron, dataSyncChan chan sync.DataSync, blobMock *MockBlob, newConfig string) {
time.Sleep(time.Millisecond) // sleep so the new file has different modification date
blobMock.AddObject(object, newConfig)
mockCron.Tick()
select {
case data, ok := <-dataSyncChan:
if ok {
if data.FlagData != newConfig {
t.Errorf("expected content: %s, but received content: %s", newConfig, data.FlagData)
}
} else {
t.Errorf("data channel unexpecdly closed")
}
default:
t.Errorf("data channel has no expected update")
}
}

func tickWithoutConfigChange(t *testing.T, mockCron *synctesting.MockCron, dataSyncChan chan sync.DataSync) {
mockCron.Tick()
select {
case data, ok := <-dataSyncChan:
if ok {
t.Errorf("unexpected update: %s", data.FlagData)
} else {
t.Errorf("data channel unexpecdly closed")
}
default:
}
}

func TestReSync(t *testing.T) {
ctrl := gomock.NewController(t)
mockCron := synctesting.NewMockCron(ctrl)

blobSync := &Sync{
Bucket: scheme + "://" + bucket,
Object: object,
Cron: mockCron,
Logger: logger.NewLogger(nil, false),
}
blobMock := NewMockBlob(scheme, func() *Sync {
return blobSync
})
blobSync.BlobURLMux = blobMock.URLMux()

ctx := context.Background()
dataSyncChan := make(chan sync.DataSync, 1)

config := "my-config"
blobMock.AddObject(object, config)

err := blobSync.ReSync(ctx, dataSyncChan)
if err != nil {
log.Fatalf("Error start sync: %s", err.Error())
return
}

data := <-dataSyncChan
if data.FlagData != config {
t.Errorf("expected content: %s, but received content: %s", config, data.FlagData)
}
}
Loading

0 comments on commit 8d56f66

Please sign in to comment.