From 94ce1d0ac26c69ae934b70854388f0092624ed41 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Fri, 10 Nov 2023 21:42:57 +0000 Subject: [PATCH 01/15] Advance retry count; if you don't, retry starts at 0. --- .../com/google/cloud/bigquery/storage/v1/ConnectionWorker.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 52373596ce..5df490961b 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -974,6 +974,8 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r // Trigger exponential backoff in append loop when request is resent for quota errors if (requestWrapper.attemptSettings == null) { requestWrapper.attemptSettings = requestWrapper.retryAlgorithm.createFirstAttempt(); + requestWrapper.attemptSettings = + requestWrapper.retryAlgorithm.createNextAttempt(requestWrapper.attemptSettings); } else { requestWrapper.attemptSettings = requestWrapper.retryAlgorithm.createNextAttempt(requestWrapper.attemptSettings); From aee53e2137113bd030cde8f2f9327f6f9a8c8a9b Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Fri, 10 Nov 2023 21:52:45 +0000 Subject: [PATCH 02/15] Add comment --- .../com/google/cloud/bigquery/storage/v1/ConnectionWorker.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 5df490961b..57663fc8fa 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -974,6 +974,8 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r // Trigger exponential backoff in append loop when request is resent for quota errors if (requestWrapper.attemptSettings == null) { requestWrapper.attemptSettings = requestWrapper.retryAlgorithm.createFirstAttempt(); + // createNextAttempt correctly initiales the retry delay; createfirstAttempt does not + // include a positive delay, just 0. requestWrapper.attemptSettings = requestWrapper.retryAlgorithm.createNextAttempt(requestWrapper.attemptSettings); } else { From 1891f6ee7cf4b49a71329ac0620631eb6bf9053b Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Fri, 10 Nov 2023 23:12:38 +0000 Subject: [PATCH 03/15] Add test to verify backoff of at least initial retry millis --- .../bigquery/storage/v1/ConnectionWorker.java | 4 +- .../storage/v1/FakeBigQueryWrite.java | 6 +++ .../storage/v1/FakeBigQueryWriteImpl.java | 7 ++++ .../bigquery/storage/v1/StreamWriterTest.java | 39 ++++++++++++++++++- 4 files changed, 53 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 57663fc8fa..f435549a64 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -973,11 +973,11 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r if (this.retrySettings != null && errorCode == Code.RESOURCE_EXHAUSTED) { // Trigger exponential backoff in append loop when request is resent for quota errors if (requestWrapper.attemptSettings == null) { - requestWrapper.attemptSettings = requestWrapper.retryAlgorithm.createFirstAttempt(); // createNextAttempt correctly initiales the retry delay; createfirstAttempt does not // include a positive delay, just 0. requestWrapper.attemptSettings = - requestWrapper.retryAlgorithm.createNextAttempt(requestWrapper.attemptSettings); + requestWrapper.retryAlgorithm.createNextAttempt( + requestWrapper.retryAlgorithm.createFirstAttempt()); } else { requestWrapper.attemptSettings = requestWrapper.retryAlgorithm.createNextAttempt(requestWrapper.attemptSettings); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java index a31cc145a6..120e004b7d 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java @@ -19,6 +19,8 @@ import com.google.protobuf.AbstractMessage; import io.grpc.ServerServiceDefinition; import io.grpc.Status; +import java.time.Instant; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -128,4 +130,8 @@ public void setReturnErrorDuringExclusiveStreamRetry(boolean retryOnError) { public void setVerifyOffset(boolean verifyOffset) { serviceImpl.setVerifyOffset(verifyOffset); } + + public ArrayList getLatestRequestReceivedInstants() { + return serviceImpl.getLatestRequestReceivedInstants(); + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java index 16f3feea3c..abf08bd0e1 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java @@ -20,6 +20,7 @@ import com.google.rpc.Code; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -73,6 +74,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase { private final Map, Boolean> connectionToFirstRequest = new ConcurrentHashMap<>(); private Status failedStatus = Status.ABORTED; + private ArrayList requestReceivedInstants = new ArrayList<>(); /** Class used to save the state of a possible response. */ public static class Response { @@ -111,6 +113,10 @@ public String toString() { } } + public ArrayList getLatestRequestReceivedInstants() { + return requestReceivedInstants; + } + @Override public void getWriteStream( GetWriteStreamRequest request, StreamObserver responseObserver) { @@ -197,6 +203,7 @@ public StreamObserver appendRows( new StreamObserver() { @Override public void onNext(AppendRowsRequest value) { + requestReceivedInstants.add(Instant.now()); recordCount++; requests.add(value); long offset = value.getOffset().getValue(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index ee18e9e68d..9c125bcb55 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -54,6 +54,8 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -74,6 +76,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.threeten.bp.Duration; +import org.threeten.bp.temporal.TemporalUnit; @RunWith(JUnit4.class) public class StreamWriterTest { @@ -86,9 +89,10 @@ public class StreamWriterTest { private static final String EXPLICIT_STREAM = "projects/p/datasets/d1/tables/t1/streams/s1"; private static final String TEST_TRACE_ID = "DATAFLOW:job_id"; private static final int MAX_RETRY_NUM_ATTEMPTS = 3; + private static final long INITIAL_RETRY_MILLIS = 500; private static final RetrySettings retrySettings = RetrySettings.newBuilder() - .setInitialRetryDelay(Duration.ofMillis(500)) + .setInitialRetryDelay(Duration.ofMillis(INITIAL_RETRY_MILLIS)) .setRetryDelayMultiplier(1.1) .setMaxAttempts(MAX_RETRY_NUM_ATTEMPTS) .setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(5)) @@ -2002,6 +2006,39 @@ public void testExclusiveAppendSuccessAndQuotaErrorRetryMaxRetry() throws Except ((StatusRuntimeException) ex.getCause()).getStatus().getCode()); } + @Test + public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Exception { + testBigQueryWrite.setReturnErrorDuringExclusiveStreamRetry(true); + StreamWriter writer = getTestStreamWriterExclusiveRetryEnabled(); + + testBigQueryWrite.addResponse( + new DummyResponseSupplierWillFailThenSucceed( + new FakeBigQueryWriteImpl.Response(createAppendResponse(0)), + /* totalFailCount= */ MAX_RETRY_NUM_ATTEMPTS + 1, + com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build())); + + ApiFuture future = writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0); + + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> { + future.get(); + }); + assertEquals( + Status.Code.RESOURCE_EXHAUSTED, + ((StatusRuntimeException) ex.getCause()).getStatus().getCode()); + + ArrayList instants = testBigQueryWrite.getLatestRequestReceivedInstants(); + Instant previousInstant = instants.get(0); + for (int i = 1; i < instants.size(); i++) { + Instant currentInstant = instants.get(i); + assertThat(previousInstant.plus(INITIAL_RETRY_MILLIS, ChronoUnit.MILLIS)).isLessThan(currentInstant); + previousInstant = currentInstant; + } + + } + @Test public void testAppendSuccessAndNonRetryableError() throws Exception { StreamWriter writer = getTestStreamWriterRetryEnabled(); From 290e4d7ed50ef7d884e2415b7062a879bcff72f8 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Fri, 10 Nov 2023 23:14:50 +0000 Subject: [PATCH 04/15] Verify attempt retry count --- .../com/google/cloud/bigquery/storage/v1/StreamWriterTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 9c125bcb55..cfb5ef44ec 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -2031,6 +2031,8 @@ public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Except ArrayList instants = testBigQueryWrite.getLatestRequestReceivedInstants(); Instant previousInstant = instants.get(0); + // Include initial attempt + assertEquals(instants.size(), MAX_RETRY_NUM_ATTEMPTS + 1); for (int i = 1; i < instants.size(); i++) { Instant currentInstant = instants.get(i); assertThat(previousInstant.plus(INITIAL_RETRY_MILLIS, ChronoUnit.MILLIS)).isLessThan(currentInstant); From 2a05efd9a20839bf28b7a1278d7c4ad206019b61 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Fri, 10 Nov 2023 23:23:13 +0000 Subject: [PATCH 05/15] format --- .../cloud/bigquery/storage/v1/StreamWriterTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index cfb5ef44ec..98785a5581 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -76,7 +76,6 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.threeten.bp.Duration; -import org.threeten.bp.temporal.TemporalUnit; @RunWith(JUnit4.class) public class StreamWriterTest { @@ -2017,7 +2016,8 @@ public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Except /* totalFailCount= */ MAX_RETRY_NUM_ATTEMPTS + 1, com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build())); - ApiFuture future = writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0); + ApiFuture future = + writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0); ExecutionException ex = assertThrows( @@ -2035,10 +2035,10 @@ public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Except assertEquals(instants.size(), MAX_RETRY_NUM_ATTEMPTS + 1); for (int i = 1; i < instants.size(); i++) { Instant currentInstant = instants.get(i); - assertThat(previousInstant.plus(INITIAL_RETRY_MILLIS, ChronoUnit.MILLIS)).isLessThan(currentInstant); + assertThat(previousInstant.plus(INITIAL_RETRY_MILLIS, ChronoUnit.MILLIS)) + .isLessThan(currentInstant); previousInstant = currentInstant; } - } @Test From f7ca245888ec1417d1d141dbb8d2f3cb46ff8962 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Fri, 10 Nov 2023 23:48:51 +0000 Subject: [PATCH 06/15] Add check for expoenntial aspect of the backoff. improve setting attemptSettings --- .../bigquery/storage/v1/ConnectionWorker.java | 19 ++++++++----------- .../bigquery/storage/v1/StreamWriterTest.java | 13 +++++++++---- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index f435549a64..a342268562 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -971,17 +971,14 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r try { requestWrapper.retryCount++; if (this.retrySettings != null && errorCode == Code.RESOURCE_EXHAUSTED) { - // Trigger exponential backoff in append loop when request is resent for quota errors - if (requestWrapper.attemptSettings == null) { - // createNextAttempt correctly initiales the retry delay; createfirstAttempt does not - // include a positive delay, just 0. - requestWrapper.attemptSettings = - requestWrapper.retryAlgorithm.createNextAttempt( - requestWrapper.retryAlgorithm.createFirstAttempt()); - } else { - requestWrapper.attemptSettings = - requestWrapper.retryAlgorithm.createNextAttempt(requestWrapper.attemptSettings); - } + // Trigger exponential backoff in append loop when request is resent for quota errors. + // createNextAttempt correctly initiales the retry delay; createfirstAttempt does not + // include a positive delay, just 0. + requestWrapper.attemptSettings = requestWrapper.attemptSettings == null ? + requestWrapper.retryAlgorithm.createFirstAttempt() : + requestWrapper.attemptSettings; + requestWrapper.attemptSettings = + requestWrapper.retryAlgorithm.createNextAttempt(requestWrapper.attemptSettings); requestWrapper.blockMessageSendDeadline = Instant.now().plusMillis(requestWrapper.attemptSettings.getRetryDelay().toMillis()); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 98785a5581..ce054c17c5 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -89,12 +89,14 @@ public class StreamWriterTest { private static final String TEST_TRACE_ID = "DATAFLOW:job_id"; private static final int MAX_RETRY_NUM_ATTEMPTS = 3; private static final long INITIAL_RETRY_MILLIS = 500; + private static final double RETRY_MULTIPLIER = 1.1; + private static final int MAX_RETRY_DELAY_MINUTES = 5; private static final RetrySettings retrySettings = RetrySettings.newBuilder() .setInitialRetryDelay(Duration.ofMillis(INITIAL_RETRY_MILLIS)) - .setRetryDelayMultiplier(1.1) + .setRetryDelayMultiplier(RETRY_MULTIPLIER) .setMaxAttempts(MAX_RETRY_NUM_ATTEMPTS) - .setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(5)) + .setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(MAX_RETRY_DELAY_MINUTES)) .build(); private FakeScheduledExecutorService fakeExecutor; private FakeBigQueryWrite testBigQueryWrite; @@ -2033,10 +2035,13 @@ public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Except Instant previousInstant = instants.get(0); // Include initial attempt assertEquals(instants.size(), MAX_RETRY_NUM_ATTEMPTS + 1); + double minExpectedDelay = INITIAL_RETRY_MILLIS; for (int i = 1; i < instants.size(); i++) { Instant currentInstant = instants.get(i); - assertThat(previousInstant.plus(INITIAL_RETRY_MILLIS, ChronoUnit.MILLIS)) - .isLessThan(currentInstant); + double differenceInMillis = java.time.Duration.between(previousInstant, currentInstant).toMillis(); + minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER * .9; + assertThat(differenceInMillis) + .isGreaterThan(minExpectedDelay); previousInstant = currentInstant; } } From b7875a07bd4df4df706154456194e0a46c0d6369 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Fri, 10 Nov 2023 23:49:21 +0000 Subject: [PATCH 07/15] Format --- .../google/cloud/bigquery/storage/v1/ConnectionWorker.java | 7 ++++--- .../google/cloud/bigquery/storage/v1/StreamWriterTest.java | 7 +++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index a342268562..c963bc9453 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -974,9 +974,10 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r // Trigger exponential backoff in append loop when request is resent for quota errors. // createNextAttempt correctly initiales the retry delay; createfirstAttempt does not // include a positive delay, just 0. - requestWrapper.attemptSettings = requestWrapper.attemptSettings == null ? - requestWrapper.retryAlgorithm.createFirstAttempt() : - requestWrapper.attemptSettings; + requestWrapper.attemptSettings = + requestWrapper.attemptSettings == null + ? requestWrapper.retryAlgorithm.createFirstAttempt() + : requestWrapper.attemptSettings; requestWrapper.attemptSettings = requestWrapper.retryAlgorithm.createNextAttempt(requestWrapper.attemptSettings); requestWrapper.blockMessageSendDeadline = diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index ce054c17c5..98b6dccfe6 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -55,7 +55,6 @@ import io.grpc.StatusRuntimeException; import java.io.IOException; import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -2038,10 +2037,10 @@ public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Except double minExpectedDelay = INITIAL_RETRY_MILLIS; for (int i = 1; i < instants.size(); i++) { Instant currentInstant = instants.get(i); - double differenceInMillis = java.time.Duration.between(previousInstant, currentInstant).toMillis(); + double differenceInMillis = + java.time.Duration.between(previousInstant, currentInstant).toMillis(); minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER * .9; - assertThat(differenceInMillis) - .isGreaterThan(minExpectedDelay); + assertThat(differenceInMillis).isGreaterThan(minExpectedDelay); previousInstant = currentInstant; } } From 3ed77c508b92870ea021b76187ddda22cea4f45e Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Sat, 11 Nov 2023 00:08:29 +0000 Subject: [PATCH 08/15] Fix test logic, format --- .../google/cloud/bigquery/storage/v1/StreamWriterTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 98b6dccfe6..9e08dfdb16 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -88,7 +88,7 @@ public class StreamWriterTest { private static final String TEST_TRACE_ID = "DATAFLOW:job_id"; private static final int MAX_RETRY_NUM_ATTEMPTS = 3; private static final long INITIAL_RETRY_MILLIS = 500; - private static final double RETRY_MULTIPLIER = 1.1; + private static final double RETRY_MULTIPLIER = 1.3; private static final int MAX_RETRY_DELAY_MINUTES = 5; private static final RetrySettings retrySettings = RetrySettings.newBuilder() @@ -2039,8 +2039,8 @@ public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Except Instant currentInstant = instants.get(i); double differenceInMillis = java.time.Duration.between(previousInstant, currentInstant).toMillis(); - minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER * .9; assertThat(differenceInMillis).isGreaterThan(minExpectedDelay); + minExpectedDelay = (minExpectedDelay * RETRY_MULTIPLIER); previousInstant = currentInstant; } } From ce0481958b2553325cea85f2287583e4e3f17e63 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Sat, 11 Nov 2023 00:10:16 +0000 Subject: [PATCH 09/15] Tiny cleanup --- .../com/google/cloud/bigquery/storage/v1/StreamWriterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 9e08dfdb16..44018bf2c9 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -2040,7 +2040,7 @@ public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Except double differenceInMillis = java.time.Duration.between(previousInstant, currentInstant).toMillis(); assertThat(differenceInMillis).isGreaterThan(minExpectedDelay); - minExpectedDelay = (minExpectedDelay * RETRY_MULTIPLIER); + minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER; previousInstant = currentInstant; } } From e7da2e7600d026aa520a16edd8c3de6fc8721b1e Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Mon, 13 Nov 2023 17:37:25 +0000 Subject: [PATCH 10/15] More cleanup, add check for minimum delay --- .../google/cloud/bigquery/storage/v1/ConnectionWorker.java | 7 +++---- .../google/cloud/bigquery/storage/v1/StreamWriterTest.java | 1 + 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index c963bc9453..1b59703648 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -974,12 +974,11 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r // Trigger exponential backoff in append loop when request is resent for quota errors. // createNextAttempt correctly initiales the retry delay; createfirstAttempt does not // include a positive delay, just 0. - requestWrapper.attemptSettings = + requestWrapper.attemptSettings = requestWrapper.retryAlgorithm.createNextAttempt( requestWrapper.attemptSettings == null ? requestWrapper.retryAlgorithm.createFirstAttempt() - : requestWrapper.attemptSettings; - requestWrapper.attemptSettings = - requestWrapper.retryAlgorithm.createNextAttempt(requestWrapper.attemptSettings); + : requestWrapper.attemptSettings + ); requestWrapper.blockMessageSendDeadline = Instant.now().plusMillis(requestWrapper.attemptSettings.getRetryDelay().toMillis()); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 44018bf2c9..02a5f45819 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -2039,6 +2039,7 @@ public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Except Instant currentInstant = instants.get(i); double differenceInMillis = java.time.Duration.between(previousInstant, currentInstant).toMillis(); + assertThat(differenceInMillis).isAtLeast((double)INITIAL_RETRY_MILLIS); assertThat(differenceInMillis).isGreaterThan(minExpectedDelay); minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER; previousInstant = currentInstant; From 0044e641fc06ec5879eb09ec69524d18e9607617 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Mon, 13 Nov 2023 17:39:48 +0000 Subject: [PATCH 11/15] Run formatter, fix typo --- .../cloud/bigquery/storage/v1/ConnectionWorker.java | 12 ++++++------ .../cloud/bigquery/storage/v1/StreamWriterTest.java | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 1b59703648..4ed3b736f9 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -972,13 +972,13 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r requestWrapper.retryCount++; if (this.retrySettings != null && errorCode == Code.RESOURCE_EXHAUSTED) { // Trigger exponential backoff in append loop when request is resent for quota errors. - // createNextAttempt correctly initiales the retry delay; createfirstAttempt does not + // createNextAttempt correctly initializes the retry delay; createfirstAttempt does not // include a positive delay, just 0. - requestWrapper.attemptSettings = requestWrapper.retryAlgorithm.createNextAttempt( - requestWrapper.attemptSettings == null - ? requestWrapper.retryAlgorithm.createFirstAttempt() - : requestWrapper.attemptSettings - ); + requestWrapper.attemptSettings = + requestWrapper.retryAlgorithm.createNextAttempt( + requestWrapper.attemptSettings == null + ? requestWrapper.retryAlgorithm.createFirstAttempt() + : requestWrapper.attemptSettings); requestWrapper.blockMessageSendDeadline = Instant.now().plusMillis(requestWrapper.attemptSettings.getRetryDelay().toMillis()); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 02a5f45819..8632de3e10 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -2039,7 +2039,7 @@ public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Except Instant currentInstant = instants.get(i); double differenceInMillis = java.time.Duration.between(previousInstant, currentInstant).toMillis(); - assertThat(differenceInMillis).isAtLeast((double)INITIAL_RETRY_MILLIS); + assertThat(differenceInMillis).isAtLeast((double) INITIAL_RETRY_MILLIS); assertThat(differenceInMillis).isGreaterThan(minExpectedDelay); minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER; previousInstant = currentInstant; From 6e1561496ff1450b3d1e85be7b9e62de00174936 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Mon, 13 Nov 2023 17:50:51 +0000 Subject: [PATCH 12/15] Add back .05% buffer to account for jitter --- .../com/google/cloud/bigquery/storage/v1/StreamWriterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 8632de3e10..5b8b366dfe 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -2041,7 +2041,7 @@ public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Except java.time.Duration.between(previousInstant, currentInstant).toMillis(); assertThat(differenceInMillis).isAtLeast((double) INITIAL_RETRY_MILLIS); assertThat(differenceInMillis).isGreaterThan(minExpectedDelay); - minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER; + minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER * 0.95; previousInstant = currentInstant; } } From 07a41ab804733fbe04173ae4f47b056cb52937f4 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Mon, 13 Nov 2023 18:05:52 +0000 Subject: [PATCH 13/15] Add room for jitter for initial delay --- .../com/google/cloud/bigquery/storage/v1/StreamWriterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 5b8b366dfe..06494a6d4a 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -2039,7 +2039,7 @@ public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Except Instant currentInstant = instants.get(i); double differenceInMillis = java.time.Duration.between(previousInstant, currentInstant).toMillis(); - assertThat(differenceInMillis).isAtLeast((double) INITIAL_RETRY_MILLIS); + assertThat(differenceInMillis).isAtLeast((INITIAL_RETRY_MILLIS * 0.95)); assertThat(differenceInMillis).isGreaterThan(minExpectedDelay); minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER * 0.95; previousInstant = currentInstant; From 215af9df10980dc80d5a0544179c131c2ba077e2 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Mon, 13 Nov 2023 18:06:26 +0000 Subject: [PATCH 14/15] Add room for jitter for initial delay --- .../google/cloud/bigquery/storage/v1/StreamWriterTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 06494a6d4a..6f9c49d427 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -2034,14 +2034,14 @@ public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Except Instant previousInstant = instants.get(0); // Include initial attempt assertEquals(instants.size(), MAX_RETRY_NUM_ATTEMPTS + 1); - double minExpectedDelay = INITIAL_RETRY_MILLIS; + double minExpectedDelay = INITIAL_RETRY_MILLIS * 0.95; for (int i = 1; i < instants.size(); i++) { Instant currentInstant = instants.get(i); double differenceInMillis = java.time.Duration.between(previousInstant, currentInstant).toMillis(); - assertThat(differenceInMillis).isAtLeast((INITIAL_RETRY_MILLIS * 0.95)); + assertThat(differenceInMillis).isAtLeast((double) INITIAL_RETRY_MILLIS); assertThat(differenceInMillis).isGreaterThan(minExpectedDelay); - minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER * 0.95; + minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER; previousInstant = currentInstant; } } From 723ff540d56e340fe54f606852c834796eba549c Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Mon, 13 Nov 2023 21:33:03 +0000 Subject: [PATCH 15/15] Add logs describing length and deadline for retries --- .../google/cloud/bigquery/storage/v1/ConnectionWorker.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 3298643d4e..f61c2c1417 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -981,6 +981,12 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r : requestWrapper.attemptSettings); requestWrapper.blockMessageSendDeadline = Instant.now().plusMillis(requestWrapper.attemptSettings.getRetryDelay().toMillis()); + log.info( + "Messages blocked for retry for " + + java.time.Duration.between( + java.time.Instant.now(), requestWrapper.blockMessageSendDeadline) + + " until " + + requestWrapper.blockMessageSendDeadline); } Long offset =