-
Notifications
You must be signed in to change notification settings - Fork 132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add LogRecordProcessor to record event log records as span events #1551
Changes from 3 commits
20f4ddd
02d0789
6dcedc0
3b2e9d5
c2580f3
e7ce50b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,39 @@ | ||
# Processors | ||
|
||
This module provides tools to intercept and process signals globally. | ||
## Interceptable exporters | ||
|
||
This module provides tools to intercept and process signals globally: | ||
|
||
* `InterceptableSpanExporter` | ||
* `InterceptableMetricExporter` | ||
* `InterceptableLogRecordExporter` | ||
|
||
## Event to SpanEvent Bridge | ||
|
||
`EventToSpanEventBridge` is a `LogRecordProcessor` which records events (i.e. log records with an `event.name` attribute) as span events for the current span if: | ||
|
||
* The log record has a valid span context | ||
* `Span.current()` returns a span where `Span.isRecording()` is true | ||
|
||
For details of how the event log record is translated to span event, see [EventToSpanEventBridge Javadoc](./src/main/java/io/opentelemetry/contrib/eventbridge/EventToSpanEventBridge.java). | ||
|
||
`EventToSpanEventBridge` can be referenced in [declarative configuration](https://opentelemetry.io/docs/languages/java/configuration/#declarative-configuration) as follows: | ||
|
||
```yaml | ||
// Configure tracer provider as usual, omitted for brevity | ||
tracer_provider: ... | ||
|
||
logger_provider: | ||
processors: | ||
- event_to_span_event_bridge: {} | ||
``` | ||
|
||
// TODO(jack-berg): remove "{}" after merging / rle https://github.com/open-telemetry/opentelemetry-java/pull/6891/files | ||
jack-berg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
## Component owners | ||
|
||
- [Cesar Munoz](https://github.com/LikeTheSalad), Elastic | ||
- [Jack Berg](https://github.com/jack-berg), New Relic | ||
- [Jason Plumb](https://github.com/breedx-splk), Splunk | ||
|
||
Learn more about component owners in [component_owners.yml](../.github/component_owners.yml). |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.eventbridge; | ||
|
||
import io.opentelemetry.api.common.AttributeKey; | ||
import io.opentelemetry.api.common.Attributes; | ||
import io.opentelemetry.api.common.AttributesBuilder; | ||
import io.opentelemetry.api.common.Value; | ||
import io.opentelemetry.api.trace.Span; | ||
import io.opentelemetry.context.Context; | ||
import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize; | ||
import io.opentelemetry.exporter.internal.otlp.AnyValueMarshaler; | ||
import io.opentelemetry.sdk.logs.LogRecordProcessor; | ||
import io.opentelemetry.sdk.logs.ReadWriteLogRecord; | ||
import io.opentelemetry.sdk.logs.data.LogRecordData; | ||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.logging.Level; | ||
import java.util.logging.Logger; | ||
|
||
/** | ||
* A processor that records events (i.e. log records with an {@code event.name} attribute) as span | ||
* events for the current span if: | ||
* | ||
* <ul> | ||
* <li>The log record has a valid span context | ||
* <li>{@link Span#current()} returns a span where {@link Span#isRecording()} is true | ||
* </ul> | ||
* | ||
* <p>The event {@link LogRecordData} is converted to attributes on the span event as follows: | ||
* | ||
* <ul> | ||
* <li>{@code event.name} attribute is mapped to span event name | ||
* <li>{@link LogRecordData#getTimestampEpochNanos()} is mapped to span event timestamp | ||
* <li>{@link LogRecordData#getAttributes()} are mapped to span event attributes, excluding {@code | ||
* event.name} | ||
* <li>{@link LogRecordData#getObservedTimestampEpochNanos()} is mapped to span event attribute | ||
* with key {@code log.record.observed_timestamp} | ||
* <li>{@link LogRecordData#getSeverity()} is mapped to span event attribute with key {@code | ||
* log.record.severity_number} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Exactly... if / when the spec comes out and tell us how to translate between events and span events, we'll of course follow those instructions. But until then, the less opinionated we can be the better, and following the proto field naming is a way to avoid having an opinion. |
||
* <li>{@link LogRecordData#getBodyValue()} is mapped to span event attribute with key {@code | ||
* log.record.body}, as an escaped JSON string following the standard protobuf JSON encoding | ||
* <li>{@link LogRecordData#getTotalAttributeCount()} - {@link | ||
* LogRecordData#getAttributes()}.size() is mapped to span event attribute with key {@code | ||
* log.record.dropped_attributes_count} | ||
* </ul> | ||
*/ | ||
public final class EventToSpanEventBridge implements LogRecordProcessor { | ||
|
||
private static final Logger LOGGER = Logger.getLogger(EventToSpanEventBridge.class.getName()); | ||
jack-berg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private static final AttributeKey<String> EVENT_NAME = AttributeKey.stringKey("event.name"); | ||
private static final AttributeKey<Long> LOG_RECORD_OBSERVED_TIME_UNIX_NANO = | ||
AttributeKey.longKey("log.record.observed_time_unix_nano"); | ||
private static final AttributeKey<Long> LOG_RECORD_SEVERITY_NUMBER = | ||
AttributeKey.longKey("log.record.severity_number"); | ||
private static final AttributeKey<String> LOG_RECORD_BODY = | ||
AttributeKey.stringKey("log.record.body"); | ||
private static final AttributeKey<Long> LOG_RECORD_DROPPED_ATTRIBUTES_COUNT = | ||
AttributeKey.longKey("log.record.dropped_attributes_count"); | ||
|
||
private EventToSpanEventBridge() {} | ||
|
||
/** Create an instance. */ | ||
public static EventToSpanEventBridge create() { | ||
return new EventToSpanEventBridge(); | ||
} | ||
|
||
@Override | ||
public void onEmit(Context context, ReadWriteLogRecord logRecord) { | ||
LogRecordData logRecordData = logRecord.toLogRecordData(); | ||
String eventName = logRecordData.getAttributes().get(EVENT_NAME); | ||
if (eventName == null) { | ||
return; | ||
} | ||
jack-berg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (!logRecordData.getSpanContext().isValid()) { | ||
return; | ||
} | ||
Span currentSpan = Span.current(); | ||
if (!currentSpan.isRecording()) { | ||
return; | ||
} | ||
currentSpan.addEvent( | ||
jack-berg marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jack-berg I just found this in the otep about events It says span events will be deprecated in the API. I suppose it will still stay in SDK / proto because many backends only support trace signal. Is there a non-API approach for this processor? Or is it not worth reading too much into that for now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the purpose of deprecating won't be to remove, but rather to clearly signal when it is no longer the preferred approach I don't think we will ever remove it since there's no plan for breaking changes |
||
eventName, | ||
toSpanEventAttributes(logRecordData), | ||
logRecordData.getTimestampEpochNanos(), | ||
TimeUnit.NANOSECONDS); | ||
} | ||
|
||
private static Attributes toSpanEventAttributes(LogRecordData logRecord) { | ||
AttributesBuilder builder = | ||
logRecord.getAttributes().toBuilder().removeIf(key -> key.equals(EVENT_NAME)); | ||
|
||
builder.put(LOG_RECORD_OBSERVED_TIME_UNIX_NANO, logRecord.getObservedTimestampEpochNanos()); | ||
|
||
builder.put(LOG_RECORD_SEVERITY_NUMBER, logRecord.getSeverity().getSeverityNumber()); | ||
|
||
// Add bridging for logRecord.getSeverityText() if EventBuilder adds severity text setter | ||
|
||
Value<?> body = logRecord.getBodyValue(); | ||
if (body != null) { | ||
MarshalerWithSize marshaler = AnyValueMarshaler.create(body); | ||
ByteArrayOutputStream baos = new ByteArrayOutputStream(); | ||
jack-berg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try { | ||
marshaler.writeJsonTo(baos); | ||
} catch (IOException e) { | ||
LOGGER.log(Level.WARNING, "Error converting log record body to JSON", e); | ||
} | ||
builder.put(LOG_RECORD_BODY, new String(baos.toByteArray(), StandardCharsets.UTF_8)); | ||
jack-berg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
int droppedAttributesCount = | ||
logRecord.getTotalAttributeCount() - logRecord.getAttributes().size(); | ||
if (droppedAttributesCount > 0) { | ||
builder.put(LOG_RECORD_DROPPED_ATTRIBUTES_COUNT, droppedAttributesCount); | ||
} | ||
|
||
return builder.build(); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "EventToSpanEventBridge{}"; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.eventbridge.internal; | ||
|
||
import io.opentelemetry.contrib.eventbridge.EventToSpanEventBridge; | ||
import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider; | ||
import io.opentelemetry.sdk.autoconfigure.spi.internal.StructuredConfigProperties; | ||
import io.opentelemetry.sdk.logs.LogRecordProcessor; | ||
|
||
/** | ||
* Declarative configuration SPI implementation for {@link EventToSpanEventBridge}. | ||
* | ||
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change | ||
* at any time. | ||
*/ | ||
public class EventToSpanEventBridgeComponentProvider | ||
implements ComponentProvider<LogRecordProcessor> { | ||
|
||
@Override | ||
public Class<LogRecordProcessor> getType() { | ||
return LogRecordProcessor.class; | ||
} | ||
|
||
@Override | ||
public String getName() { | ||
return "event_to_span_event_bridge"; | ||
} | ||
|
||
@Override | ||
public LogRecordProcessor create(StructuredConfigProperties config) { | ||
return EventToSpanEventBridge.create(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
io.opentelemetry.contrib.eventbridge.internal.EventToSpanEventBridgeComponentProvider |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.eventbridge; | ||
|
||
import static io.opentelemetry.api.common.AttributeKey.longKey; | ||
import static io.opentelemetry.api.common.AttributeKey.stringKey; | ||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; | ||
|
||
import io.opentelemetry.api.common.Attributes; | ||
import io.opentelemetry.api.common.Value; | ||
import io.opentelemetry.api.incubator.events.EventLogger; | ||
import io.opentelemetry.api.logs.Severity; | ||
import io.opentelemetry.api.trace.Span; | ||
import io.opentelemetry.api.trace.SpanKind; | ||
import io.opentelemetry.api.trace.Tracer; | ||
import io.opentelemetry.context.Context; | ||
import io.opentelemetry.context.Scope; | ||
import io.opentelemetry.sdk.logs.SdkLoggerProvider; | ||
import io.opentelemetry.sdk.logs.internal.SdkEventLoggerProvider; | ||
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; | ||
import io.opentelemetry.sdk.testing.time.TestClock; | ||
import io.opentelemetry.sdk.trace.SdkTracerProvider; | ||
import io.opentelemetry.sdk.trace.data.LinkData; | ||
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; | ||
import io.opentelemetry.sdk.trace.samplers.Sampler; | ||
import io.opentelemetry.sdk.trace.samplers.SamplingResult; | ||
import java.time.Instant; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.concurrent.TimeUnit; | ||
import org.junit.jupiter.api.Test; | ||
|
||
class EventToSpanEventBridgeTest { | ||
|
||
private final InMemorySpanExporter spanExporter = InMemorySpanExporter.create(); | ||
private final SdkTracerProvider tracerProvider = | ||
SdkTracerProvider.builder() | ||
.setSampler(onlyServerSpans()) | ||
.addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) | ||
.build(); | ||
private final TestClock testClock = TestClock.create(); | ||
private final SdkEventLoggerProvider eventLoggerProvider = | ||
SdkEventLoggerProvider.create( | ||
SdkLoggerProvider.builder() | ||
.setClock(testClock) | ||
.addLogRecordProcessor(EventToSpanEventBridge.create()) | ||
.build()); | ||
private final Tracer tracer = tracerProvider.get("tracer"); | ||
private final EventLogger eventLogger = eventLoggerProvider.get("event-logger"); | ||
|
||
private static Sampler onlyServerSpans() { | ||
return new Sampler() { | ||
@Override | ||
public SamplingResult shouldSample( | ||
Context parentContext, | ||
String traceId, | ||
String name, | ||
SpanKind spanKind, | ||
Attributes attributes, | ||
List<LinkData> parentLinks) { | ||
return SpanKind.SERVER.equals(spanKind) | ||
? SamplingResult.recordAndSample() | ||
: SamplingResult.drop(); | ||
} | ||
|
||
@Override | ||
public String getDescription() { | ||
return "description"; | ||
} | ||
}; | ||
} | ||
|
||
@Test | ||
void withRecordingSpan_BridgesEvent() { | ||
testClock.setTime(Instant.ofEpochMilli(1)); | ||
|
||
Span span = tracer.spanBuilder("span").setSpanKind(SpanKind.SERVER).startSpan(); | ||
try (Scope unused = span.makeCurrent()) { | ||
eventLogger | ||
.builder("my.event-name") | ||
.setTimestamp(100, TimeUnit.NANOSECONDS) | ||
.setSeverity(Severity.DEBUG) | ||
.put("foo", "bar") | ||
.put("number", 1) | ||
.put("map", Value.of(Collections.singletonMap("key", Value.of("value")))) | ||
.setAttributes(Attributes.builder().put("color", "red").build()) | ||
.setAttributes(Attributes.builder().put("shape", "square").build()) | ||
.emit(); | ||
} finally { | ||
span.end(); | ||
} | ||
|
||
assertThat(spanExporter.getFinishedSpanItems()) | ||
.satisfiesExactly( | ||
spanData -> | ||
assertThat(spanData) | ||
.hasName("span") | ||
.hasEventsSatisfyingExactly( | ||
spanEvent -> | ||
spanEvent | ||
.hasName("my.event-name") | ||
.hasTimestamp(100, TimeUnit.NANOSECONDS) | ||
.hasAttributesSatisfying( | ||
attributes -> { | ||
assertThat(attributes.get(stringKey("color"))) | ||
.isEqualTo("red"); | ||
assertThat(attributes.get(stringKey("shape"))) | ||
.isEqualTo("square"); | ||
assertThat( | ||
attributes.get( | ||
longKey("log.record.observed_time_unix_nano"))) | ||
.isEqualTo(1000000L); | ||
assertThat( | ||
attributes.get(longKey("log.record.severity_number"))) | ||
.isEqualTo(Severity.DEBUG.getSeverityNumber()); | ||
assertThat(attributes.get(stringKey("log.record.body"))) | ||
.isEqualTo( | ||
"{\"kvlistValue\":{\"values\":[{\"key\":\"number\",\"value\":{\"intValue\":\"1\"}},{\"key\":\"foo\",\"value\":{\"stringValue\":\"bar\"}},{\"key\":\"map\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"key\",\"value\":{\"stringValue\":\"value\"}}]}}}]}}"); | ||
}))); | ||
} | ||
|
||
@Test | ||
void noSpan_doesNotBridgeEvent() { | ||
eventLogger | ||
.builder("my.event-name") | ||
.setTimestamp(100, TimeUnit.NANOSECONDS) | ||
.setSeverity(Severity.DEBUG) | ||
.put("foo", "bar") | ||
.put("number", 1) | ||
.put("map", Value.of(Collections.singletonMap("key", Value.of("value")))) | ||
.setAttributes(Attributes.builder().put("color", "red").build()) | ||
.setAttributes(Attributes.builder().put("shape", "square").build()) | ||
.emit(); | ||
|
||
assertThat(spanExporter.getFinishedSpanItems()).isEmpty(); | ||
} | ||
|
||
@Test | ||
void nonRecordingSpan_doesNotBridgeEvent() { | ||
Span span = tracer.spanBuilder("span").setSpanKind(SpanKind.INTERNAL).startSpan(); | ||
try (Scope unused = span.makeCurrent()) { | ||
eventLogger | ||
.builder("my.event-name") | ||
.setTimestamp(100, TimeUnit.NANOSECONDS) | ||
.setSeverity(Severity.DEBUG) | ||
.put("foo", "bar") | ||
.put("number", 1) | ||
.put("map", Value.of(Collections.singletonMap("key", Value.of("value")))) | ||
.setAttributes(Attributes.builder().put("color", "red").build()) | ||
.setAttributes(Attributes.builder().put("shape", "square").build()) | ||
.emit(); | ||
} finally { | ||
span.end(); | ||
} | ||
|
||
assertThat(spanExporter.getFinishedSpanItems()).isEmpty(); | ||
jack-berg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the help!