Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LogRecordProcessor to record event log records as span events #1551

Merged
merged 6 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/component_owners.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ components:
processors:
- LikeTheSalad
- breedx-splk
- jack-berg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the help!

prometheus-collector:
- jkwatson
resource-providers:
Expand Down
31 changes: 30 additions & 1 deletion processors/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,39 @@
# Processors

This module provides tools to intercept and process signals globally.
## Interceptable exporters

This module provides tools to intercept and process signals globally:

* `InterceptableSpanExporter`
* `InterceptableMetricExporter`
* `InterceptableLogRecordExporter`

## Event to SpanEvent Bridge

`EventToSpanEventBridge` is a `LogRecordProcessor` which records events (i.e. log records with an `event.name` attribute) as span events for the current span if:

* The log record has a valid span context
* `Span.current()` returns a span where `Span.isRecording()` is true

For details of how the event log record is translated to span event, see [EventToSpanEventBridge Javadoc](./src/main/java/io/opentelemetry/contrib/eventbridge/EventToSpanEventBridge.java).

`EventToSpanEventBridge` can be referenced in [declarative configuration](https://opentelemetry.io/docs/languages/java/configuration/#declarative-configuration) as follows:

```yaml
// Configure tracer provider as usual, omitted for brevity
tracer_provider: ...

logger_provider:
processors:
- event_to_span_event_bridge: {}
```

// TODO(jack-berg): remove "{}" after merging / rle https://github.com/open-telemetry/opentelemetry-java/pull/6891/files
jack-berg marked this conversation as resolved.
Show resolved Hide resolved

## Component owners

- [Cesar Munoz](https://github.com/LikeTheSalad), Elastic
- [Jack Berg](https://github.com/jack-berg), New Relic
- [Jason Plumb](https://github.com/breedx-splk), Splunk

Learn more about component owners in [component_owners.yml](../.github/component_owners.yml).
8 changes: 8 additions & 0 deletions processors/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,13 @@ java {

dependencies {
api("io.opentelemetry:opentelemetry-sdk")
compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi")

// For EventToSpanEventBridge
implementation("io.opentelemetry:opentelemetry-exporter-otlp-common")
implementation("com.fasterxml.jackson.core:jackson-core")
jack-berg marked this conversation as resolved.
Show resolved Hide resolved

testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-incubator")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.eventbridge;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.common.Value;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize;
import io.opentelemetry.exporter.internal.otlp.AnyValueMarshaler;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* A processor that records events (i.e. log records with an {@code event.name} attribute) as span
* events for the current span if:
*
* <ul>
* <li>The log record has a valid span context
* <li>{@link Span#current()} returns a span where {@link Span#isRecording()} is true
* </ul>
*
* <p>The event {@link LogRecordData} is converted to attributes on the span event as follows:
*
* <ul>
* <li>{@code event.name} attribute is mapped to span event name
* <li>{@link LogRecordData#getTimestampEpochNanos()} is mapped to span event timestamp
* <li>{@link LogRecordData#getAttributes()} are mapped to span event attributes, excluding {@code
* event.name}
* <li>{@link LogRecordData#getObservedTimestampEpochNanos()} is mapped to span event attribute
* with key {@code log.record.observed_timestamp}
* <li>{@link LogRecordData#getSeverity()} is mapped to span event attribute with key {@code
* log.record.severity_number}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you like log.record.severity_number better than simply log.record.severity? Maybe you're just trying to maintain consistency with the protos?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you're just trying to maintain consistency with the protos?

Exactly... if / when the spec comes out and tell us how to translate between events and span events, we'll of course follow those instructions. But until then, the less opinionated we can be the better, and following the proto field naming is a way to avoid having an opinion.

https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/logs/v1/logs.proto#L161

* <li>{@link LogRecordData#getBodyValue()} is mapped to span event attribute with key {@code
* log.record.body}, as an escaped JSON string following the standard protobuf JSON encoding
* <li>{@link LogRecordData#getTotalAttributeCount()} - {@link
* LogRecordData#getAttributes()}.size() is mapped to span event attribute with key {@code
* log.record.dropped_attributes_count}
* </ul>
*/
public final class EventToSpanEventBridge implements LogRecordProcessor {

private static final Logger LOGGER = Logger.getLogger(EventToSpanEventBridge.class.getName());
jack-berg marked this conversation as resolved.
Show resolved Hide resolved

private static final AttributeKey<String> EVENT_NAME = AttributeKey.stringKey("event.name");
private static final AttributeKey<Long> LOG_RECORD_OBSERVED_TIME_UNIX_NANO =
AttributeKey.longKey("log.record.observed_time_unix_nano");
private static final AttributeKey<Long> LOG_RECORD_SEVERITY_NUMBER =
AttributeKey.longKey("log.record.severity_number");
private static final AttributeKey<String> LOG_RECORD_BODY =
AttributeKey.stringKey("log.record.body");
private static final AttributeKey<Long> LOG_RECORD_DROPPED_ATTRIBUTES_COUNT =
AttributeKey.longKey("log.record.dropped_attributes_count");

private EventToSpanEventBridge() {}

/** Create an instance. */
public static EventToSpanEventBridge create() {
return new EventToSpanEventBridge();
}

@Override
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
LogRecordData logRecordData = logRecord.toLogRecordData();
String eventName = logRecordData.getAttributes().get(EVENT_NAME);
if (eventName == null) {
return;
}
jack-berg marked this conversation as resolved.
Show resolved Hide resolved
if (!logRecordData.getSpanContext().isValid()) {
return;
}
Span currentSpan = Span.current();
if (!currentSpan.isRecording()) {
return;
}
currentSpan.addEvent(
jack-berg marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jack-berg I just found this in the otep about events

https://github.com/open-telemetry/oteps/blob/main/text/0265-event-vision.md#relationship-to-span-events

It says span events will be deprecated in the API. I suppose it will still stay in SDK / proto because many backends only support trace signal. Is there a non-API approach for this processor? Or is it not worth reading too much into that for now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the purpose of deprecating won't be to remove, but rather to clearly signal when it is no longer the preferred approach

I don't think we will ever remove it since there's no plan for breaking changes

eventName,
toSpanEventAttributes(logRecordData),
logRecordData.getTimestampEpochNanos(),
TimeUnit.NANOSECONDS);
}

private static Attributes toSpanEventAttributes(LogRecordData logRecord) {
AttributesBuilder builder =
logRecord.getAttributes().toBuilder().removeIf(key -> key.equals(EVENT_NAME));

builder.put(LOG_RECORD_OBSERVED_TIME_UNIX_NANO, logRecord.getObservedTimestampEpochNanos());

builder.put(LOG_RECORD_SEVERITY_NUMBER, logRecord.getSeverity().getSeverityNumber());

// Add bridging for logRecord.getSeverityText() if EventBuilder adds severity text setter

Value<?> body = logRecord.getBodyValue();
if (body != null) {
MarshalerWithSize marshaler = AnyValueMarshaler.create(body);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
jack-berg marked this conversation as resolved.
Show resolved Hide resolved
try {
marshaler.writeJsonTo(baos);
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Error converting log record body to JSON", e);
}
builder.put(LOG_RECORD_BODY, new String(baos.toByteArray(), StandardCharsets.UTF_8));
jack-berg marked this conversation as resolved.
Show resolved Hide resolved
}

int droppedAttributesCount =
logRecord.getTotalAttributeCount() - logRecord.getAttributes().size();
if (droppedAttributesCount > 0) {
builder.put(LOG_RECORD_DROPPED_ATTRIBUTES_COUNT, droppedAttributesCount);
}

return builder.build();
}

@Override
public String toString() {
return "EventToSpanEventBridge{}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.eventbridge.internal;

import io.opentelemetry.contrib.eventbridge.EventToSpanEventBridge;
import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider;
import io.opentelemetry.sdk.autoconfigure.spi.internal.StructuredConfigProperties;
import io.opentelemetry.sdk.logs.LogRecordProcessor;

/**
* Declarative configuration SPI implementation for {@link EventToSpanEventBridge}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class EventToSpanEventBridgeComponentProvider
implements ComponentProvider<LogRecordProcessor> {

@Override
public Class<LogRecordProcessor> getType() {
return LogRecordProcessor.class;
}

@Override
public String getName() {
return "event_to_span_event_bridge";
}

@Override
public LogRecordProcessor create(StructuredConfigProperties config) {
return EventToSpanEventBridge.create();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.opentelemetry.contrib.eventbridge.internal.EventToSpanEventBridgeComponentProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.eventbridge;

import static io.opentelemetry.api.common.AttributeKey.longKey;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.Value;
import io.opentelemetry.api.incubator.events.EventLogger;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
import io.opentelemetry.sdk.logs.internal.SdkEventLoggerProvider;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
import io.opentelemetry.sdk.testing.time.TestClock;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;

class EventToSpanEventBridgeTest {

private final InMemorySpanExporter spanExporter = InMemorySpanExporter.create();
private final SdkTracerProvider tracerProvider =
SdkTracerProvider.builder()
.setSampler(onlyServerSpans())
.addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
.build();
private final TestClock testClock = TestClock.create();
private final SdkEventLoggerProvider eventLoggerProvider =
SdkEventLoggerProvider.create(
SdkLoggerProvider.builder()
.setClock(testClock)
.addLogRecordProcessor(EventToSpanEventBridge.create())
.build());
private final Tracer tracer = tracerProvider.get("tracer");
private final EventLogger eventLogger = eventLoggerProvider.get("event-logger");

private static Sampler onlyServerSpans() {
return new Sampler() {
@Override
public SamplingResult shouldSample(
Context parentContext,
String traceId,
String name,
SpanKind spanKind,
Attributes attributes,
List<LinkData> parentLinks) {
return SpanKind.SERVER.equals(spanKind)
? SamplingResult.recordAndSample()
: SamplingResult.drop();
}

@Override
public String getDescription() {
return "description";
}
};
}

@Test
void withRecordingSpan_BridgesEvent() {
testClock.setTime(Instant.ofEpochMilli(1));

Span span = tracer.spanBuilder("span").setSpanKind(SpanKind.SERVER).startSpan();
try (Scope unused = span.makeCurrent()) {
eventLogger
.builder("my.event-name")
.setTimestamp(100, TimeUnit.NANOSECONDS)
.setSeverity(Severity.DEBUG)
.put("foo", "bar")
.put("number", 1)
.put("map", Value.of(Collections.singletonMap("key", Value.of("value"))))
.setAttributes(Attributes.builder().put("color", "red").build())
.setAttributes(Attributes.builder().put("shape", "square").build())
.emit();
} finally {
span.end();
}

assertThat(spanExporter.getFinishedSpanItems())
.satisfiesExactly(
spanData ->
assertThat(spanData)
.hasName("span")
.hasEventsSatisfyingExactly(
spanEvent ->
spanEvent
.hasName("my.event-name")
.hasTimestamp(100, TimeUnit.NANOSECONDS)
.hasAttributesSatisfying(
attributes -> {
assertThat(attributes.get(stringKey("color")))
.isEqualTo("red");
assertThat(attributes.get(stringKey("shape")))
.isEqualTo("square");
assertThat(
attributes.get(
longKey("log.record.observed_time_unix_nano")))
.isEqualTo(1000000L);
assertThat(
attributes.get(longKey("log.record.severity_number")))
.isEqualTo(Severity.DEBUG.getSeverityNumber());
assertThat(attributes.get(stringKey("log.record.body")))
.isEqualTo(
"{\"kvlistValue\":{\"values\":[{\"key\":\"number\",\"value\":{\"intValue\":\"1\"}},{\"key\":\"foo\",\"value\":{\"stringValue\":\"bar\"}},{\"key\":\"map\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"key\",\"value\":{\"stringValue\":\"value\"}}]}}}]}}");
})));
}

@Test
void noSpan_doesNotBridgeEvent() {
eventLogger
.builder("my.event-name")
.setTimestamp(100, TimeUnit.NANOSECONDS)
.setSeverity(Severity.DEBUG)
.put("foo", "bar")
.put("number", 1)
.put("map", Value.of(Collections.singletonMap("key", Value.of("value"))))
.setAttributes(Attributes.builder().put("color", "red").build())
.setAttributes(Attributes.builder().put("shape", "square").build())
.emit();

assertThat(spanExporter.getFinishedSpanItems()).isEmpty();
}

@Test
void nonRecordingSpan_doesNotBridgeEvent() {
Span span = tracer.spanBuilder("span").setSpanKind(SpanKind.INTERNAL).startSpan();
try (Scope unused = span.makeCurrent()) {
eventLogger
.builder("my.event-name")
.setTimestamp(100, TimeUnit.NANOSECONDS)
.setSeverity(Severity.DEBUG)
.put("foo", "bar")
.put("number", 1)
.put("map", Value.of(Collections.singletonMap("key", Value.of("value"))))
.setAttributes(Attributes.builder().put("color", "red").build())
.setAttributes(Attributes.builder().put("shape", "square").build())
.emit();
} finally {
span.end();
}

assertThat(spanExporter.getFinishedSpanItems()).isEmpty();
jack-berg marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading
Loading