From 42a63b046312fde7f1a16b56722fce12bd1f8450 Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Mon, 11 Sep 2023 18:17:15 -0400 Subject: [PATCH] fix: add a sanity check that all bulk mutation entries are accounted for Add a fail safe that marks missing entries in a response as permanent errors. Previously the client assumed that all entries were present and only looked for errors Change-Id: Ie3f294fd6bb19ec17662b58bfe9c75a3eed81097 --- .../mutaterows/MutateRowsAttemptCallable.java | 24 ++++++++++++++ .../MutateRowsAttemptCallableTest.java | 33 +++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java index 36c2930bda..b049219a95 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java @@ -35,6 +35,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; import com.google.common.util.concurrent.MoreExecutors; import com.google.rpc.Code; import java.util.List; @@ -263,9 +264,12 @@ private void handleAttemptSuccess(List responses) { Builder builder = lastRequest.toBuilder().clearEntries(); List newOriginalIndexes = Lists.newArrayList(); + boolean[] seenIndices = new boolean[currentRequest.getEntriesCount()]; for (MutateRowsResponse response : responses) { for (Entry entry : response.getEntriesList()) { + seenIndices[Ints.checkedCast(entry.getIndex())] = true; + if (entry.getStatus().getCode() == Code.OK_VALUE) { continue; } @@ -288,6 +292,26 @@ private void handleAttemptSuccess(List responses) { } } + // Handle missing mutations + for (int i = 0; i < seenIndices.length; i++) { + if (seenIndices[i]) { + continue; + } + + int origIndex = getOriginalIndex(i); + FailedMutation failedMutation = + FailedMutation.create( + origIndex, + ApiExceptionFactory.createException( + "Missing entry response for entry " + origIndex, + null, + GrpcStatusCode.of(io.grpc.Status.Code.INTERNAL), + false)); + + allFailures.add(failedMutation); + permanentFailures.add(failedMutation); + } + currentRequest = builder.build(); originalIndexes = newOriginalIndexes; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallableTest.java index 2df2aaf2b4..358ff01cde 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallableTest.java @@ -41,6 +41,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -92,6 +94,37 @@ public void singleEntrySuccessTest() throws Exception { assertThat(innerCallable.lastRequest).isEqualTo(request); } + @Test + public void missingEntry() { + MutateRowsRequest request = + MutateRowsRequest.newBuilder() + .addEntries(Entry.getDefaultInstance()) + .addEntries(Entry.getDefaultInstance()) + .build(); + innerCallable.response.add( + MutateRowsResponse.newBuilder() + .addEntries(MutateRowsResponse.Entry.newBuilder().setIndex(0)) + .build()); + + MutateRowsAttemptCallable attemptCallable = + new MutateRowsAttemptCallable(innerCallable, request, callContext, retryCodes); + attemptCallable.setExternalFuture(parentFuture); + attemptCallable.call(); + + ExecutionException executionException = + Assert.assertThrows(ExecutionException.class, () -> parentFuture.attemptFuture.get()); + assertThat(executionException).hasCauseThat().isInstanceOf(MutateRowsException.class); + MutateRowsException e = (MutateRowsException) executionException.getCause(); + + assertThat(e).hasMessageThat().contains("Some mutations failed to apply"); + assertThat(e.getFailedMutations()).hasSize(1); + FailedMutation failedMutation = e.getFailedMutations().get(0); + assertThat(failedMutation.getIndex()).isEqualTo(1); + assertThat(failedMutation.getError()) + .hasMessageThat() + .contains("Missing entry response for entry 1"); + } + @Test public void testNoRpcTimeout() { parentFuture.timedAttemptSettings =