Skip to content

Commit

Permalink
update subscribe calls in servicebus with no-op handlers or handlers …
Browse files Browse the repository at this point in the history
…that logs (#42926)
  • Loading branch information
anuchandy authored Nov 13, 2024
1 parent 0829c72 commit 328c5fb
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -210,7 +211,7 @@ void begin() {
if (getAndSet(true)) {
throw logger.atInfo().log(new IllegalStateException("The streaming cannot begin more than once."));
}
final Disposable d = beginIntern().subscribe();
final Disposable d = MessageUtils.subscribe(beginIntern(), "begin", logger.atWarning());
if (!disposable.add(d)) {
throw logger.atInfo().log(new IllegalStateException("Cannot begin streaming after the disposal."));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.core.util.IterableStream;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusReceiverClientBuilder;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.models.AbandonOptions;
import com.azure.messaging.servicebus.models.CompleteOptions;
Expand Down Expand Up @@ -639,7 +640,7 @@ public IterableStream<ServiceBusReceivedMessage> receiveMessages(int maxMessages
final Flux<ServiceBusReceivedMessage> messagesFlux
= tracer.traceSyncReceive("ServiceBus.receiveMessages", emitter.asFlux());
// messagesFlux is already a hot publisher, so it's ok to subscribe
messagesFlux.subscribe();
MessageUtils.subscribe(messagesFlux);

return new IterableStream<>(messagesFlux);
}
Expand Down Expand Up @@ -966,7 +967,7 @@ private <T> IterableStream<T> fromFluxAndSubscribe(Flux<T> flux) {
Flux<T> cached = flux.cache();

// Subscribe to message flux so we can kick off this operation
cached.subscribe();
MessageUtils.subscribe(cached);
return new IterableStream<>(cached);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE;

/**
* Package-private class that manages session aware message receiving.
* Package-private class that manages session aware message receiving in v1 stack.
*
*/
class ServiceBusSessionManager implements AutoCloseable, IServiceBusSessionManager {
// Time to delay before trying to accept another session.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.SESSION_ID_KEY;

/**
* Represents a session that is received when "any" session is accepted from the service.
* Represents a session (in v1 stack) that is received when "any" session is accepted from the service.
*/
class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusSessionReceiver.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ private Mono<Void> terminate(TerminalSignalType signalType, Scheduler workerSche
private ServiceBusSessionReactorReceiver nextSessionReceiver(ServiceBusSessionAcquirer.Session nextSession) {
final State<ServiceBusSessionReactorReceiver> lastState = super.get();
if (lastState == TERMINATED) {
nextSession.getLink().closeAsync().subscribe();
MessageUtils.subscribe(nextSession.getLink().closeAsync());
throw new MessagePumpTerminatedException(pumpId, fullyQualifiedNamespace, entityPath,
"session#next-receiver roller_" + rollerId);
}
Expand All @@ -468,7 +468,7 @@ private ServiceBusSessionReactorReceiver nextSessionReceiver(ServiceBusSessionAc
// each creating ServiceBusSessionReactorReceiver and 'compareAndSet' of T1 winning the race with T’s
// 'compareAndSet' resulting in T1 not closing its ServiceBusSessionReactorReceiver.
//
nextSessionReceiver.closeAsync().subscribe();
MessageUtils.subscribe(nextSessionReceiver.closeAsync());
throw new MessagePumpTerminatedException(pumpId, fullyQualifiedNamespace, entityPath,
"session#next-receiver roller_" + rollerId);
}
Expand Down Expand Up @@ -572,7 +572,7 @@ private static final class NextSessionStream extends AtomicBoolean {
}).map(session -> {
final boolean isTerminated = super.get();
if (isTerminated) {
session.getLink().closeAsync().subscribe();
MessageUtils.subscribe(session.getLink().closeAsync());
throw new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace,
this.entityPath, "session#next-link roller_" + this.rollerId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.util.IterableStream;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.amqp.implementation.WindowedSubscriber;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import reactor.core.publisher.Flux;

Expand Down Expand Up @@ -162,12 +163,9 @@ private void messageReleaser(ServiceBusReceivedMessage message) {
*/
private Flux<ServiceBusReceivedMessage> traceDecorator(Flux<ServiceBusReceivedMessage> toDecorate) {
final Flux<ServiceBusReceivedMessage> decorated = tracer.traceSyncReceive(SYNC_RECEIVE_SPAN_NAME, toDecorate);
// TODO (anu) - discuss with Liudmila - do we need decorated.subscribe() here or IterableStream's internal
// TODO (anu) - discuss with Liudmila - do we need subscribe(decorated) here or IterableStream's internal
// subscription to the 'decorated' flux will do?
decorated.subscribe(i -> {
}, e -> {
}, () -> {
});
MessageUtils.subscribe(decorated);
return decorated;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.LoggingEventBuilder;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import com.azure.messaging.servicebus.administration.implementation.EntityHelper;
import com.azure.messaging.servicebus.administration.implementation.models.RuleDescriptionImpl;
Expand All @@ -30,6 +31,9 @@
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.net.URI;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -94,6 +98,48 @@ public static Duration getTotalTimeout(AmqpRetryOptions retryOptions) {
return Duration.ofNanos(totalTimeout);
}

/**
* Subscribes to the given Mono by providing empty subscription handlers.
*
* @param source the source Mono to subscribe.
* @return the disposable to the subscription.
* @param <T> the Mono item type.
*/
public static <T> Disposable subscribe(Mono<T> source) {
return source.subscribe(ignored -> {
}, e -> {
}, () -> {
});
}

/**
* Subscribes to the given Flux by providing empty subscription handlers.
*
* @param source the source Flux to subscribe.
* @return the disposable to the subscription.
* @param <T> the Flux item type.
*/
public static <T> Disposable subscribe(Flux<T> source) {
return source.subscribe(ignored -> {
}, e -> {
}, () -> {
});
}

/**
* Subscribes to the given Mono by providing subscription handlers that logs terminal states.
*
* @param source the source Mono to subscribe.
* @param message the message to log.
* @param logger the logger to use.
* @return the disposable to the subscription.
* @param <T> the Mono item type.
*/
public static <T> Disposable subscribe(Mono<T> source, String message, LoggingEventBuilder logger) {
return source.subscribe(ignored -> {
}, e -> logger.log(message + " (terminal-error)", e), () -> logger.log(message + " (terminal-completion)"));
}

/**
* Converts a .NET GUID to its Java UUID representation.
*
Expand Down Expand Up @@ -521,5 +567,4 @@ private static RuleAction decodeRuleAction(DescribedType describedAction) {

return null;
}

}

0 comments on commit 328c5fb

Please sign in to comment.