Skip to content

Commit

Permalink
adds RocketMQTracing
Browse files Browse the repository at this point in the history
Signed-off-by: Adrian Cole <adrian@tetrate.io>
  • Loading branch information
JoeKerouac authored and Adrian Cole committed Jan 29, 2024
1 parent 21779e3 commit 01266d6
Show file tree
Hide file tree
Showing 33 changed files with 1,174 additions and 33 deletions.
10 changes: 10 additions & 0 deletions brave-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,16 @@
<artifactId>brave-instrumentation-okhttp3</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-instrumentation-rocketmq-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-instrumentation-rocketmq-clients</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-instrumentation-rpc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2023 The OpenZipkin Authors
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -19,7 +19,7 @@
import org.junit.jupiter.api.Test;

import static brave.propagation.CurrentTraceContext.Default.INHERITABLE;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;

class InheritableDefaultCurrentTraceContextTest extends CurrentTraceContextTest {
@Override protected Class<? extends Supplier<CurrentTraceContext.Builder>> builderSupplier() {
Expand All @@ -33,7 +33,7 @@ static class BuilderSupplier implements Supplier<CurrentTraceContext.Builder> {
}

@Test protected void isnt_inheritable() {
assertThrows(AssertionError.class, () -> {
assertThatExceptionOfType(AssertionError.class).isThrownBy(() -> {
super.isnt_inheritable();
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2023 The OpenZipkin Authors
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -17,7 +17,7 @@
import org.junit.jupiter.api.Test;

import static brave.test.util.ClassLoaders.assertRunIsUnloadable;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;

class ThreadLocalCurrentTraceContextClassLoaderTest {

Expand Down Expand Up @@ -64,7 +64,7 @@ static class LeakedNullScope implements Runnable {
* prior to the classloader unload, which would be an odd thing to rely on.
*/
@Test void leakedScope_preventsUnloading() {
assertThrows(AssertionError.class, () -> {
assertThatExceptionOfType(AssertionError.class).isThrownBy(() -> {
assertRunIsUnloadable(LeakedScope.class, getClass().getClassLoader());
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2023 The OpenZipkin Authors
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -17,7 +17,7 @@
import org.junit.jupiter.api.Test;

import static brave.test.util.ClassLoaders.assertRunIsUnloadable;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;

class ThreadLocalSpanClassLoaderTest {

Expand Down Expand Up @@ -66,7 +66,7 @@ static class CurrentTracerBasicUsage implements Runnable {
* tolerant, for example considering weak references or similar.
*/
@Test void unfinishedSpan_preventsUnloading() {
assertThrows(AssertionError.class, () -> {
assertThatExceptionOfType(AssertionError.class).isThrownBy(() -> {
assertRunIsUnloadable(CurrentTracerDoesntFinishSpan.class, getClass().getClassLoader());
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2023 The OpenZipkin Authors
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -19,7 +19,7 @@

import static brave.test.util.ClassLoaders.assertRunIsUnloadable;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;

class ClassLoadersTest {
static class Foo {
Expand Down Expand Up @@ -66,7 +66,7 @@ static class PresentThreadLocalWithApplicationType implements Runnable {
}

@Test void assertRunIsUnloadable_threadLocalWithOurClassIsntUnloadable() {
assertThrows(AssertionError.class, () -> {
assertThatExceptionOfType(AssertionError.class).isThrownBy(() -> {
assertRunIsUnloadable(PresentThreadLocalWithApplicationType.class, getClass().getClassLoader());
});
}
Expand Down
6 changes: 3 additions & 3 deletions brave/src/test/java/brave/internal/collect/ListsTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2023 The OpenZipkin Authors
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -21,7 +21,7 @@
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;

class ListsTest {

Expand All @@ -41,7 +41,7 @@ class ListsTest {
}

@Test void ensureImmutable_returnsImmutableEmptyList() {
assertThrows(UnsupportedOperationException.class, () -> {
assertThatExceptionOfType(UnsupportedOperationException.class).isThrownBy(() -> {
Lists.ensureImmutable(new ArrayList<>()).add("foo");
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
import static org.mockito.Mockito.mock;

class StrictScopeDecoratorTest {
Expand Down Expand Up @@ -179,7 +179,7 @@ Closeable businessMethodThreadLocalSpan() {

// ThreadLocalSpan by definition can't be closed on another thread
@Test void scope_close_onWrongThread_threadLocalSpan() throws Exception {
assertThrows(AssertionError.class, () -> {
assertThatExceptionOfType(AssertionError.class).isThrownBy(() -> {
scope_close_onWrongThread(businessClass::businessMethodThreadLocalSpan,
"businessMethodThreadLocalSpan");
});
Expand Down
6 changes: 3 additions & 3 deletions brave/src/test/java/brave/sampler/CountingSamplerTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2023 The OpenZipkin Authors
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -16,8 +16,8 @@
import org.assertj.core.data.Percentage;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
import static org.assertj.core.data.Percentage.withPercentage;
import static org.junit.jupiter.api.Assertions.assertThrows;

class CountingSamplerTest extends SamplerTest {
@Override Sampler newSampler(float probability) {
Expand All @@ -29,7 +29,7 @@ class CountingSamplerTest extends SamplerTest {
}

@Test void probabilityMinimumOnePercent() {
assertThrows(IllegalArgumentException.class, () -> {
assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> {
newSampler(0.0001f);
});
}
Expand Down
8 changes: 4 additions & 4 deletions brave/src/test/java/brave/sampler/SamplerTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2023 The OpenZipkin Authors
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -20,7 +20,7 @@
import org.junit.jupiter.params.provider.ValueSource;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;

abstract class SamplerTest {
/**
Expand Down Expand Up @@ -59,13 +59,13 @@ void retainsPerSampleProbability(float sampleProbability) {
}

@Test void probabilityCantBeNegative() {
assertThrows(IllegalArgumentException.class, () -> {
assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> {
newSampler(-1.0f);
});
}

@Test void probabilityCantBeOverOne() {
assertThrows(IllegalArgumentException.class, () -> {
assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> {
newSampler(1.1f);
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2023 The OpenZipkin Authors
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -24,8 +24,8 @@
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

class MDCScopeDecoratorTest extends CurrentTraceContextTest {
public MDCScopeDecoratorTest() {
Expand Down Expand Up @@ -60,7 +60,7 @@ static class BuilderSupplier implements Supplier<CurrentTraceContext.Builder> {

@Test // Log4J 1.2.x MDC is inheritable by default
public void isnt_inheritable() throws Exception {
assertThrows(AssertionError.class, () -> {
assertThatExceptionOfType(AssertionError.class).isThrownBy(() -> {
super.isnt_inheritable();
});
}
Expand Down
1 change: 1 addition & 0 deletions instrumentation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Here's a brief overview of what's packaged here:
* [mysql8](mysql8/README.md) - Tracing MySQL v8 statement interceptor
* [netty-codec-http](netty-codec-http/README.md) - Tracing handler for [Netty](http://netty.io/) 4.x http servers
* [okhttp3](okhttp3/README.md) - Tracing decorators for [OkHttp](https://github.com/square/okhttp) 3.x
* [rocketmq-client](rocketmq-client/README.md) - Tracing decorators for RocketMQ producers and consumers.
* [servlet](servlet/README.md) - Tracing filter for Servlet 2.5+ (including Async)
* [spring-rabbit](spring-rabbit/README.md) - Tracing MessagePostProcessor and ListenerAdvice for [Spring Rabbit](https://spring.io/guides/gs/messaging-rabbitmq/)
* [spring-web](spring-web/README.md) - Tracing interceptor for [Spring RestTemplate](https://spring.io/guides/gs/consuming-rest/)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;

class TracingKafkaClientSupplierTests {

Expand All @@ -33,7 +33,7 @@ class TracingKafkaClientSupplierTests {
}

@Test void shouldThrowException() {
assertThrows(UnsupportedOperationException.class, () -> {
assertThatExceptionOfType(UnsupportedOperationException.class).isThrownBy(() -> {
FakeKafkaClientSupplier fake = new FakeKafkaClientSupplier();
fake.getAdmin(props);
});
Expand Down
1 change: 1 addition & 0 deletions instrumentation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<module>mysql6</module>
<module>mysql8</module>
<module>okhttp3</module>
<module>rocketmq-client</module>
<module>rpc</module>
<module>servlet</module>
<module>servlet-jakarta</module>
Expand Down
100 changes: 100 additions & 0 deletions instrumentation/rocketmq-client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# brave-instrumentation-rocketmq-client

## Tracing for RocketMQ Client

This module provides instrumentation for RocketMQ based services.

## example

### producer

The key is to register our hook to the producer

```java
package brave.rocketmq.client;

import brave.Tracing;
import brave.messaging.MessagingRequest;
import brave.messaging.MessagingTracing;
import brave.sampler.SamplerFunction;
import brave.sampler.SamplerFunctions;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class ProducerExample {

public static void main(String[] args) throws Exception {
// todo Replaced with actual tracing construct
Tracing tracing = Tracing.newBuilder().build();
SamplerFunction<MessagingRequest> producerSampler = SamplerFunctions.deferDecision();
RocketMQTracing producerTracing = RocketMQTracing.create(
MessagingTracing.newBuilder(tracing).producerSampler(producerSampler).build());

String topic = "testSend";
Message message = new Message(topic, "JoeKerouac", "hello".getBytes());
DefaultMQProducer producer = new DefaultMQProducer("testSend");
// todo This is the key, register the hook to the producer
producer.getDefaultMQProducerImpl()
.registerSendMessageHook(new SendMessageBraveHookImpl(producerTracing));
// Replace with actual address
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
producer.send(message);

producer.shutdown();
}
}

```

### consumer

Replace `org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently`
with `brave.rocketmq.client.TracingMessageListenerConcurrently`
or `org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly`
with `brave.rocketmq.client.TracingMessageListenerOrderly`;

```java
package brave.rocketmq.client;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.messaging.MessagingRequest;
import brave.messaging.MessagingTracing;
import brave.sampler.SamplerFunction;
import brave.sampler.SamplerFunctions;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.Optional;

public class ProducerExample {

public static void main(String[] args) throws Exception {
// todo Replaced with actual tracing construct
Tracing tracing = Tracing.newBuilder().build();
SamplerFunction<MessagingRequest> producerSampler = SamplerFunctions.deferDecision();
RocketMQTracing producerTracing = RocketMQTracing.create(
MessagingTracing.newBuilder(tracing).producerSampler(producerSampler).build());

String topic = "testPushConsumer";
String nameserverAddr = "127.0.0.1:9876";

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testPushConsumer");
consumer.setNamesrvAddr(nameserverAddr);
consumer.subscribe(topic, "*");
consumer.registerMessageListener(new TraceableMessageListenerConcurrently(0, producerTracing) {
@Override
protected void handleMessage(MessageExt messageExt) {
Span span =
Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null);
// do something
}
});
consumer.start();
}
}

```

6 changes: 6 additions & 0 deletions instrumentation/rocketmq-client/bnd.bnd
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# We use brave.internal.Nullable, but it is not used at runtime.
Import-Package: \
!brave.internal*,\
*
Export-Package: \
brave.rocketmq.client
Loading

0 comments on commit 01266d6

Please sign in to comment.