Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make @RabbitListener propagate context properly #3339

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apply from: "$rootDir/gradle/instrumentation.gradle"

muzzle {
pass {
group = 'org.springframework.amqp'
module = 'spring-amqp'
versions = "(,)"
}
}

dependencies {
library 'org.springframework.amqp:spring-amqp:1.0.0.RELEASE'

testInstrumentation project(':instrumentation:rabbitmq-2.7:javaagent')

// 2.1.7 adds the @RabbitListener annotation, we need that for tests
testLibrary 'org.springframework.amqp:spring-amqp:2.1.7.RELEASE'
testLibrary 'org.springframework.amqp:spring-rabbit:2.1.7.RELEASE'
testLibrary "org.springframework.boot:spring-boot-starter-test:1.5.22.RELEASE"
testLibrary "org.springframework.boot:spring-boot-starter:1.5.22.RELEASE"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.amqp;

import static io.opentelemetry.javaagent.instrumentation.spring.amqp.SpringAmqpSingletons.instrumenter;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.springframework.amqp.core.Message;

public class AbstractMessageListenerInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("invokeListener")
.and(
takesArguments(2)
.and(
takesArgument(1, Object.class)
.or(takesArgument(1, named("org.springframework.amqp.core.Message"))))),
getClass().getName() + "$InvokeListenerAdvice");
}

@SuppressWarnings("unused")
public static class InvokeListenerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(1) Object data,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
if (!(data instanceof Message)) {
return;
}

Context parentContext = Java8BytecodeBridge.currentContext();
Message message = (Message) data;
if (instrumenter().shouldStart(parentContext, message)) {
context = instrumenter().start(parentContext, message);
scope = context.makeCurrent();
}
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void onEnter(
@Advice.Argument(1) Object data,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope,
@Advice.Thrown Throwable throwable) {
if (scope == null || !(data instanceof Message)) {
return;
}
scope.close();
instrumenter().end(context, (Message) data, null, throwable);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.amqp;

import io.opentelemetry.context.propagation.TextMapGetter;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.amqp.core.Message;

final class MessageHeaderGetter implements TextMapGetter<Message> {
@Override
public Iterable<String> keys(Message carrier) {
return carrier.getMessageProperties().getHeaders().keySet();
}

@Nullable
@Override
public String get(Message carrier, String key) {
Object value = carrier.getMessageProperties().getHeaders().get(key);
return value == null ? null : value.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.amqp;

import static java.util.Collections.singletonList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class SpringAmqpInstrumentationModule extends InstrumentationModule {
public SpringAmqpInstrumentationModule() {
super("spring-amqp", "spring-amqp-1.0");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new AbstractMessageListenerInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.amqp;

import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.amqp.core.Message;

final class SpringAmqpMessageAttributesExtractor
extends MessagingAttributesExtractor<Message, Void> {
@Override
protected String system(Message message) {
return "rabbitmq";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should rename this module to spring-amqp-rabbitmq?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'll rename it to spring-rabbit - this is the name of the library that actually contains AbstractMessageListenerContainer

}

@Override
protected String destinationKind(Message message) {
return "queue";
}

@Override
protected @Nullable String destination(Message message) {
return message.getMessageProperties().getReceivedRoutingKey();
}

@Override
protected boolean temporaryDestination(Message message) {
return false;
}

@Override
protected @Nullable String protocol(Message message) {
return null;
}

@Override
protected @Nullable String protocolVersion(Message message) {
return null;
}

@Override
protected @Nullable String url(Message message) {
return null;
}

@Override
protected @Nullable String conversationId(Message message) {
return null;
}

@Override
protected Long messagePayloadSize(Message message) {
return message.getMessageProperties().getContentLength();
}

@Override
protected @Nullable Long messagePayloadCompressedSize(Message message) {
return null;
}

@Override
protected MessageOperation operation(Message message) {
return MessageOperation.PROCESS;
}

@Override
protected @Nullable String messageId(Message message, @Nullable Void unused) {
return message.getMessageProperties().getMessageId();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.amqp;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import org.springframework.amqp.core.Message;

public final class SpringAmqpSingletons {

private static final String INSTRUMENTATION_NAME = "io.opentelemetry.javaagent.spring-amqp-1.0";

private static final Instrumenter<Message, Void> INSTRUMENTER;

static {
SpringAmqpMessageAttributesExtractor attributesExtractor =
new SpringAmqpMessageAttributesExtractor();
SpanNameExtractor<Message> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);

INSTRUMENTER =
Instrumenter.<Message, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.newConsumerInstrumenter(new MessageHeaderGetter());
}

public static Instrumenter<Message, Void> instrumenter() {
return INSTRUMENTER;
}

private SpringAmqpSingletons() {}
}
Loading