Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(os/gfsnotify): add recursive watching for created subfolders and sub-files under folders that already watched #3830

Merged
merged 2 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions os/gfpool/gfpool_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (p *Pool) File() (*File, error) {
}
// It firstly checks using !p.init.Val() for performance purpose.
if !p.init.Val() && p.init.Cas(false, true) {
_, _ = gfsnotify.Add(f.path, func(event *gfsnotify.Event) {
var watchCallback = func(event *gfsnotify.Event) {
// If the file is removed or renamed, recreates the pool by increasing the pool id.
if event.IsRemove() || event.IsRename() {
// It drops the old pool.
Expand All @@ -110,7 +110,8 @@ func (p *Pool) File() (*File, error) {
// Whenever the pool id changes, the pool will be recreated.
p.id.Add(1)
}
}, false)
}
_, _ = gfsnotify.Add(f.path, watchCallback, gfsnotify.WatchOption{NoRecursive: true})
}
return f, nil
}
Expand Down
33 changes: 24 additions & 9 deletions os/gfsnotify/gfsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Callback struct {
Path string // Bound file path (absolute).
name string // Registered name for AddOnce.
elem *glist.Element // Element in the callbacks of watcher.
recursive bool // Is bound to path recursively or not.
recursive bool // Is bound to sub-path recursively or not.
}

// Event is the event produced by underlying fsnotify.
Expand All @@ -53,6 +53,15 @@ type Event struct {
Watcher *Watcher // Parent watcher.
}

// WatchOption holds the option for watching.
type WatchOption struct {
// NoRecursive explicitly specifies no recursive watching.
// Recursive watching will also watch all its current and following created subfolders and sub-files.
//
// Note that the recursive watching is enabled in default.
NoRecursive bool
}

// Op is the bits union for file operations.
type Op uint32

