diff --git a/.chloggen/fix-kafkametric-receiver-type.yaml b/.chloggen/fix-kafkametric-receiver-type.yaml new file mode 100644 index 000000000000..e5445b617f60 --- /dev/null +++ b/.chloggen/fix-kafkametric-receiver-type.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/kafkametricsreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Updates certain metrics in kafkametricsreceiver to function as non-monotonic sums. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [4327] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Update the metric type in KafkaMetricsReceiver from "gauge" to "nonmonotonic sum". \ No newline at end of file diff --git a/receiver/kafkametricsreceiver/broker_scraper_test.go b/receiver/kafkametricsreceiver/broker_scraper_test.go index 5be948ad1340..37079cb16c97 100644 --- a/receiver/kafkametricsreceiver/broker_scraper_test.go +++ b/receiver/kafkametricsreceiver/broker_scraper_test.go @@ -106,7 +106,7 @@ func TestBrokerScraper_scrape(t *testing.T) { assert.NoError(t, err) expectedDp := int64(len(testBrokers)) receivedMetrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) - receivedDp := receivedMetrics.Gauge().DataPoints().At(0).IntValue() + receivedDp := receivedMetrics.Sum().DataPoints().At(0).IntValue() assert.Equal(t, expectedDp, receivedDp) } diff --git a/receiver/kafkametricsreceiver/documentation.md b/receiver/kafkametricsreceiver/documentation.md index 58157c3eb81e..c831edcdfe53 100644 --- a/receiver/kafkametricsreceiver/documentation.md +++ b/receiver/kafkametricsreceiver/documentation.md @@ -16,9 +16,9 @@ metrics: Number of brokers in the cluster. -| Unit | Metric Type | Value Type | -| ---- | ----------- | ---------- | -| {brokers} | Gauge | Int | +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {brokers} | Sum | Int | Delta | false | ### kafka.consumer_group.lag diff --git a/receiver/kafkametricsreceiver/internal/metadata/generated_metrics.go b/receiver/kafkametricsreceiver/internal/metadata/generated_metrics.go index cabb3f8548a0..40663a91216f 100644 --- a/receiver/kafkametricsreceiver/internal/metadata/generated_metrics.go +++ b/receiver/kafkametricsreceiver/internal/metadata/generated_metrics.go @@ -22,14 +22,16 @@ func (m *metricKafkaBrokers) init() { m.data.SetName("kafka.brokers") m.data.SetDescription("Number of brokers in the cluster.") m.data.SetUnit("{brokers}") - m.data.SetEmptyGauge() + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(false) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) } func (m *metricKafkaBrokers) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { if !m.config.Enabled { return } - dp := m.data.Gauge().DataPoints().AppendEmpty() + dp := m.data.Sum().DataPoints().AppendEmpty() dp.SetStartTimestamp(start) dp.SetTimestamp(ts) dp.SetIntValue(val) @@ -37,14 +39,14 @@ func (m *metricKafkaBrokers) recordDataPoint(start pcommon.Timestamp, ts pcommon // updateCapacity saves max length of data point slices that will be used for the slice capacity. func (m *metricKafkaBrokers) updateCapacity() { - if m.data.Gauge().DataPoints().Len() > m.capacity { - m.capacity = m.data.Gauge().DataPoints().Len() + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() } } // emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. func (m *metricKafkaBrokers) emit(metrics pmetric.MetricSlice) { - if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { m.updateCapacity() m.data.MoveTo(metrics.AppendEmpty()) m.init() diff --git a/receiver/kafkametricsreceiver/internal/metadata/generated_metrics_test.go b/receiver/kafkametricsreceiver/internal/metadata/generated_metrics_test.go index bdbc21b3428d..84f7e3ecff3a 100644 --- a/receiver/kafkametricsreceiver/internal/metadata/generated_metrics_test.go +++ b/receiver/kafkametricsreceiver/internal/metadata/generated_metrics_test.go @@ -126,11 +126,13 @@ func TestMetricsBuilder(t *testing.T) { case "kafka.brokers": assert.False(t, validatedMetrics["kafka.brokers"], "Found a duplicate in the metrics slice: kafka.brokers") validatedMetrics["kafka.brokers"] = true - assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) - assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) assert.Equal(t, "Number of brokers in the cluster.", ms.At(i).Description()) assert.Equal(t, "{brokers}", ms.At(i).Unit()) - dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, false, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityDelta, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) assert.Equal(t, start, dp.StartTimestamp()) assert.Equal(t, ts, dp.Timestamp()) assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) diff --git a/receiver/kafkametricsreceiver/metadata.yaml b/receiver/kafkametricsreceiver/metadata.yaml index 4f531c8ed8ce..5e998d12f750 100644 --- a/receiver/kafkametricsreceiver/metadata.yaml +++ b/receiver/kafkametricsreceiver/metadata.yaml @@ -18,14 +18,16 @@ attributes: type: string metrics: -# brokers scraper + # brokers scraper kafka.brokers: enabled: true description: Number of brokers in the cluster. unit: "{brokers}" - gauge: + sum: + monotonic: false value_type: int -# topics scraper + aggregation: delta + # topics scraper kafka.topic.partitions: enabled: true description: Number of partitions in topic. @@ -61,7 +63,7 @@ metrics: gauge: value_type: int attributes: [topic, partition] -# consumers scraper + # consumers scraper kafka.consumer_group.members: enabled: true description: Count of members in the consumer group @@ -96,4 +98,4 @@ metrics: unit: 1 gauge: value_type: int - attributes: [group, topic] + attributes: [group, topic] \ No newline at end of file diff --git a/receiver/kafkametricsreceiver/testdata/integration/expected.yaml b/receiver/kafkametricsreceiver/testdata/integration/expected.yaml index baf81546fd9e..af55af7f89a9 100644 --- a/receiver/kafkametricsreceiver/testdata/integration/expected.yaml +++ b/receiver/kafkametricsreceiver/testdata/integration/expected.yaml @@ -3,7 +3,8 @@ resourceMetrics: scopeMetrics: - metrics: - description: Number of brokers in the cluster. - gauge: + sum: + aggregationTemporality: 1 dataPoints: - asInt: "1" startTimeUnixNano: "1685063120199110000"