Skip to content

Commit

Permalink
feat: track batch size using serialized size of PublishRequest (#2113)
Browse files Browse the repository at this point in the history
* feat: track batch size using serialized size of PublishRequest

* fix: compare against batchedBytes instead of messageSize in flush condition

* fix: also count static overhead in flow control

* fix: adjust thresholds in tests

* fix: clean up merge issue

* fix: revert use of topicNameSize in MessageFlowController

* fix: store topicNameSize as initialBatchedBytes in MessagesBatch

---------

Co-authored-by: Mike Prieto <michaelpri10@gmail.com>
  • Loading branch information
sjvanrossum and michaelpri10 authored Oct 11, 2024
1 parent 6c67798 commit be78e64
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.cloud.pubsub.v1.stub.PublisherStub;
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
import com.google.common.base.Preconditions;
import com.google.protobuf.CodedOutputStream;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PubsubMessage;
Expand Down Expand Up @@ -99,6 +100,7 @@ public class Publisher implements PublisherInterface {
private static final String OPEN_TELEMETRY_TRACER_NAME = "com.google.cloud.pubsub.v1";

private final String topicName;
private final int topicNameSize;

private final BatchingSettings batchingSettings;
private final boolean enableMessageOrdering;
Expand Down Expand Up @@ -145,6 +147,8 @@ public static long getApiMaxRequestBytes() {

private Publisher(Builder builder) throws IOException {
topicName = builder.topicName;
topicNameSize =
CodedOutputStream.computeStringSize(PublishRequest.TOPIC_FIELD_NUMBER, this.topicName);

this.batchingSettings = builder.batchingSettings;
FlowControlSettings flowControl = this.batchingSettings.getFlowControlSettings();
Expand Down Expand Up @@ -309,7 +313,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
}
MessagesBatch messagesBatch = messagesBatches.get(orderingKey);
if (messagesBatch == null) {
messagesBatch = new MessagesBatch(batchingSettings, orderingKey);
messagesBatch = new MessagesBatch(batchingSettings, topicNameSize, orderingKey);
messagesBatches.put(orderingKey, messagesBatch);
}

Expand Down Expand Up @@ -636,7 +640,9 @@ private static final class OutstandingPublish {
OutstandingPublish(PubsubMessageWrapper messageWrapper) {
this.publishResult = SettableApiFuture.create();
this.messageWrapper = messageWrapper;
this.messageSize = messageWrapper.getPubsubMessage().getSerializedSize();
this.messageSize =
CodedOutputStream.computeMessageSize(
PublishRequest.MESSAGES_FIELD_NUMBER, messageWrapper.getPubsubMessage());
}
}

Expand Down Expand Up @@ -1093,12 +1099,15 @@ void release(long messageSize) {

private class MessagesBatch {
private List<OutstandingPublish> messages;
private int initialBatchedBytes;
private int batchedBytes;
private String orderingKey;
private final BatchingSettings batchingSettings;

private MessagesBatch(BatchingSettings batchingSettings, String orderingKey) {
private MessagesBatch(
BatchingSettings batchingSettings, int initialBatchedBytes, String orderingKey) {
this.batchingSettings = batchingSettings;
this.initialBatchedBytes = initialBatchedBytes;
this.orderingKey = orderingKey;
reset();
}
Expand All @@ -1111,7 +1120,7 @@ private OutstandingBatch popOutstandingBatch() {

private void reset() {
messages = new LinkedList<>();
batchedBytes = 0;
batchedBytes = initialBatchedBytes;
}

private boolean isEmpty() {
Expand Down Expand Up @@ -1150,7 +1159,9 @@ && getBatchedBytes() + outstandingPublish.messageSize >= getMaxBatchBytes()) {
// immediately.
// Alternatively if after adding the message we have reached the batch max messages then we
// have a batch to send.
if ((hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes())
// Note that exceeding {@link Publisher#getApiMaxRequestBytes()} will result in failed
// publishes without compression and may yet fail if a request is not sufficiently compressed.
if ((hasBatchingBytes() && getBatchedBytes() >= getMaxBatchBytes())
|| getMessagesCount() == batchingSettings.getElementCountThreshold()) {
batchesToSend.add(popOutstandingBatch());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ public void testLargeMessagesDoNotReorderBatches() throws Exception {
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(10L)
.setRequestByteThreshold(20L)
.setRequestByteThreshold(64L)
.setDelayThreshold(Duration.ofSeconds(100))
.build())
.setEnableMessageOrdering(true)
Expand Down Expand Up @@ -1150,7 +1150,7 @@ public void testPublishFlowControl_throwException() throws Exception {
.setLimitExceededBehavior(
FlowController.LimitExceededBehavior.ThrowException)
.setMaxOutstandingElementCount(1L)
.setMaxOutstandingRequestBytes(10L)
.setMaxOutstandingRequestBytes(13L)
.build())
.build())
.build();
Expand Down Expand Up @@ -1192,7 +1192,7 @@ public void testPublishFlowControl_throwExceptionWithOrderingKey() throws Except
.setLimitExceededBehavior(
FlowController.LimitExceededBehavior.ThrowException)
.setMaxOutstandingElementCount(1L)
.setMaxOutstandingRequestBytes(10L)
.setMaxOutstandingRequestBytes(13L)
.build())
.build())
.setEnableMessageOrdering(true)
Expand Down Expand Up @@ -1238,7 +1238,7 @@ public void testPublishFlowControl_block() throws Exception {
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.setMaxOutstandingElementCount(2L)
.setMaxOutstandingRequestBytes(10L)
.setMaxOutstandingRequestBytes(13L)
.build())
.build())
.build();
Expand Down

0 comments on commit be78e64

Please sign in to comment.