Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: track batch size using serialized size of PublishRequest #2113

Merged
merged 8 commits into from
Oct 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.cloud.pubsub.v1.stub.PublisherStub;
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
import com.google.common.base.Preconditions;
import com.google.protobuf.CodedOutputStream;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PubsubMessage;
Expand Down Expand Up @@ -94,6 +95,7 @@ public class Publisher implements PublisherInterface {
private static final String GZIP_COMPRESSION = "gzip";

private final String topicName;
private final int topicNameSize;

private final BatchingSettings batchingSettings;
private final boolean enableMessageOrdering;
Expand Down Expand Up @@ -136,6 +138,8 @@ public static long getApiMaxRequestBytes() {

private Publisher(Builder builder) throws IOException {
topicName = builder.topicName;
topicNameSize =
CodedOutputStream.computeStringSize(PublishRequest.TOPIC_FIELD_NUMBER, this.topicName);

this.batchingSettings = builder.batchingSettings;
FlowControlSettings flowControl = this.batchingSettings.getFlowControlSettings();
Expand Down Expand Up @@ -592,7 +596,8 @@ private static final class OutstandingPublish {
OutstandingPublish(PubsubMessage message) {
this.publishResult = SettableApiFuture.create();
this.message = message;
this.messageSize = message.getSerializedSize();
this.messageSize =
CodedOutputStream.computeMessageSize(PublishRequest.MESSAGES_FIELD_NUMBER, message);
}
}

Expand Down Expand Up @@ -890,7 +895,7 @@ public Publisher build() throws IOException {
}
}

private static class MessageFlowController {
private class MessageFlowController {
private final Lock lock;
private final Long messageLimit;
private final Long byteLimit;
Expand All @@ -909,7 +914,7 @@ private static class MessageFlowController {
this.lock = new ReentrantLock();

this.outstandingMessages = 0L;
this.outstandingBytes = 0L;
this.outstandingBytes = (long) topicNameSize;
sjvanrossum marked this conversation as resolved.
Show resolved Hide resolved

this.awaitingMessageAcquires = new LinkedList<CountDownLatch>();
this.awaitingBytesAcquires = new LinkedList<CountDownLatch>();
Expand Down Expand Up @@ -1044,7 +1049,7 @@ private OutstandingBatch popOutstandingBatch() {

private void reset() {
messages = new LinkedList<>();
batchedBytes = 0;
batchedBytes = topicNameSize;
sjvanrossum marked this conversation as resolved.
Show resolved Hide resolved
}

private boolean isEmpty() {
Expand Down Expand Up @@ -1083,7 +1088,9 @@ && getBatchedBytes() + outstandingPublish.messageSize >= getMaxBatchBytes()) {
// immediately.
// Alternatively if after adding the message we have reached the batch max messages then we
// have a batch to send.
if ((hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes())
// Note that exceeding {@link Publisher#getApiMaxRequestBytes()} will result in failed
// publishes without compression and may yet fail if a request is not sufficiently compressed.
if ((hasBatchingBytes() && getBatchedBytes() >= getMaxBatchBytes())
|| getMessagesCount() == batchingSettings.getElementCountThreshold()) {
batchesToSend.add(popOutstandingBatch());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ public void testLargeMessagesDoNotReorderBatches() throws Exception {
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(10L)
.setRequestByteThreshold(20L)
.setRequestByteThreshold(64L)
.setDelayThreshold(Duration.ofSeconds(100))
.build())
.setEnableMessageOrdering(true)
Expand Down Expand Up @@ -1139,7 +1139,7 @@ public void testPublishFlowControl_throwException() throws Exception {
.setLimitExceededBehavior(
FlowController.LimitExceededBehavior.ThrowException)
.setMaxOutstandingElementCount(1L)
.setMaxOutstandingRequestBytes(10L)
.setMaxOutstandingRequestBytes(55L)
.build())
.build())
.build();
Expand Down Expand Up @@ -1181,7 +1181,7 @@ public void testPublishFlowControl_throwExceptionWithOrderingKey() throws Except
.setLimitExceededBehavior(
FlowController.LimitExceededBehavior.ThrowException)
.setMaxOutstandingElementCount(1L)
.setMaxOutstandingRequestBytes(10L)
.setMaxOutstandingRequestBytes(55L)
.build())
.build())
.setEnableMessageOrdering(true)
Expand Down Expand Up @@ -1227,7 +1227,7 @@ public void testPublishFlowControl_block() throws Exception {
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.setMaxOutstandingElementCount(2L)
.setMaxOutstandingRequestBytes(10L)
.setMaxOutstandingRequestBytes(55L)
.build())
.build())
.build();
Expand Down