From 6ae9c777e2fe150a315cc1e7ca3a8d35e58938b9 Mon Sep 17 00:00:00 2001 From: Jochen Ullrich Date: Wed, 2 Nov 2022 14:00:02 +0100 Subject: [PATCH 01/15] Add backup to bucket functionality Adds a functionality to backup processed files to another or the same bucket with an optional prefix. If enabled it will delete files from the source bucket. --- x-pack/filebeat/input/awss3/config.go | 24 ++++ x-pack/filebeat/input/awss3/config_test.go | 68 +++++++++++ x-pack/filebeat/input/awss3/input.go | 4 +- .../input/awss3/input_benchmark_test.go | 12 +- x-pack/filebeat/input/awss3/interfaces.go | 29 +++++ .../input/awss3/mock_interfaces_test.go | 26 ++++ x-pack/filebeat/input/awss3/s3_objects.go | 33 ++++- .../filebeat/input/awss3/s3_objects_test.go | 115 +++++++++++++++++- x-pack/filebeat/input/awss3/s3_test.go | 4 +- 9 files changed, 303 insertions(+), 12 deletions(-) diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index cd26b57d0b19..5b40fd629169 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -37,6 +37,7 @@ type config struct { ReaderConfig readerConfig `config:",inline"` // Reader options to apply when no file_selectors are used. PathStyle bool `config:"path_style"` ProviderOverride string `config:"provider"` + BackupConfig backupConfig `config:",inline"` } func defaultConfig() config { @@ -106,10 +107,33 @@ func (c *config) Validate() error { if c.ProviderOverride != "" && c.NonAWSBucketName == "" { return errors.New("provider can only be overriden when polling non-AWS S3 services") } + if c.BackupConfig.NonAWSBackupToBucketName != "" && c.NonAWSBucketName == "" { + return errors.New("backup to non-AWS bucket can only be used for non-AWS sources") + } + if c.BackupConfig.BackupToBucketArn != "" && c.BucketARN == "" { + return errors.New("backup to AWS bucket can only be used for AWS sources") + } + if c.BackupConfig.BackupToBucketArn != "" && c.BackupConfig.NonAWSBackupToBucketName != "" { + return errors.New("backup_to_bucket_arn and non_aws_backup_to_bucket_name cannot be used together") + } return nil } +type backupConfig struct { + BackupToBucketArn string `config:"backup_to_bucket_arn"` + NonAWSBackupToBucketName string `config:"non_aws_backup_to_bucket_name"` + BackupToBucketPrefix string `config:"backup_to_bucket_prefix"` + Delete bool `config:"delete"` +} + +func (c *backupConfig) GetBucketName() string { + if c.BackupToBucketArn != "" { + return getBucketNameFromARN(c.BackupToBucketArn) + } + return c.NonAWSBackupToBucketName +} + // fileSelectorConfig defines reader configuration that applies to a subset // of S3 objects whose URL matches the given regex. type fileSelectorConfig struct { diff --git a/x-pack/filebeat/input/awss3/config_test.go b/x-pack/filebeat/input/awss3/config_test.go index d1c03e06e68f..0552f9417889 100644 --- a/x-pack/filebeat/input/awss3/config_test.go +++ b/x-pack/filebeat/input/awss3/config_test.go @@ -377,6 +377,74 @@ func TestConfig(t *testing.T) { "provider can only be overriden when polling non-AWS S3 services", nil, }, + { + "backup_to_bucket with AWS", + queueURL, + "", + "", + mapstr.M{ + "bucket_arn": "arn:aws:s3:::aBucket", + "backup_to_bucket_arn": "arn:aws:s3:::bBucket", + "backup_to_bucket_prefix": "backup", + "number_of_workers": 5, + }, + "", + func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { + c := makeConfig("", s3Bucket, "") + c.BucketARN = "arn:aws:s3:::aBucket" + c.BackupConfig.BackupToBucketArn = "arn:aws:s3:::bBucket" + c.BackupConfig.BackupToBucketPrefix = "backup" + c.NumberOfWorkers = 5 + return c + }, + }, + { + "backup_to_bucket with non-AWS", + queueURL, + "", + "", + mapstr.M{ + "non_aws_bucket_name": "aBucket", + "non_aws_backup_to_bucket_name": "bBucket", + "backup_to_bucket_prefix": "backup", + "number_of_workers": 5, + }, + "", + func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { + c := makeConfig("", s3Bucket, "") + c.NonAWSBucketName = "aBucket" + c.BackupConfig.NonAWSBackupToBucketName = "bBucket" + c.BackupConfig.BackupToBucketPrefix = "backup" + c.NumberOfWorkers = 5 + return c + }, + }, + { + "error with AWS backup and non-AWS source", + queueURL, + "", + "", + mapstr.M{ + "non_aws_backup_to_bucket_name": "bBucket", + "bucket_arn": "arn:aws:s3:::aBucket", + "number_of_workers": 5, + }, + "backup to non-AWS bucket can only be used for non-AWS sources", + nil, + }, + { + "error with non-AWS backup and AWS source", + queueURL, + "", + "", + mapstr.M{ + "non_aws_bucket_name": "aBucket", + "backup_to_bucket_arn": "arn:aws:s3:::bBucket", + "number_of_workers": 5, + }, + "backup to AWS bucket can only be used for AWS sources", + nil, + }, } for _, tc := range testCases { diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index e93ff4216938..46eebc863b19 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -192,7 +192,7 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsRe if err != nil { return nil, err } - s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, fileSelectors) + s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, fileSelectors, in.config.BackupConfig) sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, s3EventHandlerFactory) sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler) @@ -267,7 +267,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli if len(in.config.FileSelectors) == 0 { fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}} } - s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, fileSelectors) + s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, fileSelectors, in.config.BackupConfig) s3Poller := newS3Poller(log.Named("s3_poller"), metrics, s3API, diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index b1e652ef635c..b8fd62a90636 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -144,6 +144,14 @@ func (c constantS3) GetObject(ctx context.Context, bucket, key string) (*s3.GetO return newS3GetObjectResponse(c.filename, c.data, c.contentType), nil } +func (c constantS3) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) { + return nil, nil +} + +func (c constantS3) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) { + return nil, nil +} + func (c constantS3) ListObjectsPaginator(bucket, prefix string) s3Pager { return c.pagerConstant } @@ -175,7 +183,7 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR defer close(client.Channel) conf := makeBenchmarkConfig(t) - s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors) + s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors, backupConfig{}) sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, s3EventHandlerFactory) sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, maxMessagesInflight, sqsMessageHandler) @@ -313,7 +321,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult return } - s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, config.FileSelectors) + s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, config.FileSelectors, backupConfig{}) s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", listPrefix, "region", "provider", numberOfWorkers, time.Second) if err := s3Poller.Poll(ctx); err != nil { diff --git a/x-pack/filebeat/input/awss3/interfaces.go b/x-pack/filebeat/input/awss3/interfaces.go index 0196f831af9a..38a239919a6d 100644 --- a/x-pack/filebeat/input/awss3/interfaces.go +++ b/x-pack/filebeat/input/awss3/interfaces.go @@ -68,6 +68,7 @@ type sqsProcessor interface { type s3API interface { s3Getter + s3Mover s3Lister } @@ -75,6 +76,11 @@ type s3Getter interface { GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) } +type s3Mover interface { + CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) + DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) +} + type s3Lister interface { ListObjectsPaginator(bucket, prefix string) s3Pager } @@ -227,6 +233,29 @@ func (a *awsS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetOb return getObjectOutput, nil } +func (a *awsS3API) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) { + copyObjectOutput, err := a.client.CopyObject(ctx, &s3.CopyObjectInput{ + Bucket: awssdk.String(to_bucket), + CopySource: awssdk.String(fmt.Sprintf("%s/%s", from_bucket, from_key)), + Key: awssdk.String(to_key), + }) + if err != nil { + return nil, fmt.Errorf("s3 CopyObject failed: %w", err) + } + return copyObjectOutput, nil +} + +func (a *awsS3API) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) { + deleteObjectOutput, err := a.client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: awssdk.String(bucket), + Key: awssdk.String(key), + }) + if err != nil { + return nil, fmt.Errorf("s3 DeleteObject failed: %w", err) + } + return deleteObjectOutput, nil +} + func (a *awsS3API) ListObjectsPaginator(bucket, prefix string) s3Pager { pager := s3.NewListObjectsV2Paginator(a.client, &s3.ListObjectsV2Input{ Bucket: awssdk.String(bucket), diff --git a/x-pack/filebeat/input/awss3/mock_interfaces_test.go b/x-pack/filebeat/input/awss3/mock_interfaces_test.go index 63d269183026..4d61dc251e0b 100644 --- a/x-pack/filebeat/input/awss3/mock_interfaces_test.go +++ b/x-pack/filebeat/input/awss3/mock_interfaces_test.go @@ -274,6 +274,32 @@ func (mr *MockS3APIMockRecorder) GetObject(ctx, bucket, key interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObject", reflect.TypeOf((*MockS3API)(nil).GetObject), ctx, bucket, key) } +func (m *MockS3API) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CopyObject", ctx, from_bucket, to_bucket, from_key, to_key) + ret0, _ := ret[0].(*s3.CopyObjectOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (mr *MockS3APIMockRecorder) CopyObject(ctx, from_bucket, to_bucket, from_key, to_key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CopyObject", reflect.TypeOf((*MockS3API)(nil).GetObject), ctx, from_bucket, to_bucket, from_key, to_key) +} + +func (m *MockS3API) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteObject", ctx, bucket, key) + ret0, _ := ret[0].(*s3.DeleteObjectOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (mr *MockS3APIMockRecorder) DeleteObject(ctx, bucket, key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockS3API)(nil).GetObject), ctx, bucket, key) +} + // ListObjectsPaginator mocks base method. func (m *MockS3API) ListObjectsPaginator(bucket, prefix string) s3Pager { m.ctrl.T.Helper() diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index a1d70c604c26..cfc106437cc3 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -41,12 +41,13 @@ const ( type s3ObjectProcessorFactory struct { log *logp.Logger metrics *inputMetrics - s3 s3Getter + s3 s3API publisher beat.Client fileSelectors []fileSelectorConfig + backupConfig backupConfig } -func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3Getter, publisher beat.Client, sel []fileSelectorConfig) *s3ObjectProcessorFactory { +func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3API, publisher beat.Client, sel []fileSelectorConfig, backupConfig backupConfig) *s3ObjectProcessorFactory { if metrics == nil { metrics = newInputMetrics(monitoring.NewRegistry(), "") } @@ -61,6 +62,7 @@ func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3G s3: s3, publisher: publisher, fileSelectors: sel, + backupConfig: backupConfig, } } @@ -158,11 +160,17 @@ func (p *s3ObjectProcessor) ProcessS3Object() error { default: err = p.readFile(reader) } + if err != nil { return fmt.Errorf("failed reading s3 object (elapsed_time_ns=%d): %w", time.Since(start).Nanoseconds(), err) } + err = p.finalizeObject() + if err != nil { + return fmt.Errorf("failed to finalize s3 object (elapsed_time_ns=%d): %w", time.Since(start).Nanoseconds(), err) + } + return nil } @@ -361,6 +369,27 @@ func (p *s3ObjectProcessor) createEvent(message string, offset int64) beat.Event return event } +func (p *s3ObjectProcessor) finalizeObject() error { + bucketName := p.backupConfig.GetBucketName() + if bucketName != "" { + backupKey := p.s3Obj.S3.Object.Key + if p.backupConfig.BackupToBucketPrefix != "" { + backupKey = fmt.Sprintf("%s%s", p.backupConfig.BackupToBucketPrefix, backupKey) + } + _, err := p.s3.CopyObject(p.ctx, p.s3Obj.S3.Bucket.Name, bucketName, p.s3Obj.S3.Object.Key, backupKey) + if err != nil { + return fmt.Errorf("failed to copy object to backup bucket: %w", err) + } + if p.backupConfig.Delete { + _, err = p.s3.DeleteObject(p.ctx, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key) + if err != nil { + return fmt.Errorf("failed to delete object from bucket: %w", err) + } + } + } + return nil +} + func objectID(objectHash string, offset int64) string { return fmt.Sprintf("%s-%012d", objectHash, offset) } diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index 4541874303ba..f697b05b3d03 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -144,7 +144,7 @@ func TestS3ObjectProcessor(t *testing.T) { GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). Return(nil, errFakeConnectivityFailure) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, nil) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, nil, backupConfig{}) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() require.Error(t, err) @@ -166,7 +166,7 @@ func TestS3ObjectProcessor(t *testing.T) { GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). Return(nil, nil) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, nil) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, nil, backupConfig{}) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() require.Error(t, err) @@ -193,7 +193,114 @@ func TestS3ObjectProcessor(t *testing.T) { Times(2), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, nil) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, nil, backupConfig{}) + ack := awscommon.NewEventACKTracker(ctx) + err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() + require.NoError(t, err) + }) + + t.Run("backups objects after reading", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + mockS3API := NewMockS3API(ctrl) + mockPublisher := NewMockBeatClient(ctrl) + s3Event, s3Resp := newS3Object(t, "testdata/log.txt", "") + + backupCfg := backupConfig{ + BackupToBucketArn: "arn:aws:s3:::backup", + } + + var events []beat.Event + gomock.InOrder( + mockS3API.EXPECT(). + GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). + Return(s3Resp, nil), + mockPublisher.EXPECT(). + Publish(gomock.Any()). + Do(func(event beat.Event) { events = append(events, event) }). + Times(2), + mockS3API.EXPECT(). + CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)). + Return(nil, nil), + ) + + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, nil, backupCfg) + ack := awscommon.NewEventACKTracker(ctx) + err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() + require.NoError(t, err) + }) + + t.Run("deletes objects after backing up", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + mockS3API := NewMockS3API(ctrl) + mockPublisher := NewMockBeatClient(ctrl) + s3Event, s3Resp := newS3Object(t, "testdata/log.txt", "") + + backupCfg := backupConfig{ + BackupToBucketArn: "arn:aws:s3:::backup", + Delete: true, + } + + var events []beat.Event + gomock.InOrder( + mockS3API.EXPECT(). + GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). + Return(s3Resp, nil), + mockPublisher.EXPECT(). + Publish(gomock.Any()). + Do(func(event beat.Event) { events = append(events, event) }). + Times(2), + mockS3API.EXPECT(). + CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)). + Return(nil, nil), + mockS3API.EXPECT(). + DeleteObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). + Return(nil, nil), + ) + + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, nil, backupCfg) + ack := awscommon.NewEventACKTracker(ctx) + err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() + require.NoError(t, err) + }) + + t.Run("prefixes objects when backing up", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + mockS3API := NewMockS3API(ctrl) + mockPublisher := NewMockBeatClient(ctrl) + s3Event, s3Resp := newS3Object(t, "testdata/log.txt", "") + + backupCfg := backupConfig{ + BackupToBucketArn: s3Event.S3.Bucket.ARN, + BackupToBucketPrefix: "backup/", + } + + var events []beat.Event + gomock.InOrder( + mockS3API.EXPECT(). + GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). + Return(s3Resp, nil), + mockPublisher.EXPECT(). + Publish(gomock.Any()). + Do(func(event beat.Event) { events = append(events, event) }). + Times(2), + mockS3API.EXPECT(). + CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq("backup/testdata/log.txt")). + Return(nil, nil), + ) + + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, nil, backupCfg) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() require.NoError(t, err) @@ -231,7 +338,7 @@ func _testProcessS3Object(t testing.TB, file, contentType string, numEvents int, Times(numEvents), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, selectors) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, selectors, backupConfig{}) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() diff --git a/x-pack/filebeat/input/awss3/s3_test.go b/x-pack/filebeat/input/awss3/s3_test.go index 367f707b1832..a24c707ac238 100644 --- a/x-pack/filebeat/input/awss3/s3_test.go +++ b/x-pack/filebeat/input/awss3/s3_test.go @@ -125,7 +125,7 @@ func TestS3Poller(t *testing.T) { GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key5")). Return(nil, errFakeConnectivityFailure) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil, backupConfig{}) receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", "provider", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) assert.Equal(t, numberOfWorkers, receiver.workerSem.Available()) @@ -248,7 +248,7 @@ func TestS3Poller(t *testing.T) { GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key5")). Return(nil, errFakeConnectivityFailure) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil, backupConfig{}) receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", "provider", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) assert.Equal(t, numberOfWorkers, receiver.workerSem.Available()) From 962fc91f84891833d63a24d830c653b9da7c1dfe Mon Sep 17 00:00:00 2001 From: Jochen Ullrich Date: Wed, 2 Nov 2022 14:19:42 +0100 Subject: [PATCH 02/15] Add documentation for backup_to_bucket configuration parameters --- .../docs/inputs/input-aws-s3.asciidoc | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 19a47e3e543c..7b9839a6c49c 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -437,6 +437,34 @@ This is only supported with 3rd party S3 providers. AWS does not support path s In order to make AWS API calls, `aws-s3` input requires AWS credentials. Please see <> for more details. +[float] +==== `backup_to_bucket_arn` + +The bucket ARN to backup processed files to. This will copy the processed file after it was fully read. +When using the `non_aws_bucket_name`, please use `non_aws_backup_to_bucket_name` accordingly. + +Naming of the backed up files can be controlled with `backup_to_bucket_prefix`. + +[float] +==== `backup_to_bucket_prefix` + +This prefix will be prepended to the object key when backing it up to another (or the same) bucket. + +[float] +==== `non_aws_backup_to_bucket_name` + +The bucket name to backup processed files to. Use this parameter when not using AWS buckets. This will copy the processed file after it was fully read. +When using the `bucket_arn`, please use `backup_to_bucket_arn` accordingly. + +Naming of the backed up files can be controlled with `backup_to_bucket_prefix`. + +[float] +==== `delete` + +Controls whether fully processed files will be deleted from the bucket. + +Can only be used together with the backup functionality. + [float] === AWS Permissions From da593876afa571c43721db5bcdb7ccc695383db9 Mon Sep 17 00:00:00 2001 From: Jochen Ullrich Date: Wed, 2 Nov 2022 14:23:47 +0100 Subject: [PATCH 03/15] Add configuration to reference config file --- x-pack/filebeat/filebeat.reference.yml | 282 +++++++++++++------------ 1 file changed, 146 insertions(+), 136 deletions(-) diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index e61f7bfdfced..d62b7ab73598 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -169,6 +169,16 @@ filebeat.modules: # Configures the SSL settings, ie. set trusted CAs, ignore certificate verification.... #var.ssl: + # Configures backing up processed files to another (or the same) bucket + #var.backup_to_bucket_arn: 'arn:aws:s3:::mybucket' + #var.non_aws_backup_to_bucket_name: 'mybucket' + + # Sets a prefix to prepend to object keys when backing up + #var.backup_to_bucket_prefix: 'backup/' + + # Controls deletion of objects after backing them up + #var.delete: false + cloudwatch: enabled: false @@ -843,7 +853,7 @@ filebeat.modules: #------------------------------- Coredns Module ------------------------------- - module: coredns # Fileset for native deployment - log: + log: enabled: false # Set custom paths for the log files. If left empty, @@ -852,7 +862,7 @@ filebeat.modules: #----------------------------- Crowdstrike Module ----------------------------- - module: crowdstrike - + falcon: enabled: false @@ -944,7 +954,7 @@ filebeat.modules: #------------------------------ Envoyproxy Module ------------------------------ - module: envoyproxy # Fileset for native deployment - log: + log: enabled: false # Set custom paths for the log files. If left empty, @@ -1473,7 +1483,7 @@ filebeat.modules: # Oauth Token URL, should include the tenant ID #var.oauth2.token_url: "https://login.microsoftonline.com/TENANT-ID/oauth2/v2.0/token" - + # Related scopes, default should be included #var.oauth2.scopes: # - "https://api.security.microsoft.com/.default" @@ -1857,133 +1867,133 @@ filebeat.modules: #var.password: #------------------------------ Salesforce Module ------------------------------ -- module: salesforce - - apex-rest: - enabled: false - - # Oauth Client ID - #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Client Secret - #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Token URL - #var.token_url: "https://login.salesforce.com/services/oauth2/token" - - # Oauth User, should include the User mail - #var.user: "abc.xyz@mail.com" - - # Oauth password, should include the User password - #var.password: "P@$$W0₹D" - - # URL, should include the instance_url - #var.url: "https://instance_id.my.salesforce.com" - - login-rest: - enabled: false - - # Oauth Client ID - #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Client Secret - #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Token URL - #var.token_url: "https://login.salesforce.com/services/oauth2/token" - - # Oauth User, should include the User mail - #var.user: "abc.xyz@mail.com" - - # Oauth password, should include the User password - #var.password: "P@$$W0₹D" - - # URL, should include the instance_url - #var.url: "https://instance_id.my.salesforce.com" - - login-stream: - enabled: false - - # Oauth Client ID - #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Client Secret - #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Token URL - #var.token_url: "https://login.salesforce.com/services/oauth2/token" - - # Oauth User, should include the User mail - #var.user: "abc.xyz@mail.com" - - # Oauth password, should include the User password - #var.password: "P@$$W0₹D" - - # URL, should include the instance_url - #var.url: "https://instance_id.my.salesforce.com" - - logout-rest: - enabled: false - - # Oauth Client ID - #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Client Secret - #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Token URL - #var.token_url: "https://login.salesforce.com/services/oauth2/token" - - # Oauth User, should include the User mail - #var.user: "abc.xyz@mail.com" - - # Oauth password, should include the User password - #var.password: "P@$$W0₹D" - - # URL, should include the instance_url - #var.url: "https://instance_id.my.salesforce.com" - - logout-stream: - enabled: false - - # Oauth Client ID - #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Client Secret - #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Token URL - #var.token_url: "https://login.salesforce.com/services/oauth2/token" - - # Oauth User, should include the User mail - #var.user: "abc.xyz@mail.com" - - # Oauth password, should include the User password - #var.password: "P@$$W0₹D" - - # URL, should include the instance_url - #var.url: "https://instance_id.my.salesforce.com" - - setupaudittrail-rest: - enabled: false - - # Oauth Client ID - #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Client Secret - #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Token URL - #var.token_url: "https://login.salesforce.com/services/oauth2/token" - - # Oauth User, should include the User mail - #var.user: "abc.xyz@mail.com" - - # Oauth password, should include the User password - #var.password: "P@$$W0₹D" - - # URL, should include the instance_url - #var.url: "https://instance_id.my.salesforce.com" +- module: salesforce + + apex-rest: + enabled: false + + # Oauth Client ID + #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Client Secret + #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Token URL + #var.token_url: "https://login.salesforce.com/services/oauth2/token" + + # Oauth User, should include the User mail + #var.user: "abc.xyz@mail.com" + + # Oauth password, should include the User password + #var.password: "P@$$W0₹D" + + # URL, should include the instance_url + #var.url: "https://instance_id.my.salesforce.com" + + login-rest: + enabled: false + + # Oauth Client ID + #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Client Secret + #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Token URL + #var.token_url: "https://login.salesforce.com/services/oauth2/token" + + # Oauth User, should include the User mail + #var.user: "abc.xyz@mail.com" + + # Oauth password, should include the User password + #var.password: "P@$$W0₹D" + + # URL, should include the instance_url + #var.url: "https://instance_id.my.salesforce.com" + + login-stream: + enabled: false + + # Oauth Client ID + #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Client Secret + #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Token URL + #var.token_url: "https://login.salesforce.com/services/oauth2/token" + + # Oauth User, should include the User mail + #var.user: "abc.xyz@mail.com" + + # Oauth password, should include the User password + #var.password: "P@$$W0₹D" + + # URL, should include the instance_url + #var.url: "https://instance_id.my.salesforce.com" + + logout-rest: + enabled: false + + # Oauth Client ID + #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Client Secret + #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Token URL + #var.token_url: "https://login.salesforce.com/services/oauth2/token" + + # Oauth User, should include the User mail + #var.user: "abc.xyz@mail.com" + + # Oauth password, should include the User password + #var.password: "P@$$W0₹D" + + # URL, should include the instance_url + #var.url: "https://instance_id.my.salesforce.com" + + logout-stream: + enabled: false + + # Oauth Client ID + #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Client Secret + #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Token URL + #var.token_url: "https://login.salesforce.com/services/oauth2/token" + + # Oauth User, should include the User mail + #var.user: "abc.xyz@mail.com" + + # Oauth password, should include the User password + #var.password: "P@$$W0₹D" + + # URL, should include the instance_url + #var.url: "https://instance_id.my.salesforce.com" + + setupaudittrail-rest: + enabled: false + + # Oauth Client ID + #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Client Secret + #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Token URL + #var.token_url: "https://login.salesforce.com/services/oauth2/token" + + # Oauth User, should include the User mail + #var.user: "abc.xyz@mail.com" + + # Oauth password, should include the User password + #var.password: "P@$$W0₹D" + + # URL, should include the instance_url + #var.url: "https://instance_id.my.salesforce.com" #----------------------------- Google Santa Module ----------------------------- - module: santa @@ -2973,7 +2983,7 @@ filebeat.inputs: # Source of container events. Available options: all, stdin, stderr. #stream: all - # Format of the container events. Available options: auto, cri, docker, json-file + # Format of the container events. Available options: auto, cri, docker, json-file #format: auto ### Log rotation @@ -2983,7 +2993,7 @@ filebeat.inputs: #rotation.external.strategy.copytruncate: # Regex that matches the rotated files. # suffix_regex: \.\d$ - # If the rotated filename suffix is a datetime, set it here. + # If the rotated filename suffix is a datetime, set it here. # dateformat: -20060102 ### State options @@ -3180,7 +3190,7 @@ filebeat.inputs: #hosts: #- kafka-broker-1:9092 #- kafka-broker-2:9092 - + # A list of topics to read from. #topics: ["my-topic", "important-logs"] @@ -3215,7 +3225,7 @@ filebeat.inputs: # The minimum number of bytes to wait for. #fetch.min: 1 - + # The default number of bytes to read per request. #fetch.default: 1MB @@ -4538,7 +4548,7 @@ output.elasticsearch: # Permissions to use for file creation. The default is 0600. #permissions: 0600 - + # Configure automatic file rotation on every startup. The default is true. #rotate_on_startup: true From 58d72483b0398dbbf2b7d8fbaf3a60869512a32f Mon Sep 17 00:00:00 2001 From: Jochen Ullrich Date: Wed, 2 Nov 2022 14:33:20 +0100 Subject: [PATCH 04/15] Revert "Add configuration to reference config file" This reverts commit da593876afa571c43721db5bcdb7ccc695383db9. --- x-pack/filebeat/filebeat.reference.yml | 282 ++++++++++++------------- 1 file changed, 136 insertions(+), 146 deletions(-) diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index d62b7ab73598..e61f7bfdfced 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -169,16 +169,6 @@ filebeat.modules: # Configures the SSL settings, ie. set trusted CAs, ignore certificate verification.... #var.ssl: - # Configures backing up processed files to another (or the same) bucket - #var.backup_to_bucket_arn: 'arn:aws:s3:::mybucket' - #var.non_aws_backup_to_bucket_name: 'mybucket' - - # Sets a prefix to prepend to object keys when backing up - #var.backup_to_bucket_prefix: 'backup/' - - # Controls deletion of objects after backing them up - #var.delete: false - cloudwatch: enabled: false @@ -853,7 +843,7 @@ filebeat.modules: #------------------------------- Coredns Module ------------------------------- - module: coredns # Fileset for native deployment - log: + log: enabled: false # Set custom paths for the log files. If left empty, @@ -862,7 +852,7 @@ filebeat.modules: #----------------------------- Crowdstrike Module ----------------------------- - module: crowdstrike - + falcon: enabled: false @@ -954,7 +944,7 @@ filebeat.modules: #------------------------------ Envoyproxy Module ------------------------------ - module: envoyproxy # Fileset for native deployment - log: + log: enabled: false # Set custom paths for the log files. If left empty, @@ -1483,7 +1473,7 @@ filebeat.modules: # Oauth Token URL, should include the tenant ID #var.oauth2.token_url: "https://login.microsoftonline.com/TENANT-ID/oauth2/v2.0/token" - + # Related scopes, default should be included #var.oauth2.scopes: # - "https://api.security.microsoft.com/.default" @@ -1867,133 +1857,133 @@ filebeat.modules: #var.password: #------------------------------ Salesforce Module ------------------------------ -- module: salesforce - - apex-rest: - enabled: false - - # Oauth Client ID - #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Client Secret - #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Token URL - #var.token_url: "https://login.salesforce.com/services/oauth2/token" - - # Oauth User, should include the User mail - #var.user: "abc.xyz@mail.com" - - # Oauth password, should include the User password - #var.password: "P@$$W0₹D" - - # URL, should include the instance_url - #var.url: "https://instance_id.my.salesforce.com" - - login-rest: - enabled: false - - # Oauth Client ID - #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Client Secret - #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Token URL - #var.token_url: "https://login.salesforce.com/services/oauth2/token" - - # Oauth User, should include the User mail - #var.user: "abc.xyz@mail.com" - - # Oauth password, should include the User password - #var.password: "P@$$W0₹D" - - # URL, should include the instance_url - #var.url: "https://instance_id.my.salesforce.com" - - login-stream: - enabled: false - - # Oauth Client ID - #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Client Secret - #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Token URL - #var.token_url: "https://login.salesforce.com/services/oauth2/token" - - # Oauth User, should include the User mail - #var.user: "abc.xyz@mail.com" - - # Oauth password, should include the User password - #var.password: "P@$$W0₹D" - - # URL, should include the instance_url - #var.url: "https://instance_id.my.salesforce.com" - - logout-rest: - enabled: false - - # Oauth Client ID - #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Client Secret - #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Token URL - #var.token_url: "https://login.salesforce.com/services/oauth2/token" - - # Oauth User, should include the User mail - #var.user: "abc.xyz@mail.com" - - # Oauth password, should include the User password - #var.password: "P@$$W0₹D" - - # URL, should include the instance_url - #var.url: "https://instance_id.my.salesforce.com" - - logout-stream: - enabled: false - - # Oauth Client ID - #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Client Secret - #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Token URL - #var.token_url: "https://login.salesforce.com/services/oauth2/token" - - # Oauth User, should include the User mail - #var.user: "abc.xyz@mail.com" - - # Oauth password, should include the User password - #var.password: "P@$$W0₹D" - - # URL, should include the instance_url - #var.url: "https://instance_id.my.salesforce.com" - - setupaudittrail-rest: - enabled: false - - # Oauth Client ID - #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Client Secret - #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - - # Oauth Token URL - #var.token_url: "https://login.salesforce.com/services/oauth2/token" - - # Oauth User, should include the User mail - #var.user: "abc.xyz@mail.com" - - # Oauth password, should include the User password - #var.password: "P@$$W0₹D" - - # URL, should include the instance_url - #var.url: "https://instance_id.my.salesforce.com" +- module: salesforce + + apex-rest: + enabled: false + + # Oauth Client ID + #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Client Secret + #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Token URL + #var.token_url: "https://login.salesforce.com/services/oauth2/token" + + # Oauth User, should include the User mail + #var.user: "abc.xyz@mail.com" + + # Oauth password, should include the User password + #var.password: "P@$$W0₹D" + + # URL, should include the instance_url + #var.url: "https://instance_id.my.salesforce.com" + + login-rest: + enabled: false + + # Oauth Client ID + #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Client Secret + #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Token URL + #var.token_url: "https://login.salesforce.com/services/oauth2/token" + + # Oauth User, should include the User mail + #var.user: "abc.xyz@mail.com" + + # Oauth password, should include the User password + #var.password: "P@$$W0₹D" + + # URL, should include the instance_url + #var.url: "https://instance_id.my.salesforce.com" + + login-stream: + enabled: false + + # Oauth Client ID + #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Client Secret + #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Token URL + #var.token_url: "https://login.salesforce.com/services/oauth2/token" + + # Oauth User, should include the User mail + #var.user: "abc.xyz@mail.com" + + # Oauth password, should include the User password + #var.password: "P@$$W0₹D" + + # URL, should include the instance_url + #var.url: "https://instance_id.my.salesforce.com" + + logout-rest: + enabled: false + + # Oauth Client ID + #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Client Secret + #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Token URL + #var.token_url: "https://login.salesforce.com/services/oauth2/token" + + # Oauth User, should include the User mail + #var.user: "abc.xyz@mail.com" + + # Oauth password, should include the User password + #var.password: "P@$$W0₹D" + + # URL, should include the instance_url + #var.url: "https://instance_id.my.salesforce.com" + + logout-stream: + enabled: false + + # Oauth Client ID + #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Client Secret + #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Token URL + #var.token_url: "https://login.salesforce.com/services/oauth2/token" + + # Oauth User, should include the User mail + #var.user: "abc.xyz@mail.com" + + # Oauth password, should include the User password + #var.password: "P@$$W0₹D" + + # URL, should include the instance_url + #var.url: "https://instance_id.my.salesforce.com" + + setupaudittrail-rest: + enabled: false + + # Oauth Client ID + #var.client_id: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Client Secret + #var.client_secret: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + + # Oauth Token URL + #var.token_url: "https://login.salesforce.com/services/oauth2/token" + + # Oauth User, should include the User mail + #var.user: "abc.xyz@mail.com" + + # Oauth password, should include the User password + #var.password: "P@$$W0₹D" + + # URL, should include the instance_url + #var.url: "https://instance_id.my.salesforce.com" #----------------------------- Google Santa Module ----------------------------- - module: santa @@ -2983,7 +2973,7 @@ filebeat.inputs: # Source of container events. Available options: all, stdin, stderr. #stream: all - # Format of the container events. Available options: auto, cri, docker, json-file + # Format of the container events. Available options: auto, cri, docker, json-file #format: auto ### Log rotation @@ -2993,7 +2983,7 @@ filebeat.inputs: #rotation.external.strategy.copytruncate: # Regex that matches the rotated files. # suffix_regex: \.\d$ - # If the rotated filename suffix is a datetime, set it here. + # If the rotated filename suffix is a datetime, set it here. # dateformat: -20060102 ### State options @@ -3190,7 +3180,7 @@ filebeat.inputs: #hosts: #- kafka-broker-1:9092 #- kafka-broker-2:9092 - + # A list of topics to read from. #topics: ["my-topic", "important-logs"] @@ -3225,7 +3215,7 @@ filebeat.inputs: # The minimum number of bytes to wait for. #fetch.min: 1 - + # The default number of bytes to read per request. #fetch.default: 1MB @@ -4548,7 +4538,7 @@ output.elasticsearch: # Permissions to use for file creation. The default is 0600. #permissions: 0600 - + # Configure automatic file rotation on every startup. The default is true. #rotate_on_startup: true From e1d5089b25fc55a1b2e9d5fb45f8e8ae3f1f1bed Mon Sep 17 00:00:00 2001 From: Jochen Ullrich Date: Wed, 2 Nov 2022 14:39:01 +0100 Subject: [PATCH 05/15] Add back reference config changes without whitespace changes --- x-pack/filebeat/filebeat.reference.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index e61f7bfdfced..34d57546619f 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -3454,6 +3454,16 @@ filebeat.inputs: # Overrides the `cloud.provider` field for non-AWS S3 buckets. See docs for auto recognized providers. #provider: minio + # Configures backing up processed files to another (or the same) bucket + #backup_to_bucket_arn: 'arn:aws:s3:::mybucket' + #non_aws_backup_to_bucket_name: 'mybucket' + + # Sets a prefix to prepend to object keys when backing up + #backup_to_bucket_prefix: 'backup/' + + # Controls deletion of objects after backing them up + #delete: false + #------------------------------ AWS CloudWatch input -------------------------------- # Beta: Config options for AWS CloudWatch input #- type: aws-cloudwatch From 183f908679417ad7781ca8bbbe812b512d73888e Mon Sep 17 00:00:00 2001 From: Jochen Ullrich Date: Wed, 2 Nov 2022 14:39:56 +0100 Subject: [PATCH 06/15] fix typo that makes linter fail --- x-pack/filebeat/input/awss3/config.go | 2 +- x-pack/filebeat/input/awss3/config_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index 5b40fd629169..304ec8e60ce5 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -105,7 +105,7 @@ func (c *config) Validate() error { return errors.New("path_style can only be used when polling non-AWS S3 services") } if c.ProviderOverride != "" && c.NonAWSBucketName == "" { - return errors.New("provider can only be overriden when polling non-AWS S3 services") + return errors.New("provider can only be overridden when polling non-AWS S3 services") } if c.BackupConfig.NonAWSBackupToBucketName != "" && c.NonAWSBucketName == "" { return errors.New("backup to non-AWS bucket can only be used for non-AWS sources") diff --git a/x-pack/filebeat/input/awss3/config_test.go b/x-pack/filebeat/input/awss3/config_test.go index 0552f9417889..71b5d467ff54 100644 --- a/x-pack/filebeat/input/awss3/config_test.go +++ b/x-pack/filebeat/input/awss3/config_test.go @@ -361,7 +361,7 @@ func TestConfig(t *testing.T) { "number_of_workers": 5, "provider": "asdf", }, - "provider can only be overriden when polling non-AWS S3 services", + "provider can only be overridden when polling non-AWS S3 services", nil, }, { @@ -374,7 +374,7 @@ func TestConfig(t *testing.T) { "number_of_workers": 5, "provider": "asdf", }, - "provider can only be overriden when polling non-AWS S3 services", + "provider can only be overridden when polling non-AWS S3 services", nil, }, { From 7bcd68edcc002b8ccb5a8afbae4dd00e3dc07c64 Mon Sep 17 00:00:00 2001 From: Jochen Ullrich Date: Wed, 2 Nov 2022 14:50:47 +0100 Subject: [PATCH 07/15] change reference config the right way --- .../config/filebeat.inputs.reference.xpack.yml.tmpl | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl index c13c00d25520..44d8ce51d57a 100644 --- a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl +++ b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl @@ -102,6 +102,16 @@ # Overrides the `cloud.provider` field for non-AWS S3 buckets. See docs for auto recognized providers. #provider: minio + # Configures backing up processed files to another (or the same) bucket + #backup_to_bucket_arn: 'arn:aws:s3:::mybucket' + #non_aws_backup_to_bucket_name: 'mybucket' + + # Sets a prefix to prepend to object keys when backing up + #backup_to_bucket_prefix: 'backup/' + + # Controls deletion of objects after backing them up + #delete: false + #------------------------------ AWS CloudWatch input -------------------------------- # Beta: Config options for AWS CloudWatch input #- type: aws-cloudwatch From ee03b6b82f5e1e12f7f852279dc92ba49249df60 Mon Sep 17 00:00:00 2001 From: Jochen Ullrich Date: Tue, 15 Nov 2022 14:33:14 +0100 Subject: [PATCH 08/15] Add later finalizing, missing tests for now --- x-pack/filebeat/input/awss3/config.go | 10 ++++++ x-pack/filebeat/input/awss3/input.go | 5 +++ x-pack/filebeat/input/awss3/interfaces.go | 4 +++ x-pack/filebeat/input/awss3/s3.go | 38 ++++++++++++++------- x-pack/filebeat/input/awss3/s3_objects.go | 7 +--- x-pack/filebeat/input/awss3/sqs_s3_event.go | 26 +++++++++++--- 6 files changed, 67 insertions(+), 23 deletions(-) diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index 304ec8e60ce5..cdc7d6cd011a 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -116,6 +116,16 @@ func (c *config) Validate() error { if c.BackupConfig.BackupToBucketArn != "" && c.BackupConfig.NonAWSBackupToBucketName != "" { return errors.New("backup_to_bucket_arn and non_aws_backup_to_bucket_name cannot be used together") } + if c.BucketARN != "" || c.NonAWSBucketName != "" { + if c.BackupConfig.BackupToBucketArn == c.BucketARN || c.BackupConfig.NonAWSBackupToBucketName == c.NonAWSBucketName { + if c.BackupConfig.BackupToBucketPrefix == "" { + return errors.New("backup_to_bucket_prefix is a required property when source and backup bucket are the same") + } + if c.BackupConfig.BackupToBucketPrefix == c.BucketListPrefix { + return errors.New("backup_to_bucket_prefix cannot be the same as bucket_list_prefix, this will create an infinite loop") + } + } + } return nil } diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 46eebc863b19..c3593d9dbb3d 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -181,6 +181,11 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsRe log.Infof("AWS SQS visibility_timeout is set to %v.", in.config.VisibilityTimeout) log.Infof("AWS SQS max_number_of_messages is set to %v.", in.config.MaxNumberOfMessages) + if in.config.BackupConfig.GetBucketName() != "" { + log.Warnf("You have the backup_to_bucket functionality activated with SQS. Please make sure to set appropriate destination buckets" + + "or prefixes to avoid an infinite loop.") + } + metricRegistry := monitoring.GetNamespace("dataset").GetRegistry() metrics := newInputMetrics(metricRegistry, ctx.ID) diff --git a/x-pack/filebeat/input/awss3/interfaces.go b/x-pack/filebeat/input/awss3/interfaces.go index 38a239919a6d..ac2f02e7d6c1 100644 --- a/x-pack/filebeat/input/awss3/interfaces.go +++ b/x-pack/filebeat/input/awss3/interfaces.go @@ -105,6 +105,10 @@ type s3ObjectHandler interface { // determine this). ProcessS3Object() error + // FinalizeS3Object finalizes processing of an S3 object after the current + // batch is finished. + FinalizeS3Object() error + // Wait waits for every event published by ProcessS3Object() to be ACKed // by the publisher before returning. Internally it uses the // s3ObjectHandler eventACKTracker's Wait() method diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index 349d5f7cfddc..114bd75b4767 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -115,6 +115,19 @@ func (p *s3Poller) handlePurgingLock(info s3ObjectInfo, isStored bool) { } } +func (p *s3Poller) createS3ObjectProcessor(ctx context.Context, state state) (s3ObjectHandler, s3EventV2) { + event := s3EventV2{} + event.AWSRegion = p.region + event.Provider = p.provider + event.S3.Bucket.Name = state.Bucket + event.S3.Bucket.ARN = p.bucket + event.S3.Object.Key = state.Key + + acker := awscommon.NewEventACKTracker(ctx) + + return p.s3ObjectHandler.Create(ctx, p.log, acker, event), event +} + func (p *s3Poller) ProcessObject(s3ObjectPayloadChan <-chan *s3ObjectPayload) error { var errs []error @@ -205,16 +218,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<- p.states.Update(state, "") - event := s3EventV2{} - event.AWSRegion = p.region - event.Provider = p.provider - event.S3.Bucket.Name = bucketName - event.S3.Bucket.ARN = p.bucket - event.S3.Object.Key = filename - - acker := awscommon.NewEventACKTracker(ctx) - - s3Processor := p.s3ObjectHandler.Create(ctx, p.log, acker, event) + s3Processor, event := p.createS3ObjectProcessor(ctx, state) if s3Processor == nil { p.log.Debugw("empty s3 processor.", "state", state) continue @@ -272,7 +276,7 @@ func (p *s3Poller) Purge() { lock.(*sync.Mutex).Lock() - keys := map[string]struct{}{} + states := map[string]*state{} latestStoredTimeByBucketAndListPrefix := make(map[string]time.Time, 0) for _, state := range p.states.GetStatesByListingID(listingID) { @@ -283,7 +287,7 @@ func (p *s3Poller) Purge() { } var latestStoredTime time.Time - keys[state.ID] = struct{}{} + states[state.ID] = &state latestStoredTime, ok := latestStoredTimeByBucketAndListPrefix[state.Bucket+state.ListPrefix] if !ok { var commitWriteState commitWriteState @@ -307,7 +311,7 @@ func (p *s3Poller) Purge() { } } - for key := range keys { + for key := range states { p.states.Delete(key) } @@ -325,6 +329,14 @@ func (p *s3Poller) Purge() { lock.(*sync.Mutex).Unlock() p.workersListingMap.Delete(listingID) p.states.DeleteListing(listingID) + + // Listing is removed from all states, we can finalize now + for _, state := range states { + processor, _ := p.createS3ObjectProcessor(context.TODO(), *state) + if err := processor.FinalizeS3Object(); err != nil { + p.log.Errorw("Failed to finalize S3 object", "key", state.Key, "error", err) + } + } } } diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index cfc106437cc3..0f41bd5737ad 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -166,11 +166,6 @@ func (p *s3ObjectProcessor) ProcessS3Object() error { time.Since(start).Nanoseconds(), err) } - err = p.finalizeObject() - if err != nil { - return fmt.Errorf("failed to finalize s3 object (elapsed_time_ns=%d): %w", time.Since(start).Nanoseconds(), err) - } - return nil } @@ -369,7 +364,7 @@ func (p *s3ObjectProcessor) createEvent(message string, offset int64) beat.Event return event } -func (p *s3ObjectProcessor) finalizeObject() error { +func (p *s3ObjectProcessor) FinalizeS3Object() error { bucketName := p.backupConfig.GetBucketName() if bucketName != "" { backupKey := p.s3Obj.S3.Object.Key diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index 72489b1550cb..6f9d8de0c1da 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -123,7 +123,7 @@ func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *types.Message keepaliveWg.Add(1) go p.keepalive(keepaliveCtx, log, &keepaliveWg, msg) - processingErr := p.processS3Events(ctx, log, *msg.Body) + processingErr, handles := p.processS3Events(ctx, log, *msg.Body) // Stop keepalive routine before changing visibility. keepaliveCancel() @@ -135,6 +135,10 @@ func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *types.Message return fmt.Errorf("failed deleting message from SQS queue (it may be reprocessed): %w", msgDelErr) } p.metrics.sqsMessagesDeletedTotal.Inc() + // SQS message finished and deleted, finalize s3 objects + if finalizeErr := p.finalizeS3Objects(handles); finalizeErr != nil { + return fmt.Errorf("failed finalizing message from SQS queue (manual cleanup is required): %w", finalizeErr) + } return nil } @@ -265,14 +269,14 @@ func (*sqsS3EventProcessor) isObjectCreatedEvents(event s3EventV2) bool { return event.EventSource == "aws:s3" && strings.HasPrefix(event.EventName, "ObjectCreated:") } -func (p *sqsS3EventProcessor) processS3Events(ctx context.Context, log *logp.Logger, body string) error { +func (p *sqsS3EventProcessor) processS3Events(ctx context.Context, log *logp.Logger, body string) (error, []s3ObjectHandler) { s3Events, err := p.getS3Notifications(body) if err != nil { if errors.Is(err, context.Canceled) { // Messages that are in-flight at shutdown should be returned to SQS. - return err + return err, nil } - return &nonRetryableError{err} + return &nonRetryableError{err}, nil } log.Debugf("SQS message contained %d S3 event notifications.", len(s3Events)) defer log.Debug("End processing SQS S3 event notifications.") @@ -282,11 +286,13 @@ func (p *sqsS3EventProcessor) processS3Events(ctx context.Context, log *logp.Log defer acker.Wait() var errs []error + var handles []s3ObjectHandler for i, event := range s3Events { s3Processor := p.s3ObjectHandler.Create(ctx, log, acker, event) if s3Processor == nil { continue } + handles = append(handles, s3Processor) // Process S3 object (download, parse, create events). if err := s3Processor.ProcessS3Object(); err != nil { @@ -296,5 +302,17 @@ func (p *sqsS3EventProcessor) processS3Events(ctx context.Context, log *logp.Log } } + return multierr.Combine(errs...), handles +} + +func (p *sqsS3EventProcessor) finalizeS3Objects(handles []s3ObjectHandler) error { + var errs []error + for i, handle := range handles { + if err := handle.FinalizeS3Object(); err != nil { + errs = append(errs, fmt.Errorf( + "failed finalizing S3 event (object record %d of %d in SQS notification): %w", + i+1, len(handles), err)) + } + } return multierr.Combine(errs...) } From 978f1903d5dd80818a9179383aff7b9b0caab4de Mon Sep 17 00:00:00 2001 From: Jochen Ullrich Date: Wed, 16 Nov 2022 15:58:09 +0100 Subject: [PATCH 09/15] Add code review feedback & unit tests --- x-pack/filebeat/input/awss3/config.go | 5 +- x-pack/filebeat/input/awss3/config_test.go | 53 ++++++++++++++----- .../input/awss3/mock_interfaces_test.go | 14 +++++ x-pack/filebeat/input/awss3/s3_objects.go | 34 ++++++------ .../filebeat/input/awss3/s3_objects_test.go | 44 ++++----------- x-pack/filebeat/input/awss3/sqs_s3_event.go | 22 +++++--- .../filebeat/input/awss3/sqs_s3_event_test.go | 1 + 7 files changed, 100 insertions(+), 73 deletions(-) diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index cdc7d6cd011a..ca2025561d49 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -116,8 +116,9 @@ func (c *config) Validate() error { if c.BackupConfig.BackupToBucketArn != "" && c.BackupConfig.NonAWSBackupToBucketName != "" { return errors.New("backup_to_bucket_arn and non_aws_backup_to_bucket_name cannot be used together") } - if c.BucketARN != "" || c.NonAWSBucketName != "" { - if c.BackupConfig.BackupToBucketArn == c.BucketARN || c.BackupConfig.NonAWSBackupToBucketName == c.NonAWSBucketName { + if c.BackupConfig.GetBucketName() != "" && c.QueueURL == "" { + if (c.BackupConfig.BackupToBucketArn != "" && c.BackupConfig.BackupToBucketArn == c.BucketARN) || + (c.BackupConfig.NonAWSBackupToBucketName != "" && c.BackupConfig.NonAWSBackupToBucketName == c.NonAWSBucketName) { if c.BackupConfig.BackupToBucketPrefix == "" { return errors.New("backup_to_bucket_prefix is a required property when source and backup bucket are the same") } diff --git a/x-pack/filebeat/input/awss3/config_test.go b/x-pack/filebeat/input/awss3/config_test.go index 71b5d467ff54..189c40d8c63b 100644 --- a/x-pack/filebeat/input/awss3/config_test.go +++ b/x-pack/filebeat/input/awss3/config_test.go @@ -379,11 +379,11 @@ func TestConfig(t *testing.T) { }, { "backup_to_bucket with AWS", - queueURL, "", + s3Bucket, "", mapstr.M{ - "bucket_arn": "arn:aws:s3:::aBucket", + "bucket_arn": s3Bucket, "backup_to_bucket_arn": "arn:aws:s3:::bBucket", "backup_to_bucket_prefix": "backup", "number_of_workers": 5, @@ -391,7 +391,6 @@ func TestConfig(t *testing.T) { "", func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { c := makeConfig("", s3Bucket, "") - c.BucketARN = "arn:aws:s3:::aBucket" c.BackupConfig.BackupToBucketArn = "arn:aws:s3:::bBucket" c.BackupConfig.BackupToBucketPrefix = "backup" c.NumberOfWorkers = 5 @@ -400,19 +399,19 @@ func TestConfig(t *testing.T) { }, { "backup_to_bucket with non-AWS", - queueURL, "", "", + nonAWSS3Bucket, mapstr.M{ - "non_aws_bucket_name": "aBucket", + "non_aws_bucket_name": nonAWSS3Bucket, "non_aws_backup_to_bucket_name": "bBucket", "backup_to_bucket_prefix": "backup", "number_of_workers": 5, }, "", func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { - c := makeConfig("", s3Bucket, "") - c.NonAWSBucketName = "aBucket" + c := makeConfig("", "", nonAWSS3Bucket) + c.NonAWSBucketName = nonAWSS3Bucket c.BackupConfig.NonAWSBackupToBucketName = "bBucket" c.BackupConfig.BackupToBucketPrefix = "backup" c.NumberOfWorkers = 5 @@ -420,31 +419,59 @@ func TestConfig(t *testing.T) { }, }, { - "error with AWS backup and non-AWS source", - queueURL, + "error with non-AWS backup and AWS source", "", + s3Bucket, "", mapstr.M{ + "bucket_arn": s3Bucket, "non_aws_backup_to_bucket_name": "bBucket", - "bucket_arn": "arn:aws:s3:::aBucket", "number_of_workers": 5, }, "backup to non-AWS bucket can only be used for non-AWS sources", nil, }, { - "error with non-AWS backup and AWS source", - queueURL, + "error with AWS backup and non-AWS source", "", "", + nonAWSS3Bucket, mapstr.M{ - "non_aws_bucket_name": "aBucket", + "non_aws_bucket_name": nonAWSS3Bucket, "backup_to_bucket_arn": "arn:aws:s3:::bBucket", "number_of_workers": 5, }, "backup to AWS bucket can only be used for AWS sources", nil, }, + { + "error with same bucket backup and empty backup prefix", + "", + "", + nonAWSS3Bucket, + mapstr.M{ + "non_aws_bucket_name": nonAWSS3Bucket, + "non_aws_backup_to_bucket_name": nonAWSS3Bucket, + "number_of_workers": 5, + }, + "backup_to_bucket_prefix is a required property when source and backup bucket are the same", + nil, + }, + { + "error with same bucket backup and backup prefix equal to list prefix", + "", + "", + nonAWSS3Bucket, + mapstr.M{ + "non_aws_bucket_name": nonAWSS3Bucket, + "non_aws_backup_to_bucket_name": nonAWSS3Bucket, + "number_of_workers": 5, + "backup_to_bucket_prefix": "processed_", + "bucket_list_prefix": "processed_", + }, + "backup_to_bucket_prefix cannot be the same as bucket_list_prefix, this will create an infinite loop", + nil, + }, } for _, tc := range testCases { diff --git a/x-pack/filebeat/input/awss3/mock_interfaces_test.go b/x-pack/filebeat/input/awss3/mock_interfaces_test.go index ef00b846c228..39889de990ce 100644 --- a/x-pack/filebeat/input/awss3/mock_interfaces_test.go +++ b/x-pack/filebeat/input/awss3/mock_interfaces_test.go @@ -521,6 +521,20 @@ func (mr *MockS3ObjectHandlerMockRecorder) ProcessS3Object() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessS3Object", reflect.TypeOf((*MockS3ObjectHandler)(nil).ProcessS3Object)) } +// ProcessS3Object mocks base method. +func (m *MockS3ObjectHandler) FinalizeS3Object() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FinalizeS3Object") + ret0, _ := ret[0].(error) + return ret0 +} + +// ProcessS3Object indicates an expected call of ProcessS3Object. +func (mr *MockS3ObjectHandlerMockRecorder) FinalizeS3Object() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FinalizeS3Object", reflect.TypeOf((*MockS3ObjectHandler)(nil).FinalizeS3Object)) +} + // Wait mocks base method. func (m *MockS3ObjectHandler) Wait() { m.ctrl.T.Helper() diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index 38aa3f026c0d..0295ad5341cb 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -46,7 +46,7 @@ type s3ObjectProcessorFactory struct { backupConfig backupConfig } -func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3Getter, sel []fileSelectorConfig, backupConfig backupConfig) *s3ObjectProcessorFactory { +func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3API, sel []fileSelectorConfig, backupConfig backupConfig) *s3ObjectProcessorFactory { if metrics == nil { metrics = newInputMetrics(monitoring.NewRegistry(), "") } @@ -366,21 +366,23 @@ func (p *s3ObjectProcessor) createEvent(message string, offset int64) beat.Event func (p *s3ObjectProcessor) FinalizeS3Object() error { bucketName := p.backupConfig.GetBucketName() - if bucketName != "" { - backupKey := p.s3Obj.S3.Object.Key - if p.backupConfig.BackupToBucketPrefix != "" { - backupKey = fmt.Sprintf("%s%s", p.backupConfig.BackupToBucketPrefix, backupKey) - } - _, err := p.s3.CopyObject(p.ctx, p.s3Obj.S3.Bucket.Name, bucketName, p.s3Obj.S3.Object.Key, backupKey) - if err != nil { - return fmt.Errorf("failed to copy object to backup bucket: %w", err) - } - if p.backupConfig.Delete { - _, err = p.s3.DeleteObject(p.ctx, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key) - if err != nil { - return fmt.Errorf("failed to delete object from bucket: %w", err) - } - } + if bucketName == "" { + return nil + } + backupKey := p.s3Obj.S3.Object.Key + if p.backupConfig.BackupToBucketPrefix != "" { + backupKey = fmt.Sprintf("%s%s", p.backupConfig.BackupToBucketPrefix, backupKey) + } + _, err := p.s3.CopyObject(p.ctx, p.s3Obj.S3.Bucket.Name, bucketName, p.s3Obj.S3.Object.Key, backupKey) + if err != nil { + return fmt.Errorf("failed to copy object to backup bucket: %w", err) + } + if !p.backupConfig.Delete { + return nil + } + _, err = p.s3.DeleteObject(p.ctx, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key) + if err != nil { + return fmt.Errorf("failed to delete object from bucket: %w", err) } return nil } diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index 45e306885c94..0c30496cfcec 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -195,11 +195,11 @@ func TestS3ObjectProcessor(t *testing.T) { s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}) ack := awscommon.NewEventACKTracker(ctx) - err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() + err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object() require.NoError(t, err) }) - t.Run("backups objects after reading", func(t *testing.T) { + t.Run("backups objects on finalize call", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() @@ -207,29 +207,21 @@ func TestS3ObjectProcessor(t *testing.T) { defer ctrl.Finish() mockS3API := NewMockS3API(ctrl) mockPublisher := NewMockBeatClient(ctrl) - s3Event, s3Resp := newS3Object(t, "testdata/log.txt", "") + s3Event, _ := newS3Object(t, "testdata/log.txt", "") backupCfg := backupConfig{ BackupToBucketArn: "arn:aws:s3:::backup", } - var events []beat.Event gomock.InOrder( - mockS3API.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). - Return(s3Resp, nil), - mockPublisher.EXPECT(). - Publish(gomock.Any()). - Do(func(event beat.Event) { events = append(events, event) }). - Times(2), mockS3API.EXPECT(). CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)). Return(nil, nil), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, nil, backupCfg) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg) ack := awscommon.NewEventACKTracker(ctx) - err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() + err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).FinalizeS3Object() require.NoError(t, err) }) @@ -241,22 +233,14 @@ func TestS3ObjectProcessor(t *testing.T) { defer ctrl.Finish() mockS3API := NewMockS3API(ctrl) mockPublisher := NewMockBeatClient(ctrl) - s3Event, s3Resp := newS3Object(t, "testdata/log.txt", "") + s3Event, _ := newS3Object(t, "testdata/log.txt", "") backupCfg := backupConfig{ BackupToBucketArn: "arn:aws:s3:::backup", Delete: true, } - var events []beat.Event gomock.InOrder( - mockS3API.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). - Return(s3Resp, nil), - mockPublisher.EXPECT(). - Publish(gomock.Any()). - Do(func(event beat.Event) { events = append(events, event) }). - Times(2), mockS3API.EXPECT(). CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)). Return(nil, nil), @@ -265,9 +249,9 @@ func TestS3ObjectProcessor(t *testing.T) { Return(nil, nil), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, nil, backupCfg) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg) ack := awscommon.NewEventACKTracker(ctx) - err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() + err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).FinalizeS3Object() require.NoError(t, err) }) @@ -279,22 +263,14 @@ func TestS3ObjectProcessor(t *testing.T) { defer ctrl.Finish() mockS3API := NewMockS3API(ctrl) mockPublisher := NewMockBeatClient(ctrl) - s3Event, s3Resp := newS3Object(t, "testdata/log.txt", "") + s3Event, _ := newS3Object(t, "testdata/log.txt", "") backupCfg := backupConfig{ BackupToBucketArn: s3Event.S3.Bucket.ARN, BackupToBucketPrefix: "backup/", } - var events []beat.Event gomock.InOrder( - mockS3API.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). - Return(s3Resp, nil), - mockPublisher.EXPECT(). - Publish(gomock.Any()). - Do(func(event beat.Event) { events = append(events, event) }). - Times(2), mockS3API.EXPECT(). CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq("backup/testdata/log.txt")). Return(nil, nil), @@ -302,7 +278,7 @@ func TestS3ObjectProcessor(t *testing.T) { s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg) ack := awscommon.NewEventACKTracker(ctx) - err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object() + err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).FinalizeS3Object() require.NoError(t, err) }) } diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index db8838b9202a..68800921f7a6 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -135,7 +135,7 @@ func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *types.Message keepaliveWg.Add(1) go p.keepalive(keepaliveCtx, log, &keepaliveWg, msg) - processingErr, handles := p.processS3Events(ctx, log, *msg.Body) + handles, processingErr := p.processS3Events(ctx, log, *msg.Body) // Stop keepalive routine before changing visibility. keepaliveCancel() @@ -281,20 +281,20 @@ func (*sqsS3EventProcessor) isObjectCreatedEvents(event s3EventV2) bool { return event.EventSource == "aws:s3" && strings.HasPrefix(event.EventName, "ObjectCreated:") } -func (p *sqsS3EventProcessor) processS3Events(ctx context.Context, log *logp.Logger, body string) (error, []s3ObjectHandler) { +func (p *sqsS3EventProcessor) processS3Events(ctx context.Context, log *logp.Logger, body string) ([]s3ObjectHandler, error) { s3Events, err := p.getS3Notifications(body) if err != nil { if errors.Is(err, context.Canceled) { // Messages that are in-flight at shutdown should be returned to SQS. - return err, nil + return nil, err } - return &nonRetryableError{err}, nil + return nil, &nonRetryableError{err} } log.Debugf("SQS message contained %d S3 event notifications.", len(s3Events)) defer log.Debug("End processing SQS S3 event notifications.") if len(s3Events) == 0 { - return nil + return nil, nil } // Create a pipeline client scoped to this goroutine. @@ -307,7 +307,7 @@ func (p *sqsS3EventProcessor) processS3Events(ctx context.Context, log *logp.Log }, }) if err != nil { - return err + return nil, err } defer client.Close() @@ -322,17 +322,23 @@ func (p *sqsS3EventProcessor) processS3Events(ctx context.Context, log *logp.Log if s3Processor == nil { continue } - handles = append(handles, s3Processor) // Process S3 object (download, parse, create events). if err := s3Processor.ProcessS3Object(); err != nil { errs = append(errs, fmt.Errorf( "failed processing S3 event for object key %q in bucket %q (object record %d of %d in SQS notification): %w", event.S3.Object.Key, event.S3.Bucket.Name, i+1, len(s3Events), err)) + } else { + handles = append(handles, s3Processor) } } - return multierr.Combine(errs...), handles + // Make sure all s3 events were processed successfully + if len(handles) == len(s3Events) { + return handles, multierr.Combine(errs...) + } + + return nil, multierr.Combine(errs...) } func (p *sqsS3EventProcessor) finalizeS3Objects(handles []s3ObjectHandler) error { diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go index 886fdfe17116..b1020cd49435 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go @@ -124,6 +124,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockS3Handler.EXPECT().ProcessS3Object().Return(nil), mockClient.EXPECT().Close(), mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&msg)).Return(nil), + mockS3Handler.EXPECT().FinalizeS3Object().Return(nil), ) p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, visibilityTimeout, 5, mockBeatPipeline, mockS3HandlerFactory) From 5d998bc50bcb194f443c6aca0af3fefe5b652bdf Mon Sep 17 00:00:00 2001 From: Jochen Ullrich Date: Fri, 18 Nov 2022 12:56:24 +0100 Subject: [PATCH 10/15] Try fix G601 error --- x-pack/filebeat/input/awss3/s3.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index f64c75b33600..2e96c1bc24bf 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -284,7 +284,8 @@ func (p *s3Poller) Purge() { states := map[string]*state{} latestStoredTimeByBucketAndListPrefix := make(map[string]time.Time, 0) - for _, state := range p.states.GetStatesByListingID(listingID) { + listingStates := p.states.GetStatesByListingID(listingID) + for i, state := range listingStates { // it is not stored, keep if !state.Stored { p.log.Debugw("state not stored, skip purge", "state", state) @@ -292,7 +293,7 @@ func (p *s3Poller) Purge() { } var latestStoredTime time.Time - states[state.ID] = &state + states[state.ID] = &listingStates[i] latestStoredTime, ok := latestStoredTimeByBucketAndListPrefix[state.Bucket+state.ListPrefix] if !ok { var commitWriteState commitWriteState From eb2f836926ff2fdbbc59e2c33a8aca392089d16d Mon Sep 17 00:00:00 2001 From: Jochen Ullrich Date: Tue, 22 Nov 2022 14:18:24 +0100 Subject: [PATCH 11/15] Fix last code review feedback --- x-pack/filebeat/input/awss3/config_test.go | 13 +++++++++++++ x-pack/filebeat/input/awss3/s3.go | 6 +++--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/awss3/config_test.go b/x-pack/filebeat/input/awss3/config_test.go index 189c40d8c63b..27038d0114b8 100644 --- a/x-pack/filebeat/input/awss3/config_test.go +++ b/x-pack/filebeat/input/awss3/config_test.go @@ -447,6 +447,19 @@ func TestConfig(t *testing.T) { { "error with same bucket backup and empty backup prefix", "", + s3Bucket, + "", + mapstr.M{ + "bucket_arn": s3Bucket, + "backup_to_bucket_arn": s3Bucket, + "number_of_workers": 5, + }, + "backup_to_bucket_prefix is a required property when source and backup bucket are the same", + nil, + }, + { + "error with same bucket backup (non-AWS) and empty backup prefix", + "", "", nonAWSS3Bucket, mapstr.M{ diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index 2e96c1bc24bf..6d0d456b3832 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -264,7 +264,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<- } } -func (p *s3Poller) Purge() { +func (p *s3Poller) Purge(ctx context.Context) { listingIDs := p.states.GetListingIDs() p.log.Debugw("purging listing.", "listingIDs", listingIDs) for _, listingID := range listingIDs { @@ -338,7 +338,7 @@ func (p *s3Poller) Purge() { // Listing is removed from all states, we can finalize now for _, state := range states { - processor, _ := p.createS3ObjectProcessor(context.TODO(), *state) + processor, _ := p.createS3ObjectProcessor(ctx, *state) if err := processor.FinalizeS3Object(); err != nil { p.log.Errorw("Failed to finalize S3 object", "key", state.Key, "error", err) } @@ -371,7 +371,7 @@ func (p *s3Poller) Poll(ctx context.Context) error { }() p.GetS3Objects(ctx, s3ObjectPayloadChan) - p.Purge() + p.Purge(ctx) }() workerWg.Add(workers) From 4f1a2d0fa396cf8ebb058446201ea8ba5a71aea2 Mon Sep 17 00:00:00 2001 From: Jochen Ullrich Date: Wed, 23 Nov 2022 15:02:59 +0100 Subject: [PATCH 12/15] Add missing unit test --- x-pack/filebeat/input/awss3/config_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/x-pack/filebeat/input/awss3/config_test.go b/x-pack/filebeat/input/awss3/config_test.go index 27038d0114b8..9606ff1445ad 100644 --- a/x-pack/filebeat/input/awss3/config_test.go +++ b/x-pack/filebeat/input/awss3/config_test.go @@ -473,6 +473,21 @@ func TestConfig(t *testing.T) { { "error with same bucket backup and backup prefix equal to list prefix", "", + s3Bucket, + "", + mapstr.M{ + "bucket_arn": s3Bucket, + "backup_to_bucket_arn": s3Bucket, + "number_of_workers": 5, + "backup_to_bucket_prefix": "processed_", + "bucket_list_prefix": "processed_", + }, + "backup_to_bucket_prefix cannot be the same as bucket_list_prefix, this will create an infinite loop", + nil, + }, + { + "error with same bucket backup (non-AWS) and backup prefix equal to list prefix", + "", "", nonAWSS3Bucket, mapstr.M{ From 8bd859a6942722920d4f3feb43e8a17beddae710 Mon Sep 17 00:00:00 2001 From: Jochen Ullrich Date: Thu, 24 Nov 2022 11:36:02 +0100 Subject: [PATCH 13/15] add entry to changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e46b86b33edb..83ad11bddcca 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -167,6 +167,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Add Common Expression Language input. {pull}31233[31233] - Add support for http+unix and http+npipe schemes in httpjson input. {issue}33571[33571] {pull}33610[33610] - Add support for http+unix and http+npipe schemes in cel input. {issue}33571[33571] {pull}33712[33712] +- Add backup to bucket and delete functionality for the `aws-s3` input. {issue}30696[30696] {pull}33559[33559] *Auditbeat* From c48fd174dd1ba1ac024be468733f2c08a0b77815 Mon Sep 17 00:00:00 2001 From: Andrea Spacca Date: Thu, 22 Dec 2022 17:10:20 +0900 Subject: [PATCH 14/15] rename to , add permissions required for backup feature in docs --- .../config/filebeat.inputs.reference.xpack.yml.tmpl | 2 +- x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc | 12 +++++++++++- x-pack/filebeat/filebeat.reference.yml | 2 +- x-pack/filebeat/input/awss3/config.go | 2 +- 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl index 44d8ce51d57a..07ca2e8d10b6 100644 --- a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl +++ b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl @@ -110,7 +110,7 @@ #backup_to_bucket_prefix: 'backup/' # Controls deletion of objects after backing them up - #delete: false + #delete_after_backup: false #------------------------------ AWS CloudWatch input -------------------------------- # Beta: Config options for AWS CloudWatch input diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 6e05854702e9..b83dfc7da287 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -459,7 +459,7 @@ When using the `bucket_arn`, please use `backup_to_bucket_arn` accordingly. Naming of the backed up files can be controlled with `backup_to_bucket_prefix`. [float] -==== `delete` +==== `delete_after_backup` Controls whether fully processed files will be deleted from the bucket. @@ -487,6 +487,16 @@ s3:ListBucket s3:GetBucketLocation ---- +In case `backup_to_bucket_arn` or `non_aws_backup_to_bucket_name` are set the following permission is required as well: +---- +s3:PutObject +---- + +In case `delete_after_backup` is set the following permission is required as well: +---- +s3:DeleteObject +---- + [float] === S3 and SQS setup diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index b87d011f22db..2304e9607a13 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -3465,7 +3465,7 @@ filebeat.inputs: #backup_to_bucket_prefix: 'backup/' # Controls deletion of objects after backing them up - #delete: false + #delete_after_backup: false #------------------------------ AWS CloudWatch input -------------------------------- # Beta: Config options for AWS CloudWatch input diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index ca2025561d49..7297425c5742 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -135,7 +135,7 @@ type backupConfig struct { BackupToBucketArn string `config:"backup_to_bucket_arn"` NonAWSBackupToBucketName string `config:"non_aws_backup_to_bucket_name"` BackupToBucketPrefix string `config:"backup_to_bucket_prefix"` - Delete bool `config:"delete"` + Delete bool `config:"delete_after_backup"` } func (c *backupConfig) GetBucketName() string { From 065307cf19d91f555728177cfd44193af75d4b87 Mon Sep 17 00:00:00 2001 From: Andrea Spacca Date: Thu, 22 Dec 2022 20:00:01 +0900 Subject: [PATCH 15/15] fix integration tests --- x-pack/filebeat/input/awss3/input.go | 36 +++++++++---- .../input/awss3/input_integration_test.go | 50 +++++-------------- x-pack/filebeat/input/awss3/metrics.go | 1 + 3 files changed, 39 insertions(+), 48 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 66c31401bcbb..040c25e5f74e 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -50,26 +50,38 @@ func (im *s3InputManager) Create(cfg *conf.C) (v2.Input, error) { return nil, err } - return newInput(config, im.store) + return newInput(config, im.store, true) +} + +func (im *s3InputManager) CreateWithoutClosingMetrics(cfg *conf.C) (v2.Input, error) { + // This smells, but since we call metrics.Close() on metrics that are not injectable in integration test we need this + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, err + } + + return newInput(config, im.store, false) } // s3Input is a input for reading logs from S3 when triggered by an SQS message. type s3Input struct { - config config - awsConfig awssdk.Config - store beater.StateStore + closeMetrics bool + config config + awsConfig awssdk.Config + store beater.StateStore } -func newInput(config config, store beater.StateStore) (*s3Input, error) { +func newInput(config config, store beater.StateStore, closeMetrics bool) (*s3Input, error) { awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig) if err != nil { return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err) } return &s3Input{ - config: config, - awsConfig: awsConfig, - store: store, + closeMetrics: closeMetrics, + config: config, + awsConfig: awsConfig, + store: store, }, nil } @@ -120,7 +132,9 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { if err != nil { return fmt.Errorf("failed to initialize sqs receiver: %w", err) } - defer receiver.metrics.Close() + if in.closeMetrics { + defer receiver.metrics.Close() + } if err := receiver.Receive(ctx); err != nil { return err @@ -148,7 +162,9 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { if err != nil { return fmt.Errorf("failed to initialize s3 poller: %w", err) } - defer poller.metrics.Close() + if in.closeMetrics { + defer poller.metrics.Close() + } if err := poller.Poll(ctx); err != nil { return err diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index df3a5c47e43e..d0ba602d6049 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -32,8 +32,6 @@ import ( "gopkg.in/yaml.v2" v2 "github.com/elastic/beats/v7/filebeat/input/v2" - pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" @@ -132,7 +130,9 @@ file_selectors: } func createInput(t *testing.T, cfg *conf.C) *s3Input { - inputV2, err := Plugin(openTestStatestore()).Manager.Create(cfg) + s3InputManager := Plugin(openTestStatestore()).Manager.(*s3InputManager) + inputV2, err := s3InputManager.CreateWithoutClosingMetrics(cfg) + if err != nil { t.Fatal(err) } @@ -159,7 +159,7 @@ func TestInputRunSQS(t *testing.T) { drainSQS(t, tfConfig.AWSRegion, tfConfig.QueueURL) // Ensure metrics are removed before testing. - monitoring.GetNamespace("inputs").GetRegistry().Remove(inputID) + monitoring.GetNamespace("dataset").GetRegistry().Remove(inputID) uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketName, "testdata/events-array.json", @@ -180,18 +180,9 @@ func TestInputRunSQS(t *testing.T) { cancel() }) - client := pubtest.NewChanClient(0) - defer close(client.Channel) - go func() { - for event := range client.Channel { - // Fake the ACK handling that's not implemented in pubtest. - event.Private.(*awscommon.EventACKTracker).ACK() - } - }() - var errGroup errgroup.Group errGroup.Go(func() error { - pipeline := pubtest.PublisherWithClient(client) + pipeline := &fakePipeline{} return s3Input.Run(inputCtx, pipeline) }) @@ -200,7 +191,7 @@ func TestInputRunSQS(t *testing.T) { } snap := mapstr.M(monitoring.CollectStructSnapshot( - monitoring.GetNamespace("inputs").GetRegistry(), + monitoring.GetNamespace("dataset").GetRegistry(), monitoring.Full, false)) t.Log(snap.StringToPrint()) @@ -222,7 +213,7 @@ func TestInputRunS3(t *testing.T) { tfConfig := getTerraformOutputs(t) // Ensure metrics are removed before testing. - monitoring.GetNamespace("inputs").GetRegistry().Remove(inputID) + monitoring.GetNamespace("dataset").GetRegistry().Remove(inputID) uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketName, "testdata/events-array.json", @@ -243,18 +234,9 @@ func TestInputRunS3(t *testing.T) { cancel() }) - client := pubtest.NewChanClient(0) - defer close(client.Channel) - go func() { - for event := range client.Channel { - // Fake the ACK handling that's not implemented in pubtest. - event.Private.(*awscommon.EventACKTracker).ACK() - } - }() - var errGroup errgroup.Group errGroup.Go(func() error { - pipeline := pubtest.PublisherWithClient(client) + pipeline := &fakePipeline{} return s3Input.Run(inputCtx, pipeline) }) @@ -263,7 +245,7 @@ func TestInputRunS3(t *testing.T) { } snap := mapstr.M(monitoring.CollectStructSnapshot( - monitoring.GetNamespace("inputs").GetRegistry(), + monitoring.GetNamespace("dataset").GetRegistry(), monitoring.Full, false)) t.Log(snap.StringToPrint()) @@ -438,7 +420,7 @@ func TestInputRunSNS(t *testing.T) { drainSQS(t, tfConfig.AWSRegion, tfConfig.QueueURLForSNS) // Ensure metrics are removed before testing. - monitoring.GetNamespace("inputs").GetRegistry().Remove(inputID) + monitoring.GetNamespace("dataset").GetRegistry().Remove(inputID) uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketNameForSNS, "testdata/events-array.json", @@ -459,17 +441,9 @@ func TestInputRunSNS(t *testing.T) { cancel() }) - client := pubtest.NewChanClient(0) - defer close(client.Channel) - go func() { - for event := range client.Channel { - event.Private.(*awscommon.EventACKTracker).ACK() - } - }() - var errGroup errgroup.Group errGroup.Go(func() error { - pipeline := pubtest.PublisherWithClient(client) + pipeline := &fakePipeline{} return s3Input.Run(inputCtx, pipeline) }) @@ -478,7 +452,7 @@ func TestInputRunSNS(t *testing.T) { } snap := mapstr.M(monitoring.CollectStructSnapshot( - monitoring.GetNamespace("inputs").GetRegistry(), + monitoring.GetNamespace("dataset").GetRegistry(), monitoring.Full, false)) t.Log(snap.StringToPrint()) diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 2e56a55847f2..16cb2fb1363a 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -36,6 +36,7 @@ type inputMetrics struct { // Close removes the metrics from the registry. func (m *inputMetrics) Close() { + // Thanks to this integration tests fail m.unregister() }