Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backup to bucket functionality #33559

Merged
merged 28 commits into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6ae9c77
Add backup to bucket functionality
jniebuhr Nov 2, 2022
48dbfaa
Merge branch 'main' into feature/backup-to-bucket
jniebuhr Nov 2, 2022
962fc91
Add documentation for backup_to_bucket configuration parameters
jniebuhr Nov 2, 2022
a2d47f9
Merge branch 'feature/backup-to-bucket' of github.com:jniebuhr/beats …
jniebuhr Nov 2, 2022
da59387
Add configuration to reference config file
jniebuhr Nov 2, 2022
58d7248
Revert "Add configuration to reference config file"
jniebuhr Nov 2, 2022
e1d5089
Add back reference config changes without whitespace changes
jniebuhr Nov 2, 2022
183f908
fix typo that makes linter fail
jniebuhr Nov 2, 2022
7bcd68e
change reference config the right way
jniebuhr Nov 2, 2022
1d69a75
Merge branch 'main' into feature/backup-to-bucket
Nov 15, 2022
ee03b6b
Add later finalizing, missing tests for now
jniebuhr Nov 15, 2022
4e27731
Merge branch 'main' into feature/backup-to-bucket
jniebuhr Nov 15, 2022
4cb3070
Merge branch 'main' into feature/backup-to-bucket
Nov 16, 2022
978f190
Add code review feedback & unit tests
jniebuhr Nov 16, 2022
5d998bc
Try fix G601 error
jniebuhr Nov 18, 2022
eb2f836
Fix last code review feedback
jniebuhr Nov 22, 2022
690c714
Merge branch 'main' into feature/backup-to-bucket
jniebuhr Nov 22, 2022
0d7bf2c
Merge branch 'main' into feature/backup-to-bucket
Nov 23, 2022
4f1a2d0
Add missing unit test
jniebuhr Nov 23, 2022
301c9d0
Merge branch 'feature/backup-to-bucket' of github.com:jniebuhr/beats …
jniebuhr Nov 23, 2022
8bd859a
add entry to changelog
jniebuhr Nov 24, 2022
1d5cc8d
Merge branch 'main' into feature/backup-to-bucket
jniebuhr Nov 24, 2022
0ec3d61
Merge branch 'main' into feature/backup-to-bucket
Nov 28, 2022
c02e5aa
Merge branch 'main' into feature/backup-to-bucket
Nov 29, 2022
508531b
Merge branch 'main' into feature/backup-to-bucket
jniebuhr Dec 5, 2022
7e4fc9a
Merge branch 'main' into feature/backup-to-bucket
Dec 22, 2022
c48fd17
rename to , add permissions required for backup feature in docs
Dec 22, 2022
065307c
fix integration tests
Dec 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Add support for http+unix and http+npipe schemes in httpjson input. {issue}33571[33571] {pull}33610[33610]
- Add support for http+unix and http+npipe schemes in cel input. {issue}33571[33571] {pull}33712[33712]
- Add `decode_duration`, `move_fields` processors. {pull}31301[31301]
- Add backup to bucket and delete functionality for the `aws-s3` input. {issue}30696[30696] {pull}33559[33559]
- Add metrics for UDP packet processing. {pull}33870[33870]
- Convert UDP input to v2 input. {pull}33930[33930]
- Improve collection of risk information from Okta debug data. {issue}33677[33677] {pull}34030[34030]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@
# Overrides the `cloud.provider` field for non-AWS S3 buckets. See docs for auto recognized providers.
#provider: minio

# Configures backing up processed files to another (or the same) bucket
#backup_to_bucket_arn: 'arn:aws:s3:::mybucket'
#non_aws_backup_to_bucket_name: 'mybucket'

# Sets a prefix to prepend to object keys when backing up
#backup_to_bucket_prefix: 'backup/'

# Controls deletion of objects after backing them up
#delete_after_backup: false

#------------------------------ AWS CloudWatch input --------------------------------
# Beta: Config options for AWS CloudWatch input
#- type: aws-cloudwatch
Expand Down
38 changes: 38 additions & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,34 @@ This is only supported with 3rd party S3 providers. AWS does not support path s
In order to make AWS API calls, `aws-s3` input requires AWS credentials. Please
see <<aws-credentials-config,AWS credentials options>> for more details.

