-
Notifications
You must be signed in to change notification settings - Fork 64
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Support blob type sources and GCS as an example of such source. (…
…#1366) Signed-off-by: Alan Kutniewski <kutniewski@google.com>
- Loading branch information
Showing
14 changed files
with
589 additions
and
67 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
package blob | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"errors" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/open-feature/flagd/core/pkg/logger" | ||
"github.com/open-feature/flagd/core/pkg/sync" | ||
"gocloud.dev/blob" | ||
_ "gocloud.dev/blob/gcsblob" // needed to initialize GCS driver | ||
) | ||
|
||
type Sync struct { | ||
Bucket string | ||
Object string | ||
BlobURLMux *blob.URLMux | ||
Cron Cron | ||
Logger *logger.Logger | ||
Interval uint32 | ||
ready bool | ||
lastUpdated time.Time | ||
} | ||
|
||
// Cron defines the behaviour required of a cron | ||
type Cron interface { | ||
AddFunc(spec string, cmd func()) error | ||
Start() | ||
Stop() | ||
} | ||
|
||
func (hs *Sync) Init(_ context.Context) error { | ||
if hs.Bucket == "" { | ||
return errors.New("no bucket string set") | ||
} | ||
if hs.Object == "" { | ||
return errors.New("no object string set") | ||
} | ||
return nil | ||
} | ||
|
||
func (hs *Sync) IsReady() bool { | ||
return hs.ready | ||
} | ||
|
||
func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { | ||
hs.Logger.Info(fmt.Sprintf("starting sync from %s/%s with interval %ds", hs.Bucket, hs.Object, hs.Interval)) | ||
_ = hs.Cron.AddFunc(fmt.Sprintf("*/%d * * * *", hs.Interval), func() { | ||
err := hs.sync(ctx, dataSync, false) | ||
if err != nil { | ||
hs.Logger.Warn(fmt.Sprintf("sync failed: %v", err)) | ||
} | ||
}) | ||
// Initial fetch | ||
hs.Logger.Debug(fmt.Sprintf("initial sync of the %s/%s", hs.Bucket, hs.Object)) | ||
err := hs.sync(ctx, dataSync, false) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
hs.ready = true | ||
hs.Cron.Start() | ||
<-ctx.Done() | ||
hs.Cron.Stop() | ||
|
||
return nil | ||
} | ||
|
||
func (hs *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error { | ||
return hs.sync(ctx, dataSync, true) | ||
} | ||
|
||
func (hs *Sync) sync(ctx context.Context, dataSync chan<- sync.DataSync, skipCheckingModTime bool) error { | ||
bucket, err := hs.getBucket(ctx) | ||
if err != nil { | ||
return fmt.Errorf("couldn't get bucket: %v", err) | ||
} | ||
defer bucket.Close() | ||
var updated time.Time | ||
if !skipCheckingModTime { | ||
updated, err = hs.fetchObjectModificationTime(ctx, bucket) | ||
if err != nil { | ||
return fmt.Errorf("couldn't get object attributes: %v", err) | ||
} | ||
if hs.lastUpdated == updated { | ||
hs.Logger.Debug("configuration hasn't changed, skipping fetching full object") | ||
return nil | ||
} | ||
if hs.lastUpdated.After(updated) { | ||
hs.Logger.Warn("configuration changed but the modification time decreased instead of increasing") | ||
} | ||
} | ||
msg, err := hs.fetchObject(ctx, bucket) | ||
if err != nil { | ||
return fmt.Errorf("couldn't get object: %v", err) | ||
} | ||
hs.Logger.Debug(fmt.Sprintf("configuration updated: %s", msg)) | ||
if !skipCheckingModTime { | ||
hs.lastUpdated = updated | ||
} | ||
dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + hs.Object, Type: sync.ALL} | ||
return nil | ||
} | ||
|
||
func (hs *Sync) getBucket(ctx context.Context) (*blob.Bucket, error) { | ||
b, err := hs.BlobURLMux.OpenBucket(ctx, hs.Bucket) | ||
if err != nil { | ||
return nil, fmt.Errorf("error opening bucket %s: %v", hs.Bucket, err) | ||
} | ||
return b, nil | ||
} | ||
|
||
func (hs *Sync) fetchObjectModificationTime(ctx context.Context, bucket *blob.Bucket) (time.Time, error) { | ||
if hs.Object == "" { | ||
return time.Time{}, errors.New("no object string set") | ||
} | ||
attrs, err := bucket.Attributes(ctx, hs.Object) | ||
if err != nil { | ||
return time.Time{}, fmt.Errorf("error fetching attributes for object %s/%s: %w", hs.Bucket, hs.Object, err) | ||
} | ||
return attrs.ModTime, nil | ||
} | ||
|
||
func (hs *Sync) fetchObject(ctx context.Context, bucket *blob.Bucket) (string, error) { | ||
buf := bytes.NewBuffer(nil) | ||
err := bucket.Download(ctx, hs.Object, buf, nil) | ||
if err != nil { | ||
return "", fmt.Errorf("error downloading object %s/%s: %w", hs.Bucket, hs.Object, err) | ||
} | ||
|
||
return buf.String(), nil | ||
} |
Oops, something went wrong.