Skip to content

Commit

Permalink
fix todo
Browse files Browse the repository at this point in the history
  • Loading branch information
JoeKerouac committed Feb 6, 2024
1 parent 9c72c70 commit 054034e
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 58 deletions.
32 changes: 21 additions & 11 deletions instrumentation/rocketmq-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,33 +49,39 @@ public class ProducerExample {

### consumer

Replace `org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently`
with `brave.rocketmq.client.TracingMessageListenerConcurrently`
or `org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly`
with `brave.rocketmq.client.TracingMessageListenerOrderly`;
wrap `org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently`
using `brave.rocketmq.client.RocketMQTracing.wrap(long, org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly)`,
or alternatively, wrap `org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly`
using `brave.rocketmq.client.RocketMQTracing.wrap(int, org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently)`;

```java
package brave.rocketmq.client;

import java.util.List;
import java.util.Optional;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.messaging.MessagingRequest;
import brave.messaging.MessagingTracing;
import brave.rocketmq.client.RocketMQTracing;
import brave.sampler.SamplerFunction;
import brave.sampler.SamplerFunctions;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.Optional;

public class ProducerExample {

public static void main(String[] args) throws Exception {
// todo Replaced with actual tracing construct
Tracing tracing = Tracing.newBuilder().build();
SamplerFunction<MessagingRequest> producerSampler = SamplerFunctions.deferDecision();
RocketMQTracing producerTracing = RocketMQTracing.create(
RocketMQTracing rocketMQTracing = RocketMQTracing.create(
MessagingTracing.newBuilder(tracing).producerSampler(producerSampler).build());

String topic = "testPushConsumer";
Expand All @@ -84,16 +90,20 @@ public class ProducerExample {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testPushConsumer");
consumer.setNamesrvAddr(nameserverAddr);
consumer.subscribe(topic, "*");
consumer.registerMessageListener(new TraceableMessageListenerConcurrently(0, producerTracing) {
MessageListenerConcurrently messageListenerConcurrently = rocketMQTracing.wrap(new MessageListenerConcurrently() {
@Override
protected void handleMessage(MessageExt messageExt) {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
Span span =
Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null);
// do something
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.registerMessageListener(messageListenerConcurrently);

consumer.start();
}

}

```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@
import brave.propagation.TraceContext.Injector;
import brave.propagation.TraceContextOrSamplingFlags;
import brave.sampler.SamplerFunction;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;

import java.util.Map;

public class RocketMQTracing {

private static final long defaultSuspendCurrentQueueTimeMillis = 1000;
private static final int defaultDelayLevelWhenNextConsume = 0;

public static RocketMQTracing create(Tracing tracing) {
return new RocketMQTracing(MessagingTracing.create(tracing), RocketMQTags.ROCKETMQ_SERVICE);
}
Expand Down Expand Up @@ -106,4 +113,35 @@ public Tracing tracing() {
public Tracer tracer() {
return tracer;
}

public MessageListenerOrderly wrap(MessageListenerOrderly messageListenerOrderly) {
return new TracingMessageListenerOrderly(defaultSuspendCurrentQueueTimeMillis, this, messageListenerOrderly);
}

public MessageListenerOrderly wrap(long suspendCurrentQueueTimeMillis, MessageListenerOrderly messageListenerOrderly) {
return new TracingMessageListenerOrderly(suspendCurrentQueueTimeMillis, this, messageListenerOrderly);
}

public MessageListenerConcurrently wrap(MessageListenerConcurrently messageListenerConcurrently) {
return new TracingMessageListenerConcurrently(defaultDelayLevelWhenNextConsume, this, messageListenerConcurrently);
}

public MessageListenerConcurrently wrap(int delayLevelWhenNextConsume, MessageListenerConcurrently messageListenerConcurrently) {
return new TracingMessageListenerConcurrently(delayLevelWhenNextConsume, this, messageListenerConcurrently);
}

public MessageListenerOrderly unwrap(MessageListenerOrderly messageListenerOrderly) {
if (messageListenerOrderly instanceof TracingMessageListenerOrderly) {
return ((TracingMessageListenerOrderly)messageListenerOrderly).messageListenerOrderly;
}
return null;
}

public MessageListenerConcurrently unwrap(MessageListenerConcurrently messageListenerConcurrently) {
if (messageListenerConcurrently instanceof TracingMessageListenerConcurrently) {
return ((TracingMessageListenerConcurrently)messageListenerConcurrently).messageListenerConcurrently;
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,28 @@
*/
package brave.rocketmq.client;

import brave.Span;
import brave.Tracer;
import brave.Tracer.SpanInScope;
import java.util.Collections;
import java.util.List;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

// TODO: I think we don't want to expose a custom class rather wrap in context and prove a user can
// do custom tagging via their own MessageListenerConcurrently.
// Maybe expose RocketMQTracing.messageListenerConcurrently() to wrap theirs or make spans default
// and not expose this.
public abstract class TracingMessageListenerConcurrently implements MessageListenerConcurrently {
import brave.Span;
import brave.Tracer.SpanInScope;

class TracingMessageListenerConcurrently implements MessageListenerConcurrently {

private final int delayLevelWhenNextConsume;

private final RocketMQTracing tracing;
final MessageListenerConcurrently messageListenerConcurrently;

public TracingMessageListenerConcurrently(int delayLevelWhenNextConsume,
RocketMQTracing tracing) {
TracingMessageListenerConcurrently(int delayLevelWhenNextConsume,
RocketMQTracing tracing, MessageListenerConcurrently messageListenerConcurrently) {
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
this.tracing = tracing;
this.messageListenerConcurrently = messageListenerConcurrently;
}

@Override
Expand All @@ -50,7 +49,7 @@ public final ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

ConsumeConcurrentlyStatus result;
try (SpanInScope scope = tracing.tracer().withSpanInScope(span)) {
result = handleMessage(msg, context);
result = messageListenerConcurrently.consumeMessage(Collections.singletonList(msg), context);
} catch (Exception e) {
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
result = ConsumeConcurrentlyStatus.RECONSUME_LATER;
Expand All @@ -66,7 +65,4 @@ public final ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

protected abstract ConsumeConcurrentlyStatus handleMessage(MessageExt messageExt,
ConsumeConcurrentlyContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,27 @@
*/
package brave.rocketmq.client;

import brave.Span;
import brave.Tracer;
import brave.Tracer.SpanInScope;
import java.util.Collections;
import java.util.List;

import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

// TODO: I think we don't want to expose a custom class rather wrap in context and prove a user can
// do custom tagging via their own MessageListenerOrderly.
// Maybe expose RocketMQTracing.messageListenerOrderly() to wrap theirs or make spans default
// and not expose this.
public abstract class TracingMessageListenerOrderly implements MessageListenerOrderly {
import brave.Span;
import brave.Tracer.SpanInScope;

class TracingMessageListenerOrderly implements MessageListenerOrderly {
private final long suspendCurrentQueueTimeMillis;
private final RocketMQTracing tracing;
final MessageListenerOrderly messageListenerOrderly;

public TracingMessageListenerOrderly(long suspendCurrentQueueTimeMillis,
RocketMQTracing tracing) {
TracingMessageListenerOrderly(long suspendCurrentQueueTimeMillis,
RocketMQTracing tracing, MessageListenerOrderly messageListenerOrderly) {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
this.tracing = tracing;
this.messageListenerOrderly = messageListenerOrderly;
}

@Override
Expand All @@ -48,7 +48,7 @@ public final ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,

ConsumeOrderlyStatus result;
try (SpanInScope scope = tracing.tracer().withSpanInScope(span)) {
result = handleMessage(msg, context);
result = messageListenerOrderly.consumeMessage(Collections.singletonList(msg), context);
} catch (Exception e) {
span.error(e);
context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
Expand All @@ -65,7 +65,4 @@ public final ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,

return ConsumeOrderlyStatus.SUCCESS;
}

protected abstract ConsumeOrderlyStatus handleMessage(MessageExt messageExt,
ConsumeOrderlyContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ public void sendMessageBefore(SendMessageContext context) {
request,
msg.getProperties());
span.name(RocketMQTags.TO_PREFIX + msg.getTopic());
span.tag(RocketMQTags.ROCKETMQ_TAGS, Util.getOrEmpty(msg.getTags()));
if (msg.getTags() != null && !msg.getTags().isEmpty()) {
span.tag(RocketMQTags.ROCKETMQ_TAGS, msg.getTags());
}
context.setMqTraceContext(span);
tracing.producerInjector.inject(span.context(), request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,4 @@ static <T extends MessagingRequest> Span createAndStartSpan(RocketMQTracing trac
return span;
}

// TODO: we shouldn't add tags with empty values!
static String getOrEmpty(String obj) {
if (obj == null) {
return "";
} else {
return obj;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import brave.sampler.SamplerFunctions;
import brave.test.ITRemote;
import brave.test.IntegrationTestSpanHandler;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -33,6 +35,8 @@
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
Expand Down Expand Up @@ -145,17 +149,17 @@ class ITRocketMQTracingTest extends ITRemote {
consumer.subscribe(topic, "*");
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Span> reference = new AtomicReference<>();
consumer.registerMessageListener(new TracingMessageListenerConcurrently(0, consumerTracing) {
MessageListenerConcurrently messageListenerConcurrently = consumerTracing.wrap(new MessageListenerConcurrently() {
@Override
protected ConsumeConcurrentlyStatus handleMessage(MessageExt messageExt,
ConsumeConcurrentlyContext context) {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
Span span =
Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null);
reference.set(span);
latch.countDown();
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.registerMessageListener(messageListenerConcurrently);
producer.send(message);
consumer.start();

Expand Down Expand Up @@ -185,17 +189,17 @@ protected ConsumeConcurrentlyStatus handleMessage(MessageExt messageExt,
consumer.subscribe(topic, "*");
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Span> reference = new AtomicReference<>();
consumer.registerMessageListener(new TracingMessageListenerOrderly(0, consumerTracing) {
MessageListenerOrderly messageListenerOrderly = consumerTracing.wrap(new MessageListenerOrderly() {
@Override
protected ConsumeOrderlyStatus handleMessage(MessageExt messageExt,
ConsumeOrderlyContext context) {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
Span span =
Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null);
reference.set(span);
latch.countDown();
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.registerMessageListener(messageListenerOrderly);
producer.send(message);
consumer.start();

Expand Down Expand Up @@ -226,17 +230,17 @@ protected ConsumeOrderlyStatus handleMessage(MessageExt messageExt,
consumer.subscribe(topic, "*");
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Span> reference = new AtomicReference<>();
consumer.registerMessageListener(new TracingMessageListenerOrderly(0, consumerTracing) {
MessageListenerOrderly messageListenerOrderly = consumerTracing.wrap(new MessageListenerOrderly() {
@Override
protected ConsumeOrderlyStatus handleMessage(MessageExt messageExt,
ConsumeOrderlyContext context) {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
Span span =
Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null);
reference.set(span);
latch.countDown();
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.registerMessageListener(messageListenerOrderly);

producer.send(message);
consumer.start();
Expand Down

0 comments on commit 054034e

Please sign in to comment.