[float]
==== `backup_to_bucket_arn`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add s3:PutObject into the AWS Permissions section of this documentation at line 469?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! also s3:DeleteObject


The bucket ARN to backup processed files to. This will copy the processed file after it was fully read.
When using the `non_aws_bucket_name`, please use `non_aws_backup_to_bucket_name` accordingly.

Naming of the backed up files can be controlled with `backup_to_bucket_prefix`.

[float]
==== `backup_to_bucket_prefix`

This prefix will be prepended to the object key when backing it up to another (or the same) bucket.

[float]
==== `non_aws_backup_to_bucket_name`

The bucket name to backup processed files to. Use this parameter when not using AWS buckets. This will copy the processed file after it was fully read.
When using the `bucket_arn`, please use `backup_to_bucket_arn` accordingly.

Naming of the backed up files can be controlled with `backup_to_bucket_prefix`.

[float]
==== `delete_after_backup`

Controls whether fully processed files will be deleted from the bucket.

Can only be used together with the backup functionality.

[float]
=== AWS Permissions

Expand All @@ -459,6 +487,16 @@ s3:ListBucket
s3:GetBucketLocation
----

In case `backup_to_bucket_arn` or `non_aws_backup_to_bucket_name` are set the following permission is required as well:
----
s3:PutObject
----

In case `delete_after_backup` is set the following permission is required as well:
----
s3:DeleteObject
----

[float]
=== S3 and SQS setup

Expand Down
10 changes: 10 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3457,6 +3457,16 @@ filebeat.inputs:
# Overrides the `cloud.provider` field for non-AWS S3 buckets. See docs for auto recognized providers.
#provider: minio

# Configures backing up processed files to another (or the same) bucket
#backup_to_bucket_arn: 'arn:aws:s3:::mybucket'
#non_aws_backup_to_bucket_name: 'mybucket'

# Sets a prefix to prepend to object keys when backing up
#backup_to_bucket_prefix: 'backup/'

# Controls deletion of objects after backing them up
#delete_after_backup: false

#------------------------------ AWS CloudWatch input --------------------------------
# Beta: Config options for AWS CloudWatch input
#- type: aws-cloudwatch
Expand Down
37 changes: 36 additions & 1 deletion x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type config struct {
ReaderConfig readerConfig `config:",inline"` // Reader options to apply when no file_selectors are used.
PathStyle bool `config:"path_style"`
ProviderOverride string `config:"provider"`
BackupConfig backupConfig `config:",inline"`
}

func defaultConfig() config {
Expand Down Expand Up @@ -104,12 +105,46 @@ func (c *config) Validate() error {
return errors.New("path_style can only be used when polling non-AWS S3 services")
}
if c.ProviderOverride != "" && c.NonAWSBucketName == "" {
return errors.New("provider can only be overriden when polling non-AWS S3 services")
return errors.New("provider can only be overridden when polling non-AWS S3 services")
}
if c.BackupConfig.NonAWSBackupToBucketName != "" && c.NonAWSBucketName == "" {
return errors.New("backup to non-AWS bucket can only be used for non-AWS sources")
}
if c.BackupConfig.BackupToBucketArn != "" && c.BucketARN == "" {
return errors.New("backup to AWS bucket can only be used for AWS sources")
}
if c.BackupConfig.BackupToBucketArn != "" && c.BackupConfig.NonAWSBackupToBucketName != "" {
return errors.New("backup_to_bucket_arn and non_aws_backup_to_bucket_name cannot be used together")
}
if c.BackupConfig.GetBucketName() != "" && c.QueueURL == "" {
if (c.BackupConfig.BackupToBucketArn != "" && c.BackupConfig.BackupToBucketArn == c.BucketARN) ||
(c.BackupConfig.NonAWSBackupToBucketName != "" && c.BackupConfig.NonAWSBackupToBucketName == c.NonAWSBucketName) {
if c.BackupConfig.BackupToBucketPrefix == "" {
return errors.New("backup_to_bucket_prefix is a required property when source and backup bucket are the same")
}
if c.BackupConfig.BackupToBucketPrefix == c.BucketListPrefix {
return errors.New("backup_to_bucket_prefix cannot be the same as bucket_list_prefix, this will create an infinite loop")
}
}
}

return nil
}

