Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make @RabbitListener propagate context properly #3339

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apply from: "$rootDir/gradle/instrumentation.gradle"

muzzle {
pass {
group = 'org.springframework.amqp'
module = 'spring-rabbit'
versions = "(,)"
}
}

dependencies {
library 'org.springframework.amqp:spring-rabbit:1.0.0.RELEASE'

testInstrumentation project(':instrumentation:rabbitmq-2.7:javaagent')

// 2.1.7 adds the @RabbitListener annotation, we need that for tests
testLibrary 'org.springframework.amqp:spring-rabbit:2.1.7.RELEASE'
testLibrary "org.springframework.boot:spring-boot-starter-test:1.5.22.RELEASE"
testLibrary "org.springframework.boot:spring-boot-starter:1.5.22.RELEASE"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.rabbit;

import static io.opentelemetry.javaagent.instrumentation.spring.rabbit.SpringRabbitSingletons.instrumenter;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.springframework.amqp.core.Message;

public class AbstractMessageListenerContainerInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("invokeListener")
.and(
takesArguments(2)
.and(
takesArgument(1, Object.class)
.or(takesArgument(1, named("org.springframework.amqp.core.Message"))))),
getClass().getName() + "$InvokeListenerAdvice");
}

@SuppressWarnings("unused")
public static class InvokeListenerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(1) Object data,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
if (!(data instanceof Message)) {
return;
}

Context parentContext = Java8BytecodeBridge.currentContext();
Message message = (Message) data;
if (instrumenter().shouldStart(parentContext, message)) {
context = instrumenter().start(parentContext, message);
scope = context.makeCurrent();
}
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void onEnter(
@Advice.Argument(1) Object data,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope,
@Advice.Thrown Throwable throwable) {
if (scope == null || !(data instanceof Message)) {
return;
}
scope.close();
instrumenter().end(context, (Message) data, null, throwable);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.rabbit;

import io.opentelemetry.context.propagation.TextMapGetter;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.amqp.core.Message;

final class MessageHeaderGetter implements TextMapGetter<Message> {
@Override
public Iterable<String> keys(Message carrier) {
return carrier.getMessageProperties().getHeaders().keySet();
}

@Nullable
@Override
public String get(Message carrier, String key) {
Object value = carrier.getMessageProperties().getHeaders().get(key);
return value == null ? null : value.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.rabbit;

import static java.util.Collections.singletonList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class SpringRabbitInstrumentationModule extends InstrumentationModule {
public SpringRabbitInstrumentationModule() {
super("spring-rabbit", "spring-rabbit-1.0");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new AbstractMessageListenerContainerInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.rabbit;

import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.amqp.core.Message;

final class SpringRabbitMessageAttributesExtractor
extends MessagingAttributesExtractor<Message, Void> {
@Override
protected String system(Message message) {
return "rabbitmq";
}

@Override
protected String destinationKind(Message message) {
return "queue";
}

@Override
protected @Nullable String destination(Message message) {
return message.getMessageProperties().getReceivedRoutingKey();
}

@Override
protected boolean temporaryDestination(Message message) {
return false;
}

@Override
protected @Nullable String protocol(Message message) {
return null;
}

@Override
protected @Nullable String protocolVersion(Message message) {
return null;
}

@Override
protected @Nullable String url(Message message) {
return null;
}

@Override
protected @Nullable String conversationId(Message message) {
return null;
}

@Override
protected Long messagePayloadSize(Message message) {
return message.getMessageProperties().getContentLength();
}

@Override
protected @Nullable Long messagePayloadCompressedSize(Message message) {
return null;
}

@Override
protected MessageOperation operation(Message message) {
return MessageOperation.PROCESS;
}

@Override
protected @Nullable String messageId(Message message, @Nullable Void unused) {
return message.getMessageProperties().getMessageId();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.rabbit;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import org.springframework.amqp.core.Message;

public final class SpringRabbitSingletons {

private static final String INSTRUMENTATION_NAME = "io.opentelemetry.javaagent.spring-rabbit-1.0";

private static final Instrumenter<Message, Void> INSTRUMENTER;

static {
SpringRabbitMessageAttributesExtractor attributesExtractor =
new SpringRabbitMessageAttributesExtractor();
SpanNameExtractor<Message> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);

INSTRUMENTER =
Instrumenter.<Message, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.newConsumerInstrumenter(new MessageHeaderGetter());
}

public static Instrumenter<Message, Void> instrumenter() {
return INSTRUMENTER;
}

private SpringRabbitSingletons() {}
}
Loading