Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kafka] Add option to supply destination topic through context #34503

Merged
merged 13 commits into from
Sep 17, 2024
Merged
27 changes: 27 additions & 0 deletions .chloggen/kafka-topic-context.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkaexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add option to supply destination topic through context.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34503, 34432]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
codeboten marked this conversation as resolved.
Show resolved Hide resolved
# Default: '[user]'
change_logs: [api]
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ pkg/batchperresourceattr/ @open-teleme
pkg/batchpersignal/ @open-telemetry/collector-contrib-approvers @jpkrohling
pkg/experimentalmetricmetadata/ @open-telemetry/collector-contrib-approvers @rmfitzpatrick
pkg/golden/ @open-telemetry/collector-contrib-approvers @djaglowski @atoulme
pkg/kafka/topic/ @open-telemetry/collector-contrib-approvers @pavolloffay @MovieStoreGuy
tylerbenson marked this conversation as resolved.
Show resolved Hide resolved
pkg/ottl/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @kentquirk @bogdandrutu @evan-bradley
pkg/pdatatest/ @open-telemetry/collector-contrib-approvers @djaglowski @fatsheep9146
pkg/pdatautil/ @open-telemetry/collector-contrib-approvers @dmitryax
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ body:
- pkg/batchpersignal
- pkg/experimentalmetricmetadata
- pkg/golden
- pkg/kafka/topic
- pkg/ottl
- pkg/pdatatest
- pkg/pdatautil
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ body:
- pkg/batchpersignal
- pkg/experimentalmetricmetadata
- pkg/golden
- pkg/kafka/topic
- pkg/ottl
- pkg/pdatatest
- pkg/pdatautil
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ body:
- pkg/batchpersignal
- pkg/experimentalmetricmetadata
- pkg/golden
- pkg/kafka/topic
- pkg/ottl
- pkg/pdatatest
- pkg/pdatautil
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/unmaintained.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ body:
- pkg/batchpersignal
- pkg/experimentalmetricmetadata
- pkg/golden
- pkg/kafka/topic
- pkg/ottl
- pkg/pdatatest
- pkg/pdatautil
Expand Down
1 change: 1 addition & 0 deletions cmd/otelcontribcol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ replaces:
- github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter => ../../exporter/fileexporter
- github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry => ../../pkg/resourcetotelemetry
- github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
- github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic => ../../pkg/kafka/topic
- github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opencensusexporter => ../../exporter/opencensusexporter
- github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opensearchexporter => ../../exporter/opensearchexporter
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders => ../../internal/metadataproviders
Expand Down
3 changes: 3 additions & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.108.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.108.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.108.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.108.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.108.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.108.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.108.0 // indirect
Expand Down Expand Up @@ -1179,6 +1180,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourceto

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic => ../../pkg/kafka/topic

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opencensusexporter => ../../exporter/opencensusexporter

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opensearchexporter => ../../exporter/opensearchexporter
Expand Down
10 changes: 8 additions & 2 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ The following settings can be optionally configured:
- `brokers` (default = localhost:9092): The list of kafka brokers.
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup.
- `client_id` (default = "sarama"): The client ID to configure the Sarama Kafka client with. The client ID will be used for all produce requests.
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to export to.
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. This option, when set, will take precedence over the default topic. If `topic_from_attribute` is not set, the message's topic will be set to the value of the configuration option `topic` instead.
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the default kafka topic to export to. See [Destination Topic](#destination-topic) below for more details.
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. See [Destination Topic](#destination-topic) below for more details.
- `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
- `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
Expand Down Expand Up @@ -105,3 +105,9 @@ exporters:
- localhost:9092
protocol_version: 2.0.0
```

## Destination Topic
The destination topic can be defined in a few different ways and takes priority in the following order:
1. When `topic_from_attribute` is configured, and the corresponding attribute is found on the ingested data, the value of this attribute is used.
2. If a prior component in the collector pipeline sets the topic on the context via the `topic.WithTopic` function (from the `github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic` package), the value set in the context is used.
3. Finally, the `topic` configuration is used as a default/fallback destination.
3 changes: 3 additions & 0 deletions exporter/kafkaexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.108.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.108.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.108.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.108.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.108.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.108.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.108.0
Expand Down Expand Up @@ -106,6 +107,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal => ../../pkg/batchpersignal

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic => ../../pkg/kafka/topic

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger => ../../pkg/translator/jaeger

retract (
Expand Down
32 changes: 18 additions & 14 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic"
)

var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")
Expand All @@ -40,8 +41,8 @@ func (ke kafkaErrors) Error() string {
return fmt.Sprintf("Failed to deliver %d messages due to %s", ke.count, ke.err)
}

func (e *kafkaTracesProducer) tracesPusher(_ context.Context, td ptrace.Traces) error {
messages, err := e.marshaler.Marshal(td, getTopic(&e.cfg, td.ResourceSpans()))
func (e *kafkaTracesProducer) tracesPusher(ctx context.Context, td ptrace.Traces) error {
messages, err := e.marshaler.Marshal(td, getTopic(ctx, &e.cfg, td.ResourceSpans()))
if err != nil {
return consumererror.NewPermanent(err)
}
Expand Down Expand Up @@ -82,8 +83,8 @@ type kafkaMetricsProducer struct {
logger *zap.Logger
}

func (e *kafkaMetricsProducer) metricsDataPusher(_ context.Context, md pmetric.Metrics) error {
messages, err := e.marshaler.Marshal(md, getTopic(&e.cfg, md.ResourceMetrics()))
func (e *kafkaMetricsProducer) metricsDataPusher(ctx context.Context, md pmetric.Metrics) error {
messages, err := e.marshaler.Marshal(md, getTopic(ctx, &e.cfg, md.ResourceMetrics()))
if err != nil {
return consumererror.NewPermanent(err)
}
Expand Down Expand Up @@ -124,8 +125,8 @@ type kafkaLogsProducer struct {
logger *zap.Logger
}

func (e *kafkaLogsProducer) logsDataPusher(_ context.Context, ld plog.Logs) error {
messages, err := e.marshaler.Marshal(ld, getTopic(&e.cfg, ld.ResourceLogs()))
func (e *kafkaLogsProducer) logsDataPusher(ctx context.Context, ld plog.Logs) error {
messages, err := e.marshaler.Marshal(ld, getTopic(ctx, &e.cfg, ld.ResourceLogs()))
if err != nil {
return consumererror.NewPermanent(err)
}
Expand Down Expand Up @@ -259,15 +260,18 @@ type resource interface {
Resource() pcommon.Resource
}

func getTopic[T resource](cfg *Config, resources resourceSlice[T]) string {
if cfg.TopicFromAttribute == "" {
return cfg.Topic
}
for i := 0; i < resources.Len(); i++ {
rv, ok := resources.At(i).Resource().Attributes().Get(cfg.TopicFromAttribute)
if ok && rv.Str() != "" {
return rv.Str()
func getTopic[T resource](ctx context.Context, cfg *Config, resources resourceSlice[T]) string {
if cfg.TopicFromAttribute != "" {
for i := 0; i < resources.Len(); i++ {
rv, ok := resources.At(i).Resource().Attributes().Get(cfg.TopicFromAttribute)
if ok && rv.Str() != "" {
return rv.Str()
}
}
}
contextTopic, ok := topic.FromContext(ctx)
if ok {
return contextTopic
}
return cfg.Topic
}
105 changes: 101 additions & 4 deletions exporter/kafkaexporter/kafka_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic"
)

func TestNewExporter_err_version(t *testing.T) {
Expand Down Expand Up @@ -168,6 +169,22 @@ func TestTracesPusher_attr(t *testing.T) {
require.NoError(t, err)
}

func TestTracesPusher_ctx(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
producer.ExpectSendMessageAndSucceed()

p := kafkaTracesProducer{
producer: producer,
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
err := p.tracesPusher(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateTraces(2))
require.NoError(t, err)
}

func TestTracesPusher_err(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down Expand Up @@ -234,6 +251,22 @@ func TestMetricsDataPusher_attr(t *testing.T) {
require.NoError(t, err)
}

func TestMetricsDataPusher_ctx(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
producer.ExpectSendMessageAndSucceed()

p := kafkaMetricsProducer{
producer: producer,
marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding, false),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
err := p.metricsDataPusher(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateMetrics(2))
require.NoError(t, err)
}

func TestMetricsDataPusher_err(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down Expand Up @@ -300,6 +333,22 @@ func TestLogsDataPusher_attr(t *testing.T) {
require.NoError(t, err)
}

func TestLogsDataPusher_ctx(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
producer.ExpectSendMessageAndSucceed()

p := kafkaLogsProducer{
producer: producer,
marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding, false),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
err := p.logsDataPusher(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateLogs(1))
require.NoError(t, err)
}

func TestLogsDataPusher_err(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down Expand Up @@ -373,6 +422,7 @@ func Test_GetTopic(t *testing.T) {
tests := []struct {
name string
cfg Config
ctx context.Context
resource any
wantTopic string
}{
Expand All @@ -382,6 +432,7 @@ func Test_GetTopic(t *testing.T) {
TopicFromAttribute: "resource-attr",
Topic: "defaultTopic",
},
ctx: topic.WithTopic(context.Background(), "context-topic"),
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
wantTopic: "resource-attr-val-1",
},
Expand All @@ -391,6 +442,7 @@ func Test_GetTopic(t *testing.T) {
TopicFromAttribute: "resource-attr",
Topic: "defaultTopic",
},
ctx: topic.WithTopic(context.Background(), "context-topic"),
resource: testdata.GenerateTraces(1).ResourceSpans(),
wantTopic: "resource-attr-val-1",
},
Expand All @@ -400,6 +452,7 @@ func Test_GetTopic(t *testing.T) {
TopicFromAttribute: "resource-attr",
Topic: "defaultTopic",
},
ctx: topic.WithTopic(context.Background(), "context-topic"),
resource: testdata.GenerateLogs(1).ResourceLogs(),
wantTopic: "resource-attr-val-1",
},
Expand All @@ -409,14 +462,58 @@ func Test_GetTopic(t *testing.T) {
TopicFromAttribute: "nonexistent_attribute",
Topic: "defaultTopic",
},
ctx: context.Background(),
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
wantTopic: "defaultTopic",
},

{
name: "Valid metric context, return topic name",
cfg: Config{
TopicFromAttribute: "nonexistent_attribute",
Topic: "defaultTopic",
},
ctx: topic.WithTopic(context.Background(), "context-topic"),
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
wantTopic: "context-topic",
},
{
name: "Valid trace context, return topic name",
cfg: Config{
TopicFromAttribute: "nonexistent_attribute",
Topic: "defaultTopic",
},
ctx: topic.WithTopic(context.Background(), "context-topic"),
resource: testdata.GenerateTraces(1).ResourceSpans(),
wantTopic: "context-topic",
},
{
name: "Valid log context, return topic name",
cfg: Config{
TopicFromAttribute: "nonexistent_attribute",
Topic: "defaultTopic",
},
ctx: topic.WithTopic(context.Background(), "context-topic"),
resource: testdata.GenerateLogs(1).ResourceLogs(),
wantTopic: "context-topic",
},

{
name: "Attribute not found",
cfg: Config{
TopicFromAttribute: "nonexistent_attribute",
Topic: "defaultTopic",
},
ctx: context.Background(),
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
wantTopic: "defaultTopic",
},
{
name: "TopicFromAttribute not set, return default topic",
name: "TopicFromAttribute, return default topic",
cfg: Config{
Topic: "defaultTopic",
},
ctx: context.Background(),
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
wantTopic: "defaultTopic",
},
Expand All @@ -427,11 +524,11 @@ func Test_GetTopic(t *testing.T) {
topic := ""
switch r := tests[i].resource.(type) {
case pmetric.ResourceMetricsSlice:
topic = getTopic(&tests[i].cfg, r)
topic = getTopic(tests[i].ctx, &tests[i].cfg, r)
case ptrace.ResourceSpansSlice:
topic = getTopic(&tests[i].cfg, r)
topic = getTopic(tests[i].ctx, &tests[i].cfg, r)
case plog.ResourceLogsSlice:
topic = getTopic(&tests[i].cfg, r)
topic = getTopic(tests[i].ctx, &tests[i].cfg, r)
}
assert.Equal(t, tests[i].wantTopic, topic)
})
Expand Down
1 change: 1 addition & 0 deletions pkg/kafka/topic/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../../Makefile.Common
4 changes: 4 additions & 0 deletions pkg/kafka/topic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Kafka Topic Context Accessor

This module is used for accessing the topic within a context.
See the [kafka exporter readme](../../../exporter/kafkaexporter/README.md#destination-topic) for more details.
3 changes: 3 additions & 0 deletions pkg/kafka/topic/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic

go 1.22
Loading
Loading