Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Prepare tracing methods for message processor and scheduleMessage #16524

Merged
merged 7 commits into from
Oct 28, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,21 @@
import com.azure.core.amqp.implementation.ErrorContextProvider;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.ProcessKind;
import reactor.core.publisher.Signal;

import java.nio.BufferOverflowException;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;

import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME;
import static com.azure.messaging.servicebus.implementation.MessageUtils.traceMessageSpan;

/**
* A class for aggregating {@link ServiceBusMessage messages} into a single, size-limited, batch. It is treated as a
* single AMQP message when sent to the Azure Service Bus service.
*/
public final class ServiceBusMessageBatch {
private static final String AZ_TRACING_NAMESPACE_VALUE = "Microsoft.ServiceBus";
private final ClientLogger logger = new ClientLogger(ServiceBusMessageBatch.class);
private final Object lock = new Object();
private final int maxMessageSize;
Expand Down Expand Up @@ -102,7 +92,10 @@ public boolean tryAdd(final ServiceBusMessage serviceBusMessage) {
throw logger.logExceptionAsWarning(new IllegalArgumentException("message cannot be null"));
}
ServiceBusMessage serviceBusMessageUpdated =
tracerProvider.isEnabled() ? traceMessageSpan(serviceBusMessage) : serviceBusMessage;
tracerProvider.isEnabled()
? traceMessageSpan(serviceBusMessage, serviceBusMessage.getContext(), hostname, entityPath,
tracerProvider)
: serviceBusMessage;

final int size;
try {
Expand Down Expand Up @@ -135,39 +128,6 @@ List<ServiceBusMessage> getMessages() {
return serviceBusMessageList;
}

/**
* Method to start and end a "Azure.EventHubs.message" span and add the "DiagnosticId" as a property of the
* message.
*
* @param serviceBusMessage The Message to add tracing span for.
*
* @return the updated Message data object.
*/
private ServiceBusMessage traceMessageSpan(ServiceBusMessage serviceBusMessage) {
Optional<Object> eventContextData = serviceBusMessage.getContext().getData(SPAN_CONTEXT_KEY);
if (eventContextData.isPresent()) {
// if message has context (in case of retries), don't start a message span or add a new context
return serviceBusMessage;
} else {
// Starting the span makes the sampling decision (nothing is logged at this time)
Context messageContext = serviceBusMessage.getContext()
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE)
.addData(ENTITY_PATH_KEY, entityPath)
.addData(HOST_NAME_KEY, hostname);
Context eventSpanContext = tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, messageContext,
ProcessKind.MESSAGE);
Optional<Object> eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY);
if (eventDiagnosticIdOptional.isPresent()) {
serviceBusMessage.getApplicationProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get()
.toString());
tracerProvider.endSpan(eventSpanContext, Signal.complete());
serviceBusMessage.addContext(SPAN_CONTEXT_KEY, eventSpanContext);
}
}

return serviceBusMessage;
}

private int getSize(final ServiceBusMessage serviceBusMessage, final boolean isFirst) {
Objects.requireNonNull(serviceBusMessage, "'serviceBusMessage' cannot be null.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
Expand All @@ -15,16 +21,32 @@
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import reactor.core.publisher.Signal;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME;
import static com.azure.core.util.tracing.Tracer.SCOPE_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE;


/**
* Contains helper methods for message conversions, reading status codes, and getting delivery state.
*/
Expand Down Expand Up @@ -253,4 +275,76 @@ private static TransactionalState getTransactionState(ByteBuffer transactionId,
transactionalState.setOutcome(outcome);
return transactionalState;
}

public static ServiceBusMessage traceMessageSpan(ServiceBusMessage serviceBusMessage,
YijunXieMS marked this conversation as resolved.
Show resolved Hide resolved
Context messageContext, String hostname, String entityPath, TracerProvider tracerProvider) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@YijunXieMS YijunXieMS Oct 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProcessKind.MESSAGE should be used to create a message. Just like in EventHubs.

Context eventSpanContext = tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, eventContext,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ProcessKind helps the tracer apply specific attributes on the span. For example, in the case of MESSAGE, the tracer will attribute the spanType=Producer, here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samvaity is this javadoc misleading? It looks like MESSAGE is used for receiving.

    /**
     * Amqp message process call to receive data.
     */
    MESSAGE,

Copy link
Member

@samvaity samvaity Oct 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we initially had this as RECEIVE and later updated to MESSAGE so the left over javadoc. But yes it should be updated to suit process kind "message". AMQP process kind for message spans.

Optional<Object> eventContextData = messageContext.getData(SPAN_CONTEXT_KEY);
if (eventContextData.isPresent()) {
// if message has context (in case of retries), don't start a message span or add a new context
return serviceBusMessage;
} else {
// Starting the span makes the sampling decision (nothing is logged at this time)
Context newMessageContext = messageContext
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE)
.addData(ENTITY_PATH_KEY, entityPath)
.addData(HOST_NAME_KEY, hostname);
YijunXieMS marked this conversation as resolved.
Show resolved Hide resolved
Context eventSpanContext = tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, newMessageContext,
ProcessKind.MESSAGE);
Optional<Object> eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY);
if (eventDiagnosticIdOptional.isPresent()) {
serviceBusMessage.getApplicationProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get()
.toString());
tracerProvider.endSpan(eventSpanContext, Signal.complete());
serviceBusMessage.addContext(SPAN_CONTEXT_KEY, eventSpanContext);
}
}
return serviceBusMessage;
}

