From 8647e021c10bbbab31c86163872475e926bf5c48 Mon Sep 17 00:00:00 2001 From: Dave Josephsen Date: Sun, 21 Jul 2024 22:27:20 -0500 Subject: [PATCH 01/13] Wire up 'watcher' interface for file-watching Implement fsnotify and `os.Stat` based watchers fixes: #1344 Signed-off-by: Dave Josephsen --- core/pkg/sync/file/fileinfo_watcher.go | 199 ++++++++++++++++++++ core/pkg/sync/file/fileinfo_watcher_test.go | 124 ++++++++++++ core/pkg/sync/file/filepath_sync.go | 61 ++++-- core/pkg/sync/file/fsnotify_watcher.go | 51 +++++ 4 files changed, 420 insertions(+), 15 deletions(-) create mode 100644 core/pkg/sync/file/fileinfo_watcher.go create mode 100644 core/pkg/sync/file/fileinfo_watcher_test.go create mode 100644 core/pkg/sync/file/fsnotify_watcher.go diff --git a/core/pkg/sync/file/fileinfo_watcher.go b/core/pkg/sync/file/fileinfo_watcher.go new file mode 100644 index 000000000..637404d1f --- /dev/null +++ b/core/pkg/sync/file/fileinfo_watcher.go @@ -0,0 +1,199 @@ +package file + +import ( + "context" + "errors" + "io/fs" + "os" + "sync" + "time" + + "github.com/fsnotify/fsnotify" + "golang.org/x/sync/errgroup" +) + +// Implements file.Watcher using a timer and os.FileInfo +type fileInfoWatcher struct { + // Event Chan + evChan chan fsnotify.Event + // Errors Chan + erChan chan error + // timer thread errgroup + eg errgroup.Group + // Func to wrap os.Stat (injection point for test helpers) + statFunc func(string) (fs.FileInfo, error) + // thread-safe interface to underlying files we are watching + mu sync.RWMutex + watches map[string]fs.FileInfo // filename -> info +} + +// NewFsNotifyWatcher returns a new fsNotifyWatcher +func NewFileInfoWatcher() *fileInfoWatcher { + return &fileInfoWatcher{ + evChan: make(chan fsnotify.Event), + erChan: make(chan error), + } +} + +// fileInfoWatcher explicitly implements file.Watcher +var _ Watcher = &fileInfoWatcher{} + +// Close calls close on the underlying fsnotify.Watcher +func (f *fileInfoWatcher) Close() error { + // close all channels and exit + close(f.evChan) + close(f.erChan) + return nil +} + +// Add calls Add on the underlying fsnotify.Watcher +func (f *fileInfoWatcher) Add(name string) error { + f.mu.Lock() + defer f.mu.Unlock() + + // exit early if name already exists + if _, ok := f.watches[name]; ok { + return nil + } + + info, err := f.statFunc(name) + if err != nil { + return err + } + + f.watches[name] = info + + return nil +} + +// Remove calls Remove on the underlying fsnotify.Watcher +func (f *fileInfoWatcher) Remove(name string) error { + f.mu.Lock() + defer f.mu.Unlock() + + // no need to exit early, deleting non-existent key is a no-op + delete(f.watches, name) + + return nil +} + +// Watchlist calls watchlist on the underlying fsnotify.Watcher +func (f *fileInfoWatcher) WatchList() []string { + f.mu.RLock() + defer f.mu.Unlock() + out := []string{} + for name := range f.watches { + n := name + out = append(out, n) + } + return out +} + +// Events returns the underlying watcher's Events chan +func (f *fileInfoWatcher) Events() chan fsnotify.Event { + return f.evChan +} + +// Errors returns the underlying watcher's Errors chan +func (f *fileInfoWatcher) Errors() chan error { + return f.erChan +} + +// Run is a blocking function that starts the filewatcher's timer thread +func (f *fileInfoWatcher) Run(ctx context.Context, s time.Duration) error { + // timer thread + f.eg.Go(func() error { + // execute update on the configured interval of time + ticker := time.NewTicker(s) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + if err := f.update(); err != nil { + return err + } + } + } + }) + + return f.eg.Wait() +} + +func (f *fileInfoWatcher) update() error { + event := &fsnotify.Event{} + f.mu.Lock() + defer f.mu.Unlock() + + for path, info := range f.watches { + newInfo, err := getFileInfo(path) + if err != nil { + // if the file isn't there, it must have been removed + // fire off a remove event and remove it from the watches + if errors.Is(err, os.ErrNotExist) { + f.evChan <- fsnotify.Event{ + Name: path, + Op: fsnotify.Remove, + } + delete(f.watches, path) + } + return err + } + + // if the new stat doesn't match the old stat, figure out what changed + if info != newInfo { + event, err = f.generateEvent(path, newInfo) + if err != nil { + f.erChan <- err + } else { + if event != nil { + f.evChan <- *event + } + } + f.watches[path] = newInfo + } + } + return nil +} + +// generateEvent figures out what changed and generates an fsnotify.Event for it. (if we care) +func (f *fileInfoWatcher) generateEvent(path string, newInfo fs.FileInfo) (*fsnotify.Event, error) { + info := f.watches[path] + switch { + // new mod time is more recent than old mod time, generate a write event + case newInfo.ModTime().After(info.ModTime()): + return &fsnotify.Event{ + Name: path, + Op: fsnotify.Write, + }, nil + // the file modes changed, generate a chmod event + case info.Mode() != newInfo.Mode(): + return &fsnotify.Event{ + Name: path, + Op: fsnotify.Chmod, + }, nil + } + return nil, nil +} + +// getFileInfo returns the fs.FileInfo for the given path +// TODO: verify this works correctly on windows +func getFileInfo(path string) (fs.FileInfo, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + + info, err := f.Stat() + if err != nil { + return info, err + } + + if err := f.Close(); err != nil { + return info, err + } + + return info, nil +} diff --git a/core/pkg/sync/file/fileinfo_watcher_test.go b/core/pkg/sync/file/fileinfo_watcher_test.go new file mode 100644 index 000000000..58ac1830e --- /dev/null +++ b/core/pkg/sync/file/fileinfo_watcher_test.go @@ -0,0 +1,124 @@ +package file + +import ( + "io/fs" + "testing" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/google/go-cmp/cmp" +) + +func Test_fileInfoWatcher_Close(t *testing.T) { + type fields struct{} + tests := []struct { + name string + watcher *fileInfoWatcher + wantErr bool + }{ + { + name: "all chans close", + watcher: makeTestWatcher(t, map[string]fs.FileInfo{}), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := tt.watcher.Close(); (err != nil) != tt.wantErr { + t.Errorf("fileInfoWatcher.Close() error = %v, wantErr %v", err, tt.wantErr) + } + if _, ok := (<-tt.watcher.Errors()); ok != false { + t.Error("fileInfoWatcher.Close() failed to close error chan") + } + if _, ok := (<-tt.watcher.Events()); ok != false { + t.Error("fileInfoWatcher.Close() failed to close events chan") + } + }) + } +} + +func Test_fileInfoWatcher_Add(t *testing.T) { + tests := []struct { + name string + watcher *fileInfoWatcher + add []string + want map[string]fs.FileInfo + wantErr bool + }{ + { + name: "add one watch", + watcher: makeTestWatcher(t, map[string]fs.FileInfo{}), + add: []string{"/foo"}, + want: map[string]fs.FileInfo{ + "/foo": &mockFileInfo{}, + }, + }, + } + for _, tt := range tests { + tt.watcher.statFunc = makeStatFunc(t, &mockFileInfo{}) + t.Run(tt.name, func(t *testing.T) { + for _, path := range tt.add { + if err := tt.watcher.Add(path); (err != nil) != tt.wantErr { + t.Errorf("fileInfoWatcher.Add() error = %v, wantErr %v", err, tt.wantErr) + } + } + if !cmp.Equal(tt.watcher.watches, tt.want, cmp.AllowUnexported(mockFileInfo{})) { + t.Errorf("fileInfoWatcher.Add(): want-, got+: %v ", cmp.Diff(tt.want, tt.watcher.watches)) + } + }) + } +} + +// makeTestWatcher returns a pointer to a fileInfoWatcher suitable for testing +func makeTestWatcher(t *testing.T, watches map[string]fs.FileInfo) *fileInfoWatcher { + t.Helper() + + return &fileInfoWatcher{ + evChan: make(chan fsnotify.Event), + erChan: make(chan error), + watches: watches, + } +} + +// makeStateFunc returns an os.Stat wrapper that parrots back whatever its +// constructor is given +func makeStatFunc(t *testing.T, fi fs.FileInfo) func(string) (fs.FileInfo, error) { + t.Helper() + return func(s string) (fs.FileInfo, error) { + return fi, nil + } +} + +// mockFileInfo implements fs.FileInfo for mocks +type mockFileInfo struct { + name string // base name of the file + size int64 // length in bytes for regular files; system-dependent for others + mode fs.FileMode // file mode bits + modTime time.Time // modification time +} + +// explicitly impements fs.FileInfo +var _ fs.FileInfo = &mockFileInfo{} + +func (mfi *mockFileInfo) Name() string { + return mfi.name +} + +func (mfi *mockFileInfo) Size() int64 { + return mfi.size +} + +func (mfi *mockFileInfo) Mode() fs.FileMode { + return mfi.mode +} + +func (mfi *mockFileInfo) ModTime() time.Time { + return mfi.modTime +} + +func (mfi *mockFileInfo) IsDir() bool { + return false +} + +func (mfi *mockFileInfo) Sys() any { + return "foo" +} diff --git a/core/pkg/sync/file/filepath_sync.go b/core/pkg/sync/file/filepath_sync.go index 6b2899a13..7ad4aae81 100644 --- a/core/pkg/sync/file/filepath_sync.go +++ b/core/pkg/sync/file/filepath_sync.go @@ -8,6 +8,7 @@ import ( "os" "strings" msync "sync" + "time" "github.com/fsnotify/fsnotify" "github.com/open-feature/flagd/core/pkg/logger" @@ -15,21 +16,38 @@ import ( "gopkg.in/yaml.v3" ) +const ( + FSNOTIFY = "fsnotify" + FILEINFO = "fileinfo" +) + +type Watcher interface { + Close() error + Add(name string) error + Remove(name string) error + WatchList() []string + Events() chan fsnotify.Event + Errors() chan error +} + type Sync struct { URI string Logger *logger.Logger // FileType indicates the file type e.g., json, yaml/yml etc., fileType string - watcher *fsnotify.Watcher - ready bool - Mux *msync.RWMutex + // watchType indicates how to watch the file FSNOTIFY|FILEINFO + watchType string + watcher Watcher + ready bool + Mux *msync.RWMutex } -func NewFileSync(uri string, logger *logger.Logger) *Sync { +func NewFileSync(uri string, watchType string, logger *logger.Logger) *Sync { return &Sync{ - URI: uri, - Logger: logger, - Mux: &msync.RWMutex{}, + URI: uri, + watchType: watchType, + Logger: logger, + Mux: &msync.RWMutex{}, } } @@ -41,14 +59,27 @@ func (fs *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error return nil } -func (fs *Sync) Init(_ context.Context) error { +func (fs *Sync) Init(ctx context.Context) error { fs.Logger.Info("Starting filepath sync notifier") - w, err := fsnotify.NewWatcher() - if err != nil { - return fmt.Errorf("error creating filepath watcher: %w", err) + + switch fs.watchType { + case FSNOTIFY, "": + w, err := NewFSNotifyWatcher() + if err != nil { + return fmt.Errorf("error creating fsnotify watcher: %w", err) + } + fs.watcher = w + case FILEINFO: + w := NewFileInfoWatcher() + // start the timer + // TODO: wire up configuration for the fileinfo sync duration + w.Run(ctx, 1*time.Second) + fs.watcher = w + default: + return fmt.Errorf("unknown watcher type: '%s'", fs.watchType) } - fs.watcher = w - if err = fs.watcher.Add(fs.URI); err != nil { + + if err := fs.watcher.Add(fs.URI); err != nil { return fmt.Errorf("error adding watcher %s: %w", fs.URI, err) } return nil @@ -74,7 +105,7 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { fs.Logger.Info(fmt.Sprintf("watching filepath: %s", fs.URI)) for { select { - case event, ok := <-fs.watcher.Events: + case event, ok := <-fs.watcher.Events(): if !ok { fs.Logger.Info("filepath notifier closed") return errors.New("filepath notifier closed") @@ -108,7 +139,7 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { } } - case err, ok := <-fs.watcher.Errors: + case err, ok := <-fs.watcher.Errors(): if !ok { fs.setReady(false) return errors.New("watcher error") diff --git a/core/pkg/sync/file/fsnotify_watcher.go b/core/pkg/sync/file/fsnotify_watcher.go new file mode 100644 index 000000000..06ae224eb --- /dev/null +++ b/core/pkg/sync/file/fsnotify_watcher.go @@ -0,0 +1,51 @@ +package file + +import "github.com/fsnotify/fsnotify" + +// Implements file.Watcher by wrapping fsnotify.Watcher +// This is only necessary because fsnotify.Watcher directly exposes its Errors +// and Events channels rather than returning them by method invocation +type fsNotifyWatcher struct { + watcher *fsnotify.Watcher +} + +// NewFsNotifyWatcher returns a new fsNotifyWatcher +func NewFSNotifyWatcher() (*fsNotifyWatcher, error) { + fsn, err := fsnotify.NewWatcher() + return &fsNotifyWatcher{ + watcher: fsn, + }, err +} + +// explicitly implements file.Watcher +var _ Watcher = &fsNotifyWatcher{} + +// Close calls close on the underlying fsnotify.Watcher +func (f *fsNotifyWatcher) Close() error { + return f.watcher.Close() +} + +// Add calls Add on the underlying fsnotify.Watcher +func (f *fsNotifyWatcher) Add(name string) error { + return f.watcher.Add(name) +} + +// Remove calls Remove on the underlying fsnotify.Watcher +func (f *fsNotifyWatcher) Remove(name string) error { + return f.watcher.Remove(name) +} + +// Watchlist calls watchlist on the underlying fsnotify.Watcher +func (f *fsNotifyWatcher) WatchList() []string { + return f.watcher.WatchList() +} + +// Events returns the underlying watcher's Events chan +func (f *fsNotifyWatcher) Events() chan fsnotify.Event { + return f.watcher.Events +} + +// Errors returns the underlying watcher's Errors chan +func (f *fsNotifyWatcher) Errors() chan error { + return f.watcher.Errors +} From dfd1d39adcc92fc9d8d57577901de39f78fddba1 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 22 Jul 2024 19:24:07 +0000 Subject: [PATCH 02/13] fix(deps): update module connectrpc.com/otelconnect to v0.7.1 (#1367) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit [![Mend Renovate](https://app.renovatebot.com/images/banner.svg)](https://renovatebot.com) This PR contains the following updates: | Package | Change | Age | Adoption | Passing | Confidence | |---|---|---|---|---|---| | [connectrpc.com/otelconnect](https://togithub.com/connectrpc/otelconnect-go) | `v0.7.0` -> `v0.7.1` | [![age](https://developer.mend.io/api/mc/badges/age/go/connectrpc.com%2fotelconnect/v0.7.1?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![adoption](https://developer.mend.io/api/mc/badges/adoption/go/connectrpc.com%2fotelconnect/v0.7.1?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![passing](https://developer.mend.io/api/mc/badges/compatibility/go/connectrpc.com%2fotelconnect/v0.7.0/v0.7.1?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![confidence](https://developer.mend.io/api/mc/badges/confidence/go/connectrpc.com%2fotelconnect/v0.7.0/v0.7.1?slim=true)](https://docs.renovatebot.com/merge-confidence/) | --- ### Release Notes
connectrpc/otelconnect-go (connectrpc.com/otelconnect) ### [`v0.7.1`](https://togithub.com/connectrpc/otelconnect-go/releases/tag/v0.7.1) [Compare Source](https://togithub.com/connectrpc/otelconnect-go/compare/v0.7.0...v0.7.1) This is a bug-fix release that addresses a race condition when closing a stream. #### What's Changed ##### Bugfixes - Fix data race in streaming client close by [@​emcfarlane](https://togithub.com/emcfarlane) in [#​173](https://togithub.com/connectrpc/otelconnect-go/issues/173) #### New Contributors - [@​gvacaliuc](https://togithub.com/gvacaliuc) made their first contribution in [#​163](https://togithub.com/connectrpc/otelconnect-go/issues/163) - [@​ytnsym](https://togithub.com/ytnsym) made their first contribution in [#​176](https://togithub.com/connectrpc/otelconnect-go/issues/176) - [@​drice-buf](https://togithub.com/drice-buf) made their first contribution in [#​178](https://togithub.com/connectrpc/otelconnect-go/issues/178) **Full Changelog**: https://github.com/connectrpc/otelconnect-go/compare/v0.7.0...v0.7.1
--- ### Configuration 📅 **Schedule**: Branch creation - At any time (no schedule defined), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Enabled. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] If you want to rebase/retry this PR, check this box --- This PR was generated by [Mend Renovate](https://www.mend.io/free-developer-tools/renovate/). View the [repository job log](https://developer.mend.io/github/open-feature/flagd). Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> Signed-off-by: Dave Josephsen --- core/go.mod | 2 +- core/go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/core/go.mod b/core/go.mod index 5fb62a9fe..da840ded9 100644 --- a/core/go.mod +++ b/core/go.mod @@ -8,7 +8,7 @@ require ( buf.build/gen/go/open-feature/flagd/grpc/go v1.4.0-20240215170432-1e611e2999cc.2 buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.34.2-20240215170432-1e611e2999cc.2 connectrpc.com/connect v1.16.2 - connectrpc.com/otelconnect v0.7.0 + connectrpc.com/otelconnect v0.7.1 github.com/diegoholiveira/jsonlogic/v3 v3.5.3 github.com/fsnotify/fsnotify v1.7.0 github.com/open-feature/flagd-schemas v0.2.9-0.20240527214546-61523e5efe3e diff --git a/core/go.sum b/core/go.sum index aad35ef19..1e6351d10 100644 --- a/core/go.sum +++ b/core/go.sum @@ -1153,6 +1153,8 @@ connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE= connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc= connectrpc.com/otelconnect v0.7.0 h1:ZH55ZZtcJOTKWWLy3qmL4Pam4RzRWBJFOqTPyAqCXkY= connectrpc.com/otelconnect v0.7.0/go.mod h1:Bt2ivBymHZHqxvo4HkJ0EwHuUzQN6k2l0oH+mp/8nwc= +connectrpc.com/otelconnect v0.7.1 h1:scO5pOb0i4yUE66CnNrHeK1x51yq0bE0ehPg6WvzXJY= +connectrpc.com/otelconnect v0.7.1/go.mod h1:dh3bFgHBTb2bkqGCeVVOtHJreSns7uu9wwL2Tbz17ms= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8= git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc= From 5254782dc09c21dd2d2b59adea6323b3f04f5b8b Mon Sep 17 00:00:00 2001 From: Dave Josephsen Date: Wed, 24 Jul 2024 22:17:45 -0500 Subject: [PATCH 03/13] fix: review feedback Signed-off-by: Dave Josephsen --- core/pkg/sync/file/fileinfo_watcher.go | 30 +++++++++++++++----------- core/pkg/sync/file/filepath_sync.go | 6 +----- core/pkg/sync/file/fsnotify_watcher.go | 2 +- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/core/pkg/sync/file/fileinfo_watcher.go b/core/pkg/sync/file/fileinfo_watcher.go index 637404d1f..95cca2ed2 100644 --- a/core/pkg/sync/file/fileinfo_watcher.go +++ b/core/pkg/sync/file/fileinfo_watcher.go @@ -9,7 +9,7 @@ import ( "time" "github.com/fsnotify/fsnotify" - "golang.org/x/sync/errgroup" + "github.com/open-feature/flagd/core/pkg/logger" ) // Implements file.Watcher using a timer and os.FileInfo @@ -18,8 +18,8 @@ type fileInfoWatcher struct { evChan chan fsnotify.Event // Errors Chan erChan chan error - // timer thread errgroup - eg errgroup.Group + // logger + logger *logger.Logger // Func to wrap os.Stat (injection point for test helpers) statFunc func(string) (fs.FileInfo, error) // thread-safe interface to underlying files we are watching @@ -28,11 +28,15 @@ type fileInfoWatcher struct { } // NewFsNotifyWatcher returns a new fsNotifyWatcher -func NewFileInfoWatcher() *fileInfoWatcher { - return &fileInfoWatcher{ +func NewFileInfoWatcher(ctx context.Context, logger *logger.Logger) Watcher { + fiw := &fileInfoWatcher{ evChan: make(chan fsnotify.Event), erChan: make(chan error), + logger: logger, } + // TODO: wire in configs + fiw.run(ctx, (1 * time.Second)) + return fiw } // fileInfoWatcher explicitly implements file.Watcher @@ -99,10 +103,10 @@ func (f *fileInfoWatcher) Errors() chan error { return f.erChan } -// Run is a blocking function that starts the filewatcher's timer thread -func (f *fileInfoWatcher) Run(ctx context.Context, s time.Duration) error { +// run is a blocking function that starts the filewatcher's timer thread +func (f *fileInfoWatcher) run(ctx context.Context, s time.Duration) { // timer thread - f.eg.Go(func() error { + go func() { // execute update on the configured interval of time ticker := time.NewTicker(s) defer ticker.Stop() @@ -110,16 +114,15 @@ func (f *fileInfoWatcher) Run(ctx context.Context, s time.Duration) error { for { select { case <-ctx.Done(): - return nil + return case <-ticker.C: if err := f.update(); err != nil { - return err + f.erChan <- err + return } } } - }) - - return f.eg.Wait() + }() } func (f *fileInfoWatcher) update() error { @@ -159,6 +162,7 @@ func (f *fileInfoWatcher) update() error { } // generateEvent figures out what changed and generates an fsnotify.Event for it. (if we care) +// file removal events are handled above in the update() method func (f *fileInfoWatcher) generateEvent(path string, newInfo fs.FileInfo) (*fsnotify.Event, error) { info := f.watches[path] switch { diff --git a/core/pkg/sync/file/filepath_sync.go b/core/pkg/sync/file/filepath_sync.go index 7ad4aae81..c67ca57c8 100644 --- a/core/pkg/sync/file/filepath_sync.go +++ b/core/pkg/sync/file/filepath_sync.go @@ -8,7 +8,6 @@ import ( "os" "strings" msync "sync" - "time" "github.com/fsnotify/fsnotify" "github.com/open-feature/flagd/core/pkg/logger" @@ -70,10 +69,7 @@ func (fs *Sync) Init(ctx context.Context) error { } fs.watcher = w case FILEINFO: - w := NewFileInfoWatcher() - // start the timer - // TODO: wire up configuration for the fileinfo sync duration - w.Run(ctx, 1*time.Second) + w := NewFileInfoWatcher(ctx, fs.Logger) fs.watcher = w default: return fmt.Errorf("unknown watcher type: '%s'", fs.watchType) diff --git a/core/pkg/sync/file/fsnotify_watcher.go b/core/pkg/sync/file/fsnotify_watcher.go index 06ae224eb..3eec5fc08 100644 --- a/core/pkg/sync/file/fsnotify_watcher.go +++ b/core/pkg/sync/file/fsnotify_watcher.go @@ -10,7 +10,7 @@ type fsNotifyWatcher struct { } // NewFsNotifyWatcher returns a new fsNotifyWatcher -func NewFSNotifyWatcher() (*fsNotifyWatcher, error) { +func NewFSNotifyWatcher() (Watcher, error) { fsn, err := fsnotify.NewWatcher() return &fsNotifyWatcher{ watcher: fsn, From 21770b6c24810dbb10cf80afc9f10bd6c2703a96 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Tue, 23 Jul 2024 22:42:40 +0000 Subject: [PATCH 04/13] fix(deps): update module github.com/open-feature/open-feature-operator/apis to v0.2.44 (#1368) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit [![Mend Renovate](https://app.renovatebot.com/images/banner.svg)](https://renovatebot.com) This PR contains the following updates: | Package | Change | Age | Adoption | Passing | Confidence | |---|---|---|---|---|---| | [github.com/open-feature/open-feature-operator/apis](https://togithub.com/open-feature/open-feature-operator) | `v0.2.43` -> `v0.2.44` | [![age](https://developer.mend.io/api/mc/badges/age/go/github.com%2fopen-feature%2fopen-feature-operator%2fapis/v0.2.44?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![adoption](https://developer.mend.io/api/mc/badges/adoption/go/github.com%2fopen-feature%2fopen-feature-operator%2fapis/v0.2.44?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![passing](https://developer.mend.io/api/mc/badges/compatibility/go/github.com%2fopen-feature%2fopen-feature-operator%2fapis/v0.2.43/v0.2.44?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![confidence](https://developer.mend.io/api/mc/badges/confidence/go/github.com%2fopen-feature%2fopen-feature-operator%2fapis/v0.2.43/v0.2.44?slim=true)](https://docs.renovatebot.com/merge-confidence/) | --- ### Configuration 📅 **Schedule**: Branch creation - At any time (no schedule defined), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Enabled. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] If you want to rebase/retry this PR, check this box --- This PR was generated by [Mend Renovate](https://www.mend.io/free-developer-tools/renovate/). View the [repository job log](https://developer.mend.io/github/open-feature/flagd). Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> Signed-off-by: Dave Josephsen --- core/go.mod | 4 ++-- core/go.sum | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/go.mod b/core/go.mod index da840ded9..7af3fe0f7 100644 --- a/core/go.mod +++ b/core/go.mod @@ -11,8 +11,8 @@ require ( connectrpc.com/otelconnect v0.7.1 github.com/diegoholiveira/jsonlogic/v3 v3.5.3 github.com/fsnotify/fsnotify v1.7.0 - github.com/open-feature/flagd-schemas v0.2.9-0.20240527214546-61523e5efe3e - github.com/open-feature/open-feature-operator/apis v0.2.43 + github.com/open-feature/flagd-schemas v0.2.9-0.20240708163558-2aa89b314322 + github.com/open-feature/open-feature-operator/apis v0.2.44 github.com/prometheus/client_golang v1.19.1 github.com/robfig/cron v1.2.0 github.com/stretchr/testify v1.9.0 diff --git a/core/go.sum b/core/go.sum index 1e6351d10..d38c29f87 100644 --- a/core/go.sum +++ b/core/go.sum @@ -1478,10 +1478,14 @@ github.com/open-feature/flagd-schemas v0.2.9-0.20240408192555-ea4f119d2bd7 h1:oP github.com/open-feature/flagd-schemas v0.2.9-0.20240408192555-ea4f119d2bd7/go.mod h1:WKtwo1eW9/K6D+4HfgTXWBqCDzpvMhDa5eRxW7R5B2U= github.com/open-feature/flagd-schemas v0.2.9-0.20240527214546-61523e5efe3e h1:j1xFE8kIrFXf4fZtJUsR457rEG4mxsq2YCVyy92I0HU= github.com/open-feature/flagd-schemas v0.2.9-0.20240527214546-61523e5efe3e/go.mod h1:WKtwo1eW9/K6D+4HfgTXWBqCDzpvMhDa5eRxW7R5B2U= +github.com/open-feature/flagd-schemas v0.2.9-0.20240708163558-2aa89b314322 h1:5zbNHqcZAc9jlhSrC0onuVL2RPpvYcDaNvW2wOZBfUY= +github.com/open-feature/flagd-schemas v0.2.9-0.20240708163558-2aa89b314322/go.mod h1:WKtwo1eW9/K6D+4HfgTXWBqCDzpvMhDa5eRxW7R5B2U= github.com/open-feature/open-feature-operator/apis v0.2.40 h1:h2w8vE2KZ9jrYk43zIIpku/GsTMZwzWm14D4K/Z47YI= github.com/open-feature/open-feature-operator/apis v0.2.40/go.mod h1:I/4tLd5D4JpWpaFZxe2o8R2S1isWGNwHDSC/H5h7o3A= github.com/open-feature/open-feature-operator/apis v0.2.43 h1:2icWjr2cEdKJ7LphY4tdBOMD+a+mOMCsFh0QAVW1vPg= github.com/open-feature/open-feature-operator/apis v0.2.43/go.mod h1:Af4tzkbKp0PLoYy2nJk6SJZnFWz8KrVRa5JqoVMN8jc= +github.com/open-feature/open-feature-operator/apis v0.2.44 h1:0r4Z+RnJltuHdRBv79NFgAckhna6/M3Wcec6gzNX5vI= +github.com/open-feature/open-feature-operator/apis v0.2.44/go.mod h1:xB2uLzvUkbydieX7q6/NqannBz3bt/e5BS2DeOyyw4Q= github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY= github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= github.com/phpdave11/gofpdi v1.0.13/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= From dac4c43f862fff33c1984d285e7746b289754078 Mon Sep 17 00:00:00 2001 From: Dave Josephsen Date: Mon, 29 Jul 2024 22:14:39 -0500 Subject: [PATCH 05/13] feat: wire up configs Signed-off-by: Dave Josephsen --- core/pkg/sync/builder/syncbuilder.go | 51 ++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/core/pkg/sync/builder/syncbuilder.go b/core/pkg/sync/builder/syncbuilder.go index 525ad7147..f358cf3e9 100644 --- a/core/pkg/sync/builder/syncbuilder.go +++ b/core/pkg/sync/builder/syncbuilder.go @@ -5,7 +5,6 @@ import ( "net/http" "os" "regexp" - msync "sync" "time" "github.com/open-feature/flagd/core/pkg/logger" @@ -24,6 +23,8 @@ import ( const ( syncProviderFile = "file" + syncProviderFsNotify = "fsnotify" + syncProviderFileInfo = "fileinfo" syncProviderGrpc = "grpc" syncProviderKubernetes = "kubernetes" syncProviderHTTP = "http" @@ -86,8 +87,13 @@ func (sb *SyncBuilder) SyncsFromConfig(sourceConfigs []sync.SourceConfig, logger func (sb *SyncBuilder) syncFromConfig(sourceConfig sync.SourceConfig, logger *logger.Logger) (sync.ISync, error) { switch sourceConfig.Provider { case syncProviderFile: - logger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", sourceConfig.URI)) return sb.newFile(sourceConfig.URI, logger), nil + case syncProviderFsNotify: + logger.Debug(fmt.Sprintf("using fsnotify sync-provider for: %q", sourceConfig.URI)) + return sb.newFsNotify(sourceConfig.URI, logger), nil + case syncProviderFileInfo: + logger.Debug(fmt.Sprintf("using fileinfo sync-provider for: %q", sourceConfig.URI)) + return sb.newFileInfo(sourceConfig.URI, logger), nil case syncProviderKubernetes: logger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", sourceConfig.URI)) return sb.newK8s(sourceConfig.URI, logger) @@ -99,20 +105,45 @@ func (sb *SyncBuilder) syncFromConfig(sourceConfig sync.SourceConfig, logger *lo return sb.newGRPC(sourceConfig, logger), nil default: - return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s' or '%s'", - sourceConfig.Provider, syncProviderFile, syncProviderKubernetes, syncProviderHTTP, syncProviderKubernetes) + return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s', %s', '%s' or '%s'", + sourceConfig.Provider, syncProviderFile, syncProviderFsNotify, syncProviderFileInfo, syncProviderKubernetes, syncProviderHTTP, syncProviderKubernetes) } } +// newFile returns an fsinfo sync if we are in k8s or fileinfo if not func (sb *SyncBuilder) newFile(uri string, logger *logger.Logger) *file.Sync { - return &file.Sync{ - URI: regFile.ReplaceAllString(uri, ""), - Logger: logger.WithFields( + switch os.Getenv("KUBERNETES_SERVICE_HOST") { + case "": + // no k8s service host env; use fileinfo + return sb.newFsNotify(uri, logger) + default: + // default to fsnotify + return sb.newFileInfo(uri, logger) + } +} + +// return a new file.Sync that uses fsnotify under the hood +func (sb *SyncBuilder) newFsNotify(uri string, logger *logger.Logger) *file.Sync { + return file.NewFileSync( + regFile.ReplaceAllString(uri, ""), + file.FSNOTIFY, + logger.WithFields( zap.String("component", "sync"), - zap.String("sync", "filepath"), + zap.String("sync", syncProviderFsNotify), ), - Mux: &msync.RWMutex{}, - } + ) +} + +// return a new file.Sync that uses os.Stat/fs.FileInfo under the hood +func (sb *SyncBuilder) newFileInfo(uri string, logger *logger.Logger) *file.Sync { + return file.NewFileSync( + regFile.ReplaceAllString(uri, ""), + file.FILEINFO, + logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", syncProviderFileInfo), + ), + ) } func (sb *SyncBuilder) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) { From 474fe138eee8766d400dc413c38101871560619f Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Tue, 30 Jul 2024 08:28:15 -0700 Subject: [PATCH 06/13] chore: update otel example configurations (#1370) Signed-off-by: Kavindu Dodanduwa Signed-off-by: Dave Josephsen --- docs/reference/monitoring.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/reference/monitoring.md b/docs/reference/monitoring.md index 6f957650b..57b1c5d71 100644 --- a/docs/reference/monitoring.md +++ b/docs/reference/monitoring.md @@ -79,7 +79,6 @@ official [OTEL collector example](https://github.com/open-telemetry/opentelemetr #### docker-compose.yaml ```yaml -version: "3" services: # Jaeger jaeger-all-in-one: @@ -122,13 +121,14 @@ receivers: otlp: protocols: grpc: + endpoint: 0.0.0.0:4317 exporters: prometheus: endpoint: "0.0.0.0:8889" const_labels: label1: value1 - jaeger: - endpoint: jaeger-all-in-one:14250 + otlp/jaeger: + endpoint: jaeger-all-in-one:4317 tls: insecure: true processors: @@ -138,7 +138,7 @@ service: traces: receivers: [ otlp ] processors: [ batch ] - exporters: [ jaeger ] + exporters: [ otlp/jaeger ] metrics: receivers: [ otlp ] processors: [ batch ] From 4e408e79d305a756c983127d6ae973ec013b7d8f Mon Sep 17 00:00:00 2001 From: Dave Josephsen Date: Tue, 30 Jul 2024 13:05:23 -0500 Subject: [PATCH 07/13] fix: review feedback Signed-off-by: Dave Josephsen --- core/pkg/sync/builder/syncbuilder.go | 4 ++-- docs/reference/sync-configuration.md | 17 ++++++++++++++++- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/core/pkg/sync/builder/syncbuilder.go b/core/pkg/sync/builder/syncbuilder.go index f358cf3e9..d363c4bb0 100644 --- a/core/pkg/sync/builder/syncbuilder.go +++ b/core/pkg/sync/builder/syncbuilder.go @@ -115,10 +115,10 @@ func (sb *SyncBuilder) newFile(uri string, logger *logger.Logger) *file.Sync { switch os.Getenv("KUBERNETES_SERVICE_HOST") { case "": // no k8s service host env; use fileinfo - return sb.newFsNotify(uri, logger) + return sb.newFileInfo(uri, logger) default: // default to fsnotify - return sb.newFileInfo(uri, logger) + return sb.newFsNotify(uri, logger) } } diff --git a/docs/reference/sync-configuration.md b/docs/reference/sync-configuration.md index 0c3433295..fa9cfb2be 100644 --- a/docs/reference/sync-configuration.md +++ b/docs/reference/sync-configuration.md @@ -30,7 +30,7 @@ Alternatively, these configurations can be passed to flagd via config file, spec | Field | Type | Note | | ----------- | ------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | uri | required `string` | Flag configuration source of the sync | -| provider | required `string` | Provider type - `file`, `kubernetes`, `http`, or `grpc` | +| provider | required `string` | Provider type - `file`, `fsnotify`, `fileinfo`, `kubernetes`, `http`, or `grpc` | | authHeader | optional `string` | Used for http sync; set this to include the complete `Authorization` header value for any authentication scheme (e.g., "Bearer token_here", "Basic base64_credentials", etc.). Cannot be used with `bearerToken` | | bearerToken | optional `string` | (Deprecated) Used for http sync; token gets appended to `Authorization` header with [bearer schema](https://www.rfc-editor.org/rfc/rfc6750#section-2.1). Cannot be used with `authHeader` | | interval | optional `uint32` | Used for http sync; requests will be made at this interval. Defaults to 5 seconds. | @@ -44,11 +44,20 @@ The `uri` field values **do not** follow the [URI patterns](#uri-patterns). The from the `provider` field. Only exception is the remote provider where `http(s)://` is expected by default. Incorrect URIs will result in a flagd start-up failure with errors from the respective sync provider implementation. +The `file` provider type uses either an `fsnotify` poller (on systems that +support it), or a timer-based poller that relies on `os.Stat` and `fs.FileInfo`. +The moniker: `file` defaults to using `fsnotify` when flagd detects it is +running in kubernetes and `fileinfo` in all other cases, but you may explicitly +select either polling back-end by setting the provider value to either +`fsnotify` or `fileinfo`. + Given below are example sync providers, startup command and equivalent config file definition: Sync providers: - `file` - config/samples/example_flags.json +- `fsnotify` - config/samples/example_flags.json +- `fileinfo` - config/samples/example_flags.json - `http` - - `https` - - `kubernetes` - default/my-flag-config @@ -60,6 +69,8 @@ Startup command: ```sh ./bin/flagd start --sources='[{"uri":"config/samples/example_flags.json","provider":"file"}, + {"uri":"config/samples/example_flags.json","provider":"fsnotify"}, + {"uri":"config/samples/example_flags.json","provider":"fileinfo"}, {"uri":"http://my-flag-source.json","provider":"http","bearerToken":"bearer-dji34ld2l"}, {"uri":"https://secure-remote/bearer-auth","provider":"http","authHeader":"Bearer bearer-dji34ld2l"}, {"uri":"https://secure-remote/basic-auth","provider":"http","authHeader":"Basic dXNlcjpwYXNz"}, @@ -75,6 +86,10 @@ Configuration file, sources: - uri: config/samples/example_flags.json provider: file + - uri: config/samples/example_flags.json + provider: fsnotify + - uri: config/samples/example_flags.json + provider: fileinfo - uri: http://my-flag-source.json provider: http bearerToken: bearer-dji34ld2l From b2f10b791564e9ec5d9d601bf8fb3420c48a462d Mon Sep 17 00:00:00 2001 From: Dave Josephsen Date: Tue, 30 Jul 2024 15:25:07 -0500 Subject: [PATCH 08/13] Update docs/reference/sync-configuration.md Co-authored-by: Michael Beemer Signed-off-by: Dave Josephsen --- docs/reference/sync-configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/sync-configuration.md b/docs/reference/sync-configuration.md index fa9cfb2be..588147250 100644 --- a/docs/reference/sync-configuration.md +++ b/docs/reference/sync-configuration.md @@ -44,7 +44,7 @@ The `uri` field values **do not** follow the [URI patterns](#uri-patterns). The from the `provider` field. Only exception is the remote provider where `http(s)://` is expected by default. Incorrect URIs will result in a flagd start-up failure with errors from the respective sync provider implementation. -The `file` provider type uses either an `fsnotify` poller (on systems that +The `file` provider type uses either an `fsnotify` notification (on systems that support it), or a timer-based poller that relies on `os.Stat` and `fs.FileInfo`. The moniker: `file` defaults to using `fsnotify` when flagd detects it is running in kubernetes and `fileinfo` in all other cases, but you may explicitly From 5c0bcacba04fd7b72bab6ffb8e866dc9076d56a0 Mon Sep 17 00:00:00 2001 From: Dave Josephsen Date: Tue, 6 Aug 2024 11:28:15 -0500 Subject: [PATCH 09/13] chore: wire up tests, fix bugs discovered with tests Signed-off-by: Dave Josephsen --- core/pkg/sync/file/fileinfo_watcher.go | 11 +- core/pkg/sync/file/fileinfo_watcher_test.go | 130 +++++++++++++++++++- 2 files changed, 134 insertions(+), 7 deletions(-) diff --git a/core/pkg/sync/file/fileinfo_watcher.go b/core/pkg/sync/file/fileinfo_watcher.go index 95cca2ed2..3d3d24c6b 100644 --- a/core/pkg/sync/file/fileinfo_watcher.go +++ b/core/pkg/sync/file/fileinfo_watcher.go @@ -30,8 +30,8 @@ type fileInfoWatcher struct { // NewFsNotifyWatcher returns a new fsNotifyWatcher func NewFileInfoWatcher(ctx context.Context, logger *logger.Logger) Watcher { fiw := &fileInfoWatcher{ - evChan: make(chan fsnotify.Event), - erChan: make(chan error), + evChan: make(chan fsnotify.Event, 256), + erChan: make(chan error, 256), logger: logger, } // TODO: wire in configs @@ -84,7 +84,7 @@ func (f *fileInfoWatcher) Remove(name string) error { // Watchlist calls watchlist on the underlying fsnotify.Watcher func (f *fileInfoWatcher) WatchList() []string { f.mu.RLock() - defer f.mu.Unlock() + defer f.mu.RUnlock() out := []string{} for name := range f.watches { n := name @@ -131,7 +131,7 @@ func (f *fileInfoWatcher) update() error { defer f.mu.Unlock() for path, info := range f.watches { - newInfo, err := getFileInfo(path) + newInfo, err := f.statFunc(path) if err != nil { // if the file isn't there, it must have been removed // fire off a remove event and remove it from the watches @@ -141,6 +141,7 @@ func (f *fileInfoWatcher) update() error { Op: fsnotify.Remove, } delete(f.watches, path) + continue } return err } @@ -162,7 +163,7 @@ func (f *fileInfoWatcher) update() error { } // generateEvent figures out what changed and generates an fsnotify.Event for it. (if we care) -// file removal events are handled above in the update() method +// file removal are handled above in the update() method func (f *fileInfoWatcher) generateEvent(path string, newInfo fs.FileInfo) (*fsnotify.Event, error) { info := f.watches[path] switch { diff --git a/core/pkg/sync/file/fileinfo_watcher_test.go b/core/pkg/sync/file/fileinfo_watcher_test.go index 58ac1830e..ab1804fbf 100644 --- a/core/pkg/sync/file/fileinfo_watcher_test.go +++ b/core/pkg/sync/file/fileinfo_watcher_test.go @@ -1,7 +1,10 @@ package file import ( + "errors" + "fmt" "io/fs" + "os" "testing" "time" @@ -68,13 +71,136 @@ func Test_fileInfoWatcher_Add(t *testing.T) { } } +func Test_fileInfoWatcher_Remove(t *testing.T) { + tests := []struct { + name string + watcher *fileInfoWatcher + removeThis string + want []string + }{{ + name: "remove foo", + watcher: makeTestWatcher(t, map[string]fs.FileInfo{"foo": &mockFileInfo{}, "bar": &mockFileInfo{}}), + removeThis: "foo", + want: []string{"bar"}, + }} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.watcher.Remove(tt.removeThis) + if err != nil { + t.Errorf("fileInfoWatcher.Remove() error = %v", err) + } + if !cmp.Equal(tt.watcher.WatchList(), tt.want) { + t.Errorf("fileInfoWatcher.Add(): want-, got+: %v ", cmp.Diff(tt.want, tt.watcher.WatchList())) + } + }) + } +} + +func Test_fileInfoWatcher_update(t *testing.T) { + tests := []struct { + name string + watcher *fileInfoWatcher + statFunc func(string) (fs.FileInfo, error) + wantErr bool + want *fsnotify.Event + }{ + { + name: "chmod", + watcher: makeTestWatcher(t, + map[string]fs.FileInfo{ + "foo": &mockFileInfo{ + name: "foo", + mode: 0, + }, + }, + ), + statFunc: func(path string) (fs.FileInfo, error) { + return &mockFileInfo{ + name: "foo", + mode: 1, + }, nil + }, + want: &fsnotify.Event{Name: "foo", Op: fsnotify.Chmod}, + }, + { + name: "write", + watcher: makeTestWatcher(t, + map[string]fs.FileInfo{ + "foo": &mockFileInfo{ + name: "foo", + modTime: time.Now().Local(), + }, + }, + ), + statFunc: func(path string) (fs.FileInfo, error) { + return &mockFileInfo{ + name: "foo", + modTime: (time.Now().Local().Add(5 * time.Minute)), + }, nil + }, + want: &fsnotify.Event{Name: "foo", Op: fsnotify.Write}, + }, + { + name: "remove", + watcher: makeTestWatcher(t, + map[string]fs.FileInfo{ + "foo": &mockFileInfo{ + name: "foo", + }, + }, + ), + statFunc: func(path string) (fs.FileInfo, error) { + return nil, fmt.Errorf("mock file-no-existy error: %w", os.ErrNotExist) + }, + want: &fsnotify.Event{Name: "foo", Op: fsnotify.Remove}, + }, + { + name: "unknown error", + watcher: makeTestWatcher(t, + map[string]fs.FileInfo{ + "foo": &mockFileInfo{ + name: "foo", + }, + }, + ), + statFunc: func(path string) (fs.FileInfo, error) { + return nil, errors.New("unhandled error") + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // set the statFunc + tt.watcher.statFunc = tt.statFunc + // run an update + // this also flexes fileinfowatcher.generateEvent() + err := tt.watcher.update() + if err != nil { + if tt.wantErr { + return + } else { + t.Errorf("fileInfoWatcher.update() unexpected error = %v, wantErr %v", err, tt.wantErr) + } + } + // slurp an event off the event chan + out := <-tt.watcher.Events() + if out != *tt.want { + t.Errorf("fileInfoWatcher.update() wanted %v, got %v", tt.want, out) + } + }) + } +} + +// Helpers + // makeTestWatcher returns a pointer to a fileInfoWatcher suitable for testing func makeTestWatcher(t *testing.T, watches map[string]fs.FileInfo) *fileInfoWatcher { t.Helper() return &fileInfoWatcher{ - evChan: make(chan fsnotify.Event), - erChan: make(chan error), + evChan: make(chan fsnotify.Event, 512), + erChan: make(chan error, 512), watches: watches, } } From 996e40e673b96afd1b2358628e44616099384a64 Mon Sep 17 00:00:00 2001 From: Dave Josephsen Date: Thu, 15 Aug 2024 11:19:55 -0500 Subject: [PATCH 10/13] fix: review feedback and linter fixes Signed-off-by: Dave Josephsen --- core/pkg/sync/file/fileinfo_watcher.go | 37 ++++++++++----------- core/pkg/sync/file/fileinfo_watcher_test.go | 4 +-- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/core/pkg/sync/file/fileinfo_watcher.go b/core/pkg/sync/file/fileinfo_watcher.go index 3d3d24c6b..cecc89daf 100644 --- a/core/pkg/sync/file/fileinfo_watcher.go +++ b/core/pkg/sync/file/fileinfo_watcher.go @@ -3,6 +3,7 @@ package file import ( "context" "errors" + "fmt" "io/fs" "os" "sync" @@ -30,11 +31,11 @@ type fileInfoWatcher struct { // NewFsNotifyWatcher returns a new fsNotifyWatcher func NewFileInfoWatcher(ctx context.Context, logger *logger.Logger) Watcher { fiw := &fileInfoWatcher{ - evChan: make(chan fsnotify.Event, 256), - erChan: make(chan error, 256), - logger: logger, + evChan: make(chan fsnotify.Event, 32), + erChan: make(chan error, 32), + statFunc: getFileInfo, + logger: logger, } - // TODO: wire in configs fiw.run(ctx, (1 * time.Second)) return fiw } @@ -126,7 +127,6 @@ func (f *fileInfoWatcher) run(ctx context.Context, s time.Duration) { } func (f *fileInfoWatcher) update() error { - event := &fsnotify.Event{} f.mu.Lock() defer f.mu.Unlock() @@ -148,13 +148,9 @@ func (f *fileInfoWatcher) update() error { // if the new stat doesn't match the old stat, figure out what changed if info != newInfo { - event, err = f.generateEvent(path, newInfo) - if err != nil { - f.erChan <- err - } else { - if event != nil { - f.evChan <- *event - } + event := f.generateEvent(path, newInfo) + if event != nil { + f.evChan <- *event } f.watches[path] = newInfo } @@ -164,7 +160,7 @@ func (f *fileInfoWatcher) update() error { // generateEvent figures out what changed and generates an fsnotify.Event for it. (if we care) // file removal are handled above in the update() method -func (f *fileInfoWatcher) generateEvent(path string, newInfo fs.FileInfo) (*fsnotify.Event, error) { +func (f *fileInfoWatcher) generateEvent(path string, newInfo fs.FileInfo) *fsnotify.Event { info := f.watches[path] switch { // new mod time is more recent than old mod time, generate a write event @@ -172,32 +168,33 @@ func (f *fileInfoWatcher) generateEvent(path string, newInfo fs.FileInfo) (*fsno return &fsnotify.Event{ Name: path, Op: fsnotify.Write, - }, nil + } // the file modes changed, generate a chmod event case info.Mode() != newInfo.Mode(): return &fsnotify.Event{ Name: path, Op: fsnotify.Chmod, - }, nil + } + // nothing changed that we care about + default: + return nil } - return nil, nil } // getFileInfo returns the fs.FileInfo for the given path -// TODO: verify this works correctly on windows func getFileInfo(path string) (fs.FileInfo, error) { f, err := os.Open(path) if err != nil { - return nil, err + return nil, fmt.Errorf("error from os.Open(%s): %w", path, err) } info, err := f.Stat() if err != nil { - return info, err + return info, fmt.Errorf("error from fs.Stat(%s): %w", path, err) } if err := f.Close(); err != nil { - return info, err + return info, fmt.Errorf("err from fs.Close(%s): %w", path, err) } return info, nil diff --git a/core/pkg/sync/file/fileinfo_watcher_test.go b/core/pkg/sync/file/fileinfo_watcher_test.go index ab1804fbf..ccf537b87 100644 --- a/core/pkg/sync/file/fileinfo_watcher_test.go +++ b/core/pkg/sync/file/fileinfo_watcher_test.go @@ -13,7 +13,6 @@ import ( ) func Test_fileInfoWatcher_Close(t *testing.T) { - type fields struct{} tests := []struct { name string watcher *fileInfoWatcher @@ -179,9 +178,8 @@ func Test_fileInfoWatcher_update(t *testing.T) { if err != nil { if tt.wantErr { return - } else { - t.Errorf("fileInfoWatcher.update() unexpected error = %v, wantErr %v", err, tt.wantErr) } + t.Errorf("fileInfoWatcher.update() unexpected error = %v, wantErr %v", err, tt.wantErr) } // slurp an event off the event chan out := <-tt.watcher.Events() From fc855e3148d2de3fd3ab2c1f677738c5eb9f0863 Mon Sep 17 00:00:00 2001 From: Dave Josephsen Date: Fri, 16 Aug 2024 16:26:24 -0500 Subject: [PATCH 11/13] fix: final linter pass Signed-off-by: Dave Josephsen --- core/pkg/sync/file/fileinfo_watcher_test.go | 10 ++++---- core/pkg/sync/file/fsnotify_watcher.go | 26 +++++++++++++++++---- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/core/pkg/sync/file/fileinfo_watcher_test.go b/core/pkg/sync/file/fileinfo_watcher_test.go index ccf537b87..5917e9ab6 100644 --- a/core/pkg/sync/file/fileinfo_watcher_test.go +++ b/core/pkg/sync/file/fileinfo_watcher_test.go @@ -113,7 +113,7 @@ func Test_fileInfoWatcher_update(t *testing.T) { }, }, ), - statFunc: func(path string) (fs.FileInfo, error) { + statFunc: func(_ string) (fs.FileInfo, error) { return &mockFileInfo{ name: "foo", mode: 1, @@ -131,7 +131,7 @@ func Test_fileInfoWatcher_update(t *testing.T) { }, }, ), - statFunc: func(path string) (fs.FileInfo, error) { + statFunc: func(_ string) (fs.FileInfo, error) { return &mockFileInfo{ name: "foo", modTime: (time.Now().Local().Add(5 * time.Minute)), @@ -148,7 +148,7 @@ func Test_fileInfoWatcher_update(t *testing.T) { }, }, ), - statFunc: func(path string) (fs.FileInfo, error) { + statFunc: func(_ string) (fs.FileInfo, error) { return nil, fmt.Errorf("mock file-no-existy error: %w", os.ErrNotExist) }, want: &fsnotify.Event{Name: "foo", Op: fsnotify.Remove}, @@ -162,7 +162,7 @@ func Test_fileInfoWatcher_update(t *testing.T) { }, }, ), - statFunc: func(path string) (fs.FileInfo, error) { + statFunc: func(_ string) (fs.FileInfo, error) { return nil, errors.New("unhandled error") }, wantErr: true, @@ -207,7 +207,7 @@ func makeTestWatcher(t *testing.T, watches map[string]fs.FileInfo) *fileInfoWatc // constructor is given func makeStatFunc(t *testing.T, fi fs.FileInfo) func(string) (fs.FileInfo, error) { t.Helper() - return func(s string) (fs.FileInfo, error) { + return func(_ string) (fs.FileInfo, error) { return fi, nil } } diff --git a/core/pkg/sync/file/fsnotify_watcher.go b/core/pkg/sync/file/fsnotify_watcher.go index 3eec5fc08..93c98ce1c 100644 --- a/core/pkg/sync/file/fsnotify_watcher.go +++ b/core/pkg/sync/file/fsnotify_watcher.go @@ -1,6 +1,10 @@ package file -import "github.com/fsnotify/fsnotify" +import ( + "fmt" + + "github.com/fsnotify/fsnotify" +) // Implements file.Watcher by wrapping fsnotify.Watcher // This is only necessary because fsnotify.Watcher directly exposes its Errors @@ -12,9 +16,12 @@ type fsNotifyWatcher struct { // NewFsNotifyWatcher returns a new fsNotifyWatcher func NewFSNotifyWatcher() (Watcher, error) { fsn, err := fsnotify.NewWatcher() + if err != nil { + return nil, fmt.Errorf("fsnotify: %w", err) + } return &fsNotifyWatcher{ watcher: fsn, - }, err + }, nil } // explicitly implements file.Watcher @@ -22,17 +29,26 @@ var _ Watcher = &fsNotifyWatcher{} // Close calls close on the underlying fsnotify.Watcher func (f *fsNotifyWatcher) Close() error { - return f.watcher.Close() + if err := f.watcher.Close(); err != nil { + return fmt.Errorf("fsnotify: %w", err) + } + return nil } // Add calls Add on the underlying fsnotify.Watcher func (f *fsNotifyWatcher) Add(name string) error { - return f.watcher.Add(name) + if err := f.watcher.Add(name); err != nil { + return fmt.Errorf("fsnotify: %w", err) + } + return nil } // Remove calls Remove on the underlying fsnotify.Watcher func (f *fsNotifyWatcher) Remove(name string) error { - return f.watcher.Remove(name) + if err := f.watcher.Remove(name); err != nil { + return fmt.Errorf("fsnotify: %w", err) + } + return nil } // Watchlist calls watchlist on the underlying fsnotify.Watcher From e0966cd24fd965bd083029473dc2d97684330795 Mon Sep 17 00:00:00 2001 From: Dave Josephsen Date: Sun, 18 Aug 2024 15:14:43 -0500 Subject: [PATCH 12/13] fix: never say `final` linter pass Signed-off-by: Dave Josephsen --- core/pkg/sync/builder/syncbuilder.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/pkg/sync/builder/syncbuilder.go b/core/pkg/sync/builder/syncbuilder.go index 752e893dd..d5e6a1a9f 100644 --- a/core/pkg/sync/builder/syncbuilder.go +++ b/core/pkg/sync/builder/syncbuilder.go @@ -114,7 +114,8 @@ func (sb *SyncBuilder) syncFromConfig(sourceConfig sync.SourceConfig, logger *lo default: return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s', %s', '%s' or '%s'", - sourceConfig.Provider, syncProviderFile, syncProviderFsNotify, syncProviderFileInfo, syncProviderKubernetes, syncProviderHTTP, syncProviderKubernetes) + sourceConfig.Provider, syncProviderFile, syncProviderFsNotify, syncProviderFileInfo, + syncProviderKubernetes, syncProviderHTTP, syncProviderKubernetes) } } From b8b90f70689b1837eda7eeed1f9bb837ae5f875d Mon Sep 17 00:00:00 2001 From: Dave Josephsen Date: Tue, 20 Aug 2024 12:15:52 -0500 Subject: [PATCH 13/13] fix: constructor returns nil map Signed-off-by: Dave Josephsen --- core/pkg/sync/file/fileinfo_watcher.go | 1 + 1 file changed, 1 insertion(+) diff --git a/core/pkg/sync/file/fileinfo_watcher.go b/core/pkg/sync/file/fileinfo_watcher.go index cecc89daf..21173ae36 100644 --- a/core/pkg/sync/file/fileinfo_watcher.go +++ b/core/pkg/sync/file/fileinfo_watcher.go @@ -35,6 +35,7 @@ func NewFileInfoWatcher(ctx context.Context, logger *logger.Logger) Watcher { erChan: make(chan error, 32), statFunc: getFileInfo, logger: logger, + watches: make(map[string]fs.FileInfo), } fiw.run(ctx, (1 * time.Second)) return fiw