Skip to content

Commit

Permalink
refactor: implement delete event type in filepath sync, expand tests (#…
Browse files Browse the repository at this point in the history
…369)

Signed-off-by: James Milligan <james@omnant.co.uk>
Signed-off-by: James Milligan <75740990+james-milligan@users.noreply.github.com>
Co-authored-by: Skye Gill <gill.skye95@gmail.com>
Co-authored-by: Michael Beemer <beeme1mr@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 10, 2023
1 parent 8923bf2 commit e906757
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 52 deletions.
20 changes: 18 additions & 2 deletions pkg/store/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,26 @@ func (f *Flags) Update(logger *logger.Logger, source string, flags map[string]mo
// DeleteFlags matching flags from source.
func (f *Flags) DeleteFlags(logger *logger.Logger, source string, flags map[string]model.Flag) map[string]interface{} {
notifications := map[string]interface{}{}
if len(flags) == 0 {
allFlags := f.GetAll()
for key, flag := range allFlags {
if flag.Source != source {
continue
}
notifications[key] = map[string]interface{}{
"type": string(model.NotificationDelete),
"source": source,
}
f.Delete(key)
}
}

for k := range flags {
_, ok := f.Get(k)
flag, ok := f.Get(k)
if ok {
if flag.Source != source {
continue
}
notifications[k] = map[string]interface{}{
"type": string(model.NotificationDelete),
"source": source,
Expand All @@ -142,7 +158,7 @@ func (f *Flags) DeleteFlags(logger *logger.Logger, source string, flags map[stri
f.Delete(k)
} else {
logger.Warn(
fmt.Sprintf("failed to remove flag, flag with key %s from source %s does not exisit.",
fmt.Sprintf("failed to remove flag, flag with key %s from source %s does not exist.",
k,
source))
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/store/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func TestFlags_Update(t *testing.T) {
func TestFlags_Delete(t *testing.T) {
mockLogger := logger.NewLogger(nil, false)
mockSource := "source"
mockSource2 := "source2"

tests := []struct {
name string
Expand All @@ -358,6 +359,7 @@ func TestFlags_Delete(t *testing.T) {
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
},
deleteRequest: map[string]model.Flag{
Expand All @@ -366,6 +368,7 @@ func TestFlags_Delete(t *testing.T) {
expectedState: &Flags{
Flags: map[string]model.Flag{
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
},
expectedNotificationKeys: []string{"A"},
Expand All @@ -376,6 +379,7 @@ func TestFlags_Delete(t *testing.T) {
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
},
deleteRequest: map[string]model.Flag{
Expand All @@ -385,10 +389,28 @@ func TestFlags_Delete(t *testing.T) {
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
},
expectedNotificationKeys: []string{},
},
{
name: "Remove all",
storedState: &Flags{
Flags: map[string]model.Flag{
"A": {Source: mockSource},
"B": {Source: mockSource},
"C": {Source: mockSource2},
},
},
deleteRequest: map[string]model.Flag{},
expectedState: &Flags{
Flags: map[string]model.Flag{
"C": {Source: mockSource2},
},
},
expectedNotificationKeys: []string{"A", "B"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
63 changes: 39 additions & 24 deletions pkg/sync/file/filepath_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type Sync struct {
fileType string
}

// default state is used to prevent EOF errors when handling filepath delete events + empty files
const defaultState = "{}"

//nolint:funlen
func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
fs.Logger.Info("Starting filepath sync notifier")
Expand All @@ -38,13 +41,7 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
return err
}

// file watcher is ready(and stable), fetch and emit the initial results
fetch, err := fs.fetch(ctx)
if err != nil {
return err
}

dataSync <- sync.DataSync{FlagData: fetch, Source: fs.URI, Type: sync.ALL}
fs.sendDataSync(ctx, sync.ALL, dataSync)

fs.Logger.Info(fmt.Sprintf("watching filepath: %s", fs.URI))
for {
Expand All @@ -56,24 +53,33 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
}

fs.Logger.Info(fmt.Sprintf("filepath event: %s %s", event.Name, event.Op.String()))

// event.Op is a bitmask and some systems may send multiple operations at once
// event.Has(...) checks that the bitmask contains the particular event (among others)
if event.Has(fsnotify.Create) || event.Has(fsnotify.Write) {
fs.sendDataSync(ctx, event, dataSync)
} else if event.Has(fsnotify.Remove) {
// Counterintuively, remove events are the only meanful ones seen in K8s.
// K8s handles mounted ConfigMap updates by modifying symbolic links, which is an atomic operation.
// At the point the remove event is fired, we have our new data, so we can send it down the channel.
fs.sendDataSync(ctx, event, dataSync)

switch {
case event.Has(fsnotify.Create) || event.Has(fsnotify.Write):
fs.sendDataSync(ctx, sync.ALL, dataSync)
case event.Has(fsnotify.Remove):
// K8s exposes config maps as symlinks.
// Updates cause a remove event, we need to re-add the watcher in this case.
err = watcher.Add(fs.URI)
if err != nil {
// the watcher could not be re-added, so the file must have been deleted
fs.Logger.Error(fmt.Sprintf("error restoring watcher, file may have been deleted: %s", err.Error()))
fs.sendDataSync(ctx, sync.DELETE, dataSync)
continue
}

// Counterintuitively, remove events are the only meaningful ones seen in K8s.
// K8s handles mounted ConfigMap updates by modifying symbolic links, which is an atomic operation.
// At the point the remove event is fired, we have our new data, so we can send it down the channel.
fs.sendDataSync(ctx, sync.ALL, dataSync)
case event.Has(fsnotify.Chmod):
// on linux the REMOVE event will not fire until all file descriptors are closed, this cannot happen
// while the file is being watched, os.Stat is used here to infer deletion
if _, err := os.Stat(fs.URI); errors.Is(err, os.ErrNotExist) {
fs.Logger.Error(fmt.Sprintf("file has been deleted: %s", err.Error()))
fs.sendDataSync(ctx, sync.DELETE, dataSync)
}
}

case err, ok := <-watcher.Errors:
if !ok {
return errors.New("watcher error")
Expand All @@ -87,14 +93,23 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
}
}

func (fs *Sync) sendDataSync(ctx context.Context, eventType fsnotify.Event, dataSync chan<- sync.DataSync) {
fs.Logger.Debug(fmt.Sprintf("Configuration %s: %s", fs.URI, eventType.Op.String()))
msg, err := fs.fetch(ctx)
if err != nil {
fs.Logger.Error(fmt.Sprintf("Error fetching after %s notification: %s", eventType.Op.String(), err.Error()))
func (fs *Sync) sendDataSync(ctx context.Context, syncType sync.Type, dataSync chan<- sync.DataSync) {
fs.Logger.Debug(fmt.Sprintf("Configuration %s: %s", fs.URI, syncType.String()))

msg := defaultState
if syncType != sync.DELETE {
m, err := fs.fetch(ctx)
if err != nil {
fs.Logger.Error(fmt.Sprintf("Error fetching %s: %s", fs.URI, err.Error()))
}
if m == "" {
fs.Logger.Warn(fmt.Sprintf("file %s is empty", fs.URI))
} else {
msg = m
}
}

dataSync <- sync.DataSync{FlagData: msg, Source: fs.URI, Type: sync.ALL}
dataSync <- sync.DataSync{FlagData: msg, Source: fs.URI, Type: syncType}
}

func (fs *Sync) fetch(_ context.Context) (string, error) {
Expand Down
Loading

0 comments on commit e906757

Please sign in to comment.