Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pass deadline through ExecuteQuery RetrySettings #2355

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1357,11 +1357,21 @@ public Map<String, String> extract(ExecuteQueryRequest executeQueryRequest) {

ServerStreamingCallSettings<ExecuteQueryCallContext, SqlRow> retrySettings =
ServerStreamingCallSettings.<ExecuteQueryCallContext, SqlRow>newBuilder()
// TODO resumption strategy and retry settings
// TODO add resumption strategy and pass through retry settings unchanged
// we pass through retry settings to use the deadlines now but don't
// support retries
.setRetrySettings(
settings
.executeQuerySettings()
.getRetrySettings()
.toBuilder()
// override maxAttempts as a safeguard against changes from user
.setMaxAttempts(1)
.build())
.build();

// Adding RetryingCallable to the callable chain so that client side metrics can be
// measured correctly. Retries are currently disabled.
// measured correctly and deadlines are set. Retries are currently disabled.
ServerStreamingCallable<ExecuteQueryCallContext, SqlRow> retries =
withRetries(withBigtableTracer, retrySettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;

import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.UnavailableException;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.ExecuteQueryRequest;
Expand All @@ -35,18 +36,23 @@
import com.google.cloud.bigtable.data.v2.models.sql.Statement;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable;
import com.google.common.collect.Range;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
public class ExecuteQueryCallableTest {
Expand Down Expand Up @@ -108,13 +114,74 @@ public void testExecuteQueryRequestsAreNotRetried() {
assertThat(fakeService.attempts).isEqualTo(1);
}

@Test
public void testExecuteQueryRequestsIgnoreOverriddenMaxAttempts() throws IOException {
BigtableDataSettings.Builder overrideSettings =
BigtableDataSettings.newBuilderForEmulator(server.getPort())
.setProjectId("fake-project")
.setInstanceId("fake-instance");
overrideSettings
.stubSettings()
.executeQuerySettings()
.setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(10).build());
EnhancedBigtableStub overrideStub =
EnhancedBigtableStub.create(overrideSettings.build().getStubSettings());
SqlServerStream stream =
overrideStub.executeQueryCallable().call(Statement.of("SELECT * FROM table"));

Iterator<SqlRow> iterator = stream.rows().iterator();

assertThrows(UnavailableException.class, iterator::next).getCause();
assertThat(fakeService.attempts).isEqualTo(1);
}

@Test
public void testExecuteQueryRequestsSetDefaultDeadline() {
SqlServerStream stream = stub.executeQueryCallable().call(Statement.of("SELECT * FROM table"));
Iterator<SqlRow> iterator = stream.rows().iterator();
// We don't care about this but are reusing the fake service that tests retries
assertThrows(UnavailableException.class, iterator::next).getCause();
// We have 30s default, we assume less than 1s has been burned when the fake service sets it
assertThat(fakeService.deadlineMillisRemaining).isIn(Range.closed(29000L, 30100L));
}

@Test
public void testExecuteQueryRequestsRespectOverridenDeadline() throws IOException {
BigtableDataSettings.Builder overrideSettings =
BigtableDataSettings.newBuilderForEmulator(server.getPort())
.setProjectId("fake-project")
.setInstanceId("fake-instance");
overrideSettings
jackdingilian marked this conversation as resolved.
Show resolved Hide resolved
.stubSettings()
.executeQuerySettings()
.setRetrySettings(
RetrySettings.newBuilder()
.setInitialRpcTimeout(Duration.ofMinutes(5))
.setMaxRpcTimeout(Duration.ofMinutes(5))
.build());
EnhancedBigtableStub overrideDeadline =
EnhancedBigtableStub.create(overrideSettings.build().getStubSettings());
SqlServerStream streamOverride =
overrideDeadline.executeQueryCallable().call(Statement.of("SELECT * FROM table"));
Iterator<SqlRow> overrideIterator = streamOverride.rows().iterator();
// We don't care about this but are reusing the fake service that tests retries
assertThrows(UnavailableException.class, overrideIterator::next).getCause();
// We have 30s default, we assume less than 1s has been burned when the fake service sets it
assertThat(fakeService.deadlineMillisRemaining).isIn(Range.closed(299000L, 300100L));
}

private static class FakeService extends BigtableGrpc.BigtableImplBase {

private int attempts = 0;
private long deadlineMillisRemaining;

@Override
public void executeQuery(
ExecuteQueryRequest request, StreamObserver<ExecuteQueryResponse> responseObserver) {
Deadline deadline = Context.current().getDeadline();
if (deadline != null) {
deadlineMillisRemaining = deadline.timeRemaining(TimeUnit.MILLISECONDS);
}
attempts++;
responseObserver.onNext(metadata(columnMetadata("test", stringType())));
responseObserver.onError(new StatusRuntimeException(Status.UNAVAILABLE));
Expand Down
Loading