Skip to content

Commit

Permalink
fix: abstract batch resource and add method to determine if batch sho…
Browse files Browse the repository at this point in the history
…uld be flushed (#1790)
  • Loading branch information
mutianf authored Jun 23, 2023
1 parent 596ebbd commit 4c74107
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 31 deletions.
1 change: 1 addition & 0 deletions gax-java/gax/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
<!-- Ignore any files marked as @InternalApi -->
<excludes>
<exclude>com/google/api/gax/rpc/RequestUrlParamsEncoder</exclude>
<exclude>com/google/api/gax/batching/BatchingDescriptor</exclude>
</excludes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2023 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import com.google.api.core.InternalApi;

/**
* Represent the resource used by a batch including element and byte. It can also be extended to
* other things to determine if adding a new element needs to be flow controlled or if the current
* batch needs to be flushed.
*/
@InternalApi("For google-cloud-java client use only.")
public interface BatchResource {

/** Adds the additional resource. */
BatchResource add(BatchResource resource);

/** Returns the element count of this resource. */
long getElementCount();

/** Returns the byte count of this resource. */
long getByteCount();

/**
* Checks if the current {@link BatchResource} should be flushed based on the maxElementThreshold
* and maxBytesThreshold.
*/
boolean shouldFlush(long maxElementThreshold, long maxBytesThreshold);
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private final FlowController flowController;
private final ApiCallContext callContext;

// If element threshold or bytes threshold is 0, it means that it'll always flush every element
// without batching
private final long elementThreshold;
private final long bytesThreshold;

/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
* into wrappers request and response
Expand Down Expand Up @@ -192,7 +197,7 @@ public BatcherImpl(
+ "#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold");
}
this.flowController = flowController;
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batcherStats);
if (batchingSettings.getDelayThreshold() != null) {
long delay = batchingSettings.getDelayThreshold().toMillis();
PushCurrentBatchRunnable<ElementT, ElementResultT, RequestT, ResponseT> runnable =
Expand All @@ -204,6 +209,11 @@ public BatcherImpl(
}
currentBatcherReference = new BatcherReference(this);
this.callContext = callContext;

Long elementCountThreshold = batchingSettings.getElementCountThreshold();
this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold;
Long requestByteThreshold = batchingSettings.getRequestByteThreshold();
this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold;
}

