diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 52c7ad9ab758..433641a4ad72 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -52,6 +52,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] *Filebeat* - Fix EOF on single line not producing any event. {issue}30436[30436] {pull}33568[33568] +- Fix handling of error in states in direct aws-s3 listing input {issue}33513[33513] {pull}33722[33722] - Fix `httpjson` input page number initialization and documentation. {pull}33400[33400] - Add handling of AAA operations for Cisco ASA module. {issue}32257[32257] {pull}32789[32789] - Fix gc.log always shipped even if gc fileset is disabled {issue}30995[30995] diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 2b3b8c45ee7a..df3a5c47e43e 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -31,11 +31,8 @@ import ( "golang.org/x/sync/errgroup" "gopkg.in/yaml.v2" - "github.com/elastic/beats/v7/filebeat/beater" v2 "github.com/elastic/beats/v7/filebeat/input/v2" pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" - "github.com/elastic/beats/v7/libbeat/statestore" - "github.com/elastic/beats/v7/libbeat/statestore/storetest" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -134,28 +131,6 @@ file_selectors: `, queueURL)) } -type testInputStore struct { - registry *statestore.Registry -} - -func openTestStatestore() beater.StateStore { - return &testInputStore{ - registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()), - } -} - -func (s *testInputStore) Close() { - s.registry.Close() -} - -func (s *testInputStore) Access() (*statestore.Store, error) { - return s.registry.Get("filebeat") -} - -func (s *testInputStore) CleanupInterval() time.Duration { - return 24 * time.Hour -} - func createInput(t *testing.T, cfg *conf.C) *s3Input { inputV2, err := Plugin(openTestStatestore()).Manager.Create(cfg) if err != nil { diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index 54b52475ed9d..bb367c755b4a 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -208,7 +208,12 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<- continue } - p.states.Update(state, "") + // we have no previous state or the previous state + // is not stored: refresh the state + previousState := p.states.FindPrevious(state) + if previousState.IsEmpty() || !previousState.IsProcessed() { + p.states.Update(state, "") + } event := s3EventV2{} event.AWSRegion = p.region @@ -282,8 +287,8 @@ func (p *s3Poller) Purge() { for _, state := range p.states.GetStatesByListingID(listingID) { // it is not stored, keep - if !state.Stored { - p.log.Debugw("state not stored, skip purge", "state", state) + if !state.IsProcessed() { + p.log.Debugw("state not stored or with error, skip purge", "state", state) continue } @@ -294,7 +299,7 @@ func (p *s3Poller) Purge() { var commitWriteState commitWriteState err := p.store.Get(awsS3WriteCommitPrefix+state.Bucket+state.ListPrefix, &commitWriteState) if err == nil { - // we have no entry in the map and we have no entry in the store + // we have no entry in the map, and we have no entry in the store // set zero time latestStoredTime = time.Time{} p.log.Debugw("last stored time is zero time", "bucket", state.Bucket, "listPrefix", state.ListPrefix) diff --git a/x-pack/filebeat/input/awss3/state.go b/x-pack/filebeat/input/awss3/state.go index 1e2761ce7424..97fb8d538cd6 100644 --- a/x-pack/filebeat/input/awss3/state.go +++ b/x-pack/filebeat/input/awss3/state.go @@ -60,6 +60,11 @@ func (s *state) MarkAsError() { s.Error = true } +// IsProcessed checks if the state is either Stored or Error +func (s *state) IsProcessed() bool { + return s.Stored || s.Error +} + // IsEqual checks if the two states point to the same s3 object. func (s *state) IsEqual(c *state) bool { return s.Bucket == c.Bucket && s.Key == c.Key && s.Etag == c.Etag && s.LastModified.Equal(c.LastModified) diff --git a/x-pack/filebeat/input/awss3/states.go b/x-pack/filebeat/input/awss3/states.go index 46ecfa1a3200..449219a867f5 100644 --- a/x-pack/filebeat/input/awss3/states.go +++ b/x-pack/filebeat/input/awss3/states.go @@ -78,10 +78,10 @@ func (s *states) MustSkip(state state, store *statestore.Store) bool { return true } - // we have no previous state or the previous state - // is not stored: refresh the state - if previousState.IsEmpty() || (!previousState.Stored && !previousState.Error) { - s.Update(state, "") + // the previous state is stored or has error: let's skip + if !previousState.IsEmpty() && previousState.IsProcessed() { + s.log.Debugw("previous state is stored or has error", "state", state) + return true } return false @@ -166,7 +166,7 @@ func (s *states) Update(newState state, listingID string) { s.log.Debug("New state added for ", newState.ID) } - if listingID == "" || (!newState.Stored && !newState.Error) { + if listingID == "" || !newState.IsProcessed() { return } diff --git a/x-pack/filebeat/input/awss3/states_test.go b/x-pack/filebeat/input/awss3/states_test.go index b8a7cbb63d13..39dc4cf82e63 100644 --- a/x-pack/filebeat/input/awss3/states_test.go +++ b/x-pack/filebeat/input/awss3/states_test.go @@ -9,22 +9,51 @@ import ( "testing" "time" + "github.com/elastic/beats/v7/filebeat/beater" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/storetest" + "github.com/stretchr/testify/assert" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/elastic-agent-libs/logp" ) +type testInputStore struct { + registry *statestore.Registry +} + +func openTestStatestore() beater.StateStore { + return &testInputStore{ + registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()), + } +} + +func (s *testInputStore) Close() { + _ = s.registry.Close() +} + +func (s *testInputStore) Access() (*statestore.Store, error) { + return s.registry.Get("filebeat") +} + +func (s *testInputStore) CleanupInterval() time.Duration { + return 24 * time.Hour +} + var inputCtx = v2.Context{ Logger: logp.NewLogger("test"), Cancelation: context.Background(), } -func TestStatesIsNew(t *testing.T) { +func TestStatesIsNewAndMustSkip(t *testing.T) { type stateTestCase struct { - states func() *states - state state - expected bool + states func() *states + state state + mustBeNew bool + persistentStoreKV map[string]interface{} + expectedMustSkip bool + expectedIsNew bool } lastModified := time.Date(2022, time.June, 30, 14, 13, 00, 0, time.UTC) tests := map[string]stateTestCase{ @@ -32,8 +61,9 @@ func TestStatesIsNew(t *testing.T) { states: func() *states { return newStates(inputCtx) }, - state: newState("bucket", "key", "etag", "listPrefix", lastModified), - expected: true, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + expectedMustSkip: false, + expectedIsNew: true, }, "not existing state": { states: func() *states { @@ -41,8 +71,9 @@ func TestStatesIsNew(t *testing.T) { states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "") return states }, - state: newState("bucket1", "key1", "etag1", "listPrefix1", lastModified), - expected: true, + state: newState("bucket1", "key1", "etag1", "listPrefix1", lastModified), + expectedMustSkip: false, + expectedIsNew: true, }, "existing state": { states: func() *states { @@ -50,8 +81,9 @@ func TestStatesIsNew(t *testing.T) { states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "") return states }, - state: newState("bucket", "key", "etag", "listPrefix", lastModified), - expected: false, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + expectedMustSkip: true, + expectedIsNew: false, }, "with different etag": { states: func() *states { @@ -59,8 +91,9 @@ func TestStatesIsNew(t *testing.T) { states.Update(newState("bucket", "key", "etag1", "listPrefix", lastModified), "") return states }, - state: newState("bucket", "key", "etag2", "listPrefix", lastModified), - expected: true, + state: newState("bucket", "key", "etag2", "listPrefix", lastModified), + expectedMustSkip: false, + expectedIsNew: true, }, "with different lastmodified": { states: func() *states { @@ -68,8 +101,68 @@ func TestStatesIsNew(t *testing.T) { states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "") return states }, - state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(1*time.Second)), - expected: true, + state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(1*time.Second)), + expectedMustSkip: false, + expectedIsNew: true, + }, + "with stored state": { + states: func() *states { + states := newStates(inputCtx) + aState := newState("bucket", "key", "etag", "listPrefix", lastModified) + aState.Stored = true + states.Update(aState, "") + return states + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + mustBeNew: true, + expectedMustSkip: true, + expectedIsNew: true, + }, + "with error state": { + states: func() *states { + states := newStates(inputCtx) + aState := newState("bucket", "key", "etag", "listPrefix", lastModified) + aState.Error = true + states.Update(aState, "") + return states + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + mustBeNew: true, + expectedMustSkip: true, + expectedIsNew: true, + }, + "before commit write": { + states: func() *states { + return newStates(inputCtx) + }, + persistentStoreKV: map[string]interface{}{ + awsS3WriteCommitPrefix + "bucket" + "listPrefix": &commitWriteState{lastModified}, + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(-1*time.Second)), + expectedMustSkip: true, + expectedIsNew: true, + }, + "same commit write": { + states: func() *states { + return newStates(inputCtx) + }, + persistentStoreKV: map[string]interface{}{ + awsS3WriteCommitPrefix + "bucket" + "listPrefix": &commitWriteState{lastModified}, + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + expectedMustSkip: true, + expectedIsNew: true, + }, + "after commit write": { + states: func() *states { + return newStates(inputCtx) + }, + persistentStoreKV: map[string]interface{}{ + awsS3WriteCommitPrefix + "bucket" + "listPrefix": &commitWriteState{lastModified}, + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(time.Second)), + expectedMustSkip: false, + expectedIsNew: true, }, } @@ -77,8 +170,24 @@ func TestStatesIsNew(t *testing.T) { test := test t.Run(name, func(t *testing.T) { states := test.states() + store := openTestStatestore() + persistentStore, err := store.Access() + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + for key, value := range test.persistentStoreKV { + _ = persistentStore.Set(key, value) + } + + if test.mustBeNew { + test.state.LastModified = test.state.LastModified.Add(1 * time.Second) + } + isNew := states.IsNew(test.state) - assert.Equal(t, test.expected, isNew) + assert.Equal(t, test.expectedIsNew, isNew) + + mustSkip := states.MustSkip(test.state, persistentStore) + assert.Equal(t, test.expectedMustSkip, mustSkip) }) } }