Skip to content

Commit

Permalink
[FIX] kafka metrics topic (#1946)
Browse files Browse the repository at this point in the history
[FIX] kafka metrics topic (#1946)
  • Loading branch information
etaques authored Nov 1, 2022
1 parent 470b457 commit f556aba
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions sinker/otel/kafkafanoutexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type kafkaMetricsProducer struct {
func (e *kafkaMetricsProducer) metricsDataPusher(ctx context.Context, md pmetric.Metrics) error {
e.logger.Info("logging context", zap.Any("context", ctx))
sinkId := ctx.Value("sink_id").(string)
topic := e.topic + "_" + sinkId
topic := e.topic + "-" + sinkId
messages, err := e.marshaler.Marshal(md, topic)
if err != nil {
return consumererror.NewPermanent(err)
Expand Down Expand Up @@ -114,7 +114,7 @@ type kafkaLogsProducer struct {

func (e *kafkaLogsProducer) logsDataPusher(ctx context.Context, ld plog.Logs) error {
sinkId := ctx.Value("sink-id").(string)
topic := e.topic + "_" + sinkId
topic := e.topic + "-" + sinkId
messages, err := e.marshaler.Marshal(ld, topic)
if err != nil {
return consumererror.NewPermanent(err)
Expand Down

0 comments on commit f556aba

Please sign in to comment.