Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add 'watcher' interface to file sync #1365

Merged
merged 15 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 42 additions & 10 deletions core/pkg/sync/builder/syncbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"net/http"
"os"
"regexp"
msync "sync"
"time"

"github.com/open-feature/flagd/core/pkg/logger"
Expand All @@ -26,6 +25,8 @@ import (

const (
syncProviderFile = "file"
syncProviderFsNotify = "fsnotify"
syncProviderFileInfo = "fileinfo"
beeme1mr marked this conversation as resolved.
Show resolved Hide resolved
syncProviderGrpc = "grpc"
syncProviderKubernetes = "kubernetes"
syncProviderHTTP = "http"
Expand Down Expand Up @@ -91,8 +92,13 @@ func (sb *SyncBuilder) SyncsFromConfig(sourceConfigs []sync.SourceConfig, logger
func (sb *SyncBuilder) syncFromConfig(sourceConfig sync.SourceConfig, logger *logger.Logger) (sync.ISync, error) {
switch sourceConfig.Provider {
case syncProviderFile:
logger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", sourceConfig.URI))
return sb.newFile(sourceConfig.URI, logger), nil
case syncProviderFsNotify:
logger.Debug(fmt.Sprintf("using fsnotify sync-provider for: %q", sourceConfig.URI))
return sb.newFsNotify(sourceConfig.URI, logger), nil
case syncProviderFileInfo:
logger.Debug(fmt.Sprintf("using fileinfo sync-provider for: %q", sourceConfig.URI))
return sb.newFileInfo(sourceConfig.URI, logger), nil
case syncProviderKubernetes:
logger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", sourceConfig.URI))
return sb.newK8s(sourceConfig.URI, logger)
Expand All @@ -107,20 +113,46 @@ func (sb *SyncBuilder) syncFromConfig(sourceConfig sync.SourceConfig, logger *lo
return sb.newGcs(sourceConfig, logger), nil

default:
return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s' or '%s'",
sourceConfig.Provider, syncProviderFile, syncProviderKubernetes, syncProviderHTTP, syncProviderKubernetes)
return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s', %s', '%s' or '%s'",
sourceConfig.Provider, syncProviderFile, syncProviderFsNotify, syncProviderFileInfo,
syncProviderKubernetes, syncProviderHTTP, syncProviderKubernetes)
}
}

// newFile returns an fsinfo sync if we are in k8s or fileinfo if not
func (sb *SyncBuilder) newFile(uri string, logger *logger.Logger) *file.Sync {
return &file.Sync{
URI: regFile.ReplaceAllString(uri, ""),
Logger: logger.WithFields(
switch os.Getenv("KUBERNETES_SERVICE_HOST") {
case "":
// no k8s service host env; use fileinfo
return sb.newFileInfo(uri, logger)
default:
// default to fsnotify
return sb.newFsNotify(uri, logger)
}
beeme1mr marked this conversation as resolved.
Show resolved Hide resolved
}

// return a new file.Sync that uses fsnotify under the hood
func (sb *SyncBuilder) newFsNotify(uri string, logger *logger.Logger) *file.Sync {
return file.NewFileSync(
beeme1mr marked this conversation as resolved.
Show resolved Hide resolved
regFile.ReplaceAllString(uri, ""),
file.FSNOTIFY,
logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "filepath"),
zap.String("sync", syncProviderFsNotify),
),
Mux: &msync.RWMutex{},
}
)
}

// return a new file.Sync that uses os.Stat/fs.FileInfo under the hood
func (sb *SyncBuilder) newFileInfo(uri string, logger *logger.Logger) *file.Sync {
return file.NewFileSync(
regFile.ReplaceAllString(uri, ""),
file.FILEINFO,
logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", syncProviderFileInfo),
),
)
}

func (sb *SyncBuilder) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) {
Expand Down
202 changes: 202 additions & 0 deletions core/pkg/sync/file/fileinfo_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package file

import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"sync"
"time"

"github.com/fsnotify/fsnotify"
"github.com/open-feature/flagd/core/pkg/logger"
)

// Implements file.Watcher using a timer and os.FileInfo
type fileInfoWatcher struct {
// Event Chan
evChan chan fsnotify.Event
// Errors Chan
erChan chan error
// logger
logger *logger.Logger
// Func to wrap os.Stat (injection point for test helpers)
statFunc func(string) (fs.FileInfo, error)
beeme1mr marked this conversation as resolved.
Show resolved Hide resolved
// thread-safe interface to underlying files we are watching
mu sync.RWMutex
watches map[string]fs.FileInfo // filename -> info
}

