Skip to content

Commit

Permalink
Upgrade libs, and consume test
Browse files Browse the repository at this point in the history
Remove OTel span propagation hack.

Signed-off-by: Ales Justin <ales.justin@gmail.com>
  • Loading branch information
alesj committed Jul 11, 2022
1 parent 29c2bb9 commit 9bd61e1
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 115 deletions.
4 changes: 2 additions & 2 deletions config/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
bridge.id=my-bridge
# uncomment one the following lines (bridge.tracing) to enable Jaeger tracing, check the documentation how to configure the tracer
# OpenTracing support
bridge.tracing=jaeger
#bridge.tracing=jaeger
# OpenTelemetry support
#bridge.tracing=opentelemetry
bridge.tracing=opentelemetry

#Apache Kafka common
kafka.bootstrap.servers=localhost:9092
Expand Down
27 changes: 14 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@
<jaeger.version>1.8.1</jaeger.version>
<opentracing.version>0.33.0</opentracing.version>
<opentracing-kafka-client.version>0.1.15</opentracing-kafka-client.version>
<opentelemetry.version>1.9.0-alpha</opentelemetry.version>
<opentelemetry-stable.version>1.9.0</opentelemetry-stable.version>
<grpc.version>1.44.0</grpc.version>
<opentelemetry.alpha-version>1.15.0-alpha</opentelemetry.alpha-version>
<opentelemetry.version>1.15.0</opentelemetry.version>
<grpc.version>1.47.0</grpc.version>
<micrometer.version>1.3.9</micrometer.version>
<jmx-prometheus-collector.version>0.12.0</jmx-prometheus-collector.version>
<prometheus-simpleclient.version>0.7.0</prometheus-simpleclient.version>
Expand Down Expand Up @@ -273,47 +273,47 @@
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
<version>${opentelemetry.version}</version>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>${opentelemetry-stable.version}</version>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
<version>${opentelemetry-stable.version}</version>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>${opentelemetry.version}</version>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<version>${opentelemetry-stable.version}</version>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-2.6</artifactId>
<version>${opentelemetry.version}</version>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api</artifactId>
<version>${opentelemetry.version}</version>
<artifactId>opentelemetry-instrumentation-api-semconv</artifactId>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-common</artifactId>
<version>${opentelemetry.version}</version>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-jaeger</artifactId>
<version>${opentelemetry-stable.version}</version>
<version>${opentelemetry.version}</version>
</dependency>
<!-- Use gRPC as the transport -->
<dependency>
Expand Down Expand Up @@ -568,6 +568,7 @@
<!-- OpenTelemetry - used via classpath configuration -->
<ignoredUnusedDeclaredDependency>io.opentelemetry:opentelemetry-sdk-trace</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>io.opentelemetry.instrumentation:opentelemetry-kafka-clients-common</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>io.opentelemetry.instrumentation:opentelemetry-instrumentation-api</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>io.opentelemetry:opentelemetry-exporter-jaeger</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>io.grpc:grpc-netty-shaded</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/io/strimzi/kafka/bridge/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.strimzi.kafka.bridge.amqp.AmqpBridge;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.config.BridgeExecutorServiceFactory;
import io.strimzi.kafka.bridge.http.HttpBridge;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.vertx.config.ConfigRetriever;
Expand All @@ -18,6 +19,7 @@
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.impl.VertxBuilder;
import io.vertx.core.json.JsonObject;
import io.vertx.micrometer.Label;
import io.vertx.micrometer.MetricsDomain;
Expand Down Expand Up @@ -72,7 +74,9 @@ public static void main(String[] args) {
vertxOptions.setMetricsOptions(metricsOptions());
jmxCollectorRegistry = getJmxCollectorRegistry();
}
Vertx vertx = Vertx.vertx(vertxOptions);
VertxBuilder vertxBuilder = new VertxBuilder(vertxOptions)
.executorServiceFactory(new BridgeExecutorServiceFactory());
Vertx vertx = vertxBuilder.init().vertx();
// MeterRegistry default instance is just null if metrics are not enabled in the VertxOptions instance
MeterRegistry meterRegistry = BackendRegistries.getDefaultNow();
MetricsReporter metricsReporter = new MetricsReporter(jmxCollectorRegistry, meterRegistry);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/

package io.strimzi.kafka.bridge.config;