type backupConfig struct {
BackupToBucketArn string `config:"backup_to_bucket_arn"`
NonAWSBackupToBucketName string `config:"non_aws_backup_to_bucket_name"`
BackupToBucketPrefix string `config:"backup_to_bucket_prefix"`
Delete bool `config:"delete_after_backup"`
}

func (c *backupConfig) GetBucketName() string {
if c.BackupToBucketArn != "" {
return getBucketNameFromARN(c.BackupToBucketArn)
}
return c.NonAWSBackupToBucketName
}

// fileSelectorConfig defines reader configuration that applies to a subset
// of S3 objects whose URL matches the given regex.
type fileSelectorConfig struct {
Expand Down
127 changes: 125 additions & 2 deletions x-pack/filebeat/input/awss3/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func TestConfig(t *testing.T) {
"number_of_workers": 5,
"provider": "asdf",
},
"provider can only be overriden when polling non-AWS S3 services",
"provider can only be overridden when polling non-AWS S3 services",
nil,
},
{
Expand All @@ -374,7 +374,130 @@ func TestConfig(t *testing.T) {
"number_of_workers": 5,
"provider": "asdf",
},
"provider can only be overriden when polling non-AWS S3 services",
"provider can only be overridden when polling non-AWS S3 services",
nil,
},
{
"backup_to_bucket with AWS",
"",
s3Bucket,
"",
mapstr.M{
"bucket_arn": s3Bucket,
"backup_to_bucket_arn": "arn:aws:s3:::bBucket",
"backup_to_bucket_prefix": "backup",
"number_of_workers": 5,
},
"",
func(queueURL, s3Bucket string, nonAWSS3Bucket string) config {
c := makeConfig("", s3Bucket, "")
c.BackupConfig.BackupToBucketArn = "arn:aws:s3:::bBucket"
c.BackupConfig.BackupToBucketPrefix = "backup"
c.NumberOfWorkers = 5
return c
},
},
{
"backup_to_bucket with non-AWS",
"",
"",
nonAWSS3Bucket,
mapstr.M{
"non_aws_bucket_name": nonAWSS3Bucket,
"non_aws_backup_to_bucket_name": "bBucket",
"backup_to_bucket_prefix": "backup",
"number_of_workers": 5,
},
"",
func(queueURL, s3Bucket string, nonAWSS3Bucket string) config {
c := makeConfig("", "", nonAWSS3Bucket)
c.NonAWSBucketName = nonAWSS3Bucket
c.BackupConfig.NonAWSBackupToBucketName = "bBucket"
c.BackupConfig.BackupToBucketPrefix = "backup"
c.NumberOfWorkers = 5
return c
},
},
{
"error with non-AWS backup and AWS source",
"",
s3Bucket,
"",
mapstr.M{
"bucket_arn": s3Bucket,
"non_aws_backup_to_bucket_name": "bBucket",
"number_of_workers": 5,
},
"backup to non-AWS bucket can only be used for non-AWS sources",
nil,
},
{
"error with AWS backup and non-AWS source",
"",
"",
nonAWSS3Bucket,
mapstr.M{
"non_aws_bucket_name": nonAWSS3Bucket,
"backup_to_bucket_arn": "arn:aws:s3:::bBucket",
"number_of_workers": 5,
},
"backup to AWS bucket can only be used for AWS sources",
nil,
},
{
aspacca marked this conversation as resolved.
Show resolved Hide resolved
"error with same bucket backup and empty backup prefix",
"",
s3Bucket,
"",
mapstr.M{
"bucket_arn": s3Bucket,
"backup_to_bucket_arn": s3Bucket,
"number_of_workers": 5,
},
"backup_to_bucket_prefix is a required property when source and backup bucket are the same",
nil,
},
{
"error with same bucket backup (non-AWS) and empty backup prefix",
"",
"",
nonAWSS3Bucket,
mapstr.M{
"non_aws_bucket_name": nonAWSS3Bucket,
"non_aws_backup_to_bucket_name": nonAWSS3Bucket,
"number_of_workers": 5,
},
"backup_to_bucket_prefix is a required property when source and backup bucket are the same",
nil,
},
{
"error with same bucket backup and backup prefix equal to list prefix",
"",
s3Bucket,
"",
mapstr.M{
"bucket_arn": s3Bucket,
"backup_to_bucket_arn": s3Bucket,
"number_of_workers": 5,
"backup_to_bucket_prefix": "processed_",
"bucket_list_prefix": "processed_",
},
"backup_to_bucket_prefix cannot be the same as bucket_list_prefix, this will create an infinite loop",
nil,
},
{
"error with same bucket backup (non-AWS) and backup prefix equal to list prefix",
"",
"",
nonAWSS3Bucket,
mapstr.M{
"non_aws_bucket_name": nonAWSS3Bucket,
"non_aws_backup_to_bucket_name": nonAWSS3Bucket,
"number_of_workers": 5,
"backup_to_bucket_prefix": "processed_",
"bucket_list_prefix": "processed_",
},
"backup_to_bucket_prefix cannot be the same as bucket_list_prefix, this will create an infinite loop",
nil,
},
}
Expand Down
45 changes: 33 additions & 12 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,38 @@ func (im *s3InputManager) Create(cfg *conf.C) (v2.Input, error) {
return nil, err
}