// NewFsNotifyWatcher returns a new fsNotifyWatcher
func NewFileInfoWatcher(ctx context.Context, logger *logger.Logger) Watcher {
fiw := &fileInfoWatcher{
evChan: make(chan fsnotify.Event, 32),
erChan: make(chan error, 32),
statFunc: getFileInfo,
logger: logger,
watches: make(map[string]fs.FileInfo),
}
fiw.run(ctx, (1 * time.Second))
return fiw
}

// fileInfoWatcher explicitly implements file.Watcher
var _ Watcher = &fileInfoWatcher{}

// Close calls close on the underlying fsnotify.Watcher
func (f *fileInfoWatcher) Close() error {
// close all channels and exit
close(f.evChan)
close(f.erChan)
return nil
}

// Add calls Add on the underlying fsnotify.Watcher
func (f *fileInfoWatcher) Add(name string) error {
f.mu.Lock()
defer f.mu.Unlock()

// exit early if name already exists
if _, ok := f.watches[name]; ok {
return nil
}

info, err := f.statFunc(name)
if err != nil {
return err
}

f.watches[name] = info

return nil
}

// Remove calls Remove on the underlying fsnotify.Watcher
func (f *fileInfoWatcher) Remove(name string) error {
f.mu.Lock()
defer f.mu.Unlock()

// no need to exit early, deleting non-existent key is a no-op
delete(f.watches, name)

return nil
}

// Watchlist calls watchlist on the underlying fsnotify.Watcher
func (f *fileInfoWatcher) WatchList() []string {
f.mu.RLock()
defer f.mu.RUnlock()
out := []string{}
for name := range f.watches {
n := name
out = append(out, n)
}
return out
}

// Events returns the underlying watcher's Events chan
func (f *fileInfoWatcher) Events() chan fsnotify.Event {
return f.evChan
}

// Errors returns the underlying watcher's Errors chan
func (f *fileInfoWatcher) Errors() chan error {
return f.erChan
}

// run is a blocking function that starts the filewatcher's timer thread
func (f *fileInfoWatcher) run(ctx context.Context, s time.Duration) {
// timer thread
go func() {
Kavindu-Dodan marked this conversation as resolved.
Show resolved Hide resolved
// execute update on the configured interval of time
ticker := time.NewTicker(s)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := f.update(); err != nil {
f.erChan <- err
return
}
}
}
}()
}

func (f *fileInfoWatcher) update() error {
f.mu.Lock()
defer f.mu.Unlock()

for path, info := range f.watches {
newInfo, err := f.statFunc(path)
if err != nil {
// if the file isn't there, it must have been removed
// fire off a remove event and remove it from the watches
if errors.Is(err, os.ErrNotExist) {
f.evChan <- fsnotify.Event{
Name: path,
Op: fsnotify.Remove,
}
delete(f.watches, path)
continue
}
return err
}

// if the new stat doesn't match the old stat, figure out what changed
if info != newInfo {
event := f.generateEvent(path, newInfo)
if event != nil {
f.evChan <- *event
}
f.watches[path] = newInfo
}
}
return nil
}

// generateEvent figures out what changed and generates an fsnotify.Event for it. (if we care)
// file removal are handled above in the update() method
func (f *fileInfoWatcher) generateEvent(path string, newInfo fs.FileInfo) *fsnotify.Event {
info := f.watches[path]
switch {
// new mod time is more recent than old mod time, generate a write event
case newInfo.ModTime().After(info.ModTime()):
return &fsnotify.Event{
Name: path,
Op: fsnotify.Write,
}
// the file modes changed, generate a chmod event
case info.Mode() != newInfo.Mode():
return &fsnotify.Event{
Name: path,
Op: fsnotify.Chmod,
}
// nothing changed that we care about
default:
return nil
}
}

// getFileInfo returns the fs.FileInfo for the given path
func getFileInfo(path string) (fs.FileInfo, error) {
f, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("error from os.Open(%s): %w", path, err)
}

info, err := f.Stat()
if err != nil {
return info, fmt.Errorf("error from fs.Stat(%s): %w", path, err)
}

if err := f.Close(); err != nil {
return info, fmt.Errorf("err from fs.Close(%s): %w", path, err)
}

return info, nil
}
Loading
Loading