diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c96b19f6685..5b8350bcd6e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -37,8 +37,6 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix a race condition in the TCP input when close the client socket. {pull}13038[13038] - cisco/asa fileset: Renamed log.original to event.original and cisco.asa.list_id to cisco.asa.rule_name. {pull}13286[13286] - cisco/asa fileset: Fix parsing of 302021 message code. {pull}13476[13476] -- Add support for gzipped files in S3 input {pull}13980[13980] -- Add support for all the ObjectCreated events in S3 input. {pull}14077[14077] *Heartbeat* @@ -181,6 +179,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix calculation of `network.bytes` and `network.packets` for bi-directional netflow events. {pull}14111[14111] - Accept '-' as http.response.body.bytes in apache module. {pull}14137[14137] - Fix timezone parsing of MySQL module ingest pipelines. {pull}14130[14130] +- Improve error message in s3 input when handleSQSMessage failed. {pull}14113[14113] *Heartbeat* @@ -368,6 +367,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add ELB fileset to AWS module. {pull}14020[14020] - Add module for MISP (Malware Information Sharing Platform). {pull}13805[13805] - Add `source.bytes` and `source.packets` for uni-directional netflow events. {pull}14111[14111] +- Add support for gzipped files in S3 input. {pull}13980[13980] +- Add support for all the ObjectCreated events in S3 input. {pull}14077[14077] - Add Kibana Dashboard for MISP module. {pull}14147[14147] - Add JSON options to autodiscover hints {pull}14208[14208] - Add more filesets to Zeek module. {pull}14150[14150] diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index e5e2175cb9d..aaa17dc2b92 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -248,7 +248,7 @@ func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, w s3Infos, err := handleSQSMessage(message) if err != nil { - p.logger.Error(errors.Wrap(err, "handleMessage failed")) + p.logger.Error(errors.Wrap(err, "handleSQSMessage failed")) return } @@ -256,8 +256,8 @@ func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, w err = p.handleS3Objects(svcS3, s3Infos, errC) if err != nil { err = errors.Wrap(err, "handleS3Objects failed") - errC <- err p.logger.Error(err) + errC <- err } } @@ -276,7 +276,10 @@ func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Mess } p.logger.Warnf("Message visibility timeout updated to %v", visibilityTimeout) } else { - p.logger.Debug("ACK done, deleting message from SQS") + // When ACK done, message will be deleted. Or when message is + // not s3 ObjectCreated event related(handleSQSMessage function + // failed), it will be removed as well. + p.logger.Debug("Deleting message from SQS: ", message.MessageId) // only delete sqs message when errC is closed with no error err := p.deleteMessage(queueURL, *message.ReceiptHandle, svcSQS) if err != nil { @@ -347,6 +350,8 @@ func handleSQSMessage(m sqs.Message) ([]s3Info, error) { key: record.S3.object.Key, arn: record.S3.bucket.Arn, }) + } else { + return nil, errors.New("this SQS queue should be dedicated to s3 ObjectCreated event notifications") } } return s3Infos, nil diff --git a/x-pack/filebeat/input/s3/input_test.go b/x-pack/filebeat/input/s3/input_test.go index 8b897de6729..9290b3f664d 100644 --- a/x-pack/filebeat/input/s3/input_test.go +++ b/x-pack/filebeat/input/s3/input_test.go @@ -74,7 +74,7 @@ func TestGetRegionFromQueueURL(t *testing.T) { } func TestHandleMessage(t *testing.T) { - cases := []struct { + casesPositive := []struct { title string message sqs.Message expectedS3Infos []s3Info @@ -91,6 +91,37 @@ func TestHandleMessage(t *testing.T) { }, }, }, + { + "sqs message with event source aws:s3 and event name ObjectCreated:CompleteMultipartUpload", + sqs.Message{ + Body: awssdk.String("{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"ap-southeast-1\",\"eventTime\":\"2019-06-21T16:16:54.629Z\",\"eventName\":\"ObjectCreated:CompleteMultipartUpload\",\"s3\":{\"configurationId\":\"object-created-event\",\"bucket\":{\"name\":\"test-s3-ks-2\",\"arn\":\"arn:aws:s3:::test-s3-ks-2\"},\"object\":{\"key\":\"server-access-logging2019-06-21-16-16-54-E68E4316CEB285AA\"}}}]}"), + }, + []s3Info{ + { + name: "test-s3-ks-2", + key: "server-access-logging2019-06-21-16-16-54-E68E4316CEB285AA", + }, + }, + }, + } + + for _, c := range casesPositive { + t.Run(c.title, func(t *testing.T) { + s3Info, err := handleSQSMessage(c.message) + assert.NoError(t, err) + assert.Equal(t, len(c.expectedS3Infos), len(s3Info)) + if len(s3Info) > 0 { + assert.Equal(t, c.expectedS3Infos[0].key, s3Info[0].key) + assert.Equal(t, c.expectedS3Infos[0].name, s3Info[0].name) + } + }) + } + + casesNegative := []struct { + title string + message sqs.Message + expectedS3Infos []s3Info + }{ { "sqs message with event source aws:s3 and event name ObjectRemoved:Delete", sqs.Message{ @@ -107,17 +138,14 @@ func TestHandleMessage(t *testing.T) { }, } - for _, c := range cases { + for _, c := range casesNegative { t.Run(c.title, func(t *testing.T) { s3Info, err := handleSQSMessage(c.message) - assert.NoError(t, err) - assert.Equal(t, len(c.expectedS3Infos), len(s3Info)) - if len(s3Info) > 0 { - assert.Equal(t, c.expectedS3Infos[0].key, s3Info[0].key) - assert.Equal(t, c.expectedS3Infos[0].name, s3Info[0].name) - } + assert.Error(t, err) + assert.Nil(t, s3Info) }) } + } func TestNewS3BucketReader(t *testing.T) {