Skip to content

Commit

Permalink
Fix lint and data race
Browse files Browse the repository at this point in the history
  • Loading branch information
alan-kut committed Jul 23, 2024
1 parent 3dae59e commit 5cab2db
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 48 deletions.
75 changes: 34 additions & 41 deletions core/pkg/sync/blob/blob_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/open-feature/flagd/core/pkg/sync"
"gocloud.dev/blob"
_ "gocloud.dev/blob/gcsblob" // needed to initialize GCS driver
//nolint:gosec
)

type Sync struct {
Expand All @@ -33,7 +32,7 @@ type Cron interface {
Stop()
}

func (hs *Sync) Init(ctx context.Context) error {
func (hs *Sync) Init(_ context.Context) error {
return nil
}

Expand All @@ -43,75 +42,69 @@ func (hs *Sync) IsReady() bool {

func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
hs.Logger.Info(fmt.Sprintf("starting sync from %s/%s with interval %d", hs.Bucket, hs.Object, hs.Interval))
_ = hs.Cron.AddFunc(fmt.Sprintf("*/%d * * * *", hs.Interval), func() {
err := hs.sync(ctx, dataSync, false)
if err != nil {
hs.Logger.Warn(fmt.Sprintf("sync failed: %v", err))
}
})
// Initial fetch
hs.Logger.Debug(fmt.Sprintf("initial sync of the %s/%s", hs.Bucket, hs.Object))
err := hs.ReSync(ctx, dataSync)
err := hs.sync(ctx, dataSync, false)
if err != nil {
return err
}
hs.ready = true

hs.Logger.Debug(fmt.Sprintf("polling %s/%s every %d seconds", hs.Bucket, hs.Object, hs.Interval))
_ = hs.Cron.AddFunc(fmt.Sprintf("*/%d * * * *", hs.Interval), func() {
hs.Logger.Debug(fmt.Sprintf("fetching configuration from %s/%s", hs.Bucket, hs.Object))
bucket, err := hs.getBucket(ctx)
if err != nil {
hs.Logger.Warn(fmt.Sprintf("couldn't get bucket: %v", err))
return
}
defer bucket.Close()
updated, err := hs.fetchObjectModificationTime(ctx, bucket)
if err != nil {
hs.Logger.Warn(fmt.Sprintf("couldn't get object attributes: %v", err))
return
}
if hs.lastUpdated == updated {
hs.Logger.Debug("configuration hasn't changed, skipping fetching full object")
return
}
msg, err := hs.fetchObject(ctx, bucket)
if err != nil {
hs.Logger.Warn(fmt.Sprintf("couldn't get object: %v", err))
return
}
hs.Logger.Info(fmt.Sprintf("configuration updated: %s", msg))
dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + hs.Object, Type: sync.ALL}
hs.lastUpdated = updated
})

hs.ready = true
hs.Cron.Start()

<-ctx.Done()
hs.Cron.Stop()

return nil
}

func (hs *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
return hs.sync(ctx, dataSync, true)
}

func (hs *Sync) sync(ctx context.Context, dataSync chan<- sync.DataSync, skipCheckingModTime bool) error {
bucket, err := hs.getBucket(ctx)
if err != nil {
return err
return fmt.Errorf("couldn't get bucket: %v", err)
}
defer bucket.Close()
updated, err := hs.fetchObjectModificationTime(ctx, bucket)
if err != nil {
return err
var updated time.Time
if !skipCheckingModTime {
updated, err = hs.fetchObjectModificationTime(ctx, bucket)
if err != nil {
return fmt.Errorf("couldn't get object attributes: %v", err)
}
if hs.lastUpdated == updated {
hs.Logger.Debug("configuration hasn't changed, skipping fetching full object")
return nil
}
}
msg, err := hs.fetchObject(ctx, bucket)
if err != nil {
return err
return fmt.Errorf("couldn't get object: %v", err)
}
hs.Logger.Info(fmt.Sprintf("configuration updated: %s", msg))
if !skipCheckingModTime {
hs.lastUpdated = updated
}
dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + hs.Object, Type: sync.ALL}
hs.lastUpdated = updated
return nil
}

func (hs *Sync) getBucket(ctx context.Context) (*blob.Bucket, error) {
if hs.Bucket == "" {
return nil, errors.New("no bucket string set")
}
return hs.BlobURLMux.OpenBucket(ctx, hs.Bucket)
b, err := hs.BlobURLMux.OpenBucket(ctx, hs.Bucket)
if err != nil {
return nil, fmt.Errorf("error opening bucket %s: %v", hs.Bucket, err)
}
return b, nil
}

func (hs *Sync) fetchObjectModificationTime(ctx context.Context, bucket *blob.Bucket) (time.Time, error) {
Expand Down Expand Up @@ -141,5 +134,5 @@ func (hs *Sync) fetchObject(ctx context.Context, bucket *blob.Bucket) (string, e
return "", fmt.Errorf("error reading object %s/%s: %w", hs.Bucket, hs.Object, err)
}

return string(buf.Bytes()), nil
return buf.String(), nil
}
2 changes: 1 addition & 1 deletion core/pkg/sync/blob/blob_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestSync(t *testing.T) {
}

func tickWithConfigChange(t *testing.T, mockCron *synctesting.MockCron, dataSyncChan chan sync.DataSync, blobMock *MockBlob, newConfig string) {
time.Sleep(time.Millisecond) // sleep so the new file has different modification date
time.Sleep(1 * time.Millisecond) // sleep so the new file has different modification date
blobMock.AddObject(object, newConfig)
mockCron.Tick()
select {
Expand Down
7 changes: 3 additions & 4 deletions core/pkg/sync/blob/mock_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

type MockBlob struct {
mux *blob.URLMux
bucket *blob.Bucket
scheme string
opener *fakeOpener
}
Expand All @@ -23,13 +22,13 @@ type fakeOpener struct {
getSync func() *Sync
}

func (f *fakeOpener) OpenBucketURL(ctx context.Context, u *url.URL) (*blob.Bucket, error) {
bucketUrl, err := url.Parse("mem://")
func (f *fakeOpener) OpenBucketURL(ctx context.Context, _ *url.URL) (*blob.Bucket, error) {
bucketURL, err := url.Parse("mem://")
if err != nil {
log.Fatalf("couldn't parse url: %s: %v", "mem://", err)
}
opener := &memblob.URLOpener{}
bucket, err := opener.OpenBucketURL(context.Background(), bucketUrl)
bucket, err := opener.OpenBucketURL(ctx, bucketURL)
if err != nil {
log.Fatalf("couldn't open in memory bucket: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions core/pkg/sync/builder/syncbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (sb *SyncBuilder) newGcs(config sync.SourceConfig, logger *logger.Logger) *
// Extract bucket uri and object name from the full URI:
// gs://bucket/path/to/object results in gs://bucket/ as bucketUri and
// path/to/object as an object name.
bucketUri := regGcs.FindString(config.URI)
bucketURI := regGcs.FindString(config.URI)
objectName := regGcs.ReplaceAllString(config.URI, "")

// Defaults to 5 seconds if interval is not set.
Expand All @@ -192,7 +192,7 @@ func (sb *SyncBuilder) newGcs(config sync.SourceConfig, logger *logger.Logger) *
}

return &blobSync.Sync{
Bucket: bucketUri,
Bucket: bucketURI,
Object: objectName,

BlobURLMux: blob.DefaultURLMux(),
Expand Down

0 comments on commit 5cab2db

Please sign in to comment.