Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x](backport #41755) [AWS] [S3] fix: improve object size metric calculation #41778

Merged
merged 3 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix aws region in aws-s3 input s3 polling mode. {pull}41572[41572]
- Add support for Access Points in the `aws-s3` input. {pull}41495[41495]
- Fix the "No such input type exist: 'salesforce'" error on the Windows/AIX platform. {pull}41664[41664]
- Improve S3 object size metric calculation to support situations where Content-Length is not available. {pull}41755[41755]

*Heartbeat*

Expand Down
20 changes: 14 additions & 6 deletions x-pack/filebeat/input/awss3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func init() {
// currentTime returns the current time. This exists to allow unit tests
// simulate the passage of time.
func currentTime() time.Time {
clock := clockValue.Load().(clock)
clock, _ := clockValue.Load().(clock)
return clock.Now()
}

Expand Down Expand Up @@ -206,18 +206,26 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers
return out
}

// monitoredReader implements io.Reader and counts the number of bytes read.
// monitoredReader implements io.Reader and wraps byte read tracking fields for S3 bucket objects.
// Following are the tracked metrics,
// - totalBytesReadMetric - a total metric tracking bytes reads throughout the runtime from all processed objects
// - totalBytesReadCurrent - total bytes read from the currently tracked object
//
// See newMonitoredReader for initialization considerations.
type monitoredReader struct {
reader io.Reader
totalBytesRead *monitoring.Uint
reader io.Reader
totalBytesReadMetric *monitoring.Uint
totalBytesReadCurrent int64
}

// newMonitoredReader initialize the monitoredReader with a shared monitor that tracks all bytes read.
func newMonitoredReader(r io.Reader, metric *monitoring.Uint) *monitoredReader {
return &monitoredReader{reader: r, totalBytesRead: metric}
return &monitoredReader{reader: r, totalBytesReadMetric: metric}
}

func (m *monitoredReader) Read(p []byte) (int, error) {
n, err := m.reader.Read(p)
m.totalBytesRead.Add(uint64(n))
m.totalBytesReadMetric.Add(uint64(n))
m.totalBytesReadCurrent += int64(n)
return n, err
}
9 changes: 5 additions & 4 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ type s3ObjectProcessor struct {

type s3DownloadedObject struct {
body io.ReadCloser
length int64
contentType string
metadata map[string]interface{}
}
Expand Down Expand Up @@ -142,9 +141,9 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func
defer s3Obj.body.Close()

p.s3Metadata = s3Obj.metadata
p.metrics.s3ObjectSizeInBytes.Update(s3Obj.length)

reader, err := p.addGzipDecoderIfNeeded(newMonitoredReader(s3Obj.body, p.metrics.s3BytesProcessedTotal))
mReader := newMonitoredReader(s3Obj.body, p.metrics.s3BytesProcessedTotal)
reader, err := p.addGzipDecoderIfNeeded(mReader)
if err != nil {
return fmt.Errorf("failed checking for gzip content: %w", err)
}
Expand Down Expand Up @@ -213,6 +212,9 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func
time.Since(start).Nanoseconds(), err)
}

// finally obtain total bytes of the object through metered reader
p.metrics.s3ObjectSizeInBytes.Update(mReader.totalBytesReadCurrent)

return nil
}

Expand Down Expand Up @@ -241,7 +243,6 @@ func (p *s3ObjectProcessor) download() (obj *s3DownloadedObject, err error) {

s := &s3DownloadedObject{
body: getObjectOutput.Body,
length: *getObjectOutput.ContentLength,
contentType: ctType,
metadata: meta,
}
Expand Down
70 changes: 70 additions & 0 deletions x-pack/filebeat/input/awss3/s3_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,76 @@ func TestS3ObjectProcessor(t *testing.T) {
})
}

func TestProcessObjectMetricCollection(t *testing.T) {
logger := logp.NewLogger("testing-s3-processor-metrics")

tests := []struct {
name string
filename string
contentType string
objectSize int64
}{
{
name: "simple text - octet-stream",
filename: "testdata/log.txt",
contentType: "application/octet-stream",
objectSize: 18,
},
{
name: "json text",
filename: "testdata/log.json",
contentType: "application/json",
objectSize: 199,
},
{
name: "gzip with json text",
filename: "testdata/multiline.json.gz",
contentType: "application/x-gzip",
objectSize: 175,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// given
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

ctrl, ctx := gomock.WithContext(ctx, t)
defer ctrl.Finish()

s3Event, s3Resp := newS3Object(t, test.filename, test.contentType)
mockS3API := NewMockS3API(ctrl)
gomock.InOrder(
mockS3API.EXPECT().
GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(s3Resp, nil),
)

// metric recorder with zero workers
metricRecorder := newInputMetrics(test.name, nil, 0)
objFactory := newS3ObjectProcessorFactory(metricRecorder, mockS3API, nil, backupConfig{})
objHandler := objFactory.Create(ctx, s3Event)

// when
err := objHandler.ProcessS3Object(logger, func(_ beat.Event) {})

// then
require.NoError(t, err)

require.Equal(t, uint64(1), metricRecorder.s3ObjectsRequestedTotal.Get())
require.Equal(t, uint64(0), metricRecorder.s3ObjectsInflight.Get())

values := metricRecorder.s3ObjectSizeInBytes.Values()
require.Equal(t, 1, len(values))

// since we processed a single object, total and current process size is same
require.Equal(t, test.objectSize, values[0])
require.Equal(t, uint64(test.objectSize), metricRecorder.s3BytesProcessedTotal.Get())
})
}
}

func testProcessS3Object(t testing.TB, file, contentType string, numEvents int, selectors ...fileSelectorConfig) []beat.Event {
return _testProcessS3Object(t, file, contentType, numEvents, false, selectors)
}
Expand Down
Loading