Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sb track2 schedule multiple message validate batch size #16767

Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
Expand Down Expand Up @@ -338,7 +339,7 @@ public Mono<Long> scheduleMessage(ServiceBusMessage message, OffsetDateTime sche
* Sends a scheduled messages to the Azure Service Bus entity this sender is connected to. A scheduled message is
* enqueued and made available to receivers only at the scheduled enqueue time.
*
* @param messages Message to be sent to the Service Bus Queue.
* @param messages Messages to be sent to the Service Bus Queue.
* @param scheduledEnqueueTime OffsetDateTime at which the message should appear in the Service Bus queue or topic.
*
* @return The sequence number of the scheduled message which can be used to cancel the scheduling of the message.
Expand All @@ -353,7 +354,7 @@ public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetD
* Sends a scheduled messages to the Azure Service Bus entity this sender is connected to. A scheduled message is
* enqueued and made available to receivers only at the scheduled enqueue time.
*
* @param messages Message to be sent to the Service Bus Queue.
* @param messages Messages to be sent to the Service Bus Queue.
* @param scheduledEnqueueTime Instant at which the message should appear in the Service Bus queue or topic.
* @param transactionContext to be set on batch message before scheduling them on Service Bus.
*
Expand All @@ -371,13 +372,36 @@ public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetD
return fluxError(logger, new NullPointerException("'scheduledEnqueueTime' cannot be null."));
}

return createBatch().flatMapMany(messageBatch -> {
messages.forEach(message -> messageBatch.tryAdd(message));
return getSendLink().flatMapMany(link -> connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityName, entityType))
.flatMapMany(managementNode -> managementNode.schedule(messageBatch.getMessages(), scheduledEnqueueTime,
messageBatch.getMaxSizeInBytes(), link.getLinkName(), transactionContext)));
});
return getSendLink().flatMapMany(link -> link.getLinkSize()
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved
.flatMapMany(size -> {
int maxSize = size > 0
? size
: MAX_MESSAGE_LENGTH_BYTES;
final CreateBatchOptions batchOptions = new CreateBatchOptions();
batchOptions.setMaximumSizeInBytes(maxSize);

return createBatch(batchOptions).flatMapMany(messageBatch -> {
AtomicInteger index = new AtomicInteger();
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved
messages.forEach(message -> {
index.incrementAndGet();
boolean added = messageBatch.tryAdd(message);
if (!added) {
final String error = String.format(Locale.US,
"Messages exceed max allowed size '%s' bytes for all the messages together."
+ " Failed to add message at index '%s'.",
maxSize, index.get());
throw logger.logExceptionAsError(new AmqpException(false,
AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, error, link.getErrorContext()));
}
});

return connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityName, entityType))
.flatMapMany(managementNode -> managementNode.schedule(messageBatch.getMessages(),
scheduledEnqueueTime, messageBatch.getMaxSizeInBytes(), link.getLinkName(),
transactionContext));
});
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -288,6 +290,31 @@ void createsMessageBatchWithSize() {
.verifyComplete();
}

@Test
void scheduleMessageSizeTooBig() {
// Arrange
int maxLinkSize = 1024;
int batchSize = maxLinkSize + 10;

OffsetDateTime instant = mock(OffsetDateTime.class);
final List<ServiceBusMessage> messages = TestUtils.getServiceBusMessages(batchSize, UUID.randomUUID().toString());

final AmqpSendLink link = mock(AmqpSendLink.class);
when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize));

when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull()))
.thenReturn(Mono.just(link));
when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize));

// Act & Assert
StepVerifier.create(sender.scheduleMessages(messages, instant))
.verifyErrorMatches(throwable -> {
assertTrue(throwable instanceof AmqpException);
assertSame(((AmqpException) throwable).getErrorCondition(), AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED);
return true;
});
}

/**
* Verifies that sending multiple message will result in calling sender.send(MessageBatch, transaction).
*/
Expand Down