/*
* Starts a new process tracing span and attaches the returned context to the ServiceBusReceivedMessage object for
* users.
*/
public static Context startProcessTracingSpan(ServiceBusReceivedMessage receivedMessage,
YijunXieMS marked this conversation as resolved.
Show resolved Hide resolved
String hostname, String entityPath, TracerProvider tracerProvider, ProcessKind processKind) {
Object diagnosticId = receivedMessage.getApplicationProperties().get(DIAGNOSTIC_ID_KEY);
if (diagnosticId == null || !tracerProvider.isEnabled()) {
return Context.NONE;
}

Context spanContext = tracerProvider.extractContext(diagnosticId.toString(), Context.NONE)
.addData(ENTITY_PATH_KEY, entityPath)
.addData(HOST_NAME_KEY, hostname)
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);
spanContext = receivedMessage.getEnqueuedTime() == null
? spanContext
: spanContext.addData(MESSAGE_ENQUEUED_TIME, receivedMessage.getEnqueuedTime().toEpochSecond());
return tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, spanContext, processKind);
}

/*
* Ends the process tracing span and the scope of that span.
*/
public static void endProcessTracingSpan(Context processSpanContext, Signal<Void> signal,
YijunXieMS marked this conversation as resolved.
Show resolved Hide resolved
TracerProvider tracerProvider, ClientLogger logger) {
if (processSpanContext != null) {
Optional<Object> spanScope = processSpanContext.getData(SCOPE_KEY);
// Disposes of the scope when the trace span closes.
if (tracerProvider.isEnabled() && spanScope.isPresent()) {
if (spanScope.get() instanceof Closeable) {
Closeable close = (Closeable) spanScope.get();
try {
close.close();
tracerProvider.endSpan(processSpanContext, signal);
} catch (IOException ioException) {
logger.error(Messages.MESSAGE_PROCESSOR_RUN_END, ioException);
}

} else {
logger.warning(String.format(Locale.US,
Messages.PROCESS_SPAN_SCOPE_TYPE_ERROR, spanScope.getClass()));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class Messages {
public static final String INVALID_LOCK_TOKEN_STRING = getMessage("INVALID_LOCK_TOKEN_STRING");
public static final String MESSAGE_NOT_OF_TYPE = getMessage("MESSAGE_NOT_OF_TYPE");
public static final String REQUEST_VALUE_NOT_VALID = getMessage("REQUEST_VALUE_NOT_VALID");
public static final String PROCESS_SPAN_SCOPE_TYPE_ERROR = getMessage("PROCESS_SPAN_SCOPE_TYPE_ERROR");
public static final String MESSAGE_PROCESSOR_RUN_END = getMessage("MESSAGE_PROCESSOR_RUN_END");

private static synchronized Properties getProperties() {
if (properties != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ MESSAGE_NOT_OF_TYPE=Message body type is not of type Data, but type: %s. Not set
REQUEST_VALUE_NOT_VALID=Back pressure request value not valid. It must be between {} and {}.
INVALID_OPERATION_DISPOSED_RECEIVER=Cannot perform operation '%s' on a disposed receiver.
INVALID_LOCK_TOKEN_STRING=Invalid lock token '%s'.
PROCESS_SPAN_SCOPE_TYPE_ERROR=Process span scope type is not of type Closeable, but type: %s. Not closing the scope\
and span
MESSAGE_PROCESSOR_RUN_END=MessageProcessor.run() endTracingSpan().close() failed with an error %s