Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

+kafka: permit specifying custom propagation implementation #1292

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package kamon.instrumentation.kafka.client

import com.typesafe.config.Config
import kamon.Kamon
import kamon.{ClassLoading, Kamon}
import kamon.context.Context
import kamon.instrumentation.context.HasContext
import kamon.trace.Span
Expand Down Expand Up @@ -50,7 +50,10 @@ object KafkaInstrumentation {
log.warn("W3C TraceContext propagation should be used only with identifier-scheme = double")
}
SpanPropagation.W3CTraceContext()
case other => sys.error(s"Unrecognized option [$other] for the kamon.instrumentation.kafka.client.tracing.propagator config.")
case fqcn => try ClassLoading.createInstance[KafkaPropagator](fqcn) catch {
case t: Throwable =>
sys.error(s"Failed to create kafka propagator instance from FQCN [$fqcn]. Reason: ${t.getMessage}")
}
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,45 @@ class KafkaClientsTracingInstrumentationSpec extends AnyWordSpec with Matchers
span.parentId shouldBe sendingSpan.get.id
}
}

"create a Producer/Consumer Span when publish/consume a message with custom format" in new SpanReportingTestScope(reporter) {
applyConfig("kamon.instrumentation.kafka.client.tracing.propagator = kamon.instrumentation.kafka.testutil.CustomPropagationImplementation")

val testTopicName = "custom-context-propagation"
publishStringMessageToKafka(testTopicName, "Hello world!!!")

val consumedRecord = consumeFirstRawRecord(testTopicName)

consumedRecord.headers().lastHeader("x-trace-id").value() should not be empty
consumedRecord.headers().lastHeader("traceparent") shouldBe null
consumedRecord.headers().lastHeader("kctx") shouldBe null
consumedRecord.value() shouldBe "Hello world!!!"

awaitNumReportedSpans(2)

var sendingSpan: Option[Span.Finished] = None
assertReportedSpan(_.operationName == "producer.send") { span =>
span.metricTags.get(plain("component")) shouldBe "kafka.producer"
span.metricTags.get(plain("span.kind")) shouldBe "producer"
span.tags.get(plain("kafka.topic")) shouldBe testTopicName
span.tags.get(plain("kafka.key")) shouldBe KafkaInstrumentation.Keys.Null
span.tags.get(plainLong("kafka.partition")) shouldBe 0L
sendingSpan = Some(span)
}

assertReportedSpan(_.operationName == "consumer.process") { span =>
span.metricTags.get(plain("component")) shouldBe "kafka.consumer"
span.metricTags.get(plain("span.kind")) shouldBe "consumer"
span.tags.get(plain("kafka.topic")) shouldBe testTopicName
span.tags.get(plain("kafka.client-id")) should not be empty
span.tags.get(plain("kafka.group-id")) should not be empty
span.tags.get(plainLong("kafka.partition")) shouldBe 0L
span.tags.get(plainLong("kafka.timestamp")) shouldBe consumedRecord.timestamp()
span.tags.get(plain("kafka.timestamp-type")) shouldBe consumedRecord.timestampType().name
span.trace.id shouldBe sendingSpan.get.trace.id
span.parentId shouldBe sendingSpan.get.id
}
}
}

private def publishStringMessageToKafka(topicName: String, message: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package kamon.instrumentation.kafka.testutil

import kamon.context.Context
import kamon.instrumentation.kafka.client.KafkaPropagator
import kamon.trace.Trace.SamplingDecision
import kamon.trace.{Identifier, Span, Trace}

import org.apache.kafka.common.header.{Headers => KafkaHeaders}

class CustomPropagationImplementation extends KafkaPropagator {

override def read(medium: KafkaHeaders, context: Context): Context = {

val contextWithParent = for {
traceId <- Option(medium.lastHeader("x-trace-id")).map(_.value())
traceIdStr = new String(traceId, "utf-8")
spanId <- Option(medium.lastHeader("x-span-id")).map(_.value())
spanIdStr = new String(spanId, "utf-8")
sampled <- Option(medium.lastHeader("x-trace-sampled")).map(_.value()).map{
case Array(1) => SamplingDecision.Sample
case Array(0) => SamplingDecision.DoNotSample
case _ => SamplingDecision.Unknown
}
span = Span.Remote(Identifier(spanIdStr, spanId), Identifier.Empty, Trace(Identifier(traceIdStr, traceId), sampled))
} yield context.withEntry(Span.Key, span)

contextWithParent.getOrElse(context)
}

override def write(context: Context, medium: KafkaHeaders): Unit = {
val span = context.get(Span.Key)

if (span != Span.Empty) {
medium.add("x-trace-id", span.trace.id.string.getBytes("utf-8"))
medium.add("x-span-id", span.id.string.getBytes("utf-8"))
medium.add("x-trace-sampled", if (span.trace.samplingDecision == SamplingDecision.Sample) Array(1) else Array(0))
}
}
}