diff --git a/sinker/otel/kafkafanoutexporter/kafka_exporter.go b/sinker/otel/kafkafanoutexporter/kafka_exporter.go index ffdd5a550..3b14fee24 100644 --- a/sinker/otel/kafkafanoutexporter/kafka_exporter.go +++ b/sinker/otel/kafkafanoutexporter/kafka_exporter.go @@ -82,7 +82,7 @@ type kafkaMetricsProducer struct { func (e *kafkaMetricsProducer) metricsDataPusher(ctx context.Context, md pmetric.Metrics) error { e.logger.Info("logging context", zap.Any("context", ctx)) sinkId := ctx.Value("sink_id").(string) - topic := e.topic + "_" + sinkId + topic := e.topic + "-" + sinkId messages, err := e.marshaler.Marshal(md, topic) if err != nil { return consumererror.NewPermanent(err) @@ -114,7 +114,7 @@ type kafkaLogsProducer struct { func (e *kafkaLogsProducer) logsDataPusher(ctx context.Context, ld plog.Logs) error { sinkId := ctx.Value("sink-id").(string) - topic := e.topic + "_" + sinkId + topic := e.topic + "-" + sinkId messages, err := e.marshaler.Marshal(ld, topic) if err != nil { return consumererror.NewPermanent(err)