Skip to content

Commit

Permalink
Replace tracing kafka wrappers with jdk proxies (open-telemetry#6457)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored and LironKS committed Oct 31, 2022
1 parent 5e3da77 commit d1c85cb
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 382 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaHeadersSetter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -75,13 +76,46 @@ private TextMapPropagator propagator() {
}

/** Returns a decorated {@link Producer} that emits spans for each sent message. */
@SuppressWarnings("unchecked")
public <K, V> Producer<K, V> wrap(Producer<K, V> producer) {
return new TracingProducer<>(producer, this);
return (Producer<K, V>)
Proxy.newProxyInstance(
KafkaTelemetry.class.getClassLoader(),
new Class<?>[] {Producer.class},
(proxy, method, args) -> {
// Future<RecordMetadata> send(ProducerRecord<K, V> record)
// Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
if ("send".equals(method.getName())
&& method.getParameterCount() >= 1
&& method.getParameterTypes()[0] == ProducerRecord.class) {
ProducerRecord<K, V> record = (ProducerRecord<K, V>) args[0];
Callback callback =
method.getParameterCount() >= 2
&& method.getParameterTypes()[1] == Callback.class
? (Callback) args[1]
: null;
return buildAndInjectSpan(record, callback, producer::send);
}
return method.invoke(producer, args);
});
}

/** Returns a decorated {@link Consumer} that consumes spans for each received message. */
@SuppressWarnings("unchecked")
public <K, V> Consumer<K, V> wrap(Consumer<K, V> consumer) {
return new TracingConsumer<>(consumer, this);
return (Consumer<K, V>)
Proxy.newProxyInstance(
KafkaTelemetry.class.getClassLoader(),
new Class<?>[] {Consumer.class},
(proxy, method, args) -> {
Object result = method.invoke(consumer, args);
// ConsumerRecords<K, V> poll(long timeout)
// ConsumerRecords<K, V> poll(Duration duration)
if ("poll".equals(method.getName()) && result instanceof ConsumerRecords) {
buildAndFinishSpan((ConsumerRecords) result);
}
return result;
});
}

/**
Expand Down

This file was deleted.

Loading

0 comments on commit d1c85cb

Please sign in to comment.