Skip to content

Commit

Permalink
implement kafka-exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
naser-ayat committed Jul 20, 2023
1 parent e02149c commit 4a5d04c
Show file tree
Hide file tree
Showing 12 changed files with 645 additions and 1 deletion.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ feature or via instrumentation, this project is hopefully for you.
* [OpenTelemetry Maven Extension](./maven-extension/README.md)
* [Runtime Attach](./runtime-attach/README.md)
* [Samplers](./samplers/README.md)
* [Kafka Support](./kafka-exporter/README.md)

## Getting Started

Expand Down
4 changes: 3 additions & 1 deletion dependencyManagement/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ val DEPENDENCIES = listOf(
"org.awaitility:awaitility:4.2.0",
"org.bouncycastle:bcpkix-jdk15on:1.70",
"org.junit-pioneer:junit-pioneer:1.9.1",
"org.skyscreamer:jsonassert:1.5.1"
"org.skyscreamer:jsonassert:1.5.1",
"org.apache.kafka:kafka-clients:3.5.0",
"org.testcontainers:kafka:1.18.3"
)

javaPlatform {
Expand Down
11 changes: 11 additions & 0 deletions kafka-exporter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Exporting SpanData to Kafka

This module contains `KafkaSpanExporter`, which is an implementation of the `io.opentelemetry.sdk.trace.export.SpanExporter` interface.

`KafkaSpanExporter` can be used for sending `SpanData` to a Kafka topic.

## Usage

In order to instantiate a `KafkaSpanExporter`, you either need to pass a Kafka `Producer` or the configuration of a Kafka `Producer` together with key and value serializers.
You also need to pass the topic to which the SpanData need to be sent.
For a sample usage, see `KafkaSpanExporterIntegrationTest`.
35 changes: 35 additions & 0 deletions kafka-exporter/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
plugins {
id("otel.java-conventions")
id("otel.publish-conventions")
}

description = "SpanExporter based on Kafka"
otelJava.moduleName.set("io.opentelemetry.contrib.kafka")

dependencies {
api("io.opentelemetry:opentelemetry-sdk-trace")
api("io.opentelemetry:opentelemetry-sdk-common")
api("io.opentelemetry.proto:opentelemetry-proto:0.20.0-alpha")
api("org.apache.kafka:kafka-clients")

compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
compileOnly("com.google.auto.service:auto-service-annotations")
compileOnly("com.google.auto.value:auto-value-annotations")
compileOnly("org.slf4j:slf4j-api")

runtimeOnly("com.fasterxml.jackson.core:jackson-core")
runtimeOnly("com.fasterxml.jackson.core:jackson-databind")

implementation("io.opentelemetry:opentelemetry-exporter-otlp-common")
implementation("com.google.protobuf:protobuf-java")

testImplementation("io.opentelemetry:opentelemetry-api")
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
testImplementation("com.google.guava:guava")
testImplementation("org.testcontainers:junit-jupiter")
testImplementation("org.testcontainers:kafka")
testImplementation("org.rnorth.duct-tape:duct-tape")
testImplementation("org.testcontainers:testcontainers")

testRuntimeOnly("org.slf4j:slf4j-simple")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.kafka;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
@SuppressWarnings("FutureReturnValueIgnored")
public class KafkaSpanExporter implements SpanExporter {
private static final Logger logger = LoggerFactory.getLogger(KafkaSpanExporter.class);
private static final long DEFAULT_TIMEOUT_IN_SECONDS = 5L;
private final String topicName;
private final Producer<String, Collection<SpanData>> producer;
private final ExecutorService executorService;
private final long timeoutInSeconds;
private final AtomicBoolean isShutdown = new AtomicBoolean();

public KafkaSpanExporter(
String topicName,
Producer<String, Collection<SpanData>> producer,
ExecutorService executorService,
long timeoutInSeconds) {
this.topicName = topicName;
this.producer = producer;
this.executorService = executorService;
this.timeoutInSeconds = timeoutInSeconds;
}

public KafkaSpanExporter(
String topicName,
Producer<String, Collection<SpanData>> producer,
ExecutorService executorService) {
this(topicName, producer, executorService, DEFAULT_TIMEOUT_IN_SECONDS);
}

public KafkaSpanExporter(String topicName, Producer<String, Collection<SpanData>> producer) {
this(topicName, producer, Executors.newCachedThreadPool(), DEFAULT_TIMEOUT_IN_SECONDS);
}

public KafkaSpanExporter(
String topicName, Map<String, Object> configs, ExecutorService executorService) {
this(topicName, new KafkaProducer<>(configs), executorService, DEFAULT_TIMEOUT_IN_SECONDS);
}

public KafkaSpanExporter(
String topicName,
Map<String, Object> configs,
ExecutorService executorService,
Serializer<String> keySerializer,
Serializer<Collection<SpanData>> valueSerializer) {
this(
topicName,
new KafkaProducer<>(configs, keySerializer, valueSerializer),
executorService,
DEFAULT_TIMEOUT_IN_SECONDS);
}

public KafkaSpanExporter(
String topicName, Properties properties, ExecutorService executorService) {
this(topicName, new KafkaProducer<>(properties), executorService, DEFAULT_TIMEOUT_IN_SECONDS);
}

public KafkaSpanExporter(
String topicName,
Properties properties,
ExecutorService executorService,
Serializer<String> keySerializer,
Serializer<Collection<SpanData>> valueSerializer) {
this(
topicName,
new KafkaProducer<>(properties, keySerializer, valueSerializer),
executorService,
DEFAULT_TIMEOUT_IN_SECONDS);
}

@Override
public CompletableResultCode export(@Nonnull Collection<SpanData> spans) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}
ProducerRecord<String, Collection<SpanData>> producerRecord =
new ProducerRecord<>(topicName, spans);

CompletableResultCode result = new CompletableResultCode();
CompletableFuture.runAsync(
() ->
producer.send(
producerRecord,
(metadata, exception) -> {
if (exception == null) {
result.succeed();
} else {
logger.error(
String.format("Error while sending spans to Kafka topic %s", topicName),
exception);
result.fail();
}
}),
executorService);
return result;
}

@Override
public CompletableResultCode flush() {
CompletableResultCode result = new CompletableResultCode();
CompletableFuture.runAsync(producer::flush, executorService)
.handle(
(unused, exception) -> {
if (exception == null) {
result.succeed();
} else {
logger.error(
String.format(
"Error while performing the flush operation on topic %s", topicName),
exception);
result.fail();
}
return true;
});
return result;
}

private CompletableResultCode shutdownExecutorService() {
try {
executorService.shutdown();
boolean terminated = executorService.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS);
if (!terminated) {
List<Runnable> interrupted = executorService.shutdownNow();
if (!interrupted.isEmpty()) {
logger.error(
"Shutting down KafkaSpanExporter forced {} tasks to be cancelled.",
interrupted.size());
}
}
return CompletableResultCode.ofSuccess();
} catch (InterruptedException e) {
logger.error("Error when trying to shutdown KafkaSpanExporter executorService.", e);
return CompletableResultCode.ofFailure();
}
}

