Skip to content

Commit

Permalink
chore(spanner): support transaction channel hint for RW mux (#3317)
Browse files Browse the repository at this point in the history
* chore(spanner): add transaction channel hint for RW to support multiplexed sessions

* chore(spanner): lint fix
  • Loading branch information
harshachinta committed Sep 4, 2024
1 parent cce008d commit 6004c9f
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,8 @@ public void close() {
}
}

ApiFuture<ByteString> beginTransactionAsync(Options transactionOptions, boolean routeToLeader) {
ApiFuture<ByteString> beginTransactionAsync(
Options transactionOptions, boolean routeToLeader, Map<SpannerRpc.Option, ?> channelHint) {
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
final BeginTransactionRequest request =
Expand All @@ -436,7 +437,7 @@ ApiFuture<ByteString> beginTransactionAsync(Options transactionOptions, boolean
.build();
final ApiFuture<Transaction> requestFuture;
try (IScope ignore = tracer.withSpan(span)) {
requestFuture = spanner.getRpc().beginTransactionAsync(request, getOptions(), routeToLeader);
requestFuture = spanner.getRpc().beginTransactionAsync(request, channelHint, routeToLeader);
}
requestFuture.addListener(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ ApiFuture<Void> ensureTxnAsync() {

private void createTxnAsync(final SettableApiFuture<Void> res) {
span.addAnnotation("Creating Transaction");
final ApiFuture<ByteString> fut = session.beginTransactionAsync(options, isRouteToLeader());
final ApiFuture<ByteString> fut =
session.beginTransactionAsync(options, isRouteToLeader(), getTransactionChannelHint());
fut.addListener(
() -> {
try {
Expand Down Expand Up @@ -427,7 +428,7 @@ public void run() {
final ApiFuture<com.google.spanner.v1.CommitResponse> commitFuture;
final ISpan opSpan = tracer.spanBuilderWithExplicitParent(SpannerImpl.COMMIT, span);
try (IScope ignore = tracer.withSpan(opSpan)) {
commitFuture = rpc.commitAsync(commitRequest, session.getOptions());
commitFuture = rpc.commitAsync(commitRequest, getTransactionChannelHint());
}
session.markUsed(clock.instant());
commitFuture.addListener(
Expand Down Expand Up @@ -525,7 +526,7 @@ ApiFuture<Empty> rollbackAsync() {
.setSession(session.getName())
.setTransactionId(transactionId)
.build(),
session.getOptions());
getTransactionChannelHint());
session.markUsed(clock.instant());
return apiFuture;
} else {
Expand Down Expand Up @@ -800,7 +801,7 @@ private ResultSet internalExecuteUpdate(
statement, queryMode, options, /* withTransactionSelector = */ true);
try {
com.google.spanner.v1.ResultSet resultSet =
rpc.executeQuery(builder.build(), session.getOptions(), isRouteToLeader());
rpc.executeQuery(builder.build(), getTransactionChannelHint(), isRouteToLeader());
session.markUsed(clock.instant());
if (resultSet.getMetadata().hasTransaction()) {
onTransactionMetadata(
Expand Down Expand Up @@ -838,7 +839,8 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... u
// commit.
increaseAsyncOperations();
resultSet =
rpc.executeQueryAsync(builder.build(), session.getOptions(), isRouteToLeader());
rpc.executeQueryAsync(
builder.build(), getTransactionChannelHint(), isRouteToLeader());
session.markUsed(clock.instant());
} catch (Throwable t) {
decreaseAsyncOperations();
Expand Down Expand Up @@ -926,7 +928,7 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... update
getExecuteBatchDmlRequestBuilder(statements, options);
try {
com.google.spanner.v1.ExecuteBatchDmlResponse response =
rpc.executeBatchDml(builder.build(), session.getOptions());
rpc.executeBatchDml(builder.build(), getTransactionChannelHint());
session.markUsed(clock.instant());
long[] results = new long[response.getResultSetsCount()];
for (int i = 0; i < response.getResultSetsCount(); ++i) {
Expand Down Expand Up @@ -983,7 +985,7 @@ public ApiFuture<long[]> batchUpdateAsync(
// Register the update as an async operation that must finish before the transaction may
// commit.
increaseAsyncOperations();
response = rpc.executeBatchDmlAsync(builder.build(), session.getOptions());
response = rpc.executeBatchDmlAsync(builder.build(), getTransactionChannelHint());
session.markUsed(clock.instant());
} catch (Throwable t) {
decreaseAsyncOperations();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1497,7 +1497,7 @@ public void testSessionNotFoundReadWriteTransaction() {
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(closedSession.newTransaction(Options.fromTransactionOptions()))
.thenReturn(closedTransactionContext);
when(closedSession.beginTransactionAsync(any(), eq(true))).thenThrow(sessionNotFound);
when(closedSession.beginTransactionAsync(any(), eq(true), any())).thenThrow(sessionNotFound);
when(closedSession.getTracer()).thenReturn(tracer);
TransactionRunnerImpl closedTransactionRunner = new TransactionRunnerImpl(closedSession);
closedTransactionRunner.setSpan(span);
Expand All @@ -1512,7 +1512,7 @@ public void testSessionNotFoundReadWriteTransaction() {
final TransactionContextImpl openTransactionContext = mock(TransactionContextImpl.class);
when(openSession.newTransaction(Options.fromTransactionOptions()))
.thenReturn(openTransactionContext);
when(openSession.beginTransactionAsync(any(), eq(true)))
when(openSession.beginTransactionAsync(any(), eq(true), any()))
.thenReturn(ApiFutures.immediateFuture(ByteString.copyFromUtf8("open-txn")));
when(openSession.getTracer()).thenReturn(tracer);
TransactionRunnerImpl openTransactionRunner = new TransactionRunnerImpl(openSession);
Expand Down

0 comments on commit 6004c9f

Please sign in to comment.