diff --git a/.chloggen/kafka-exporter-key-by-metric-resources.yaml b/.chloggen/kafka-exporter-key-by-metric-resources.yaml new file mode 100644 index 000000000000..21207a81524c --- /dev/null +++ b/.chloggen/kafka-exporter-key-by-metric-resources.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkaexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: add an ability to publish kafka messages with message key based on metric resource attributes - it will allow partitioning metrics in Kafka. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29433, 30666, 31675] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user, api] diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index 4788c7831b21..2ac969d72edf 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -36,6 +36,7 @@ The following settings can be optionally configured: - The following encodings are valid *only* for **logs**. - `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded. - `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default. +- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka. - `auth` - `plain_text` - `username`: The username to use. diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go index daa387bfb91f..baefca3ce23a 100644 --- a/exporter/kafkaexporter/config.go +++ b/exporter/kafkaexporter/config.go @@ -48,6 +48,8 @@ type Config struct { // trace ID as the message key by default. PartitionTracesByID bool `mapstructure:"partition_traces_by_id"` + PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"` + // Metadata is the namespace for metadata management properties used by the // Client, and shared by the Producer/Consumer. Metadata Metadata `mapstructure:"metadata"` diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go index ba1dfd7bef91..4b43f948ebe3 100644 --- a/exporter/kafkaexporter/config_test.go +++ b/exporter/kafkaexporter/config_test.go @@ -55,11 +55,12 @@ func TestLoadConfig(t *testing.T) { NumConsumers: 2, QueueSize: 10, }, - Topic: "spans", - Encoding: "otlp_proto", - PartitionTracesByID: true, - Brokers: []string{"foo:123", "bar:456"}, - ClientID: "test_client_id", + Topic: "spans", + Encoding: "otlp_proto", + PartitionTracesByID: true, + PartitionMetricsByResourceAttributes: true, + Brokers: []string{"foo:123", "bar:456"}, + ClientID: "test_client_id", Authentication: kafka.Authentication{ PlainText: &kafka.PlainTextConfig{ Username: "jdoe", @@ -109,11 +110,12 @@ func TestLoadConfig(t *testing.T) { NumConsumers: 2, QueueSize: 10, }, - Topic: "spans", - Encoding: "otlp_proto", - PartitionTracesByID: true, - Brokers: []string{"foo:123", "bar:456"}, - ClientID: "test_client_id", + Topic: "spans", + Encoding: "otlp_proto", + PartitionTracesByID: true, + PartitionMetricsByResourceAttributes: true, + Brokers: []string{"foo:123", "bar:456"}, + ClientID: "test_client_id", Authentication: kafka.Authentication{ PlainText: &kafka.PlainTextConfig{ Username: "jdoe", @@ -165,6 +167,7 @@ func TestLoadConfig(t *testing.T) { Topic: "spans", Encoding: "otlp_proto", PartitionTracesByID: true, + PartitionMetricsByResourceAttributes: true, Brokers: []string{"foo:123", "bar:456"}, ClientID: "test_client_id", ResolveCanonicalBootstrapServersOnly: true, diff --git a/exporter/kafkaexporter/factory.go b/exporter/kafkaexporter/factory.go index e822b0a005f4..d990a17dab8d 100644 --- a/exporter/kafkaexporter/factory.go +++ b/exporter/kafkaexporter/factory.go @@ -38,6 +38,8 @@ const ( defaultCompression = "none" // default from sarama.NewConfig() defaultFluxMaxMessages = 0 + // partitioning metrics by resource attributes is disabled by default + defaultPartitionMetricsByResourceAttributesEnabled = false ) // FactoryOption applies changes to kafkaExporterFactory. @@ -97,8 +99,9 @@ func createDefaultConfig() component.Config { Brokers: []string{defaultBroker}, ClientID: defaultClientID, // using an empty topic to track when it has not been set by user, default is based on traces or metrics. - Topic: "", - Encoding: defaultEncoding, + Topic: "", + Encoding: defaultEncoding, + PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled, Metadata: Metadata{ Full: defaultMetadataFull, Retry: MetadataRetry{ diff --git a/exporter/kafkaexporter/go.mod b/exporter/kafkaexporter/go.mod index 35b6d602464f..1ec42f4660ff 100644 --- a/exporter/kafkaexporter/go.mod +++ b/exporter/kafkaexporter/go.mod @@ -10,6 +10,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.99.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.99.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.99.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.99.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.99.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.99.0 github.com/openzipkin/zipkin-go v0.4.2 diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index c98abdc674c0..c6b82dcfc66a 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -211,6 +211,12 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m if marshaler == nil { return nil, errUnrecognizedEncoding } + if config.PartitionMetricsByResourceAttributes { + if keyableMarshaler, ok := marshaler.(KeyableMetricsMarshaler); ok { + keyableMarshaler.Key() + } + } + return &kafkaMetricsProducer{ cfg: config, topic: config.Topic, diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go index 0d0cfba637f5..5f40379d75a2 100644 --- a/exporter/kafkaexporter/marshaler_test.go +++ b/exporter/kafkaexporter/marshaler_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" ) @@ -71,6 +72,69 @@ func TestDefaultLogsMarshalers(t *testing.T) { } } +func TestOTLPMetricsJsonMarshaling(t *testing.T) { + tests := []struct { + name string + keyEnabled bool + messagePartitionKeys []sarama.Encoder + }{ + { + name: "partitioning_disabled", + keyEnabled: false, + messagePartitionKeys: []sarama.Encoder{nil}, + }, + { + name: "partitioning_enabled", + keyEnabled: true, + messagePartitionKeys: []sarama.Encoder{ + sarama.ByteEncoder{0x62, 0x7f, 0x20, 0x34, 0x85, 0x49, 0x55, 0x2e, 0xfa, 0x93, 0xae, 0xd7, 0xde, 0x91, 0xd7, 0x16}, + sarama.ByteEncoder{0x75, 0x6b, 0xb4, 0xd6, 0xff, 0xeb, 0x92, 0x22, 0xa, 0x68, 0x65, 0x48, 0xe0, 0xd3, 0x94, 0x44}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metric := pmetric.NewMetrics() + r := pcommon.NewResource() + r.Attributes().PutStr("service.name", "my_service_name") + r.Attributes().PutStr("service.instance.id", "kek_x_1") + r.CopyTo(metric.ResourceMetrics().AppendEmpty().Resource()) + + rm := metric.ResourceMetrics().At(0) + rm.SetSchemaUrl(conventions.SchemaURL) + + sm := rm.ScopeMetrics().AppendEmpty() + pmetric.NewScopeMetrics() + m := sm.Metrics().AppendEmpty() + m.SetEmptyGauge() + m.Gauge().DataPoints().AppendEmpty().SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(1, 0))) + m.Gauge().DataPoints().At(0).Attributes().PutStr("gauage_attribute", "attr") + m.Gauge().DataPoints().At(0).SetDoubleValue(1.0) + + r1 := pcommon.NewResource() + r1.Attributes().PutStr("service.instance.id", "kek_x_2") + r1.Attributes().PutStr("service.name", "my_service_name") + r1.CopyTo(metric.ResourceMetrics().AppendEmpty().Resource()) + + standardMarshaler := metricsMarshalers()["otlp_json"] + keyableMarshaler, ok := standardMarshaler.(KeyableMetricsMarshaler) + require.True(t, ok, "Must be a KeyableMetricsMarshaler") + if tt.keyEnabled { + keyableMarshaler.Key() + } + + msgs, err := standardMarshaler.Marshal(metric, "KafkaTopicX") + require.NoError(t, err, "Must have marshaled the data without error") + + require.Len(t, msgs, len(tt.messagePartitionKeys), "Number of messages must be %d, but was %d", len(tt.messagePartitionKeys), len(msgs)) + + for i := 0; i < len(tt.messagePartitionKeys); i++ { + require.Equal(t, tt.messagePartitionKeys[i], msgs[i].Key, "message %d has incorrect key", i) + } + }) + } +} + func TestOTLPTracesJsonMarshaling(t *testing.T) { t.Parallel() diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index d9e38dd52caf..3429cdd8316e 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -11,6 +11,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" ) type pdataLogsMarshaler struct { @@ -42,22 +43,58 @@ func newPdataLogsMarshaler(marshaler plog.Marshaler, encoding string) LogsMarsha } } +// KeyableMetricsMarshaler is an extension of the MetricsMarshaler interface intended to provide partition key capabilities +// for metrics messages +type KeyableMetricsMarshaler interface { + MetricsMarshaler + Key() +} + type pdataMetricsMarshaler struct { marshaler pmetric.Marshaler encoding string + keyed bool +} + +// Key configures the pdataMetricsMarshaler to set the message key on the kafka messages +func (p *pdataMetricsMarshaler) Key() { + p.keyed = true } func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sarama.ProducerMessage, error) { - bts, err := p.marshaler.MarshalMetrics(ld) - if err != nil { - return nil, err - } - return []*sarama.ProducerMessage{ - { + var msgs []*sarama.ProducerMessage + if p.keyed { + metrics := ld.ResourceMetrics() + + for i := 0; i < metrics.Len(); i++ { + resourceMetrics := metrics.At(i) + var hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes()) + + newMetrics := pmetric.NewMetrics() + resourceMetrics.CopyTo(newMetrics.ResourceMetrics().AppendEmpty()) + + bts, err := p.marshaler.MarshalMetrics(newMetrics) + if err != nil { + return nil, err + } + msgs = append(msgs, &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(bts), + Key: sarama.ByteEncoder(hash[:]), + }) + } + } else { + bts, err := p.marshaler.MarshalMetrics(ld) + if err != nil { + return nil, err + } + msgs = append(msgs, &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(bts), - }, - }, nil + }) + } + + return msgs, nil } func (p pdataMetricsMarshaler) Encoding() string { @@ -65,13 +102,13 @@ func (p pdataMetricsMarshaler) Encoding() string { } func newPdataMetricsMarshaler(marshaler pmetric.Marshaler, encoding string) MetricsMarshaler { - return pdataMetricsMarshaler{ + return &pdataMetricsMarshaler{ marshaler: marshaler, encoding: encoding, } } -// KeyableTracesMarshaler is an extension of the TracesMarshaler interface inteded to provide partition key capabilities +// KeyableTracesMarshaler is an extension of the TracesMarshaler interface intended to provide partition key capabilities // for trace messages type KeyableTracesMarshaler interface { TracesMarshaler diff --git a/exporter/kafkaexporter/testdata/config.yaml b/exporter/kafkaexporter/testdata/config.yaml index 15624b521b10..7c89bea74ade 100644 --- a/exporter/kafkaexporter/testdata/config.yaml +++ b/exporter/kafkaexporter/testdata/config.yaml @@ -13,6 +13,7 @@ kafka: required_acks: -1 # WaitForAll timeout: 10s partition_traces_by_id: true + partition_metrics_by_resource_attributes: true auth: plain_text: username: jdoe diff --git a/receiver/kafkareceiver/go.mod b/receiver/kafkareceiver/go.mod index fa4562a1811e..adcb79ddec9a 100644 --- a/receiver/kafkareceiver/go.mod +++ b/receiver/kafkareceiver/go.mod @@ -64,6 +64,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.99.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.99.0 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.19.0 // indirect