Skip to content

Commit

Permalink
Revert "[Filebeat] Get AWS credentials again when token expired (elas…
Browse files Browse the repository at this point in the history
…tic#17205)" (elastic#17548)

This reverts commit 07def82.
  • Loading branch information
kaiyan-sheng committed Apr 7, 2020
1 parent 4d906ec commit c71b424
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 28 deletions.
1 change: 0 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix Elasticsearch `_id` field set by S3 and Google Pub/Sub inputs. {pull}17026[17026]
- Fixed various Cisco FTD parsing issues. {issue}16863[16863] {pull}16889[16889]
- Fix default index pattern in IBM MQ filebeat dashboard. {pull}17146[17146]
- Re-obtain AWS credentials if they expired in s3 input. {pull}17205[17205]
- Fix `elasticsearch.gc` fileset to not collect _all_ logs when Elasticsearch is running in Docker. {issue}13164[13164] {issue}16583[16583] {pull}17164[17164]
- Fixed a mapping exception when ingesting CEF logs that used the spriv or dpriv extensions. {issue}17216[17216] {pull}17220[17220]
- CEF: Fixed decoding errors caused by trailing spaces in messages. {pull}17253[17253]
Expand Down
37 changes: 10 additions & 27 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,46 +178,30 @@ func (p *s3Input) Run() {
if err != nil {
p.logger.Errorf("failed to get region name from queueURL: %v", p.config.QueueURL)
}
p.awsConfig.Region = regionName

awsConfig := p.awsConfig.Copy()
awsConfig.Region = regionName

svcSQS := sqs.New(awscommon.EnrichAWSConfigWithEndpoint(p.config.AwsConfig.Endpoint, "sqs", regionName, awsConfig))
svcS3 := s3.New(awscommon.EnrichAWSConfigWithEndpoint(p.config.AwsConfig.Endpoint, "s3", regionName, awsConfig))

p.workerWg.Add(1)
go p.run(visibilityTimeout)
go p.run(svcSQS, svcS3, visibilityTimeout)
p.workerWg.Done()
})
}

func (p *s3Input) run(visibilityTimeout int64) {
func (p *s3Input) run(svcSQS sqsiface.ClientAPI, svcS3 s3iface.ClientAPI, visibilityTimeout int64) {
defer p.logger.Infof("s3 input worker for '%v' has stopped.", p.config.QueueURL)

p.logger.Infof("s3 input worker has started. with queueURL: %v", p.config.QueueURL)
svcSQS := sqs.New(awscommon.EnrichAWSConfigWithEndpoint(p.config.AwsConfig.Endpoint, "sqs", p.awsConfig.Region, p.awsConfig))

for p.context.Err() == nil {
// receive messages from sqs
output, err := p.receiveMessage(svcSQS, visibilityTimeout)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == awssdk.ErrCodeRequestCanceled {
continue
}

// When AWS credentials are expired, re-obtain AWS credentials again
if awssdk.IsErrorExpiredCreds(awsErr) {
p.logger.Warn(errors.Wrap(err, "credentials are expired, please update the credentials"))

awsConfig, err := awscommon.GetAWSCredentials(p.config.AwsConfig)
if err != nil {
p.logger.Error(errors.Wrap(err, "getAWSCredentials failed"))
continue
}

awsConfig.Region = p.awsConfig.Region
p.awsConfig = awsConfig
svcSQS = sqs.New(awscommon.EnrichAWSConfigWithEndpoint(p.config.AwsConfig.Endpoint, "sqs", p.awsConfig.Region, p.awsConfig))
continue
}
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == awssdk.ErrCodeRequestCanceled {
continue
}

p.logger.Error("failed to receive message from SQS: ", err)
time.Sleep(time.Duration(waitTimeSecond) * time.Second)
continue
Expand All @@ -229,7 +213,6 @@ func (p *s3Input) run(visibilityTimeout int64) {
}

// process messages received from sqs, get logs from s3 and create events
svcS3 := s3.New(awscommon.EnrichAWSConfigWithEndpoint(p.config.AwsConfig.Endpoint, "s3", p.awsConfig.Region, p.awsConfig))
p.processor(p.config.QueueURL, output.Messages, visibilityTimeout, svcS3, svcSQS)
}
}
Expand Down

0 comments on commit c71b424

Please sign in to comment.