return newInput(config, im.store)
return newInput(config, im.store, true)
}

func (im *s3InputManager) CreateWithoutClosingMetrics(cfg *conf.C) (v2.Input, error) {
// This smells, but since we call metrics.Close() on metrics that are not injectable in integration test we need this
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

return newInput(config, im.store, false)
}

// s3Input is a input for reading logs from S3 when triggered by an SQS message.
type s3Input struct {
config config
awsConfig awssdk.Config
store beater.StateStore
closeMetrics bool
config config
awsConfig awssdk.Config
store beater.StateStore
}

func newInput(config config, store beater.StateStore) (*s3Input, error) {
func newInput(config config, store beater.StateStore, closeMetrics bool) (*s3Input, error) {
awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err)
}

return &s3Input{
config: config,
awsConfig: awsConfig,
store: store,
closeMetrics: closeMetrics,
config: config,
awsConfig: awsConfig,
store: store,
}, nil
}

Expand Down Expand Up @@ -120,7 +132,9 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
if err != nil {
return fmt.Errorf("failed to initialize sqs receiver: %w", err)
}
defer receiver.metrics.Close()
if in.closeMetrics {
defer receiver.metrics.Close()
}

if err := receiver.Receive(ctx); err != nil {
return err
Expand Down Expand Up @@ -148,7 +162,9 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
if err != nil {
return fmt.Errorf("failed to initialize s3 poller: %w", err)
}
defer poller.metrics.Close()
if in.closeMetrics {
defer poller.metrics.Close()
}

if err := poller.Poll(ctx); err != nil {
return err
Expand Down Expand Up @@ -185,6 +201,11 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s
log.Infof("AWS SQS visibility_timeout is set to %v.", in.config.VisibilityTimeout)
log.Infof("AWS SQS max_number_of_messages is set to %v.", in.config.MaxNumberOfMessages)

if in.config.BackupConfig.GetBucketName() != "" {
log.Warnf("You have the backup_to_bucket functionality activated with SQS. Please make sure to set appropriate destination buckets" +
"or prefixes to avoid an infinite loop.")
}

fileSelectors := in.config.FileSelectors
if len(in.config.FileSelectors) == 0 {
fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}}
Expand All @@ -194,7 +215,7 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s
return nil, err
}
metrics := newInputMetrics(ctx.ID, nil)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, fileSelectors)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, fileSelectors, in.config.BackupConfig)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory)
sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler)

Expand Down Expand Up @@ -267,7 +288,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}}
}
metrics := newInputMetrics(ctx.ID, nil)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, fileSelectors)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, fileSelectors, in.config.BackupConfig)
s3Poller := newS3Poller(log.Named("s3_poller"),
metrics,
s3API,
Expand Down
Loading