Skip to content

Commit

Permalink
Detect new files under known paths in filestream input (#31268)
Browse files Browse the repository at this point in the history
## What does this PR do?

This PR fixes the `FileWatcher` of `filestream` input. Now a file is considered new even if the scanner has already found it in the previous iteration and the underlying file is different.

In the PR the file comparator function is passed as a parameter to make unit testing easier.

## Why is it important?

The problem is if an input file is renamed and a new file shows up, Filebeat did not register is as a new file. The new file was either considered updated. Or if the new file was smaller than the previous file, the file was deemed truncated and the complete contents of the previous file was reread from the beginning.
  • Loading branch information
kvch authored Apr 13, 2022
1 parent fea8a70 commit 54997ac
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Fix Azure activitylogs identity field type and several missing fields. {pull}31170[31170]
- checkpoint: Fix ingest error when a message contains trailing spaces {pull}31197[31197]
- m365_defender: Fix processing when alerts.entities is an empty list. {issue}31223[31223] {pull}31227[31227]
- Prevent filestream from rereading whole files if they are rotated using rename. {pull}31268[31268]

*Heartbeat*

Expand Down
22 changes: 14 additions & 8 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type fileWatcher struct {
scanner loginp.FSScanner
log *logp.Logger
events chan loginp.FSEvent
sameFileFunc func(os.FileInfo, os.FileInfo) bool
}

func newFileWatcher(paths []string, ns *common.ConfigNamespace) (loginp.FSWatcher, error) {
Expand Down Expand Up @@ -108,6 +109,7 @@ func newScannerWatcher(paths []string, c *common.Config) (loginp.FSWatcher, erro
prev: make(map[string]os.FileInfo, 0),
scanner: scanner,
events: make(chan loginp.FSEvent),
sameFileFunc: os.SameFile,
}, nil
}

Expand All @@ -125,7 +127,7 @@ func (w *fileWatcher) Run(ctx unison.Canceler) {
// run initial scan before starting regular
w.watch(ctx)

timed.Periodic(ctx, w.interval, func() error {
_ = timed.Periodic(ctx, w.interval, func() error {
w.watch(ctx)

return nil
Expand All @@ -141,12 +143,16 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {

for path, info := range paths {

// if the scanner found a new path or an existing path
// with a different file, it is a new file
prevInfo, ok := w.prev[path]
if !ok {
newFiles[path] = paths[path]
if !ok || !w.sameFileFunc(prevInfo, info) {
newFiles[path] = info
continue
}

// if the two infos belong to the same file and it has been modified
// if the size is smaller than before, it is truncated, if bigger, it is a write event
if prevInfo.ModTime() != info.ModTime() {
if prevInfo.Size() > info.Size() || w.resendOnModTime && prevInfo.Size() == info.Size() {
select {
Expand All @@ -171,7 +177,7 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {
// either because they have been deleted or renamed
for removedPath, removedInfo := range w.prev {
for newPath, newInfo := range newFiles {
if os.SameFile(removedInfo, newInfo) {
if w.sameFileFunc(removedInfo, newInfo) {
select {
case <-ctx.Done():
return
Expand Down Expand Up @@ -290,13 +296,13 @@ func (s *fileScanner) resolveRecursiveGlobs(c fileScannerConfig) error {

// normalizeGlobPatterns calls `filepath.Abs` on all the globs from config
func (s *fileScanner) normalizeGlobPatterns() error {
var paths []string
for _, path := range s.paths {
paths := make([]string, len(s.paths))
for i, path := range s.paths {
pathAbs, err := filepath.Abs(path)
if err != nil {
return fmt.Errorf("failed to get the absolute path for %s: %v", path, err)
return fmt.Errorf("failed to get the absolute path for %s: %w", path, err)
}
paths = append(paths, pathAbs)
paths[i] = pathAbs
}
s.paths = paths
return nil
Expand Down
15 changes: 10 additions & 5 deletions filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestFileScanner(t *testing.T) {
}

func setupFilesForScannerTest(t *testing.T, tmpDir string) {
err := os.Mkdir(filepath.Join(tmpDir, directoryPath), 750)
err := os.Mkdir(filepath.Join(tmpDir, directoryPath), 0750)
if err != nil {
t.Fatalf("cannot create non harvestable directory: %v", err)
}
Expand Down Expand Up @@ -201,10 +201,11 @@ func TestFileWatchNewDeleteModified(t *testing.T) {

t.Run(name, func(t *testing.T) {
w := fileWatcher{
log: logp.L(),
prev: test.prevFiles,
scanner: &mockScanner{test.nextFiles},
events: make(chan loginp.FSEvent),
log: logp.L(),
prev: test.prevFiles,
scanner: &mockScanner{test.nextFiles},
events: make(chan loginp.FSEvent),
sameFileFunc: testSameFile,
}

go w.watch(context.Background())
Expand Down Expand Up @@ -241,3 +242,7 @@ func (t testFileInfo) Mode() os.FileMode { return 0 }
func (t testFileInfo) ModTime() time.Time { return t.time }
func (t testFileInfo) IsDir() bool { return false }
func (t testFileInfo) Sys() interface{} { return t.sys }

func testSameFile(fi1, fi2 os.FileInfo) bool {
return fi1.Name() == fi2.Name()
}
67 changes: 64 additions & 3 deletions filebeat/input/filestream/fswatch_test_non_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ package filestream

import (
"context"
"errors"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
"github.com/elastic/beats/v7/libbeat/common/match"
Expand Down Expand Up @@ -144,9 +146,10 @@ func TestFileWatcherRenamedFile(t *testing.T) {
t.Fatal(err)
}
w := fileWatcher{
log: logp.L(),
scanner: scanner,
events: make(chan loginp.FSEvent),
log: logp.L(),
scanner: scanner,
events: make(chan loginp.FSEvent),
sameFileFunc: testSameFile,
}

go w.watch(context.Background())
Expand All @@ -170,6 +173,64 @@ func TestFileWatcherRenamedFile(t *testing.T) {
assert.Equal(t, renamedPath, evt.NewPath)
}

// this test is only supported on non Windows for now.
func TestFileWatcherRenamedTruncated(t *testing.T) {
tmpDir := t.TempDir()

fs, err := newFileScanner([]string{filepath.Join(tmpDir, "app.log*")}, fileScannerConfig{})
if err != nil {
t.Fatal(err)
}
w := fileWatcher{
log: logp.L(),
scanner: fs,
events: make(chan loginp.FSEvent),
sameFileFunc: os.SameFile,
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go w.watch(ctx)

appLogPath := filepath.Join(tmpDir, "app.log")
rotatedAppLogPath := filepath.Join(tmpDir, "app.log.1")
err = os.WriteFile(appLogPath, []byte("my longer log line"), 0o600)
if err != nil {
t.Fatal(err)
}

evt := w.Event()
require.Equal(t, loginp.OpCreate, evt.Op, "new file should be detected")
require.Equal(t, "", evt.OldPath, "new file does not have an old path set")
require.Equal(t, appLogPath, evt.NewPath, "new file does not have an old path set")

go w.watch(ctx)

err = os.Rename(appLogPath, rotatedAppLogPath)
if err != nil {
t.Fatalf("failed to rotate active file: %v", err)
}

if _, err := os.Stat(appLogPath); !errors.Is(err, os.ErrNotExist) {
t.Fatalf("app.log should not exist")
}

err = os.WriteFile(appLogPath, []byte("shorter line"), 0o600)
if err != nil {
t.Fatal(err)
}

evt = w.Event()
require.Equal(t, loginp.OpRename, evt.Op, "app.log has been renamed to app.log.1, got: %s old_path=%s new_path=%s", evt.Op.String(), evt.OldPath, evt.NewPath)
require.Equal(t, appLogPath, evt.OldPath, "old_path should be set to app.log because of rename")
require.Equal(t, rotatedAppLogPath, evt.NewPath, "new_path should be set to app.log.1 because of rename")

evt = w.Event()
require.Equal(t, loginp.OpCreate, evt.Op, "new file app.log should be detected, got: %s for old_path=%s new_path=%s", evt.Op.String(), evt.OldPath, evt.NewPath)
require.Equal(t, "", evt.OldPath, "new file should not have an old path set")
require.Equal(t, appLogPath, evt.NewPath, "new file should be called app.log")
}

func mustAbsPath(filename string) string {
abspath, err := filepath.Abs(filename)
if err != nil {
Expand Down

0 comments on commit 54997ac

Please sign in to comment.