Skip to content

Commit

Permalink
Add OTLP log endpoint to convert log events to span annotations
Browse files Browse the repository at this point in the history
fixes gh-21
  • Loading branch information
making committed Dec 23, 2024
1 parent a8e8da0 commit 9a89355
Show file tree
Hide file tree
Showing 16 changed files with 616 additions and 46 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ target/
.classpath
.project
.settings/
.venv
18 changes: 18 additions & 0 deletions collector-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,23 @@
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package zipkin2.collector.otel.http;

import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.common.v1.KeyValueList;
import io.opentelemetry.proto.logs.v1.LogRecord;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.logs.v1.ScopeLogs;
import io.opentelemetry.proto.logs.v1.SeverityNumber;
import zipkin2.Span;
import zipkin2.internal.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static zipkin2.collector.otel.http.SpanTranslator.bytesToLong;
import static zipkin2.collector.otel.http.SpanTranslator.nanoToMills;

/**
* LogEventTranslator converts OpenTelemetry Log Events to Zipkin Spans
* <p>
* See <a href="https://opentelemetry.io/docs/specs/otel/logs/api/#emit-an-event">https://opentelemetry.io/docs/specs/otel/logs/api/#emit-an-event</a>
*/
final class LogEventTranslator {
final OtelResourceMapper resourceMapper;

LogEventTranslator(OtelResourceMapper resourceMapper) {
this.resourceMapper = resourceMapper;
}

LogEventTranslator() {
this(DefaultOtelResourceMapper.create());
}

List<Span> translate(ExportLogsServiceRequest logs) {
ArrayList<Span> spans = new ArrayList<>();
List<ResourceLogs> resourceLogsList = logs.getResourceLogsList();
for (ResourceLogs resourceLogs : resourceLogsList) {
for (ScopeLogs scopeLogs : resourceLogs.getScopeLogsList()) {
for (LogRecord logRecord : scopeLogs.getLogRecordsList()) {
Span span = generateSpan(logRecord);
if (span != null) {
spans.add(span);
}
}
}
}
return spans;
}

@Nullable
Span generateSpan(LogRecord logRecord) {
// the log record must have both trace id and span id
if (logRecord.getTraceId().isEmpty() || logRecord.getSpanId().isEmpty()) {
return null;
}
Optional<String> eventNameOptional = logRecord.getAttributesList().stream()
.filter(attribute -> attribute.getKey().equals(SemanticConventionsAttributes.EVENT_NAME))
.findAny()
.map(kv -> ProtoUtils.valueToString(kv.getValue()));
// TODO Should it be restricted to log records that have an 'event.name' attribute?
if (!eventNameOptional.isPresent()) {
return null;
}
String eventName = eventNameOptional.get();
long timestamp = nanoToMills(logRecord.getTimeUnixNano());
KeyValueList.Builder kvListBuilder = KeyValueList.newBuilder();
if (logRecord.getSeverityNumber() != SeverityNumber.SEVERITY_NUMBER_UNSPECIFIED) {
kvListBuilder.addValues(KeyValue.newBuilder()
.setKey("severity_number")
.setValue(AnyValue.newBuilder().setIntValue(logRecord.getSeverityNumberValue())));
}
if (!logRecord.getSeverityText().isEmpty()) {
kvListBuilder.addValues(KeyValue.newBuilder()
.setKey("severity_text")
.setValue(AnyValue.newBuilder().setStringValue(logRecord.getSeverityText())));
}
int droppedAttributesCount = logRecord.getDroppedAttributesCount();
if (droppedAttributesCount > 0) {
kvListBuilder.addValues(KeyValue.newBuilder()
.setKey("dropped_attributes_count")
.setValue(AnyValue.newBuilder().setIntValue(droppedAttributesCount)));
}
String annotationValue = "\"" + eventName + "\":" + ProtoUtils.valueToJson(AnyValue.newBuilder()
.setKvlistValue(kvListBuilder.addValues(KeyValue.newBuilder()
.setKey("body")
.setValue(logRecord.getBody()))).build());
byte[] traceIdBytes = logRecord.getTraceId().toByteArray();
long high = bytesToLong(traceIdBytes, 0);
long low = bytesToLong(traceIdBytes, 8);
return Span.newBuilder()
.traceId(high, low)
.id(bytesToLong(logRecord.getSpanId().toByteArray(), 0))
.addAnnotation(timestamp, annotationValue)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@
*/
package zipkin2.collector.otel.http;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.protobuf.UnsafeByteOperations;
import com.linecorp.armeria.common.AggregationOptions;
import com.linecorp.armeria.common.HttpData;
Expand All @@ -23,6 +17,7 @@
import com.linecorp.armeria.server.ServerConfigurator;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.encoding.DecodingService;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.trace.v1.ScopeSpans;
import zipkin2.Callback;
Expand All @@ -33,6 +28,12 @@
import zipkin2.collector.CollectorSampler;
import zipkin2.storage.StorageComponent;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class OpenTelemetryHttpCollector extends CollectorComponent
implements ServerConfigurator {

Expand Down Expand Up @@ -115,17 +116,18 @@ public OtelResourceMapper getOtelResourceMapper() {
@Override
public void reconfigure(ServerBuilder sb) {
sb.decorator(DecodingService.newDecorator(StreamDecoderFactory.gzip()));
sb.service("/v1/traces", new HttpService(this));
sb.service("/v1/traces", new OtlpProtoV1TracesHttpService(this));
sb.service("/v1/logs", new OtlpProtoV1LogsHttpService(this));
}

static final class HttpService extends AbstractHttpService {
private static final Logger LOG = Logger.getLogger(HttpService.class.getName());
static final class OtlpProtoV1TracesHttpService extends AbstractHttpService {
private static final Logger LOG = Logger.getLogger(OtlpProtoV1TracesHttpService.class.getName());

final OpenTelemetryHttpCollector collector;

final SpanTranslator spanTranslator;

HttpService(OpenTelemetryHttpCollector collector) {
OtlpProtoV1TracesHttpService(OpenTelemetryHttpCollector collector) {
this.collector = collector;
this.spanTranslator = new SpanTranslator(collector.otelResourceMapper);
}
Expand All @@ -152,8 +154,7 @@ protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) {
try {
List<Span> spans = spanTranslator.translate(request);
collector.collector.accept(spans, result);
}
catch (RuntimeException e) {
} catch (RuntimeException e) {
// If the span is invalid, an exception such as IllegalArgumentException will be thrown.
int spanSize = request.getResourceSpansList().stream()
.flatMap(rs -> rs.getScopeSpansList().stream())
Expand All @@ -162,8 +163,56 @@ protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) {
LOG.log(Level.WARNING, "Unable to translate the spans:", e);
result.onError(e);
}
} catch (IOException e) {
collector.metrics.incrementMessagesDropped();
LOG.log(Level.WARNING, "Unable to parse the request:", e);
result.onError(e);
}
return null;
}
});
return HttpResponse.of(result);
}
}

static final class OtlpProtoV1LogsHttpService extends AbstractHttpService {
private static final Logger LOG = Logger.getLogger(OtlpProtoV1LogsHttpService.class.getName());
final OpenTelemetryHttpCollector collector;
final LogEventTranslator logEventTranslator;

OtlpProtoV1LogsHttpService(OpenTelemetryHttpCollector collector) {
this.collector = collector;
this.logEventTranslator = new LogEventTranslator(collector.otelResourceMapper);
}

@Override
protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) throws Exception {
CompletableCallback result = new CompletableCallback();
req.aggregate(AggregationOptions.usePooledObjects(ctx.alloc(), ctx.eventLoop()
)).handle((msg, t) -> {
if (t != null) {
collector.metrics.incrementMessagesDropped();
result.onError(t);
return null;
}
try (HttpData content = msg.content()) {
if (content.isEmpty()) {
result.onSuccess(null);
return null;
}
catch (IOException e) {
collector.metrics.incrementBytes(content.length());
try {
ExportLogsServiceRequest request = ExportLogsServiceRequest.parseFrom(UnsafeByteOperations.unsafeWrap(content.byteBuf().nioBuffer()).newCodedInput());
collector.metrics.incrementMessages();
try {
List<Span> spans = logEventTranslator.translate(request);
collector.collector.accept(spans, result);
} catch (RuntimeException e) {
// TODO count dropped spans
LOG.log(Level.WARNING, "Unable to translate the logs:", e);
result.onError(e);
}
} catch (IOException e) {
collector.metrics.incrementMessagesDropped();
LOG.log(Level.WARNING, "Unable to parse the request:", e);
result.onError(e);
Expand All @@ -190,5 +239,4 @@ public void onError(Throwable t) {
completeExceptionally(t);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ static String valueToString(AnyValue value) {
// Also Brave doesn't use the json encoding.
// So follow the comma separator here.
return value.getArrayValue().getValuesList().stream()
.map(ProtoUtils::valueToString)
.collect(joining(","));
.map(ProtoUtils::valueToString)
.collect(joining(","));
}
return valueToJson(value);
}

static String valueToJson(AnyValue value) {
if (value.hasStringValue()) {
return "\"" + value.getStringValue() + "\"";
return quote(value.getStringValue());
}
if (value.hasArrayValue()) {
return value.getArrayValue().getValuesList().stream()
Expand All @@ -59,8 +59,12 @@ static String valueToJson(AnyValue value) {
}
if (value.hasBytesValue()) {
// TODO
return TextFormat.escapeBytes(value.getBytesValue());
return quote(TextFormat.escapeBytes(value.getBytesValue()));
}
return value.toString();
return quote(value.toString());
}

static String quote(String value) {
return "\"" + value.replace("\n", "\\n") + "\"";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,26 @@
* OpenTelemetry Semantic Conventions Attributes
*/
final class SemanticConventionsAttributes {
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.28.0/semconv/src/main/java/io/opentelemetry/semconv/NetworkAttributes.java#L18
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.29.0/semconv/src/main/java/io/opentelemetry/semconv/NetworkAttributes.java#L18
static final String NETWORK_LOCAL_ADDRESS = "network.local.address";
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.28.0/semconv/src/main/java/io/opentelemetry/semconv/NetworkAttributes.java#L22
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.29.0/semconv/src/main/java/io/opentelemetry/semconv/NetworkAttributes.java#L22
static final String NETWORK_LOCAL_PORT = "network.local.port";
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.28.0/semconv/src/main/java/io/opentelemetry/semconv/NetworkAttributes.java#L25
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.29.0/semconv/src/main/java/io/opentelemetry/semconv/NetworkAttributes.java#L25
static final String NETWORK_PEER_ADDRESS = "network.peer.address";
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.28.0/semconv/src/main/java/io/opentelemetry/semconv/NetworkAttributes.java#L28
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.29.0/semconv/src/main/java/io/opentelemetry/semconv/NetworkAttributes.java#L28
static final String NETWORK_PEER_PORT = "network.peer.port";
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.28.0/semconv/src/main/java/io/opentelemetry/semconv/ServerAttributes.java#L27
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.29.0/semconv/src/main/java/io/opentelemetry/semconv/ServerAttributes.java#L27
static final String SERVER_ADDRESS = "server.address";
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.28.0/semconv/src/main/java/io/opentelemetry/semconv/ServiceAttributes.java#L27
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.29.0/semconv/src/main/java/io/opentelemetry/semconv/ServiceAttributes.java#L27
static final String SERVICE_NAME = "service.name";
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.28.0/semconv/src/main/java/io/opentelemetry/semconv/OtelAttributes.java#L17
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.29.0/semconv/src/main/java/io/opentelemetry/semconv/OtelAttributes.java#L17
static final String OTEL_SCOPE_NAME = "otel.scope.name";
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.28.0/semconv/src/main/java/io/opentelemetry/semconv/OtelAttributes.java#L20
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.29.0/semconv/src/main/java/io/opentelemetry/semconv/OtelAttributes.java#L20
static final String OTEL_SCOPE_VERSION = "otel.scope.version";
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.28.0/semconv/src/main/java/io/opentelemetry/semconv/OtelAttributes.java#L23
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.29.0/semconv/src/main/java/io/opentelemetry/semconv/OtelAttributes.java#L23
static final String OTEL_STATUS_CODE = "otel.status_code";
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.28.0/semconv-incubating/src/main/java/io/opentelemetry/semconv/incubating/PeerIncubatingAttributes.java#L21
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.29.0/semconv-incubating/src/main/java/io/opentelemetry/semconv/incubating/EventIncubatingAttributes.java#L26
static final String EVENT_NAME = "event.name";
// https://github.com/open-telemetry/semantic-conventions-java/blob/v1.29.0/semconv-incubating/src/main/java/io/opentelemetry/semconv/incubating/PeerIncubatingAttributes.java#L21
static final String PEER_SERVICE = "peer.service";
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ List<zipkin2.Span> translate(ExportTraceServiceRequest otelSpans) {
List<zipkin2.Span> spans = new ArrayList<>();
List<ResourceSpans> spansList = otelSpans.getResourceSpansList();
for (ResourceSpans resourceSpans : spansList) {
Resource resource = resourceSpans.getResource();
for (ScopeSpans scopeSpans : resourceSpans.getScopeSpansList()) {
InstrumentationScope scope = scopeSpans.getScope();
for (io.opentelemetry.proto.trace.v1.Span span : scopeSpans.getSpansList()) {
spans.add(generateSpan(span, scope, resourceSpans.getResource()));
spans.add(generateSpan(span, scope, resource));
}
}
}
Expand Down
Loading

0 comments on commit 9a89355

Please sign in to comment.