Skip to content

Commit

Permalink
Adding aws-s3 metric for lag time (#34306)
Browse files Browse the repository at this point in the history
  • Loading branch information
kgeller authored and chrisberkhout committed Jun 1, 2023
1 parent 691d09f commit 56e6e50
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Allow user configuration of keep-alive behaviour for HTTPJSON and CEL inputs. {issue}33951[33951] {pull}34014[34014]
- Add support for polling system UDP stats for UDP input metrics. {pull}34070[34070]
- Add support for recognizing the log level in Elasticsearch JVM logs {pull}34159[34159]
- Added metric `sqs_lag_time` for aws-s3 input. {pull}34306[34306]
- Add metrics for TCP packet processing. {pull}34333[34333]
- Add metrics for unix socket packet processing. {pull}34335[34335]
- Add beta `take over` mode for `filestream` for simple migration from `log` inputs {pull}34292[34292]
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ observe the activity of the input.
| `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
| `sqs_messages_deleted_total` | Number of SQS messages deleted.
| `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
| `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.
| `s3_objects_requested_total` | Number of S3 objects downloaded.
| `s3_objects_listed_total` | Number of S3 objects returned by list operations.
| `s3_objects_processed_total` | Number of S3 objects that matched file_selectors rules.
Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func TestInputRunSQS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0)
}

func TestInputRunS3(t *testing.T) {
Expand Down Expand Up @@ -426,4 +427,5 @@ func TestInputRunSNS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0)
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (a *awsSQSAPI) ReceiveMessage(ctx context.Context, maxMessages int) ([]type
MaxNumberOfMessages: int32(min(maxMessages, sqsMaxNumberOfMessagesLimit)),
VisibilityTimeout: int32(a.visibilityTimeout.Seconds()),
WaitTimeSeconds: int32(a.longPollWaitTime.Seconds()),
AttributeNames: []types.QueueAttributeName{sqsApproximateReceiveCountAttribute},
AttributeNames: []types.QueueAttributeName{sqsApproximateReceiveCountAttribute, sqsSentTimestampAttribute},
})
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/input/awss3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type inputMetrics struct {
sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted.
sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.

s3ObjectsRequestedTotal *monitoring.Uint // Number of S3 objects downloaded.
s3ObjectsAckedTotal *monitoring.Uint // Number of S3 objects processed that were fully ACKed.
Expand Down Expand Up @@ -50,6 +51,7 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetri
sqsMessagesReturnedTotal: monitoring.NewUint(reg, "sqs_messages_returned_total"),
sqsMessagesDeletedTotal: monitoring.NewUint(reg, "sqs_messages_deleted_total"),
sqsMessageProcessingTime: metrics.NewUniformSample(1024),
sqsLagTime: metrics.NewUniformSample(1024),
s3ObjectsRequestedTotal: monitoring.NewUint(reg, "s3_objects_requested_total"),
s3ObjectsAckedTotal: monitoring.NewUint(reg, "s3_objects_acked_total"),
s3ObjectsListedTotal: monitoring.NewUint(reg, "s3_objects_listed_total"),
Expand All @@ -61,6 +63,8 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetri
}
adapter.NewGoMetrics(reg, "sqs_message_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.sqsMessageProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
adapter.NewGoMetrics(reg, "sqs_lag_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.sqsLagTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
adapter.NewGoMetrics(reg, "s3_object_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.s3ObjectProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
return out
Expand Down
34 changes: 28 additions & 6 deletions x-pack/filebeat/input/awss3/sqs_s3_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

const (
sqsApproximateReceiveCountAttribute = "ApproximateReceiveCount"
sqsSentTimestampAttribute = "SentTimestamp"
sqsInvalidParameterValueErrorCode = "InvalidParameterValue"
sqsReceiptHandleIsInvalidErrCode = "ReceiptHandleIsInvalid"
)
Expand Down Expand Up @@ -133,6 +134,18 @@ func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *types.Message
keepaliveWg.Add(1)
go p.keepalive(keepaliveCtx, log, &keepaliveWg, msg)

receiveCount := getSQSReceiveCount(msg.Attributes)
if receiveCount == 1 {
// Only contribute to the sqs_lag_time histogram on the first message
// to avoid skewing the metric when processing retries.
if s, found := msg.Attributes[sqsSentTimestampAttribute]; found {
if sentTimeMillis, err := strconv.ParseInt(s, 10, 64); err == nil {
sentTime := time.UnixMilli(sentTimeMillis)
p.metrics.sqsLagTime.Update(time.Since(sentTime).Nanoseconds())
}
}
}

handles, processingErr := p.processS3Events(ctx, log, *msg.Body)

// Stop keepalive routine before changing visibility.
Expand All @@ -155,12 +168,10 @@ func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *types.Message
if p.maxReceiveCount > 0 && !errors.Is(processingErr, &nonRetryableError{}) {
// Prevent poison pill messages from consuming all workers. Check how
// many times this message has been received before making a disposition.
if v, found := msg.Attributes[sqsApproximateReceiveCountAttribute]; found {
if receiveCount, err := strconv.Atoi(v); err == nil && receiveCount >= p.maxReceiveCount {
processingErr = nonRetryableErrorWrap(fmt.Errorf(
"sqs ApproximateReceiveCount <%v> exceeds threshold %v: %w",
receiveCount, p.maxReceiveCount, processingErr))
}
if receiveCount >= p.maxReceiveCount {
processingErr = nonRetryableErrorWrap(fmt.Errorf(
"sqs ApproximateReceiveCount <%v> exceeds threshold %v: %w",
receiveCount, p.maxReceiveCount, processingErr))
}
}

Expand Down Expand Up @@ -350,3 +361,14 @@ func (p *sqsS3EventProcessor) finalizeS3Objects(handles []s3ObjectHandler) error
}
return multierr.Combine(errs...)
}

// getSQSReceiveCount returns the SQS ApproximateReceiveCount attribute. If the value
// cannot be read then -1 is returned.
func getSQSReceiveCount(attributes map[string]string) int {
if s, found := attributes[sqsApproximateReceiveCountAttribute]; found {
if receiveCount, err := strconv.Atoi(s); err == nil {
return receiveCount
}
}
return -1
}

0 comments on commit 56e6e50

Please sign in to comment.