Skip to content

Commit

Permalink
fix in handling states and purging (#33722)
Browse files Browse the repository at this point in the history
* fix in handling states and purging

* changelog

* increase coverage

* cr fixes

* remove code duplication in tests
  • Loading branch information
Andrea Spacca authored and chrisberkhout committed Jun 1, 2023
1 parent 0e777dd commit 4c3775e
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
*Filebeat*

- Fix EOF on single line not producing any event. {issue}30436[30436] {pull}33568[33568]
- Fix handling of error in states in direct aws-s3 listing input {issue}33513[33513] {pull}33722[33722]
- Fix `httpjson` input page number initialization and documentation. {pull}33400[33400]
- Add handling of AAA operations for Cisco ASA module. {issue}32257[32257] {pull}32789[32789]
- Fix gc.log always shipped even if gc fileset is disabled {issue}30995[30995]
Expand Down
25 changes: 0 additions & 25 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,8 @@ import (
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/filebeat/beater"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -134,28 +131,6 @@ file_selectors:
`, queueURL))
}

type testInputStore struct {
registry *statestore.Registry
}

func openTestStatestore() beater.StateStore {
return &testInputStore{
registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()),
}
}

func (s *testInputStore) Close() {
s.registry.Close()
}

func (s *testInputStore) Access() (*statestore.Store, error) {
return s.registry.Get("filebeat")
}

func (s *testInputStore) CleanupInterval() time.Duration {
return 24 * time.Hour
}

func createInput(t *testing.T, cfg *conf.C) *s3Input {
inputV2, err := Plugin(openTestStatestore()).Manager.Create(cfg)
if err != nil {
Expand Down
13 changes: 9 additions & 4 deletions x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,12 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<-
continue
}

p.states.Update(state, "")
// we have no previous state or the previous state
// is not stored: refresh the state
previousState := p.states.FindPrevious(state)
if previousState.IsEmpty() || !previousState.IsProcessed() {
p.states.Update(state, "")
}

event := s3EventV2{}
event.AWSRegion = p.region
Expand Down Expand Up @@ -282,8 +287,8 @@ func (p *s3Poller) Purge() {

for _, state := range p.states.GetStatesByListingID(listingID) {
// it is not stored, keep
if !state.Stored {
p.log.Debugw("state not stored, skip purge", "state", state)
if !state.IsProcessed() {
p.log.Debugw("state not stored or with error, skip purge", "state", state)
continue
}

Expand All @@ -294,7 +299,7 @@ func (p *s3Poller) Purge() {
var commitWriteState commitWriteState
err := p.store.Get(awsS3WriteCommitPrefix+state.Bucket+state.ListPrefix, &commitWriteState)
if err == nil {
// we have no entry in the map and we have no entry in the store
// we have no entry in the map, and we have no entry in the store
// set zero time
latestStoredTime = time.Time{}
p.log.Debugw("last stored time is zero time", "bucket", state.Bucket, "listPrefix", state.ListPrefix)
Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/input/awss3/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func (s *state) MarkAsError() {
s.Error = true
}

// IsProcessed checks if the state is either Stored or Error
func (s *state) IsProcessed() bool {
return s.Stored || s.Error
}

// IsEqual checks if the two states point to the same s3 object.
func (s *state) IsEqual(c *state) bool {
return s.Bucket == c.Bucket && s.Key == c.Key && s.Etag == c.Etag && s.LastModified.Equal(c.LastModified)
Expand Down
10 changes: 5 additions & 5 deletions x-pack/filebeat/input/awss3/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ func (s *states) MustSkip(state state, store *statestore.Store) bool {
return true
}

// we have no previous state or the previous state
// is not stored: refresh the state
if previousState.IsEmpty() || (!previousState.Stored && !previousState.Error) {
s.Update(state, "")
// the previous state is stored or has error: let's skip
if !previousState.IsEmpty() && previousState.IsProcessed() {
s.log.Debugw("previous state is stored or has error", "state", state)
return true
}

return false
Expand Down Expand Up @@ -166,7 +166,7 @@ func (s *states) Update(newState state, listingID string) {
s.log.Debug("New state added for ", newState.ID)
}

if listingID == "" || (!newState.Stored && !newState.Error) {
if listingID == "" || !newState.IsProcessed() {
return
}

Expand Down
139 changes: 124 additions & 15 deletions x-pack/filebeat/input/awss3/states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,76 +9,185 @@ import (
"testing"
"time"

"github.com/elastic/beats/v7/filebeat/beater"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"

"github.com/stretchr/testify/assert"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/elastic-agent-libs/logp"
)

type testInputStore struct {
registry *statestore.Registry
}

func openTestStatestore() beater.StateStore {
return &testInputStore{
registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()),
}
}

func (s *testInputStore) Close() {
_ = s.registry.Close()
}

func (s *testInputStore) Access() (*statestore.Store, error) {
return s.registry.Get("filebeat")
}

func (s *testInputStore) CleanupInterval() time.Duration {
return 24 * time.Hour
}

var inputCtx = v2.Context{
Logger: logp.NewLogger("test"),
Cancelation: context.Background(),
}

func TestStatesIsNew(t *testing.T) {
func TestStatesIsNewAndMustSkip(t *testing.T) {
type stateTestCase struct {
states func() *states
state state
expected bool
states func() *states
state state
mustBeNew bool
persistentStoreKV map[string]interface{}
expectedMustSkip bool
expectedIsNew bool
}
lastModified := time.Date(2022, time.June, 30, 14, 13, 00, 0, time.UTC)
tests := map[string]stateTestCase{
"with empty states": {
states: func() *states {
return newStates(inputCtx)
},
state: newState("bucket", "key", "etag", "listPrefix", lastModified),
expected: true,
state: newState("bucket", "key", "etag", "listPrefix", lastModified),
expectedMustSkip: false,
expectedIsNew: true,
},
"not existing state": {
states: func() *states {
states := newStates(inputCtx)
states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "")
return states
},
state: newState("bucket1", "key1", "etag1", "listPrefix1", lastModified),
expected: true,
state: newState("bucket1", "key1", "etag1", "listPrefix1", lastModified),
expectedMustSkip: false,
expectedIsNew: true,
},
"existing state": {
states: func() *states {
states := newStates(inputCtx)
states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "")
return states
},
state: newState("bucket", "key", "etag", "listPrefix", lastModified),
expected: false,
state: newState("bucket", "key", "etag", "listPrefix", lastModified),
expectedMustSkip: true,
expectedIsNew: false,
},
"with different etag": {
states: func() *states {
states := newStates(inputCtx)
states.Update(newState("bucket", "key", "etag1", "listPrefix", lastModified), "")
return states
},
state: newState("bucket", "key", "etag2", "listPrefix", lastModified),
expected: true,
state: newState("bucket", "key", "etag2", "listPrefix", lastModified),
expectedMustSkip: false,
expectedIsNew: true,
},
"with different lastmodified": {
states: func() *states {
states := newStates(inputCtx)
states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "")
return states
},
state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(1*time.Second)),
expected: true,
state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(1*time.Second)),
expectedMustSkip: false,
expectedIsNew: true,
},
"with stored state": {
states: func() *states {
states := newStates(inputCtx)
aState := newState("bucket", "key", "etag", "listPrefix", lastModified)
aState.Stored = true
states.Update(aState, "")
return states
},
state: newState("bucket", "key", "etag", "listPrefix", lastModified),
mustBeNew: true,
expectedMustSkip: true,
expectedIsNew: true,
},
"with error state": {
states: func() *states {
states := newStates(inputCtx)
aState := newState("bucket", "key", "etag", "listPrefix", lastModified)
aState.Error = true
states.Update(aState, "")
return states
},
state: newState("bucket", "key", "etag", "listPrefix", lastModified),
mustBeNew: true,
expectedMustSkip: true,
expectedIsNew: true,
},
"before commit write": {
states: func() *states {
return newStates(inputCtx)
},
persistentStoreKV: map[string]interface{}{
awsS3WriteCommitPrefix + "bucket" + "listPrefix": &commitWriteState{lastModified},
},
state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(-1*time.Second)),
expectedMustSkip: true,
expectedIsNew: true,
},
"same commit write": {
states: func() *states {
return newStates(inputCtx)
},
persistentStoreKV: map[string]interface{}{
awsS3WriteCommitPrefix + "bucket" + "listPrefix": &commitWriteState{lastModified},
},
state: newState("bucket", "key", "etag", "listPrefix", lastModified),
expectedMustSkip: true,
expectedIsNew: true,
},
"after commit write": {
states: func() *states {
return newStates(inputCtx)
},
persistentStoreKV: map[string]interface{}{
awsS3WriteCommitPrefix + "bucket" + "listPrefix": &commitWriteState{lastModified},
},
state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(time.Second)),
expectedMustSkip: false,
expectedIsNew: true,
},
}

for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
states := test.states()
store := openTestStatestore()
persistentStore, err := store.Access()
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
for key, value := range test.persistentStoreKV {
_ = persistentStore.Set(key, value)
}

if test.mustBeNew {
test.state.LastModified = test.state.LastModified.Add(1 * time.Second)
}

isNew := states.IsNew(test.state)
assert.Equal(t, test.expected, isNew)
assert.Equal(t, test.expectedIsNew, isNew)

mustSkip := states.MustSkip(test.state, persistentStore)
assert.Equal(t, test.expectedMustSkip, mustSkip)
})
}
}
Expand Down

0 comments on commit 4c3775e

Please sign in to comment.