diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4cbe798909e1..8f439b0e5fcd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -116,6 +116,19 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015] - Fix streaming input handling of invalid or empty websocket messages. {pull}42036[42036] - Fix awss3 document ID construction when using the CSV decoder. {pull}42019[42019] +- Fixed failed job handling and removed false-positive error logs in the GCS input. {pull}41142[41142] +- Bump github.com/elastic/go-sfdc dependency used by x-pack/filebeat/input/salesforce. {pull}41192[41192] +- Log bad handshake details when websocket connection fails {pull}41300[41300] +- Improve modification time handling for entities and entity deletion logic in the Active Directory entityanalytics input. {pull}41179[41179] +- Journald input now can read events from all boots {issue}41083[41083] {pull}41244[41244] +- Fix double encoding of client_secret in the Entity Analytics input's Azure Active Directory provider {pull}41393[41393] +- Fix aws region in aws-s3 input s3 polling mode. {pull}41572[41572] +- Fix errors in SQS host resolution in the `aws-s3` input when using custom (non-AWS) endpoints. {pull}41504[41504] +- The azure-eventhub input now correctly reports its status to the Elastic Agent on fatal errors {pull}41469[41469] +- Add support for Access Points in the `aws-s3` input. {pull}41495[41495] +- Fix the "No such input type exist: 'salesforce'" error on the Windows/AIX platform. {pull}41664[41664] +- Fix missing key in streaming input logging. {pull}41600[41600] +- Improve S3 object size metric calculation to support situations where Content-Length is not available. {pull}41755[41755] *Heartbeat* diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 3be07437a50e..0ebcaeb1a92c 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -36,7 +36,7 @@ func init() { // currentTime returns the current time. This exists to allow unit tests // simulate the passage of time. func currentTime() time.Time { - clock := clockValue.Load().(clock) + clock, _ := clockValue.Load().(clock) return clock.Now() } @@ -206,18 +206,26 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers return out } -// monitoredReader implements io.Reader and counts the number of bytes read. +// monitoredReader implements io.Reader and wraps byte read tracking fields for S3 bucket objects. +// Following are the tracked metrics, +// - totalBytesReadMetric - a total metric tracking bytes reads throughout the runtime from all processed objects +// - totalBytesReadCurrent - total bytes read from the currently tracked object +// +// See newMonitoredReader for initialization considerations. type monitoredReader struct { - reader io.Reader - totalBytesRead *monitoring.Uint + reader io.Reader + totalBytesReadMetric *monitoring.Uint + totalBytesReadCurrent int64 } +// newMonitoredReader initialize the monitoredReader with a shared monitor that tracks all bytes read. func newMonitoredReader(r io.Reader, metric *monitoring.Uint) *monitoredReader { - return &monitoredReader{reader: r, totalBytesRead: metric} + return &monitoredReader{reader: r, totalBytesReadMetric: metric} } func (m *monitoredReader) Read(p []byte) (int, error) { n, err := m.reader.Read(p) - m.totalBytesRead.Add(uint64(n)) + m.totalBytesReadMetric.Add(uint64(n)) + m.totalBytesReadCurrent += int64(n) return n, err } diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index 2fafb2e4ef60..c36bd7858f96 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -51,7 +51,6 @@ type s3ObjectProcessor struct { type s3DownloadedObject struct { body io.ReadCloser - length int64 contentType string metadata map[string]interface{} } @@ -142,9 +141,9 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func defer s3Obj.body.Close() p.s3Metadata = s3Obj.metadata - p.metrics.s3ObjectSizeInBytes.Update(s3Obj.length) - reader, err := p.addGzipDecoderIfNeeded(newMonitoredReader(s3Obj.body, p.metrics.s3BytesProcessedTotal)) + mReader := newMonitoredReader(s3Obj.body, p.metrics.s3BytesProcessedTotal) + reader, err := p.addGzipDecoderIfNeeded(mReader) if err != nil { return fmt.Errorf("failed checking for gzip content: %w", err) } @@ -213,6 +212,9 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func time.Since(start).Nanoseconds(), err) } + // finally obtain total bytes of the object through metered reader + p.metrics.s3ObjectSizeInBytes.Update(mReader.totalBytesReadCurrent) + return nil } @@ -241,7 +243,6 @@ func (p *s3ObjectProcessor) download() (obj *s3DownloadedObject, err error) { s := &s3DownloadedObject{ body: getObjectOutput.Body, - length: *getObjectOutput.ContentLength, contentType: ctType, metadata: meta, } diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index d20d81ced6c8..432c5209d258 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -289,6 +289,76 @@ func TestS3ObjectProcessor(t *testing.T) { }) } +func TestProcessObjectMetricCollection(t *testing.T) { + logger := logp.NewLogger("testing-s3-processor-metrics") + + tests := []struct { + name string + filename string + contentType string + objectSize int64 + }{ + { + name: "simple text - octet-stream", + filename: "testdata/log.txt", + contentType: "application/octet-stream", + objectSize: 18, + }, + { + name: "json text", + filename: "testdata/log.json", + contentType: "application/json", + objectSize: 199, + }, + { + name: "gzip with json text", + filename: "testdata/multiline.json.gz", + contentType: "application/x-gzip", + objectSize: 175, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // given + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + + s3Event, s3Resp := newS3Object(t, test.filename, test.contentType) + mockS3API := NewMockS3API(ctrl) + gomock.InOrder( + mockS3API.EXPECT(). + GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). + Return(s3Resp, nil), + ) + + // metric recorder with zero workers + metricRecorder := newInputMetrics(test.name, nil, 0) + objFactory := newS3ObjectProcessorFactory(metricRecorder, mockS3API, nil, backupConfig{}) + objHandler := objFactory.Create(ctx, s3Event) + + // when + err := objHandler.ProcessS3Object(logger, func(_ beat.Event) {}) + + // then + require.NoError(t, err) + + require.Equal(t, uint64(1), metricRecorder.s3ObjectsRequestedTotal.Get()) + require.Equal(t, uint64(0), metricRecorder.s3ObjectsInflight.Get()) + + values := metricRecorder.s3ObjectSizeInBytes.Values() + require.Equal(t, 1, len(values)) + + // since we processed a single object, total and current process size is same + require.Equal(t, test.objectSize, values[0]) + require.Equal(t, uint64(test.objectSize), metricRecorder.s3BytesProcessedTotal.Get()) + }) + } +} + func testProcessS3Object(t testing.TB, file, contentType string, numEvents int, selectors ...fileSelectorConfig) []beat.Event { return _testProcessS3Object(t, file, contentType, numEvents, false, selectors) }