Expand All @@ -75,13 +84,15 @@ const (
var (
mu sync.Mutex // Mutex for concurrent safety of defaultWatcher.
defaultWatcher *Watcher // Default watcher.
callbackIdMap = gmap.NewIntAnyMap(true) // Id to callback mapping.
callbackIdMap = gmap.NewIntAnyMap(true) // Global callback id to callback function mapping.
callbackIdGenerator = gtype.NewInt() // Atomic id generator for callback.
)

// New creates and returns a new watcher.
// Note that the watcher number is limited by the file handle setting of the system.
// Eg: fs.inotify.max_user_instances system variable in linux systems.
// Example: fs.inotify.max_user_instances system variable in linux systems.
//
// In most case, you can use the default watcher for usage instead of creating one.
func New() (*Watcher, error) {
w := &Watcher{
cache: gcache.New(),
Expand All @@ -102,26 +113,30 @@ func New() (*Watcher, error) {
}

// Add monitors `path` using default watcher with callback function `callbackFunc`.
//
// The parameter `path` can be either a file or a directory path.
// The optional parameter `recursive` specifies whether monitoring the `path` recursively, which is true in default.
func Add(path string, callbackFunc func(event *Event), recursive ...bool) (callback *Callback, err error) {
func Add(path string, callbackFunc func(event *Event), option ...WatchOption) (callback *Callback, err error) {
w, err := getDefaultWatcher()
if err != nil {
return nil, err
}
return w.Add(path, callbackFunc, recursive...)
return w.Add(path, callbackFunc, option...)
}

// AddOnce monitors `path` using default watcher with callback function `callbackFunc` only once using unique name `name`.
// If AddOnce is called multiple times with the same `name` parameter, `path` is only added to monitor once. It returns error
// if it's called twice with the same `name`.
//
// If AddOnce is called multiple times with the same `name` parameter, `path` is only added to monitor once.
// It returns error if it's called twice with the same `name`.
//
// The parameter `path` can be either a file or a directory path.
// The optional parameter `recursive` specifies whether monitoring the `path` recursively, which is true in default.
func AddOnce(name, path string, callbackFunc func(event *Event), recursive ...bool) (callback *Callback, err error) {
func AddOnce(name, path string, callbackFunc func(event *Event), option ...WatchOption) (callback *Callback, err error) {
w, err := getDefaultWatcher()
if err != nil {
return nil, err
}
return w.AddOnce(name, path, callbackFunc, recursive...)
return w.AddOnce(name, path, callbackFunc, option...)
}

// Remove removes all monitoring callbacks of given `path` from watcher recursively.
Expand Down
6 changes: 2 additions & 4 deletions os/gfsnotify/gfsnotify_filefunc.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,12 @@ func doFileScanDir(path string, pattern string, recursive ...bool) ([]string, er
file, err = os.Open(path)
)
if err != nil {
err = gerror.Wrapf(err, `os.Open failed for path "%s"`, path)
return nil, err
return nil, gerror.Wrapf(err, `os.Open failed for path "%s"`, path)
}
defer file.Close()
names, err := file.Readdirnames(-1)
if err != nil {
err = gerror.Wrapf(err, `read directory files failed for path "%s"`, path)
return nil, err
return nil, gerror.Wrapf(err, `read directory files failed for path "%s"`, path)
}
filePath := ""
for _, name := range names {
Expand Down
143 changes: 72 additions & 71 deletions os/gfsnotify/gfsnotify_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ import (
)

// Add monitors `path` with callback function `callbackFunc` to the watcher.
//
// The parameter `path` can be either a file or a directory path.
// The optional parameter `recursive` specifies whether monitoring the `path` recursively,
// which is true in default.
func (w *Watcher) Add(path string, callbackFunc func(event *Event), recursive ...bool) (callback *Callback, err error) {
return w.AddOnce("", path, callbackFunc, recursive...)
func (w *Watcher) Add(
path string, callbackFunc func(event *Event), option ...WatchOption,
) (callback *Callback, err error) {
return w.AddOnce("", path, callbackFunc, option...)
}

// AddOnce monitors `path` with callback function `callbackFunc` only once using unique name
Expand All @@ -28,26 +32,40 @@ func (w *Watcher) Add(path string, callbackFunc func(event *Event), recursive ..
//
// It returns error if it's called twice with the same `name`.
//
// The parameter `path` can be either a file or a directory path.
// The optional parameter `recursive` specifies whether monitoring the `path` recursively,
// which is true in default.
func (w *Watcher) AddOnce(name, path string, callbackFunc func(event *Event), recursive ...bool) (callback *Callback, err error) {
func (w *Watcher) AddOnce(
name, path string, callbackFunc func(event *Event), option ...WatchOption,
) (callback *Callback, err error) {
var watchOption = w.getWatchOption(option...)
w.nameSet.AddIfNotExistFuncLock(name, func() bool {
// Firstly add the path to watcher.
callback, err = w.addWithCallbackFunc(name, path, callbackFunc, recursive...)
//
// A path can only be watched once; watching it more than once is a no-op and will
// not return an error.
callback, err = w.addWithCallbackFunc(
name, path, callbackFunc, option...,
)
if err != nil {
return false
}

// If it's recursive adding, it then adds all sub-folders to the monitor.
// NOTE:
// 1. It only recursively adds **folders** to the monitor, NOT files,
// because if the folders are monitored and their sub-files are also monitored.
// 2. It bounds no callbacks to the folders, because it will search the callbacks
// from its parent recursively if any event produced.
if fileIsDir(path) && (len(recursive) == 0 || recursive[0]) {
if fileIsDir(path) && !watchOption.NoRecursive {
for _, subPath := range fileAllDirs(path) {
if fileIsDir(subPath) {
if err = w.watcher.Add(subPath); err != nil {
err = gerror.Wrapf(err, `add watch failed for path "%s"`, subPath)
if watchAddErr := w.watcher.Add(subPath); watchAddErr != nil {
err = gerror.Wrapf(
err,
`add watch failed for path "%s", err: %s`,
subPath, watchAddErr.Error(),
)
} else {
intlog.Printf(context.TODO(), "watcher adds monitor for: %s", subPath)
}
Expand All @@ -62,25 +80,32 @@ func (w *Watcher) AddOnce(name, path string, callbackFunc func(event *Event), re
return
}

func (w *Watcher) getWatchOption(option ...WatchOption) WatchOption {
if len(option) > 0 {
return option[0]
}
return WatchOption{}
}

// addWithCallbackFunc adds the path to underlying monitor, creates and returns a callback object.
// Very note that if it calls multiple times with the same `path`, the latest one will overwrite the previous one.
func (w *Watcher) addWithCallbackFunc(name, path string, callbackFunc func(event *Event), recursive ...bool) (callback *Callback, err error) {
func (w *Watcher) addWithCallbackFunc(
name, path string, callbackFunc func(event *Event), option ...WatchOption,
) (callback *Callback, err error) {
var watchOption = w.getWatchOption(option...)
// Check and convert the given path to absolute path.
if t := fileRealPath(path); t == "" {
if realPath := fileRealPath(path); realPath == "" {
return nil, gerror.NewCodef(gcode.CodeInvalidParameter, `"%s" does not exist`, path)
} else {
path = t
path = realPath
}
// Create callback object.
callback = &Callback{
Id: callbackIdGenerator.Add(1),
Func: callbackFunc,
Path: path,
name: name,
recursive: true,
}
if len(recursive) > 0 {
callback.recursive = recursive[0]
recursive: !watchOption.NoRecursive,
}
// Register the callback to watcher.
w.callbacks.LockFunc(func(m map[string]interface{}) {
Expand Down Expand Up @@ -113,74 +138,50 @@ func (w *Watcher) Close() {
w.events.Close()
}

// Remove removes monitor and all callbacks associated with the `path` recursively.
// Remove removes watching and all callbacks associated with the `path` recursively.
// Note that, it's recursive in default if given `path` is a directory.
func (w *Watcher) Remove(path string) error {
// Firstly remove the callbacks of the path.
if value := w.callbacks.Remove(path); value != nil {
list := value.(*glist.List)
for {
if item := list.PopFront(); item != nil {
callbackIdMap.Remove(item.(*Callback).Id)
} else {
break
}
}
}
// Secondly remove monitor of all sub-files which have no callbacks.
if subPaths, err := fileScanDir(path, "*", true); err == nil && len(subPaths) > 0 {
for _, subPath := range subPaths {
if w.checkPathCanBeRemoved(subPath) {
if internalErr := w.watcher.Remove(subPath); internalErr != nil {
intlog.Errorf(context.TODO(), `%+v`, internalErr)
}
}
var (
err error
subPaths []string
removedPaths = make([]string, 0)
)
removedPaths = append(removedPaths, path)
if fileIsDir(path) {
subPaths, err = fileScanDir(path, "*", true)
if err != nil {
return err
}
removedPaths = append(removedPaths, subPaths...)
}
// Lastly remove the monitor of the path from underlying monitor.
err := w.watcher.Remove(path)
if err != nil {
err = gerror.Wrapf(err, `remove watch failed for path "%s"`, path)
}
return err
}

// checkPathCanBeRemoved checks whether the given path have no callbacks bound.
func (w *Watcher) checkPathCanBeRemoved(path string) bool {
// Firstly check the callbacks in the watcher directly.
if v := w.callbacks.Get(path); v != nil {
return false
}
// Secondly check its parent whether has callbacks.
dirPath := fileDir(path)
if v := w.callbacks.Get(dirPath); v != nil {
for _, c := range v.(*glist.List).FrontAll() {
if c.(*Callback).recursive {
return false
}
}
return false
}
// Recursively check its parent.
parentDirPath := ""
for {
parentDirPath = fileDir(dirPath)
if parentDirPath == dirPath {
break
}
if v := w.callbacks.Get(parentDirPath); v != nil {
for _, c := range v.(*glist.List).FrontAll() {
if c.(*Callback).recursive {
return false
for _, removedPath := range removedPaths {
// remove the callbacks of the path.
if value := w.callbacks.Remove(removedPath); value != nil {
list := value.(*glist.List)
for {
if item := list.PopFront(); item != nil {
callbackIdMap.Remove(item.(*Callback).Id)
} else {
break
}
}
return false
}
dirPath = parentDirPath
// remove the monitor of the path from underlying monitor.
if watcherRemoveErr := w.watcher.Remove(removedPath); watcherRemoveErr != nil {
err = gerror.Wrapf(
err,
`remove watch failed for path "%s", err: %s`,
removedPath, watcherRemoveErr.Error(),
)
}
}
return true
return err
}

// RemoveCallback removes callback with given callback id from watcher.
//
// Note that, it auto removes the path watching if there's no callback bound on it.
func (w *Watcher) RemoveCallback(callbackId int) {
callback := (*Callback)(nil)
if r := callbackIdMap.Get(callbackId); r != nil {
Expand Down
Loading
Loading