import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.vertx.core.spi.ExecutorServiceFactory;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Custom Vertx ExecutorServiceFactory - delegate to tracing impl to provide one.
* Initially it could be NoopTracingHandle that provides it - before Application is fully initialized,
* then it should be actual tracing implementation - if there is one.
* Shutdown should be done OK, since tracing delegate will also delegate shutdown to original,
* or original itself will be used.
*/
public class BridgeExecutorServiceFactory implements ExecutorServiceFactory {
@Override
public ExecutorService createExecutor(ThreadFactory threadFactory, Integer concurrency, Integer maxConcurrency) {
ExecutorService original = ExecutorServiceFactory.INSTANCE.createExecutor(threadFactory, concurrency, maxConcurrency);
return new ExecutorService() {
private ExecutorService delegate() {
ExecutorService service = TracingUtil.getTracing().get(original);
return service == null ? original : service;
}

@Override
public void shutdown() {
delegate().shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return delegate().shutdownNow();
}

@Override
public boolean isShutdown() {
return delegate().isShutdown();
}

@Override
public boolean isTerminated() {
return delegate().isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate().awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate().submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate().submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
return delegate().submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate().invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return delegate().invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate().invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate().invokeAny(tasks, timeout, unit);
}

@Override
public void execute(Runnable command) {
delegate().execute(command);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,7 @@ public void handle(Endpoint<?> endpoint) {
if (isAsync) {
// if async is specified, return immediately once records are sent
for (KafkaProducerRecord<K, V> record : records) {
span.prepare(record);
Promise<RecordMetadata> promise = Promise.promise();
promise.future().onComplete(ar -> span.clean(record));
this.send(record, promise);
this.send(record, null);
}
span.finish(HttpResponseStatus.NO_CONTENT.code());
HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(),
Expand All @@ -136,9 +133,8 @@ public void handle(Endpoint<?> endpoint) {
List<Future> sendHandlers = new ArrayList<>(records.size());
for (KafkaProducerRecord<K, V> record : records) {
Promise<RecordMetadata> promise = Promise.promise();
Future<RecordMetadata> future = promise.future().onComplete(ar -> span.clean(record));
Future<RecordMetadata> future = promise.future();
sendHandlers.add(future);
span.prepare(record);
this.send(record, promise);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

final class NoopTracingHandle implements TracingHandle {
@Override
public String envName() {
public String envServiceName() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,14 @@
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.RoutingContext;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;

import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;

import static io.strimzi.kafka.bridge.tracing.TracingConstants.COMPONENT;
import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER;
Expand All @@ -52,6 +46,7 @@
class OpenTelemetryHandle implements TracingHandle {

private Tracer tracer;
private ExecutorService service;

static void setCommonAttributes(SpanBuilder builder, RoutingContext routingContext) {
builder.setAttribute(SemanticAttributes.PEER_SERVICE, KAFKA_SERVICE);
Expand All @@ -60,13 +55,13 @@ static void setCommonAttributes(SpanBuilder builder, RoutingContext routingConte
}

@Override
public String envName() {
public String envServiceName() {
return OPENTELEMETRY_SERVICE_NAME_ENV_KEY;
}

@Override
public String serviceName(BridgeConfig config) {
String serviceName = System.getenv(envName());
String serviceName = System.getenv(envServiceName());
if (serviceName == null) {
// legacy purpose, use previous JAEGER_SERVICE_NAME as OTEL_SERVICE_NAME (if not explicitly set)
serviceName = System.getenv(Configuration.JAEGER_SERVICE_NAME);
Expand All @@ -84,9 +79,18 @@ public String serviceName(BridgeConfig config) {

@Override
public void initialize() {
System.setProperty("otel.metrics.exporter", "none"); // disable metrics
AutoConfiguredOpenTelemetrySdk.initialize();
}

@Override
public synchronized ExecutorService get(ExecutorService provided) {
if (service == null) {
service = Context.taskWrapping(provided);
}
return service;
}

private Tracer get() {
if (tracer == null) {
tracer = GlobalOpenTelemetry.getTracer(COMPONENT);
Expand Down Expand Up @@ -188,7 +192,7 @@ public SpanHandle<K, V> span(RoutingContext routingContext) {

@Override
public void addTracingPropsToProducerConfig(Properties props) {
TracingUtil.addProperty(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ContextAwareTracingProducerInterceptor.class.getName());
TracingUtil.addProperty(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
}

private static final class OTelSpanHandle<K, V> implements SpanHandle<K, V> {
Expand All @@ -200,32 +204,6 @@ public OTelSpanHandle(Span span) {
this.scope = span.makeCurrent();
}

/**
* See ContextAwareTracingProducerInterceptor for more info.
*
* @param record Kafka producer record to use as payload
*/
@Override
public void prepare(KafkaProducerRecord<K, V> record) {
String uuid = UUID.randomUUID().toString();
SPANS.put(uuid, span);
record.addHeader(X_UUID, uuid);
}

/**
* See ContextAwareTracingProducerInterceptor for more info.
*
* @param record Kafka producer record to use as payload
*/
@Override
public void clean(KafkaProducerRecord<K, V> record) {
Optional<KafkaHeader> oh = record.headers().stream().filter(h -> h.key().equals(X_UUID)).findFirst();
oh.ifPresent(h -> {
String uuid = h.value().toString();
SPANS.remove(uuid);
});
}

@Override
public void inject(KafkaProducerRecord<K, V> record) {
propagator().inject(Context.current(), record, KafkaProducerRecord::addHeader);
Expand All @@ -247,31 +225,4 @@ public void finish(int code) {
}
}
}

static final String X_UUID = "_UUID";
static final Map<String, Span> SPANS = new ConcurrentHashMap<>();

/**
* This interceptor is a workaround for async message send.
* OpenTelemetry propagates current span via ThreadLocal,
* where we have an async send - different thread.
* So we need to pass-in the current span info via SPANS map.
* ProducerRecord is a bit abused as a payload / info carrier,
* it holds an unique UUID, which maps to current span in SPANS map.
*
* @param <K> key type
* @param <V> value type
*/
public static class ContextAwareTracingProducerInterceptor<K, V> extends TracingProducerInterceptor<K, V> {
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
Headers headers = record.headers();
String key = Buffer.buffer(headers.lastHeader(X_UUID).value()).toString();
headers.remove(X_UUID);
Span span = SPANS.remove(key);
try (Scope ignored = span.makeCurrent()) {
return super.onSend(record);
}
}
}
}
Loading

0 comments on commit 9bd61e1

Please sign in to comment.