Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): support max_commit_delay in Spanner transactions #2854

Merged
merged 17 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,20 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.30.0')
implementation platform('com.google.cloud:libraries-bom:26.31.0')

implementation 'com.google.cloud:google-cloud-spanner'
```
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-spanner:6.56.0'
implementation 'com.google.cloud:google-cloud-spanner:6.57.0'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.56.0"
libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.57.0"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -444,7 +444,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-spanner/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-spanner.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.56.0
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.57.0
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.RequestOptions.Priority;
import java.io.Serializable;
import java.time.Duration;
import java.util.Objects;

/** Specifies options for various spanner operations */
Expand Down Expand Up @@ -140,6 +141,11 @@ public static ReadQueryUpdateTransactionOption priority(RpcPriority priority) {
return new PriorityOption(priority);
}

public static ReadQueryUpdateTransactionOption maxCommitDelay(Duration maxCommitDelay) {
Preconditions.checkArgument(!maxCommitDelay.isNegative(), "maxCommitDelay should be positive");
return new MaxCommitDelayOption(maxCommitDelay);
}

/**
* Specifying this will cause the reads, queries, updates and writes operations statistics
* collection to be grouped by tag.
Expand Down Expand Up @@ -247,6 +253,21 @@ void appendToOptions(Options options) {

static final CommitStatsOption COMMIT_STATS_OPTION = new CommitStatsOption();

/** Option to request {@link MaxCommitDelayOption} for read/write transactions. */
static final class MaxCommitDelayOption extends InternalOption
implements ReadQueryUpdateTransactionOption {
final Duration maxCommitDelay;

MaxCommitDelayOption(Duration maxCommitDelay) {
this.maxCommitDelay = maxCommitDelay;
}

@Override
void appendToOptions(Options options) {
options.maxCommitDelay = maxCommitDelay;
}
}

/** Option to request Optimistic Concurrency Control for read/write transactions. */
static final class OptimisticLockOption extends InternalOption implements TransactionOption {
@Override
Expand Down Expand Up @@ -354,6 +375,9 @@ void appendToOptions(Options options) {
}

private boolean withCommitStats;

private Duration maxCommitDelay;

private Long limit;
private Integer prefetchChunks;
private Integer bufferRows;
Expand All @@ -375,6 +399,14 @@ boolean withCommitStats() {
return withCommitStats;
}

boolean hasMaxCommitDelay() {
return maxCommitDelay != null;
}

Duration maxCommitDelay() {
return maxCommitDelay;
}

boolean hasLimit() {
return limit != null;
}
Expand Down Expand Up @@ -481,6 +513,9 @@ public String toString() {
if (withCommitStats) {
b.append("withCommitStats: ").append(withCommitStats).append(' ');
}
if (maxCommitDelay != null) {
b.append("maxCommitDelay: ").append(maxCommitDelay).append(' ');
}
if (limit != null) {
b.append("limit: ").append(limit).append(' ');
}
Expand Down Expand Up @@ -533,6 +568,7 @@ public boolean equals(Object o) {

Options that = (Options) o;
return Objects.equals(withCommitStats, that.withCommitStats)
&& Objects.equals(maxCommitDelay, that.maxCommitDelay)
&& (!hasLimit() && !that.hasLimit()
|| hasLimit() && that.hasLimit() && Objects.equals(limit(), that.limit()))
&& (!hasPrefetchChunks() && !that.hasPrefetchChunks()
Expand Down Expand Up @@ -562,6 +598,9 @@ public int hashCode() {
if (withCommitStats) {
result = 31 * result + 1231;
}
if (maxCommitDelay != null) {
result = 31 * result + maxCommitDelay.hashCode();
}
if (limit != null) {
result = 31 * result + limit.hashCode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.Empty;
import com.google.spanner.v1.BatchWriteRequest;
import com.google.spanner.v1.BatchWriteResponse;
Expand All @@ -60,6 +61,7 @@
* users need not be aware of the actual session management, pooling and handling.
*/
class SessionImpl implements Session {

private static final Tracer tracer = Tracing.getTracer();

/** Keep track of running transactions on this session per thread. */
Expand All @@ -86,8 +88,10 @@ static TransactionOptions createReadWriteTransactionOptions(Options options) {
* only have one such transaction active at a time.
*/
interface SessionTransaction {

/** Invalidates the transaction, generally because a new one has been started on the session. */
void invalidate();

/** Registers the current span on the transaction. */
void setSpan(Span span);
}
Expand Down Expand Up @@ -176,16 +180,24 @@ public CommitResponse writeAtLeastOnceWithOptions(
setActive(null);
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutations, mutationsProto);
Options options = Options.fromTransactionOptions(transactionOptions);
final CommitRequest.Builder requestBuilder =
CommitRequest.newBuilder()
.setSession(name)
.setReturnCommitStats(
Options.fromTransactionOptions(transactionOptions).withCommitStats())
.setReturnCommitStats(options.withCommitStats())
.addAllMutations(mutationsProto)
.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
if (options.hasMaxCommitDelay()) {
requestBuilder.setMaxCommitDelay(
Duration.newBuilder()
.setSeconds(options.maxCommitDelay().getSeconds())
.setNanos(options.maxCommitDelay().getNano())
.build());
}
RequestOptions commitRequestOptions = getRequestOptions(transactionOptions);

if (commitRequestOptions != null) {
requestBuilder.setRequestOptions(commitRequestOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@

/** Default implementation of {@link TransactionRunner}. */
class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {

private static final Tracer tracer = Tracing.getTracer();
private static final Logger txnLogger = Logger.getLogger(TransactionRunner.class.getName());
/**
Expand All @@ -84,6 +85,7 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {

@VisibleForTesting
static class TransactionContextImpl extends AbstractReadContext implements TransactionContext {

static class Builder extends AbstractReadContext.Builder<Builder, TransactionContextImpl> {

private Clock clock = new Clock();
Expand Down Expand Up @@ -131,6 +133,7 @@ static Builder newBuilder() {
*/
private class TransactionContextAsyncResultSetImpl extends ForwardingAsyncResultSet
implements ListenableAsyncResultSet {

private TransactionContextAsyncResultSetImpl(ListenableAsyncResultSet delegate) {
super(delegate);
}
Expand Down Expand Up @@ -339,6 +342,13 @@ ApiFuture<CommitResponse> commitAsync() {
}
builder.setRequestOptions(requestOptionsBuilder.build());
}
if (options.hasMaxCommitDelay()) {
builder.setMaxCommitDelay(
com.google.protobuf.Duration.newBuilder()
.setSeconds(options.maxCommitDelay().getSeconds())
.setNanos(options.maxCommitDelay().getNano())
.build());
}
synchronized (lock) {
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
finishOps = SettableApiFuture.create();
Expand All @@ -354,6 +364,7 @@ ApiFuture<CommitResponse> commitAsync() {
}

private final class CommitRunnable implements Runnable {

private final SettableApiFuture<CommitResponse> res;
private final ApiFuture<Void> prev;
private final CommitRequest.Builder requestBuilder;
Expand Down Expand Up @@ -575,7 +586,9 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude

@Nullable
String getTransactionTag() {
if (this.options.hasTag()) return this.options.tag();
if (this.options.hasTag()) {
return this.options.tag();
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@

@RunWith(JUnit4.class)
public class DatabaseClientImplTest {

private static final String TEST_PROJECT = "my-project";
private static final String TEST_INSTANCE = "my-instance";
private static final String TEST_DATABASE = "my-database";
Expand Down Expand Up @@ -3635,6 +3636,112 @@ public void testAsyncTransactionManagerCommitWithPriority() {
assertEquals(Priority.PRIORITY_HIGH, request.getRequestOptions().getPriority());
}

@Test
public void testCommitWithoutMaxCommitDelay() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
TransactionRunner runner = client.readWriteTransaction();
runner.run(
transaction -> {
transaction.buffer(Mutation.delete("TEST", KeySet.all()));
return null;
});

List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(requests).hasSize(1);
CommitRequest request = requests.get(0);
assertFalse(request.hasMaxCommitDelay());
}

@Test
public void testCommitWithMaxCommitDelay() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
TransactionRunner runner =
client.readWriteTransaction(Options.maxCommitDelay(java.time.Duration.ofMillis(100)));
runner.run(
transaction -> {
transaction.buffer(Mutation.delete("TEST", KeySet.all()));
return null;
});

List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(requests).hasSize(1);
CommitRequest request = requests.get(0);
assertNotNull(request.getMaxCommitDelay());
assertEquals(
com.google.protobuf.Duration.newBuilder().setNanos(100000000).build(),
request.getMaxCommitDelay());
}

@Test
public void testTransactionManagerCommitWithMaxCommitDelay() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
TransactionManager manager =
client.transactionManager(Options.maxCommitDelay(java.time.Duration.ofMillis(100)));
TransactionContext transaction = manager.begin();
transaction.buffer(Mutation.delete("TEST", KeySet.all()));
manager.commit();

List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(requests).hasSize(1);
CommitRequest request = requests.get(0);
assertNotNull(request.getMaxCommitDelay());
assertEquals(
com.google.protobuf.Duration.newBuilder().setNanos(100000000).build(),
request.getMaxCommitDelay());
}

@Test
public void testAsyncRunnerCommitWithMaxCommitDelay() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
AsyncRunner runner = client.runAsync(Options.maxCommitDelay(java.time.Duration.ofMillis(100)));
get(
runner.runAsync(
txn -> {
txn.buffer(Mutation.delete("TEST", KeySet.all()));
return ApiFutures.immediateFuture(null);
},
executor));

List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(requests).hasSize(1);
CommitRequest request = requests.get(0);
assertNotNull(request.getMaxCommitDelay());
assertEquals(
com.google.protobuf.Duration.newBuilder().setNanos(100000000).build(),
request.getMaxCommitDelay());
}

@Test
public void testAsyncTransactionManagerCommitWithMaxCommitDelay() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
try (AsyncTransactionManager manager =
client.transactionManagerAsync(Options.maxCommitDelay(java.time.Duration.ofMillis(100)))) {
TransactionContextFuture transaction = manager.beginAsync();
get(
transaction
.then(
(txn, input) -> {
txn.buffer(Mutation.delete("TEST", KeySet.all()));
return ApiFutures.immediateFuture(null);
},
executor)
.commitAsync());
}

List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(requests).hasSize(1);
CommitRequest request = requests.get(0);
assertNotNull(request.getMaxCommitDelay());
assertEquals(
com.google.protobuf.Duration.newBuilder().setNanos(100000000).build(),
request.getMaxCommitDelay());
}

@Test
public void singleUseNoAction_ClearsCheckedOutSession() {
DatabaseClientImpl client =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,22 @@ public void batchWriteAtLeastOnce() {
}
}

@Test
public void testWriteWithMaxCommitDelay() {
CommitResponse response =
client.writeWithOptions(
Collections.singletonList(
Mutation.newInsertOrUpdateBuilder("T")
.set("K")
.to(lastKey = uniqueString())
.set("StringValue")
.to("v1")
.build()),
Options.maxCommitDelay(java.time.Duration.ofMillis(100)));
assertNotNull(response);
assertNotNull(response.getCommitTimestamp());
}

@Test
public void testWriteReturnsCommitStats() {
assumeFalse("Emulator does not return commit statistics", isUsingEmulator());
Expand Down
Loading