From a839c64b49e3ac39def6b5aa13f2097e22b14c11 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Mon, 20 Aug 2018 23:01:35 +0200 Subject: [PATCH 1/2] Install watches in single goroutine (#8009) Auditbeat's file_integrity module had a data race in its recursive watcher implementation for Linux. Was caused by file watches being installed from different goroutines. This patch fixes the problem by moving watch installation to the same goroutine that processes inotify's events, guaranteeing that watches are installed in order, which is necessary for consistence. Fixes #8009 --- CHANGELOG.asciidoc | 1 + .../file_integrity/monitor/monitor_test.go | 3 +- .../file_integrity/monitor/recursive.go | 50 ++++++++++++++----- 3 files changed, 41 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 82258c89f592..79f5b9534a49 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -45,6 +45,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] *Auditbeat* - Fixed a crash in the file_integrity module under Linux. {issue}7753[7753] +- Fixed a data race in the file_integrity module. {issue}8009[8009] *Filebeat* diff --git a/auditbeat/module/file_integrity/monitor/monitor_test.go b/auditbeat/module/file_integrity/monitor/monitor_test.go index f9be16f10d0a..0f923ff25721 100644 --- a/auditbeat/module/file_integrity/monitor/monitor_test.go +++ b/auditbeat/module/file_integrity/monitor/monitor_test.go @@ -211,8 +211,9 @@ func TestRecursiveSubdirPermissions(t *testing.T) { watcher, err := New(true) assertNoError(t, err) - assertNoError(t, watcher.Add(dir)) assertNoError(t, watcher.Start()) + assertNoError(t, watcher.Add(dir)) + defer func() { assertNoError(t, watcher.Close()) }() diff --git a/auditbeat/module/file_integrity/monitor/recursive.go b/auditbeat/module/file_integrity/monitor/recursive.go index 2b1f060f89d9..0e7202c38586 100644 --- a/auditbeat/module/file_integrity/monitor/recursive.go +++ b/auditbeat/module/file_integrity/monitor/recursive.go @@ -27,17 +27,21 @@ import ( ) type recursiveWatcher struct { - inner *fsnotify.Watcher - tree FileTree - eventC chan fsnotify.Event - done chan bool + inner *fsnotify.Watcher + tree FileTree + eventC chan fsnotify.Event + done chan bool + addC chan string + addErrC chan error } func newRecursiveWatcher(inner *fsnotify.Watcher) *recursiveWatcher { return &recursiveWatcher{ - inner: inner, - tree: FileTree{}, - eventC: make(chan fsnotify.Event, 1), + inner: inner, + tree: FileTree{}, + eventC: make(chan fsnotify.Event, 1), + addC: make(chan string), + addErrC: make(chan error), } } @@ -48,6 +52,10 @@ func (watcher *recursiveWatcher) Start() error { } func (watcher *recursiveWatcher) Add(path string) error { + if watcher.done != nil { + watcher.addC <- path + return <-watcher.addErrC + } return watcher.addRecursive(path) } @@ -104,6 +112,21 @@ func (watcher *recursiveWatcher) close() error { return watcher.inner.Close() } +func (watcher *recursiveWatcher) deliver(ev fsnotify.Event) { + for { + select { + case <-watcher.done: + return + + case path := <-watcher.addC: + watcher.addErrC <- watcher.addRecursive(path) + + case watcher.eventC <- ev: + return + } + } +} + func (watcher *recursiveWatcher) forwardEvents() error { defer watcher.close() @@ -112,6 +135,9 @@ func (watcher *recursiveWatcher) forwardEvents() error { case <-watcher.done: return nil + case path := <-watcher.addC: + watcher.addErrC <- watcher.addRecursive(path) + case event, ok := <-watcher.inner.Events: if !ok { return nil @@ -125,19 +151,19 @@ func (watcher *recursiveWatcher) forwardEvents() error { watcher.inner.Errors <- errors.Wrapf(err, "unable to recurse path '%s'", event.Name) } watcher.tree.Visit(event.Name, PreOrder, func(path string, _ bool) error { - watcher.eventC <- fsnotify.Event{ + watcher.deliver(fsnotify.Event{ Name: path, Op: event.Op, - } + }) return nil }) case fsnotify.Remove: watcher.tree.Visit(event.Name, PostOrder, func(path string, _ bool) error { - watcher.eventC <- fsnotify.Event{ + watcher.deliver(fsnotify.Event{ Name: path, Op: event.Op, - } + }) return nil }) watcher.tree.Remove(event.Name) @@ -151,7 +177,7 @@ func (watcher *recursiveWatcher) forwardEvents() error { fallthrough default: - watcher.eventC <- event + watcher.deliver(event) } } } From 84cb605451761a563b5006fdc023bcc918611875 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Mon, 20 Aug 2018 23:07:05 +0200 Subject: [PATCH 2/2] Fix deadlock when file integrity monitor is started A deadlock is possible in auditbeat's file_integrity module under Windows: When enough events arrive while watches are being installed, the event channel can fill causing the installation of a watch to block. This patch makes sure that events are received while watches are being installed, and at the same time ensures that no event is lost. --- CHANGELOG.asciidoc | 1 + .../file_integrity/eventreader_fsnotify.go | 51 ++++++++++++-- .../module/file_integrity/eventreader_test.go | 70 +++++++++++++++++++ 3 files changed, 117 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 79f5b9534a49..e5fa34c91af7 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -46,6 +46,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Fixed a crash in the file_integrity module under Linux. {issue}7753[7753] - Fixed a data race in the file_integrity module. {issue}8009[8009] +- Fixed a deadlock in the file_integrity module. {pull}8027[8027] *Filebeat* diff --git a/auditbeat/module/file_integrity/eventreader_fsnotify.go b/auditbeat/module/file_integrity/eventreader_fsnotify.go index c08e8186a6e5..b228ebc59a32 100644 --- a/auditbeat/module/file_integrity/eventreader_fsnotify.go +++ b/auditbeat/module/file_integrity/eventreader_fsnotify.go @@ -47,7 +47,6 @@ func NewEventReader(c Config) (EventProducer, error) { return &reader{ watcher: watcher, config: c, - eventC: make(chan Event, 1), log: logp.NewLogger(moduleName), }, nil } @@ -56,7 +55,16 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) { if err := r.watcher.Start(); err != nil { return nil, errors.Wrap(err, "unable to start watcher") } - go r.consumeEvents(done) + + queueDone := make(chan struct{}) + queueC := make(chan []*Event) + + // Launch a separate goroutine to fetch all events that happen while + // watches are being installed. + go func() { + defer close(queueC) + queueC <- r.enqueueEvents(queueDone) + }() // Windows implementation of fsnotify needs to have the watched paths // installed after the event consumer is started, to avoid a potential @@ -73,21 +81,53 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) { } } + close(queueDone) + events := <-queueC + + // Populate callee's event channel with the previously received events + r.eventC = make(chan Event, 1+len(events)) + for _, ev := range events { + r.eventC <- *ev + } + + go r.consumeEvents(done) + r.log.Infow("Started fsnotify watcher", "file_path", r.config.Paths, "recursive", r.config.Recursive) return r.eventC, nil } +func (r *reader) enqueueEvents(done <-chan struct{}) (events []*Event) { + for { + ev := r.nextEvent(done) + if ev == nil { + return + } + events = append(events, ev) + } +} + func (r *reader) consumeEvents(done <-chan struct{}) { defer close(r.eventC) defer r.watcher.Close() for { - select { - case <-done: + ev := r.nextEvent(done) + if ev == nil { r.log.Debug("fsnotify reader terminated") return + } + r.eventC <- *ev + } +} + +func (r *reader) nextEvent(done <-chan struct{}) *Event { + for { + select { + case <-done: + return nil + case event := <-r.watcher.EventChannel(): if event.Name == "" || r.config.IsExcludedPath(event.Name) || !r.config.IsIncludedPath(event.Name) { @@ -102,7 +142,8 @@ func (r *reader) consumeEvents(done <-chan struct{}) { r.config.MaxFileSizeBytes, r.config.HashTypes) e.rtt = time.Since(start) - r.eventC <- e + return &e + case err := <-r.watcher.ErrorChannel(): // a bug in fsnotify can cause spurious nil errors to be sent // on the error channel. diff --git a/auditbeat/module/file_integrity/eventreader_test.go b/auditbeat/module/file_integrity/eventreader_test.go index f0bf074902cb..f444aa6ea884 100644 --- a/auditbeat/module/file_integrity/eventreader_test.go +++ b/auditbeat/module/file_integrity/eventreader_test.go @@ -18,10 +18,12 @@ package file_integrity import ( + "fmt" "io/ioutil" "os" "path/filepath" "runtime" + "strings" "syscall" "testing" "time" @@ -233,6 +235,74 @@ func TestEventReader(t *testing.T) { }) } +func TestRaces(t *testing.T) { + const ( + fileMode os.FileMode = 0640 + N = 100 + ) + + var dirs []string + + for i := 0; i < N; i++ { + dir, err := ioutil.TempDir("", "audit") + if err != nil { + t.Fatal(err) + } + if dir, err = filepath.EvalSymlinks(dir); err != nil { + t.Fatal(err) + } + dirs = append(dirs, dir) + } + + defer func() { + for _, dir := range dirs { + os.RemoveAll(dir) + } + }() + + // Create a new EventProducer. + config := defaultConfig + config.Paths = dirs + config.Recursive = true + r, err := NewEventReader(config) + if err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + defer close(done) + + // Generate a lot of events in parallel to Start() so there is a chance of + // events arriving before all watched dirs are Add()-ed + go func() { + for i := 0; i < 10; i++ { + for _, dir := range dirs { + fname := filepath.Join(dir, fmt.Sprintf("%d.dat", i)) + ioutil.WriteFile(fname, []byte("hello"), fileMode) + } + } + }() + eventC, err := r.Start(done) + if err != nil { + t.Fatal(err) + } + + const marker = "test_file" + for _, dir := range dirs { + fname := filepath.Join(dir, marker) + ioutil.WriteFile(fname, []byte("hello"), fileMode) + } + + got := 0 + for i := 0; got < N; i++ { + ev := readTimeout(t, eventC) + if strings.Contains(ev.Path, marker) { + got++ + } + } + assert.Equal(t, N, got) +} + // readTimeout reads one event from the channel and returns it. If it does // not receive an event after one second it will time-out and fail the test. func readTimeout(t testing.TB, events <-chan Event) Event {