Skip to content

Commit

Permalink
Add integration test for Vert.x gRPC exporter
Browse files Browse the repository at this point in the history
Also add TLS support to the exporter
  • Loading branch information
geoand committed Jul 17, 2023
1 parent 21845e3 commit 8889a88
Show file tree
Hide file tree
Showing 17 changed files with 694 additions and 26 deletions.
9 changes: 9 additions & 0 deletions bom/test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

<rxjava1.version>1.3.8</rxjava1.version>
<strimzi-test-container.version>0.100.0</strimzi-test-container.version>

<opentelemetry-proto.version>0.20.0-alpha</opentelemetry-proto.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -66,6 +68,13 @@
<artifactId>jaxb-api</artifactId>
<version>${jaxb-api.version}</version>
</dependency>

<dependency>
<groupId>io.opentelemetry.proto</groupId>
<artifactId>opentelemetry-proto</artifactId>
<version>${opentelemetry-proto.version}</version>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,6 @@ public interface OtlpExporterTracesConfig {
@WithDefault(DEFAULT_GRPC_BASE_URI)
Optional<String> legacyEndpoint();

// /**
// * Sets the certificate chain to use for verifying servers when TLS is enabled. The {@code byte[]}
// * should contain an X.509 certificate collection in PEM format. If not set, TLS connections will
// * use the system default trusted certificates.
// */
// @ConfigItem()
// public Optional<byte[]> certificate;

// /**
// * Sets ths client key and the certificate chain to use for verifying client when TLS is enabled.
// * The key must be PKCS8, and both must be in PEM format.
// */
// @ConfigItem()
// public Optional<OtlpExporterRuntimeConfig.ClientTlsConfig> client;

/**
* Key-value pairs to be used as headers associated with gRPC requests.
* The format is similar to the {@code OTEL_EXPORTER_OTLP_HEADERS} environment variable,
Expand Down Expand Up @@ -73,6 +58,37 @@ public interface OtlpExporterTracesConfig {
@WithDefault(Protocol.HTTP_PROTOBUF)
Optional<String> protocol();

/**
* Key/cert configuration in the PEM format.
*/
@WithName("key-cert")
KeyCert keyCert();

/**
* Trust configuration in the PEM format.
*/
@WithName("trust-cert")
TrustCert trustCert();

interface KeyCert {
/**
* Comma-separated list of the path to the key files (Pem format).
*/
Optional<List<String>> keys();

/**
* Comma-separated list of the path to the certificate files (Pem format).
*/
Optional<List<String>> certs();
}

interface TrustCert {
/**
* Comma-separated list of the trust certificate files (Pem format).
*/
Optional<List<String>> certs();
}

public static class Protocol {
public static final String GRPC = "grpc";
public static final String HTTP_PROTOBUF = "http/protobuf";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.spi.CDI;
Expand All @@ -19,10 +20,15 @@
import io.quarkus.opentelemetry.runtime.config.runtime.OTelRuntimeConfig;
import io.quarkus.opentelemetry.runtime.config.runtime.exporter.CompressionType;
import io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterRuntimeConfig;
import io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterTracesConfig;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.annotations.Recorder;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.net.PemTrustOptions;

@Recorder
public class OtlpRecorder {
Expand Down Expand Up @@ -86,26 +92,23 @@ public void installBatchSpanProcessorForOtlp(
private SpanExporter createOtlpGrpcSpanExporter(OtlpExporterRuntimeConfig exporterRuntimeConfig, String endpoint,
Vertx vertx) {

if (exporterRuntimeConfig.traces().protocol().isPresent()) {
if (!exporterRuntimeConfig.traces().protocol().get().equals(HTTP_PROTOBUF)) {
OtlpExporterTracesConfig tracesConfig = exporterRuntimeConfig.traces();
if (tracesConfig.protocol().isPresent()) {
if (!tracesConfig.protocol().get().equals(HTTP_PROTOBUF)) {
throw new IllegalStateException("Only the GRPC Exporter is currently supported. " +
"Please check `quarkus.otel.exporter.otlp.traces.protocol` property");
}
}

boolean compressionEnabled = false;
if (exporterRuntimeConfig.traces().compression().isPresent()) {
compressionEnabled = (exporterRuntimeConfig.traces().compression().get() == CompressionType.GZIP);
if (tracesConfig.compression().isPresent()) {
compressionEnabled = (tracesConfig.compression().get() == CompressionType.GZIP);
}

// FIXME TLS Support. Was not available before but will be available soon.
// exporterRuntimeConfig.traces.certificate.ifPresent(exporterBuilder::setTrustedCertificates);
// exporterRuntimeConfig.client.ifPresent(exporterBuilder::setClientTls);

Map<String, String> headersMap = new HashMap<>();
OtlpUserAgent.addUserAgentHeader(headersMap::put);
if (exporterRuntimeConfig.traces().headers().isPresent()) {
List<String> headers = exporterRuntimeConfig.traces().headers().get();
if (tracesConfig.headers().isPresent()) {
List<String> headers = tracesConfig.headers().get();
if (!headers.isEmpty()) {
for (String header : headers) {
if (header.isEmpty()) {
Expand All @@ -125,8 +128,54 @@ private SpanExporter createOtlpGrpcSpanExporter(OtlpExporterRuntimeConfig export
MeterProvider::noop,
ExporterBuilderUtil.validateEndpoint(endpoint),
compressionEnabled,
exporterRuntimeConfig.traces().timeout(),
tracesConfig.timeout(),
headersMap,
new Consumer<>() {
@Override
public void accept(HttpClientOptions options) {
configureTLS(options);
}

private void configureTLS(HttpClientOptions options) {
// TODO: this can reuse existing stuff when https://github.com/quarkusio/quarkus/pull/33228 is in
options.setKeyCertOptions(toPemKeyCertOptions(tracesConfig));
options.setPemTrustOptions(toPemTrustOptions(tracesConfig));

if (!options.getPemTrustOptions().getCertPaths().isEmpty()
|| !options.getPemKeyCertOptions().getCertPaths().isEmpty()
|| !options.getPemKeyCertOptions().getKeyPaths().isEmpty()) {
options.setSsl(true);
options.setUseAlpn(true);
}
}

private KeyCertOptions toPemKeyCertOptions(OtlpExporterTracesConfig configuration) {
PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions();
OtlpExporterTracesConfig.KeyCert keyCert = configuration.keyCert();
if (keyCert.certs().isPresent()) {
for (String cert : keyCert.certs().get()) {
pemKeyCertOptions.addCertPath(cert);
}
}
if (keyCert.keys().isPresent()) {
for (String cert : keyCert.keys().get()) {
pemKeyCertOptions.addKeyPath(cert);
}
}
return pemKeyCertOptions;
}

private PemTrustOptions toPemTrustOptions(OtlpExporterTracesConfig configuration) {
PemTrustOptions pemTrustOptions = new PemTrustOptions();
OtlpExporterTracesConfig.TrustCert trustCert = configuration.trustCert();
if (trustCert.certs().isPresent()) {
for (String cert : trustCert.certs().get()) {
pemTrustOptions.addCertPath(cert);
}
}
return pemTrustOptions;
}
},
vertx);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -63,6 +64,7 @@ final class VertxGrpcExporter implements SpanExporter {
URI grpcBaseUri, boolean compressionEnabled,
Duration timeout,
Map<String, String> headersMap,
Consumer<HttpClientOptions> clientOptionsCustomizer,
Vertx vertx) {
this.type = type;
this.exporterMetrics = ExporterMetrics.createGrpcOkHttp(exporterName, type, meterProviderSupplier);
Expand All @@ -73,6 +75,7 @@ final class VertxGrpcExporter implements SpanExporter {
.setHttp2ClearTextUpgrade(false) // needed otherwise connections get closed immediately
.setReadIdleTimeout((int) timeout.getSeconds())
.setTracingPolicy(TracingPolicy.IGNORE); // needed to avoid tracing the calls from this gRPC client
clientOptionsCustomizer.accept(httpClientOptions);
this.client = GrpcClient.client(vertx, httpClientOptions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,31 @@ public Duration timeout() {
public Optional<String> protocol() {
return Optional.empty();
}

@Override
public KeyCert keyCert() {
return new KeyCert() {
@Override
public Optional<List<String>> keys() {
return Optional.empty();
}

@Override
public Optional<List<String>> certs() {
return Optional.empty();
}
};
}

@Override
public TrustCert trustCert() {
return new TrustCert() {
@Override
public Optional<List<String>> certs() {
return Optional.empty();
}
};
}
};
}
};
Expand Down
Loading

0 comments on commit 8889a88

Please sign in to comment.