-
Notifications
You must be signed in to change notification settings - Fork 127
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
6ecf34d
commit 8f22e5a
Showing
15 changed files
with
954 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,3 +60,6 @@ components: | |
- trask | ||
static-instrumenter: | ||
- anosek-an | ||
kafka-exporter: | ||
- spockz | ||
- vincentfree |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
# Exporting SpanData to Kafka | ||
|
||
This module contains `KafkaSpanExporter`, which is an implementation of the `io.opentelemetry.sdk.trace.export.SpanExporter` interface. | ||
|
||
`KafkaSpanExporter` can be used for sending `SpanData` to a Kafka topic. | ||
|
||
## Usage | ||
|
||
In order to instantiate a `KafkaSpanExporter`, you either need to pass a Kafka `Producer` or the configuration of a Kafka `Producer` together with key and value serializers. | ||
You also need to pass the topic to which the SpanData need to be sent. | ||
For a sample usage, see `KafkaSpanExporterIntegrationTest`. | ||
|
||
## Component owners | ||
|
||
- [Alessandro Vermeulen](https://github.com/spockz) | ||
- [Vincent Free](https://github.com/vincentfree) | ||
|
||
Learn more about component owners in [component_owners.yml](../.github/component_owners.yml). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
plugins { | ||
id("otel.java-conventions") | ||
id("otel.publish-conventions") | ||
} | ||
|
||
description = "SpanExporter based on Kafka" | ||
otelJava.moduleName.set("io.opentelemetry.contrib.kafka") | ||
|
||
dependencies { | ||
api("io.opentelemetry:opentelemetry-sdk-trace") | ||
api("io.opentelemetry:opentelemetry-sdk-common") | ||
api("io.opentelemetry.proto:opentelemetry-proto:0.20.0-alpha") | ||
api("org.apache.kafka:kafka-clients") | ||
|
||
compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure") | ||
compileOnly("com.google.auto.service:auto-service-annotations") | ||
compileOnly("com.google.auto.value:auto-value-annotations") | ||
compileOnly("org.slf4j:slf4j-api") | ||
|
||
runtimeOnly("com.fasterxml.jackson.core:jackson-core") | ||
runtimeOnly("com.fasterxml.jackson.core:jackson-databind") | ||
|
||
implementation("io.opentelemetry:opentelemetry-exporter-otlp-common") | ||
implementation("com.google.protobuf:protobuf-java") | ||
|
||
testImplementation("io.opentelemetry:opentelemetry-api") | ||
testImplementation("io.opentelemetry:opentelemetry-sdk-testing") | ||
testImplementation("com.google.guava:guava") | ||
testImplementation("org.testcontainers:junit-jupiter") | ||
testImplementation("org.testcontainers:kafka") | ||
testImplementation("org.rnorth.duct-tape:duct-tape") | ||
testImplementation("org.testcontainers:testcontainers") | ||
|
||
testRuntimeOnly("org.slf4j:slf4j-simple") | ||
} |
139 changes: 139 additions & 0 deletions
139
kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaSpanExporter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.kafka; | ||
|
||
import io.opentelemetry.sdk.common.CompletableResultCode; | ||
import io.opentelemetry.sdk.trace.data.SpanData; | ||
import io.opentelemetry.sdk.trace.export.SpanExporter; | ||
import java.time.Duration; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import javax.annotation.Nonnull; | ||
import javax.annotation.concurrent.ThreadSafe; | ||
import org.apache.kafka.clients.producer.Producer; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.apache.kafka.common.KafkaException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
@ThreadSafe | ||
@SuppressWarnings("FutureReturnValueIgnored") | ||
public class KafkaSpanExporter implements SpanExporter { | ||
private static final Logger logger = LoggerFactory.getLogger(KafkaSpanExporter.class); | ||
private final String topicName; | ||
private final Producer<String, Collection<SpanData>> producer; | ||
private final ExecutorService executorService; | ||
private final long timeoutInSeconds; | ||
private final AtomicBoolean isShutdown = new AtomicBoolean(); | ||
|
||
public static KafkaSpanExporterBuilder newBuilder() { | ||
return new KafkaSpanExporterBuilder(); | ||
} | ||
|
||
KafkaSpanExporter( | ||
String topicName, | ||
Producer<String, Collection<SpanData>> producer, | ||
ExecutorService executorService, | ||
long timeoutInSeconds) { | ||
this.topicName = topicName; | ||
this.producer = producer; | ||
this.executorService = executorService; | ||
this.timeoutInSeconds = timeoutInSeconds; | ||
} | ||
|
||
@Override | ||
public CompletableResultCode export(@Nonnull Collection<SpanData> spans) { | ||
if (isShutdown.get()) { | ||
return CompletableResultCode.ofFailure(); | ||
} | ||
ProducerRecord<String, Collection<SpanData>> producerRecord = | ||
new ProducerRecord<>(topicName, spans); | ||
|
||
CompletableResultCode result = new CompletableResultCode(); | ||
CompletableFuture.runAsync( | ||
() -> | ||
producer.send( | ||
producerRecord, | ||
(metadata, exception) -> { | ||
if (exception == null) { | ||
result.succeed(); | ||
} else { | ||
logger.error( | ||
String.format("Error while sending spans to Kafka topic %s", topicName), | ||
exception); | ||
result.fail(); | ||
} | ||
}), | ||
executorService); | ||
return result; | ||
} | ||
|
||
@Override | ||
public CompletableResultCode flush() { | ||
CompletableResultCode result = new CompletableResultCode(); | ||
CompletableFuture.runAsync(producer::flush, executorService) | ||
.handle( | ||
(unused, exception) -> { | ||
if (exception == null) { | ||
result.succeed(); | ||
} else { | ||
logger.error( | ||
String.format( | ||
"Error while performing the flush operation on topic %s", topicName), | ||
exception); | ||
result.fail(); | ||
} | ||
return true; | ||
}); | ||
return result; | ||
} | ||
|
||
private CompletableResultCode shutdownExecutorService() { | ||
try { | ||
executorService.shutdown(); | ||
boolean terminated = executorService.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS); | ||
if (!terminated) { | ||
List<Runnable> interrupted = executorService.shutdownNow(); | ||
if (!interrupted.isEmpty()) { | ||
logger.error( | ||
"Shutting down KafkaSpanExporter forced {} tasks to be cancelled.", | ||
interrupted.size()); | ||
} | ||
} | ||
return CompletableResultCode.ofSuccess(); | ||
} catch (InterruptedException e) { | ||
logger.error("Error when trying to shutdown KafkaSpanExporter executorService.", e); | ||
return CompletableResultCode.ofFailure(); | ||
} | ||
} | ||
|
||
private CompletableResultCode shutdownProducer() { | ||
try { | ||
producer.close(Duration.ofSeconds(timeoutInSeconds)); | ||
return CompletableResultCode.ofSuccess(); | ||
} catch (KafkaException e) { | ||
logger.error("Error when trying to shutdown KafkaSpanExporter Producer.", e); | ||
return CompletableResultCode.ofFailure(); | ||
} | ||
} | ||
|
||
@Override | ||
public CompletableResultCode shutdown() { | ||
if (!isShutdown.compareAndSet(false, true)) { | ||
logger.warn("Calling shutdown() multiple times."); | ||
return CompletableResultCode.ofSuccess(); | ||
} | ||
List<CompletableResultCode> codes = new ArrayList<>(2); | ||
codes.add(shutdownExecutorService()); | ||
codes.add(shutdownProducer()); | ||
return CompletableResultCode.ofAll(codes); | ||
} | ||
} |
120 changes: 120 additions & 0 deletions
120
kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaSpanExporterBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.kafka; | ||
|
||
import static java.util.Objects.isNull; | ||
import static java.util.Objects.nonNull; | ||
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; | ||
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; | ||
|
||
import com.google.errorprone.annotations.CanIgnoreReturnValue; | ||
import io.opentelemetry.sdk.trace.data.SpanData; | ||
import java.util.Collection; | ||
import java.util.Map; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.Producer; | ||
import org.apache.kafka.common.serialization.Serializer; | ||
|
||
public class KafkaSpanExporterBuilder { | ||
private static final long DEFAULT_TIMEOUT_IN_SECONDS = 5L; | ||
private String topicName; | ||
private Producer<String, Collection<SpanData>> producer; | ||
private ExecutorService executorService; | ||
private long timeoutInSeconds = DEFAULT_TIMEOUT_IN_SECONDS; | ||
|
||
@SuppressWarnings(value = {"NullAway"}) | ||
public KafkaSpanExporterBuilder() {} | ||
|
||
@CanIgnoreReturnValue | ||
public KafkaSpanExporterBuilder setTopicName(String topicName) { | ||
this.topicName = topicName; | ||
return this; | ||
} | ||
|
||
@CanIgnoreReturnValue | ||
public KafkaSpanExporterBuilder setProducer(Producer<String, Collection<SpanData>> producer) { | ||
this.producer = producer; | ||
return this; | ||
} | ||
|
||
@CanIgnoreReturnValue | ||
public KafkaSpanExporterBuilder setExecutorService(ExecutorService executorService) { | ||
this.executorService = executorService; | ||
return this; | ||
} | ||
|
||
@CanIgnoreReturnValue | ||
public KafkaSpanExporterBuilder setTimeoutInSeconds(long timeoutInSeconds) { | ||
this.timeoutInSeconds = timeoutInSeconds; | ||
return this; | ||
} | ||
|
||
public KafkaSpanExporter build() { | ||
if (isNull(topicName)) { | ||
throw new IllegalArgumentException("topicName cannot be null"); | ||
} | ||
if (isNull(producer)) { | ||
throw new IllegalArgumentException("producer cannot be null"); | ||
} | ||
if (isNull(executorService)) { | ||
executorService = Executors.newCachedThreadPool(); | ||
} | ||
return new KafkaSpanExporter(topicName, producer, executorService, timeoutInSeconds); | ||
} | ||
|
||
public static class ProducerBuilder { | ||
private Map<String, Object> config; | ||
private Serializer<String> keySerializer; | ||
private Serializer<Collection<SpanData>> valueSerializer; | ||
|
||
public static ProducerBuilder newInstance() { | ||
return new ProducerBuilder(); | ||
} | ||
|
||
@SuppressWarnings(value = {"NullAway"}) | ||
public ProducerBuilder() {} | ||
|
||
@CanIgnoreReturnValue | ||
public ProducerBuilder setConfig(Map<String, Object> config) { | ||
this.config = config; | ||
return this; | ||
} | ||
|
||
@CanIgnoreReturnValue | ||
public ProducerBuilder setKeySerializer(Serializer<String> keySerializer) { | ||
this.keySerializer = keySerializer; | ||
return this; | ||
} | ||
|
||
@CanIgnoreReturnValue | ||
public ProducerBuilder setValueSerializer(Serializer<Collection<SpanData>> valueSerializer) { | ||
this.valueSerializer = valueSerializer; | ||
return this; | ||
} | ||
|
||
public Producer<String, Collection<SpanData>> build() { | ||
if (isNull(config)) { | ||
throw new IllegalArgumentException("producer configuration cannot be null"); | ||
} | ||
boolean correctConfig = | ||
((config.containsKey(KEY_SERIALIZER_CLASS_CONFIG) | ||
&& config.containsKey(VALUE_SERIALIZER_CLASS_CONFIG)) | ||
^ (nonNull(keySerializer) && nonNull(valueSerializer))) | ||
&& (config.containsKey(KEY_SERIALIZER_CLASS_CONFIG) ^ nonNull(valueSerializer)) | ||
&& (config.containsKey(VALUE_SERIALIZER_CLASS_CONFIG) ^ nonNull(keySerializer)); | ||
if (!correctConfig) { | ||
throw new IllegalArgumentException( | ||
"Both the key and value serializers should be provided either in the configuration or by using the corresponding setters"); | ||
} | ||
if (config.containsKey(KEY_SERIALIZER_CLASS_CONFIG)) { | ||
return new KafkaProducer<>(config); | ||
} | ||
return new KafkaProducer<>(config, keySerializer, valueSerializer); | ||
} | ||
} | ||
} |
27 changes: 27 additions & 0 deletions
27
kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/SpanDataDeserializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.kafka; | ||
|
||
import com.google.protobuf.InvalidProtocolBufferException; | ||
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; | ||
import java.util.Objects; | ||
import org.apache.kafka.common.errors.SerializationException; | ||
import org.apache.kafka.common.serialization.Deserializer; | ||
|
||
public class SpanDataDeserializer implements Deserializer<ExportTraceServiceRequest> { | ||
@SuppressWarnings("NullAway") | ||
@Override | ||
public ExportTraceServiceRequest deserialize(String topic, byte[] data) { | ||
if (Objects.isNull(data)) { | ||
return null; | ||
} | ||
try { | ||
return ExportTraceServiceRequest.parseFrom(data); | ||
} catch (InvalidProtocolBufferException e) { | ||
throw new SerializationException("Error while deserializing data", e); | ||
} | ||
} | ||
} |
Oops, something went wrong.