From eeb419b3b9e09797a304bed0e3dda621ced7e936 Mon Sep 17 00:00:00 2001 From: Andrea Spacca Date: Mon, 12 Dec 2022 19:28:26 +0900 Subject: [PATCH] fix in handling states and purging (#33722) * fix in handling states and purging * changelog * increase coverage * cr fixes * remove code duplication in tests (cherry picked from commit 01f9d43c96bdbcb46ed4f1d6e75363906a34112b) --- CHANGELOG.next.asciidoc | 14 ++ .../input/awss3/input_integration_test.go | 25 ---- x-pack/filebeat/input/awss3/s3.go | 13 +- x-pack/filebeat/input/awss3/state.go | 5 + x-pack/filebeat/input/awss3/states.go | 10 +- x-pack/filebeat/input/awss3/states_test.go | 139 ++++++++++++++++-- 6 files changed, 157 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 546de8da1b8..43d2ad2d21d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -43,6 +43,20 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] *Filebeat* - Fix EOF on single line not producing any event. {issue}30436[30436] {pull}33568[33568] +- Fix handling of error in states in direct aws-s3 listing input {issue}33513[33513] {pull}33722[33722] +- Fix `httpjson` input page number initialization and documentation. {pull}33400[33400] +- Add handling of AAA operations for Cisco ASA module. {issue}32257[32257] {pull}32789[32789] +- Fix gc.log always shipped even if gc fileset is disabled {issue}30995[30995] +- Fix handling of empty array in httpjson input. {pull}32001[32001] +- Fix reporting of `filebeat.events.active` in log events such that the current value is always reported instead of the difference from the last value. {pull}33597[33597] +- Fix splitting array of strings/arrays in httpjson input {issue}30345[30345] {pull}33609[33609] +- Fix Google workspace pagination and document ID generation. {pull}33666[33666] +- Fix PANW handling of messages with event.original already set. {issue}33829[33829] {pull}33830[33830] +- Rename identity as identity_name when the value is a string in Azure Platform Logs. {pull}33654[33654] +- Fix input cancellation handling when HTTP client does not support contexts. {issue}33962[33962] {pull}33968[33968] +- Update mito CEL extension library to v0.0.0-20221207004749-2f0f2875e464 {pull}33974[33974] +- Fix CEL result deserialisation when evaluation fails. {issue}33992[33992] {pull}33996[33996] +- Fix handling of non-200/non-429 status codes. {issue}33999[33999] {pull}34002[34002] *Heartbeat* diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 024caa23ab5..6f4d793f42a 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -31,11 +31,8 @@ import ( "golang.org/x/sync/errgroup" "gopkg.in/yaml.v2" - "github.com/elastic/beats/v7/filebeat/beater" v2 "github.com/elastic/beats/v7/filebeat/input/v2" pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" - "github.com/elastic/beats/v7/libbeat/statestore" - "github.com/elastic/beats/v7/libbeat/statestore/storetest" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -134,28 +131,6 @@ file_selectors: `, queueURL)) } -type testInputStore struct { - registry *statestore.Registry -} - -func openTestStatestore() beater.StateStore { - return &testInputStore{ - registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()), - } -} - -func (s *testInputStore) Close() { - s.registry.Close() -} - -func (s *testInputStore) Access() (*statestore.Store, error) { - return s.registry.Get("filebeat") -} - -func (s *testInputStore) CleanupInterval() time.Duration { - return 24 * time.Hour -} - func createInput(t *testing.T, cfg *conf.C) *s3Input { inputV2, err := Plugin(openTestStatestore()).Manager.Create(cfg) if err != nil { diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index 349d5f7cfdd..c97a38f716e 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -203,7 +203,12 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<- continue } - p.states.Update(state, "") + // we have no previous state or the previous state + // is not stored: refresh the state + previousState := p.states.FindPrevious(state) + if previousState.IsEmpty() || !previousState.IsProcessed() { + p.states.Update(state, "") + } event := s3EventV2{} event.AWSRegion = p.region @@ -277,8 +282,8 @@ func (p *s3Poller) Purge() { for _, state := range p.states.GetStatesByListingID(listingID) { // it is not stored, keep - if !state.Stored { - p.log.Debugw("state not stored, skip purge", "state", state) + if !state.IsProcessed() { + p.log.Debugw("state not stored or with error, skip purge", "state", state) continue } @@ -289,7 +294,7 @@ func (p *s3Poller) Purge() { var commitWriteState commitWriteState err := p.store.Get(awsS3WriteCommitPrefix+state.Bucket+state.ListPrefix, &commitWriteState) if err == nil { - // we have no entry in the map and we have no entry in the store + // we have no entry in the map, and we have no entry in the store // set zero time latestStoredTime = time.Time{} p.log.Debugw("last stored time is zero time", "bucket", state.Bucket, "listPrefix", state.ListPrefix) diff --git a/x-pack/filebeat/input/awss3/state.go b/x-pack/filebeat/input/awss3/state.go index 1e2761ce742..97fb8d538cd 100644 --- a/x-pack/filebeat/input/awss3/state.go +++ b/x-pack/filebeat/input/awss3/state.go @@ -60,6 +60,11 @@ func (s *state) MarkAsError() { s.Error = true } +// IsProcessed checks if the state is either Stored or Error +func (s *state) IsProcessed() bool { + return s.Stored || s.Error +} + // IsEqual checks if the two states point to the same s3 object. func (s *state) IsEqual(c *state) bool { return s.Bucket == c.Bucket && s.Key == c.Key && s.Etag == c.Etag && s.LastModified.Equal(c.LastModified) diff --git a/x-pack/filebeat/input/awss3/states.go b/x-pack/filebeat/input/awss3/states.go index 46ecfa1a320..449219a867f 100644 --- a/x-pack/filebeat/input/awss3/states.go +++ b/x-pack/filebeat/input/awss3/states.go @@ -78,10 +78,10 @@ func (s *states) MustSkip(state state, store *statestore.Store) bool { return true } - // we have no previous state or the previous state - // is not stored: refresh the state - if previousState.IsEmpty() || (!previousState.Stored && !previousState.Error) { - s.Update(state, "") + // the previous state is stored or has error: let's skip + if !previousState.IsEmpty() && previousState.IsProcessed() { + s.log.Debugw("previous state is stored or has error", "state", state) + return true } return false @@ -166,7 +166,7 @@ func (s *states) Update(newState state, listingID string) { s.log.Debug("New state added for ", newState.ID) } - if listingID == "" || (!newState.Stored && !newState.Error) { + if listingID == "" || !newState.IsProcessed() { return } diff --git a/x-pack/filebeat/input/awss3/states_test.go b/x-pack/filebeat/input/awss3/states_test.go index b8a7cbb63d1..39dc4cf82e6 100644 --- a/x-pack/filebeat/input/awss3/states_test.go +++ b/x-pack/filebeat/input/awss3/states_test.go @@ -9,22 +9,51 @@ import ( "testing" "time" + "github.com/elastic/beats/v7/filebeat/beater" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/storetest" + "github.com/stretchr/testify/assert" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/elastic-agent-libs/logp" ) +type testInputStore struct { + registry *statestore.Registry +} + +func openTestStatestore() beater.StateStore { + return &testInputStore{ + registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()), + } +} + +func (s *testInputStore) Close() { + _ = s.registry.Close() +} + +func (s *testInputStore) Access() (*statestore.Store, error) { + return s.registry.Get("filebeat") +} + +func (s *testInputStore) CleanupInterval() time.Duration { + return 24 * time.Hour +} + var inputCtx = v2.Context{ Logger: logp.NewLogger("test"), Cancelation: context.Background(), } -func TestStatesIsNew(t *testing.T) { +func TestStatesIsNewAndMustSkip(t *testing.T) { type stateTestCase struct { - states func() *states - state state - expected bool + states func() *states + state state + mustBeNew bool + persistentStoreKV map[string]interface{} + expectedMustSkip bool + expectedIsNew bool } lastModified := time.Date(2022, time.June, 30, 14, 13, 00, 0, time.UTC) tests := map[string]stateTestCase{ @@ -32,8 +61,9 @@ func TestStatesIsNew(t *testing.T) { states: func() *states { return newStates(inputCtx) }, - state: newState("bucket", "key", "etag", "listPrefix", lastModified), - expected: true, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + expectedMustSkip: false, + expectedIsNew: true, }, "not existing state": { states: func() *states { @@ -41,8 +71,9 @@ func TestStatesIsNew(t *testing.T) { states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "") return states }, - state: newState("bucket1", "key1", "etag1", "listPrefix1", lastModified), - expected: true, + state: newState("bucket1", "key1", "etag1", "listPrefix1", lastModified), + expectedMustSkip: false, + expectedIsNew: true, }, "existing state": { states: func() *states { @@ -50,8 +81,9 @@ func TestStatesIsNew(t *testing.T) { states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "") return states }, - state: newState("bucket", "key", "etag", "listPrefix", lastModified), - expected: false, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + expectedMustSkip: true, + expectedIsNew: false, }, "with different etag": { states: func() *states { @@ -59,8 +91,9 @@ func TestStatesIsNew(t *testing.T) { states.Update(newState("bucket", "key", "etag1", "listPrefix", lastModified), "") return states }, - state: newState("bucket", "key", "etag2", "listPrefix", lastModified), - expected: true, + state: newState("bucket", "key", "etag2", "listPrefix", lastModified), + expectedMustSkip: false, + expectedIsNew: true, }, "with different lastmodified": { states: func() *states { @@ -68,8 +101,68 @@ func TestStatesIsNew(t *testing.T) { states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "") return states }, - state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(1*time.Second)), - expected: true, + state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(1*time.Second)), + expectedMustSkip: false, + expectedIsNew: true, + }, + "with stored state": { + states: func() *states { + states := newStates(inputCtx) + aState := newState("bucket", "key", "etag", "listPrefix", lastModified) + aState.Stored = true + states.Update(aState, "") + return states + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + mustBeNew: true, + expectedMustSkip: true, + expectedIsNew: true, + }, + "with error state": { + states: func() *states { + states := newStates(inputCtx) + aState := newState("bucket", "key", "etag", "listPrefix", lastModified) + aState.Error = true + states.Update(aState, "") + return states + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + mustBeNew: true, + expectedMustSkip: true, + expectedIsNew: true, + }, + "before commit write": { + states: func() *states { + return newStates(inputCtx) + }, + persistentStoreKV: map[string]interface{}{ + awsS3WriteCommitPrefix + "bucket" + "listPrefix": &commitWriteState{lastModified}, + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(-1*time.Second)), + expectedMustSkip: true, + expectedIsNew: true, + }, + "same commit write": { + states: func() *states { + return newStates(inputCtx) + }, + persistentStoreKV: map[string]interface{}{ + awsS3WriteCommitPrefix + "bucket" + "listPrefix": &commitWriteState{lastModified}, + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified), + expectedMustSkip: true, + expectedIsNew: true, + }, + "after commit write": { + states: func() *states { + return newStates(inputCtx) + }, + persistentStoreKV: map[string]interface{}{ + awsS3WriteCommitPrefix + "bucket" + "listPrefix": &commitWriteState{lastModified}, + }, + state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(time.Second)), + expectedMustSkip: false, + expectedIsNew: true, }, } @@ -77,8 +170,24 @@ func TestStatesIsNew(t *testing.T) { test := test t.Run(name, func(t *testing.T) { states := test.states() + store := openTestStatestore() + persistentStore, err := store.Access() + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + for key, value := range test.persistentStoreKV { + _ = persistentStore.Set(key, value) + } + + if test.mustBeNew { + test.state.LastModified = test.state.LastModified.Add(1 * time.Second) + } + isNew := states.IsNew(test.state) - assert.Equal(t, test.expected, isNew) + assert.Equal(t, test.expectedIsNew, isNew) + + mustSkip := states.MustSkip(test.state, persistentStore) + assert.Equal(t, test.expectedMustSkip, mustSkip) }) } }