From 8bb0a6fe32838746f051729878190d6d21b5057a Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Mon, 20 Aug 2018 23:07:05 +0200 Subject: [PATCH] Fix deadlock when file integrity monitor is started A deadlock is possible in auditbeat's file_integrity module under Windows: When enough events arrive while watches are being installed, the event channel can fill causing the installation of a watch to block. This patch makes sure that events are received while watches are being installed, and at the same time ensures that no event is lost. (cherry picked from commit 51267b4e4a02ef27ab37c3dabad8682f5228aece) --- CHANGELOG.asciidoc | 1 + .../file_integrity/eventreader_fsnotify.go | 51 ++++++++++++-- .../module/file_integrity/eventreader_test.go | 70 +++++++++++++++++++ 3 files changed, 117 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 7c8f2cb51168..aa4ec45fc9af 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -47,6 +47,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff] - Fixed the RPM by designating the config file as configuration data in the RPM spec. {issue}8075[8075] - Fixed a concurrent map write panic in the auditd module. {pull}8158[8158] - Fixed a data race in the file_integrity module. {issue}8009[8009] +- Fixed a deadlock in the file_integrity module. {pull}8027[8027] *Filebeat* diff --git a/auditbeat/module/file_integrity/eventreader_fsnotify.go b/auditbeat/module/file_integrity/eventreader_fsnotify.go index 7417147e9f93..98052157a7a4 100644 --- a/auditbeat/module/file_integrity/eventreader_fsnotify.go +++ b/auditbeat/module/file_integrity/eventreader_fsnotify.go @@ -47,7 +47,6 @@ func NewEventReader(c Config) (EventProducer, error) { return &reader{ watcher: watcher, config: c, - eventC: make(chan Event, 1), log: logp.NewLogger(moduleName), }, nil } @@ -56,7 +55,16 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) { if err := r.watcher.Start(); err != nil { return nil, errors.Wrap(err, "unable to start watcher") } - go r.consumeEvents(done) + + queueDone := make(chan struct{}) + queueC := make(chan []*Event) + + // Launch a separate goroutine to fetch all events that happen while + // watches are being installed. + go func() { + defer close(queueC) + queueC <- r.enqueueEvents(queueDone) + }() // Windows implementation of fsnotify needs to have the watched paths // installed after the event consumer is started, to avoid a potential @@ -73,21 +81,53 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) { } } + close(queueDone) + events := <-queueC + + // Populate callee's event channel with the previously received events + r.eventC = make(chan Event, 1+len(events)) + for _, ev := range events { + r.eventC <- *ev + } + + go r.consumeEvents(done) + r.log.Infow("Started fsnotify watcher", "file_path", r.config.Paths, "recursive", r.config.Recursive) return r.eventC, nil } +func (r *reader) enqueueEvents(done <-chan struct{}) (events []*Event) { + for { + ev := r.nextEvent(done) + if ev == nil { + return + } + events = append(events, ev) + } +} + func (r *reader) consumeEvents(done <-chan struct{}) { defer close(r.eventC) defer r.watcher.Close() for { - select { - case <-done: + ev := r.nextEvent(done) + if ev == nil { r.log.Debug("fsnotify reader terminated") return + } + r.eventC <- *ev + } +} + +func (r *reader) nextEvent(done <-chan struct{}) *Event { + for { + select { + case <-done: + return nil + case event := <-r.watcher.EventChannel(): if event.Name == "" || r.config.IsExcludedPath(event.Name) { continue @@ -101,7 +141,8 @@ func (r *reader) consumeEvents(done <-chan struct{}) { r.config.MaxFileSizeBytes, r.config.HashTypes) e.rtt = time.Since(start) - r.eventC <- e + return &e + case err := <-r.watcher.ErrorChannel(): // a bug in fsnotify can cause spurious nil errors to be sent // on the error channel. diff --git a/auditbeat/module/file_integrity/eventreader_test.go b/auditbeat/module/file_integrity/eventreader_test.go index f0bf074902cb..f444aa6ea884 100644 --- a/auditbeat/module/file_integrity/eventreader_test.go +++ b/auditbeat/module/file_integrity/eventreader_test.go @@ -18,10 +18,12 @@ package file_integrity import ( + "fmt" "io/ioutil" "os" "path/filepath" "runtime" + "strings" "syscall" "testing" "time" @@ -233,6 +235,74 @@ func TestEventReader(t *testing.T) { }) } +func TestRaces(t *testing.T) { + const ( + fileMode os.FileMode = 0640 + N = 100 + ) + + var dirs []string + + for i := 0; i < N; i++ { + dir, err := ioutil.TempDir("", "audit") + if err != nil { + t.Fatal(err) + } + if dir, err = filepath.EvalSymlinks(dir); err != nil { + t.Fatal(err) + } + dirs = append(dirs, dir) + } + + defer func() { + for _, dir := range dirs { + os.RemoveAll(dir) + } + }() + + // Create a new EventProducer. + config := defaultConfig + config.Paths = dirs + config.Recursive = true + r, err := NewEventReader(config) + if err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + defer close(done) + + // Generate a lot of events in parallel to Start() so there is a chance of + // events arriving before all watched dirs are Add()-ed + go func() { + for i := 0; i < 10; i++ { + for _, dir := range dirs { + fname := filepath.Join(dir, fmt.Sprintf("%d.dat", i)) + ioutil.WriteFile(fname, []byte("hello"), fileMode) + } + } + }() + eventC, err := r.Start(done) + if err != nil { + t.Fatal(err) + } + + const marker = "test_file" + for _, dir := range dirs { + fname := filepath.Join(dir, marker) + ioutil.WriteFile(fname, []byte("hello"), fileMode) + } + + got := 0 + for i := 0; got < N; i++ { + ev := readTimeout(t, eventC) + if strings.Contains(ev.Path, marker) { + got++ + } + } + assert.Equal(t, N, got) +} + // readTimeout reads one event from the channel and returns it. If it does // not receive an event after one second it will time-out and fail the test. func readTimeout(t testing.TB, events <-chan Event) Event {