private CompletableResultCode shutdownProducer() {
try {
producer.close(Duration.ofSeconds(timeoutInSeconds));
return CompletableResultCode.ofSuccess();
} catch (KafkaException e) {
logger.error("Error when trying to shutdown KafkaSpanExporter Producer.", e);
return CompletableResultCode.ofFailure();
}
}

@Override
public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.warn("Calling shutdown() multiple times.");
return CompletableResultCode.ofSuccess();
}
List<CompletableResultCode> codes = new ArrayList<>(2);
codes.add(shutdownExecutorService());
codes.add(shutdownProducer());
return CompletableResultCode.ofAll(codes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.kafka;

import com.google.protobuf.InvalidProtocolBufferException;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import java.util.Objects;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

public class SpanDataDeserializer implements Deserializer<ExportTraceServiceRequest> {
@Override
public ExportTraceServiceRequest deserialize(String topic, byte[] data) {
if (Objects.isNull(data)) {
return ExportTraceServiceRequest.getDefaultInstance();
}
try {
return ExportTraceServiceRequest.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new SerializationException("Error while deserializing data", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.kafka;

import static java.util.stream.Collectors.toList;

import io.opentelemetry.exporter.internal.otlp.traces.ResourceSpansMarshaler;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

public class SpanDataSerializer implements Serializer<Collection<SpanData>> {
@Override
public byte[] serialize(String topic, Collection<SpanData> data) {
if (Objects.isNull(data)) {
throw new SerializationException("Cannot serialize null");
}
return convertSpansToRequest(data).toByteArray();
}

ExportTraceServiceRequest convertSpansToRequest(Collection<SpanData> spans) {
List<ResourceSpans> resourceSpansList =
Arrays.stream(ResourceSpansMarshaler.create(spans))
.map(
resourceSpansMarshaler -> {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
resourceSpansMarshaler.writeBinaryTo(baos);
return ResourceSpans.parseFrom(baos.toByteArray());
} catch (IOException e) {
throw new SerializationException(e);
}
})
.collect(toList());

return ExportTraceServiceRequest.newBuilder().addAllResourceSpans(resourceSpansList).build();
}
}
Loading

0 comments on commit 4a5d04c

Please sign in to comment.