Skip to content

Commit

Permalink
filestream: fix 'requires pointer' error (#33956) (#34042)
Browse files Browse the repository at this point in the history
This commit fixes the 'requires pointer' error log issued by
filestream when a file being harvested is renamed.

Filebeat already recovers from this error by using the file
identifier from the prospector instead of the metadata, so the log
message is downgraded to warn and contains more details about what
happened.

(cherry picked from commit 2263f3e)

Co-authored-by: Tiago Queiroz <tiago.queiroz@elastic.co>
  • Loading branch information
mergify[bot] and belimawr authored Dec 15, 2022
1 parent c46378c commit f7322cc
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix Google workspace pagination and document ID generation. {pull}33666[33666]
- Fix PANW handling of messages with event.original already set. {issue}33829[33829] {pull}33830[33830]
- Rename identity as identity_name when the value is a string in Azure Platform Logs. {pull}33654[33654]
- Fix 'requires pointer' error while getting cursor metadata. {pull}33956[33956]
- Fix input cancellation handling when HTTP client does not support contexts. {issue}33962[33962] {pull}33968[33968]
- Update mito CEL extension library to v0.0.0-20221207004749-2f0f2875e464 {pull}33974[33974]
- Fix CEL result deserialisation when evaluation fails. {issue}33992[33992] {pull}33996[33996]
Expand Down
7 changes: 4 additions & 3 deletions filebeat/input/filestream/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,12 @@ func (p *fileProspector) onRename(log *logp.Logger, ctx input.Context, fe loginp
} else {
// update file metadata as the path has changed
var meta fileMeta
err := s.FindCursorMeta(src, meta)
err := s.FindCursorMeta(src, &meta)
if err != nil {
log.Errorf("Error while getting cursor meta data of entry %s: %v", src.Name(), err)

meta.IdentifierName = p.identifier.Name()
log.Warnf("Error while getting cursor meta data of entry '%s': '%w'"+
", using prospector's identifier: '%s'",
src.Name(), err, meta.IdentifierName)
}
err = s.UpdateMetadata(src, fileMeta{Source: fe.NewPath, IdentifierName: meta.IdentifierName})
if err != nil {
Expand Down
73 changes: 70 additions & 3 deletions filebeat/input/filestream/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,11 +574,11 @@ func (mu *mockMetadataUpdater) checkOffset(id string, offset int64) bool {
}

func (mu *mockMetadataUpdater) FindCursorMeta(s loginp.Source, v interface{}) error {
_, ok := mu.table[s.Name()]
meta, ok := mu.table[s.Name()]
if !ok {
return fmt.Errorf("no such id")
return fmt.Errorf("no such id [%q]", s.Name())
}
return nil
return typeconv.Convert(v, meta)
}

func (mu *mockMetadataUpdater) ResetCursor(s loginp.Source, cur interface{}) error {
Expand Down Expand Up @@ -654,3 +654,70 @@ func mustPathIdentifier(renamed bool) fileIdentifier {
}
return pathIdentifier
}

func TestOnRenameFileIdentity(t *testing.T) {
testCases := map[string]struct {
identifier string
events []loginp.FSEvent
populateStore bool
errMsg string
}{
"identifier name from meta is kept": {
identifier: "foo",
errMsg: "must be the same as in the registry",
populateStore: true,
events: []loginp.FSEvent{
{
Op: loginp.OpRename,
OldPath: "/old/path/to/file",
NewPath: "/new/path/to/file",
Info: testFileInfo{},
},
},
},
"identifier from prospector is used": {
identifier: "path",
errMsg: "must come from prospector configuration",
populateStore: false,
events: []loginp.FSEvent{
{
Op: loginp.OpRename,
OldPath: "/old/path/to/file",
NewPath: "/new/path/to/file",
Info: testFileInfo{},
},
},
},
}

for k, tc := range testCases {
t.Run(k, func(t *testing.T) {
p := fileProspector{
filewatcher: newMockFileWatcher(tc.events, len(tc.events)),
identifier: mustPathIdentifier(true),
stateChangeCloser: stateChangeCloserConfig{Renamed: true},
}
ctx := input.Context{Logger: logp.L(), Cancelation: context.Background()}

path := "/new/path/to/file"
expectedIdentifier := tc.identifier
id := "path" + "::" + path

testStore := newMockMetadataUpdater()
if tc.populateStore {
testStore.table[id] = fileMeta{Source: path, IdentifierName: expectedIdentifier}
}

hg := newTestHarvesterGroup()
p.Run(ctx, testStore, hg)

got := testStore.table[id]
meta := fileMeta{}
typeconv.Convert(&meta, got)

if meta.IdentifierName != expectedIdentifier {
t.Errorf("fileMeta.IdentifierName %s, expecting: %q, got: %q", tc.errMsg, expectedIdentifier, meta.IdentifierName)
}
})
}
}

0 comments on commit f7322cc

Please sign in to comment.