Skip to content

Commit

Permalink
[7.17](backport #33956) filestream: fix 'requires pointer' error (#34041
Browse files Browse the repository at this point in the history
)

This commit fixes the 'requires pointer' error log issued by
filestream when a file being harvested is renamed.

Filebeat already recovers from this error by using the file
identifier from the prospector instead of the metadata, so the log
message is downgraded to warn and contains more details about what
happened.

(cherry picked from commit 2263f3e)

Co-authored-by: Tiago Queiroz <tiago.queiroz@elastic.co>
Co-authored-by: Denis <denis.rechkunov@elastic.co>
  • Loading branch information
3 people authored Dec 20, 2022
1 parent c92aba5 commit 8f33651
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Filebeat*

- Rename identity as identity_name when the value is a string in Azure Platform Logs. {pull}33654[33654]
- Fix 'requires pointer' error while getting cursor metadata. {pull}33956[33956]

*Heartbeat*

Expand Down
7 changes: 4 additions & 3 deletions filebeat/input/filestream/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,12 @@ func (p *fileProspector) onRename(log *logp.Logger, ctx input.Context, fe loginp
} else {
// update file metadata as the path has changed
var meta fileMeta
err := s.FindCursorMeta(src, meta)
err := s.FindCursorMeta(src, &meta)
if err != nil {
log.Errorf("Error while getting cursor meta data of entry %s: %v", src.Name(), err)

meta.IdentifierName = p.identifier.Name()
log.Warnf("Error while getting cursor meta data of entry '%s': '%w'"+
", using prospector's identifier: '%s'",
src.Name(), err, meta.IdentifierName)
}
err = s.UpdateMetadata(src, fileMeta{Source: fe.NewPath, IdentifierName: meta.IdentifierName})
if err != nil {
Expand Down
74 changes: 70 additions & 4 deletions filebeat/input/filestream/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,11 +478,11 @@ func (mu *mockMetadataUpdater) has(id string) bool {
}

func (mu *mockMetadataUpdater) FindCursorMeta(s loginp.Source, v interface{}) error {
_, ok := mu.table[s.Name()]
meta, ok := mu.table[s.Name()]
if !ok {
return fmt.Errorf("no such id")
return fmt.Errorf("no such id [%q]", s.Name())
}
return nil
return typeconv.Convert(v, meta)
}

func (mu *mockMetadataUpdater) ResetCursor(s loginp.Source, cur interface{}) error {
Expand Down Expand Up @@ -510,7 +510,6 @@ func (u *mockUnpackValue) UnpackCursorMeta(to interface{}) error {
type mockProspectorCleaner struct {
available map[string]loginp.Value
cleanedKeys []string
newEntries map[string]fileMeta
updatedKeys map[string]string
}

Expand Down Expand Up @@ -558,3 +557,70 @@ func mustPathIdentifier(renamed bool) fileIdentifier {
}
return pathIdentifier
}

func TestOnRenameFileIdentity(t *testing.T) {
testCases := map[string]struct {
identifier string
events []loginp.FSEvent
populateStore bool
errMsg string
}{
"identifier name from meta is kept": {
identifier: "foo",
errMsg: "must be the same as in the registry",
populateStore: true,
events: []loginp.FSEvent{
{
Op: loginp.OpRename,
OldPath: "/old/path/to/file",
NewPath: "/new/path/to/file",
Info: testFileInfo{},
},
},
},
"identifier from prospector is used": {
identifier: "path",
errMsg: "must come from prospector configuration",
populateStore: false,
events: []loginp.FSEvent{
{
Op: loginp.OpRename,
OldPath: "/old/path/to/file",
NewPath: "/new/path/to/file",
Info: testFileInfo{},
},
},
},
}

for k, tc := range testCases {
t.Run(k, func(t *testing.T) {
p := fileProspector{
filewatcher: &mockFileWatcher{events: tc.events},
identifier: mustPathIdentifier(true),
stateChangeCloser: stateChangeCloserConfig{Renamed: true},
}
ctx := input.Context{Logger: logp.L(), Cancelation: context.Background()}

path := "/new/path/to/file"
expectedIdentifier := tc.identifier
id := "path" + "::" + path

testStore := newMockMetadataUpdater()
if tc.populateStore {
testStore.table[id] = fileMeta{Source: path, IdentifierName: expectedIdentifier}
}

hg := newTestHarvesterGroup()
p.Run(ctx, testStore, hg)

got := testStore.table[id]
meta := fileMeta{}
typeconv.Convert(&meta, got)

if meta.IdentifierName != expectedIdentifier {
t.Errorf("fileMeta.IdentifierName %s, expecting: %q, got: %q", tc.errMsg, expectedIdentifier, meta.IdentifierName)
}
})
}
}

0 comments on commit 8f33651

Please sign in to comment.