/** {@inheritDoc} */
Expand All @@ -213,7 +223,7 @@ public ApiFuture<ElementResultT> add(ElementT element) {
// will only be done from a single calling thread.
Preconditions.checkState(closeFuture == null, "Cannot add elements on a closed batcher");

long bytesSize = batchingDescriptor.countBytes(element);
BatchResource newResource = batchingDescriptor.createResource(element);

// This is not the optimal way of throttling. It does not send out partial batches, which
// means that the Batcher might not use up all the resources allowed by FlowController.
Expand All @@ -232,7 +242,7 @@ public ApiFuture<ElementResultT> add(ElementT element) {
// defer it till we decide on if refactoring FlowController is necessary.
Stopwatch stopwatch = Stopwatch.createStarted();
try {
flowController.reserve(1, bytesSize);
flowController.reserve(newResource.getElementCount(), newResource.getByteCount());
} catch (FlowControlException e) {
// This exception will only be thrown if the FlowController is set to ThrowException behavior
throw FlowControlRuntimeException.fromFlowControlException(e);
Expand All @@ -241,12 +251,16 @@ public ApiFuture<ElementResultT> add(ElementT element) {

SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
synchronized (elementLock) {
currentOpenBatch.add(element, result, throttledTimeMs);
}
if (currentOpenBatch
.resource
.add(newResource)
.shouldFlush(elementThreshold, bytesThreshold)) {
sendOutstanding();
}

if (currentOpenBatch.hasAnyThresholdReached()) {
sendOutstanding();
currentOpenBatch.add(element, newResource, result, throttledTimeMs);
}

return result;
}

Expand All @@ -267,7 +281,7 @@ public void sendOutstanding() {
return;
}
accumulatedBatch = currentOpenBatch;
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batcherStats);
}

// This check is for old clients that instantiated the batcher without ApiCallContext
Expand All @@ -291,7 +305,9 @@ public void sendOutstanding() {
@Override
public void onSuccess(ResponseT response) {
try {
flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter);
flowController.release(
accumulatedBatch.resource.getElementCount(),
accumulatedBatch.resource.getByteCount());
accumulatedBatch.onBatchSuccess(response);
} finally {
onBatchCompletion();
Expand All @@ -301,7 +317,9 @@ public void onSuccess(ResponseT response) {
@Override
public void onFailure(Throwable throwable) {
try {
flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter);
flowController.release(
accumulatedBatch.resource.getElementCount(),
accumulatedBatch.resource.getByteCount());
accumulatedBatch.onBatchFailure(throwable);
} finally {
onBatchCompletion();
Expand Down Expand Up @@ -412,34 +430,30 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
private final BatchingRequestBuilder<ElementT, RequestT> builder;
private final List<BatchEntry<ElementT, ElementResultT>> entries;
private final BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor;
private final BatcherStats batcherStats;
private final long elementThreshold;
private final long bytesThreshold;

private long elementCounter = 0;
private long byteCounter = 0;
private final BatcherStats batcherStats;
private long totalThrottledTimeMs = 0;
private BatchResource resource;

private Batch(
RequestT prototype,
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor,
BatchingSettings batchingSettings,
BatcherStats batcherStats) {
this.descriptor = descriptor;
this.builder = descriptor.newRequestBuilder(prototype);
this.entries = new ArrayList<>();
Long elementCountThreshold = batchingSettings.getElementCountThreshold();
this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold;
Long requestByteThreshold = batchingSettings.getRequestByteThreshold();
this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold;
this.batcherStats = batcherStats;
this.resource = descriptor.createEmptyResource();
}

void add(ElementT element, SettableApiFuture<ElementResultT> result, long throttledTimeMs) {
void add(
ElementT element,
BatchResource newResource,
SettableApiFuture<ElementResultT> result,
long throttledTimeMs) {
builder.add(element);
entries.add(BatchEntry.create(element, result));
elementCounter++;
byteCounter += descriptor.countBytes(element);
resource = resource.add(newResource);
totalThrottledTimeMs += throttledTimeMs;
}

Expand All @@ -464,11 +478,7 @@ void onBatchFailure(Throwable throwable) {
}

boolean isEmpty() {
return elementCounter == 0;
}

boolean hasAnyThresholdReached() {
return elementCounter >= elementThreshold || byteCounter >= bytesThreshold;
return resource.getElementCount() == 0;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,17 @@ public interface BatchingDescriptor<ElementT, ElementResultT, RequestT, Response

/** Returns the size of the passed element object in bytes. */
long countBytes(ElementT element);

/** Creates a new {@link BatchResource} with ElementT. */
default BatchResource createResource(ElementT element) {
return DefaultBatchResource.builder()
.setElementCount(1)
.setByteCount(countBytes(element))
.build();
}

/** Create an empty {@link BatchResource}. */
default BatchResource createEmptyResource() {
return DefaultBatchResource.builder().setElementCount(0).setByteCount(0).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2023 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;

/**
* The default implementation of {@link BatchResource} which tracks the elementCount and byteCount.
*/
@AutoValue
abstract class DefaultBatchResource implements BatchResource {

static DefaultBatchResource.Builder builder() {
return new AutoValue_DefaultBatchResource.Builder();
}

@Override
public BatchResource add(BatchResource resource) {
Preconditions.checkArgument(
resource instanceof DefaultBatchResource,
"Expect an instance of DefaultBatchResource, got " + resource.getClass());
DefaultBatchResource defaultResource = (DefaultBatchResource) resource;
return new AutoValue_DefaultBatchResource.Builder()
.setElementCount(getElementCount() + defaultResource.getElementCount())
.setByteCount(getByteCount() + defaultResource.getByteCount())
.build();
}

@Override
public abstract long getElementCount();

@Override
public abstract long getByteCount();

@Override
public boolean shouldFlush(long maxElementThreshold, long maxBytesThreshold) {
return getElementCount() > maxElementThreshold || getByteCount() > maxBytesThreshold;
}

@AutoValue.Builder
abstract static class Builder {
abstract Builder setElementCount(long elementCount);

abstract Builder setByteCount(long byteCount);

abstract DefaultBatchResource build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class BatcherImplTest {
BatchingSettings.newBuilder()
.setElementCountThreshold(1000L)
.setRequestByteThreshold(1000L)
.setDelayThreshold(Duration.ofSeconds(1))
.setDelayThreshold(Duration.ofSeconds(1000))
.build();

@After
Expand Down Expand Up @@ -376,6 +376,7 @@ public void testWhenThresholdIsDisabled() throws Exception {
.build();
underTest = createDefaultBatcherImpl(settings, null);
Future<Integer> result = underTest.add(2);
underTest.add(3);
assertThat(result.isDone()).isTrue();
assertThat(result.get()).isEqualTo(4);
}
Expand Down Expand Up @@ -895,7 +896,7 @@ public void run() {

// Mockito recommends using verify() as the ONLY way to interact with Argument
// captors - otherwise it may incur in unexpected behaviour
Mockito.verify(callContext).withOption(key.capture(), value.capture());
Mockito.verify(callContext, Mockito.timeout(100)).withOption(key.capture(), value.capture());

// Verify that throttled time is recorded in ApiCallContext
assertThat(key.getValue()).isSameInstanceAs(Batcher.THROTTLED_TIME_KEY);
Expand Down Expand Up @@ -1008,12 +1009,37 @@ public ApiFuture<Object> futureCall(Object o, ApiCallContext apiCallContext) {
Assert.assertThrows(RuntimeException.class, batcher::close);
}

@Test
public void testDefaultShouldFlush() {
BatchResource resource =
DefaultBatchResource.builder().setElementCount(2).setByteCount(2).build();

assertThat(resource.shouldFlush(2, 2)).isFalse();
assertThat(resource.shouldFlush(1, 1)).isTrue();
}

@Test
public void testDefaultBatchResourceAdd() {
BatchResource resource =
DefaultBatchResource.builder().setElementCount(1).setByteCount(1).build();

BatchResource newResource =
resource.add(DefaultBatchResource.builder().setElementCount(1).setByteCount(1).build());

// Make sure add doesn't modify the old object
assertThat(resource.getElementCount()).isEqualTo(1);
assertThat(resource.getByteCount()).isEqualTo(1);
assertThat(newResource.getElementCount()).isEqualTo(2);
assertThat(newResource.getByteCount()).isEqualTo(2);
}

private void testElementTriggers(BatchingSettings settings) throws Exception {
underTest = createDefaultBatcherImpl(settings, null);
Future<Integer> result = underTest.add(4);
assertThat(result.isDone()).isFalse();
// After this element is added, the batch triggers sendOutstanding().
Future<Integer> anotherResult = underTest.add(5);
// After this element is added, the batch triggers sendOutstanding().
underTest.add(6);
// Both the elements should be resolved now.
assertThat(result.isDone()).isTrue();
assertThat(result.get()).isEqualTo(16);
Expand Down

0 comments on commit 4c74107

Please sign in to comment.