Skip to content

Commit

Permalink
Fix deadlock when file integrity monitor is started
Browse files Browse the repository at this point in the history
A deadlock is possible in auditbeat's file_integrity module under
Windows: When enough events arrive while watches are being installed,
the event channel can fill causing the installation of a watch to
block.

This patch makes sure that events are received while watches are being
installed, and at the same time ensures that no event is lost.

(cherry picked from commit 51267b4)
  • Loading branch information
adriansr authored and Christoph Wurm committed Sep 5, 2018
1 parent f6e4bc7 commit 8bb0a6f
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]
- Fixed the RPM by designating the config file as configuration data in the RPM spec. {issue}8075[8075]
- Fixed a concurrent map write panic in the auditd module. {pull}8158[8158]
- Fixed a data race in the file_integrity module. {issue}8009[8009]
- Fixed a deadlock in the file_integrity module. {pull}8027[8027]

*Filebeat*

Expand Down
51 changes: 46 additions & 5 deletions auditbeat/module/file_integrity/eventreader_fsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func NewEventReader(c Config) (EventProducer, error) {
return &reader{
watcher: watcher,
config: c,
eventC: make(chan Event, 1),
log: logp.NewLogger(moduleName),
}, nil
}
Expand All @@ -56,7 +55,16 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) {
if err := r.watcher.Start(); err != nil {
return nil, errors.Wrap(err, "unable to start watcher")
}
go r.consumeEvents(done)

queueDone := make(chan struct{})
queueC := make(chan []*Event)

// Launch a separate goroutine to fetch all events that happen while
// watches are being installed.
go func() {
defer close(queueC)
queueC <- r.enqueueEvents(queueDone)
}()

// Windows implementation of fsnotify needs to have the watched paths
// installed after the event consumer is started, to avoid a potential
Expand All @@ -73,21 +81,53 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) {
}
}

close(queueDone)
events := <-queueC

// Populate callee's event channel with the previously received events
r.eventC = make(chan Event, 1+len(events))
for _, ev := range events {
r.eventC <- *ev
}

go r.consumeEvents(done)

r.log.Infow("Started fsnotify watcher",
"file_path", r.config.Paths,
"recursive", r.config.Recursive)
return r.eventC, nil
}

func (r *reader) enqueueEvents(done <-chan struct{}) (events []*Event) {
for {
ev := r.nextEvent(done)
if ev == nil {
return
}
events = append(events, ev)
}
}

func (r *reader) consumeEvents(done <-chan struct{}) {
defer close(r.eventC)
defer r.watcher.Close()

for {
select {
case <-done:
ev := r.nextEvent(done)
if ev == nil {
r.log.Debug("fsnotify reader terminated")
return
}
r.eventC <- *ev
}
}

func (r *reader) nextEvent(done <-chan struct{}) *Event {
for {
select {
case <-done:
return nil

case event := <-r.watcher.EventChannel():
if event.Name == "" || r.config.IsExcludedPath(event.Name) {
continue
Expand All @@ -101,7 +141,8 @@ func (r *reader) consumeEvents(done <-chan struct{}) {
r.config.MaxFileSizeBytes, r.config.HashTypes)
e.rtt = time.Since(start)

r.eventC <- e
return &e

case err := <-r.watcher.ErrorChannel():
// a bug in fsnotify can cause spurious nil errors to be sent
// on the error channel.
Expand Down
70 changes: 70 additions & 0 deletions auditbeat/module/file_integrity/eventreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package file_integrity

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strings"
"syscall"
"testing"
"time"
Expand Down Expand Up @@ -233,6 +235,74 @@ func TestEventReader(t *testing.T) {
})
}

func TestRaces(t *testing.T) {
const (
fileMode os.FileMode = 0640
N = 100
)

var dirs []string

for i := 0; i < N; i++ {
dir, err := ioutil.TempDir("", "audit")
if err != nil {
t.Fatal(err)
}
if dir, err = filepath.EvalSymlinks(dir); err != nil {
t.Fatal(err)
}
dirs = append(dirs, dir)
}

defer func() {
for _, dir := range dirs {
os.RemoveAll(dir)
}
}()

// Create a new EventProducer.
config := defaultConfig
config.Paths = dirs
config.Recursive = true
r, err := NewEventReader(config)
if err != nil {
t.Fatal(err)
}

done := make(chan struct{})
defer close(done)

// Generate a lot of events in parallel to Start() so there is a chance of
// events arriving before all watched dirs are Add()-ed
go func() {
for i := 0; i < 10; i++ {
for _, dir := range dirs {
fname := filepath.Join(dir, fmt.Sprintf("%d.dat", i))
ioutil.WriteFile(fname, []byte("hello"), fileMode)
}
}
}()
eventC, err := r.Start(done)
if err != nil {
t.Fatal(err)
}

const marker = "test_file"
for _, dir := range dirs {
fname := filepath.Join(dir, marker)
ioutil.WriteFile(fname, []byte("hello"), fileMode)
}

got := 0
for i := 0; got < N; i++ {
ev := readTimeout(t, eventC)
if strings.Contains(ev.Path, marker) {
got++
}
}
assert.Equal(t, N, got)
}

// readTimeout reads one event from the channel and returns it. If it does
// not receive an event after one second it will time-out and fail the test.
func readTimeout(t testing.TB, events <-chan Event) Event {
Expand Down

0 comments on commit 8bb0a6f

Please sign in to comment.