Skip to content

Commit 24d9343

Browse files
committedApr 10, 2023
feat: add support for Directed Read options
1 parent 6927e06 commit 24d9343

11 files changed

+128
-21
lines changed
 

‎google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,9 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
598598
if (options.hasDataBoostEnabled()) {
599599
builder.setDataBoostEnabled(options.dataBoostEnabled());
600600
}
601+
if (options.hasDirectedReadOptions()) {
602+
builder.setDirectedReadOptions(options.directedReadOptions());
603+
}
601604
builder.setSeqno(getSeqNo());
602605
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
603606
builder.setRequestOptions(buildRequestOptions(options));
@@ -667,7 +670,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
667670
request.setTransaction(selector);
668671
}
669672
SpannerRpc.StreamingCall call =
670-
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
673+
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions(), true);
671674
call.request(prefetchChunks);
672675
stream.setCall(call, request.getTransaction().hasBegin());
673676
return stream;
@@ -779,6 +782,9 @@ ResultSet readInternalWithOptions(
779782
if (readOptions.hasDataBoostEnabled()) {
780783
builder.setDataBoostEnabled(readOptions.dataBoostEnabled());
781784
}
785+
if (readOptions.hasDirectedReadOptions()) {
786+
builder.setDirectedReadOptions(readOptions.directedReadOptions());
787+
}
782788
final int prefetchChunks =
783789
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
784790
ResumableStreamIterator stream =
@@ -798,7 +804,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
798804
}
799805
builder.setRequestOptions(buildRequestOptions(readOptions));
800806
SpannerRpc.StreamingCall call =
801-
rpc.read(builder.build(), stream.consumer(), session.getOptions());
807+
rpc.read(builder.build(), stream.consumer(), session.getOptions(), true);
802808
call.request(prefetchChunks);
803809
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
804810
return stream;

‎google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

+31-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.api.core.BetaApi;
2020
import com.google.common.base.Preconditions;
21+
import com.google.spanner.v1.DirectedReadOptions;
2122
import com.google.spanner.v1.RequestOptions.Priority;
2223
import java.io.Serializable;
2324
import java.util.Objects;
@@ -328,6 +329,19 @@ void appendToOptions(Options options) {
328329
}
329330
}
330331

332+
static final class DirectedReadOption extends InternalOption implements ReadAndQueryOption {
333+
private final DirectedReadOptions directedReadOptions;
334+
335+
DirectedReadOption(DirectedReadOptions directedReadOptions) {
336+
this.directedReadOptions = directedReadOptions;
337+
}
338+
339+
@Override
340+
void appendToOptions(Options options) {
341+
options.directedReadOptions = directedReadOptions;
342+
}
343+
}
344+
331345
private boolean withCommitStats;
332346
private Long limit;
333347
private Integer prefetchChunks;
@@ -341,6 +355,7 @@ void appendToOptions(Options options) {
341355
private Boolean validateOnly;
342356
private Boolean withOptimisticLock;
343357
private Boolean dataBoostEnabled;
358+
private DirectedReadOptions directedReadOptions;
344359

345360
// Construction is via factory methods below.
346361
private Options() {}
@@ -441,6 +456,14 @@ Boolean dataBoostEnabled() {
441456
return dataBoostEnabled;
442457
}
443458

459+
boolean hasDirectedReadOptions() {
460+
return directedReadOptions != null;
461+
}
462+
463+
DirectedReadOptions directedReadOptions() {
464+
return directedReadOptions;
465+
}
466+
444467
@Override
445468
public String toString() {
446469
StringBuilder b = new StringBuilder();
@@ -480,6 +503,9 @@ public String toString() {
480503
if (dataBoostEnabled != null) {
481504
b.append("dataBoostEnabled: ").append(dataBoostEnabled).append(' ');
482505
}
506+
if (directedReadOptions != null) {
507+
b.append("directedReadOptions: ").append(directedReadOptions).append(' ');
508+
}
483509
return b.toString();
484510
}
485511

@@ -515,7 +541,8 @@ public boolean equals(Object o) {
515541
&& Objects.equals(etag(), that.etag())
516542
&& Objects.equals(validateOnly(), that.validateOnly())
517543
&& Objects.equals(withOptimisticLock(), that.withOptimisticLock())
518-
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled());
544+
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled())
545+
&& Objects.equals(directedReadOptions(), that.directedReadOptions());
519546
}
520547

521548
@Override
@@ -560,6 +587,9 @@ public int hashCode() {
560587
if (dataBoostEnabled != null) {
561588
result = 31 * result + dataBoostEnabled.hashCode();
562589
}
590+
if (directedReadOptions != null) {
591+
result = 31 * result + directedReadOptions.hashCode();
592+
}
563593
return result;
564594
}
565595

‎google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java

+14
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import com.google.common.collect.ImmutableMap;
4848
import com.google.common.collect.ImmutableSet;
4949
import com.google.common.util.concurrent.ThreadFactoryBuilder;
50+
import com.google.spanner.v1.DirectedReadOptions;
5051
import com.google.spanner.v1.ExecuteSqlRequest;
5152
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
5253
import com.google.spanner.v1.SpannerGrpc;
@@ -132,6 +133,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
132133
private final CallCredentialsProvider callCredentialsProvider;
133134
private final CloseableExecutorProvider asyncExecutorProvider;
134135
private final String compressorName;
136+
private final DirectedReadOptions directedReadOptions;
135137

136138
/**
137139
* Interface that can be used to provide {@link CallCredentials} instead of {@link Credentials} to
@@ -600,6 +602,7 @@ private SpannerOptions(Builder builder) {
600602
callCredentialsProvider = builder.callCredentialsProvider;
601603
asyncExecutorProvider = builder.asyncExecutorProvider;
602604
compressorName = builder.compressorName;
605+
directedReadOptions = builder.directedReadOptions;
603606
}
604607

605608
/**
@@ -700,6 +703,7 @@ public static class Builder
700703
private CloseableExecutorProvider asyncExecutorProvider;
701704
private String compressorName;
702705
private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST");
706+
private DirectedReadOptions directedReadOptions;
703707

704708
private Builder() {
705709
// Manually set retry and polling settings that work.
@@ -1081,6 +1085,12 @@ public Builder setCompressorName(@Nullable String compressorName) {
10811085
return this;
10821086
}
10831087

1088+
public Builder setDirectedReadOptions(DirectedReadOptions directedReadOptions) {
1089+
Preconditions.checkNotNull(directedReadOptions, "DirectedReadOptions cannot be null");
1090+
this.directedReadOptions = directedReadOptions;
1091+
return this;
1092+
}
1093+
10841094
/**
10851095
* Sets the {@link ExecutorProvider} to use for high-level async calls that need an executor,
10861096
* such as fetching results for an {@link AsyncResultSet}.
@@ -1291,6 +1301,10 @@ public String getCompressorName() {
12911301
return compressorName;
12921302
}
12931303

1304+
public DirectedReadOptions getDirectedReadOptions() {
1305+
return directedReadOptions;
1306+
}
1307+
12941308
/** Returns the default query options to use for the specific database. */
12951309
public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
12961310
// Use the specific query options for the database if any have been specified. These have

‎google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -717,7 +717,7 @@ private ResultSet internalExecuteUpdate(
717717
/* withTransactionSelector = */ true);
718718
try {
719719
com.google.spanner.v1.ResultSet resultSet =
720-
rpc.executeQuery(builder.build(), session.getOptions());
720+
rpc.executeQuery(builder.build(), session.getOptions(), false);
721721
if (resultSet.getMetadata().hasTransaction()) {
722722
onTransactionMetadata(
723723
resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin());
@@ -747,7 +747,7 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... o
747747
// Register the update as an async operation that must finish before the transaction may
748748
// commit.
749749
increaseAsyncOperations();
750-
resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions());
750+
resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions(), false);
751751
} catch (Throwable t) {
752752
decreaseAsyncOperations();
753753
throw t;

‎google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

+48-5
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@
160160
import com.google.spanner.v1.CommitResponse;
161161
import com.google.spanner.v1.CreateSessionRequest;
162162
import com.google.spanner.v1.DeleteSessionRequest;
163+
import com.google.spanner.v1.DirectedReadOptions;
163164
import com.google.spanner.v1.ExecuteBatchDmlRequest;
164165
import com.google.spanner.v1.ExecuteBatchDmlResponse;
165166
import com.google.spanner.v1.ExecuteSqlRequest;
@@ -304,6 +305,7 @@ private void awaitTermination() throws InterruptedException {
304305
private static final double ADMINISTRATIVE_REQUESTS_RATE_LIMIT = 1.0D;
305306
private static final ConcurrentMap<String, RateLimiter> ADMINISTRATIVE_REQUESTS_RATE_LIMITERS =
306307
new ConcurrentHashMap<>();
308+
private final DirectedReadOptions directedReadOptions;
307309

308310
public static GapicSpannerRpc create(SpannerOptions options) {
309311
return new GapicSpannerRpc(options);
@@ -354,6 +356,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
354356
internalHeaderProviderBuilder.getResourceHeaderKey());
355357
this.callCredentialsProvider = options.getCallCredentialsProvider();
356358
this.compressorName = options.getCompressorName();
359+
this.directedReadOptions = options.getDirectedReadOptions();
357360

358361
if (initializeStubs) {
359362
// Create a managed executor provider.
@@ -1636,7 +1639,11 @@ public ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Opt
16361639

16371640
@Override
16381641
public StreamingCall read(
1639-
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
1642+
ReadRequest request,
1643+
ResultStreamConsumer consumer,
1644+
@Nullable Map<Option, ?> options,
1645+
boolean readOnly) {
1646+
request = validateReadRequest(request, readOnly);
16401647
GrpcCallContext context =
16411648
newCallContext(options, request.getSession(), request, SpannerGrpc.getReadMethod());
16421649
SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
@@ -1658,13 +1665,15 @@ public void cancel(String message) {
16581665
}
16591666

16601667
@Override
1661-
public ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map<Option, ?> options) {
1662-
return get(executeQueryAsync(request, options));
1668+
public ResultSet executeQuery(
1669+
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, boolean readOnly) {
1670+
return get(executeQueryAsync(request, options, readOnly));
16631671
}
16641672

16651673
@Override
16661674
public ApiFuture<ResultSet> executeQueryAsync(
1667-
ExecuteSqlRequest request, @Nullable Map<Option, ?> options) {
1675+
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, boolean readOnly) {
1676+
request = validateExecuteSqlRequest(request, readOnly);
16681677
GrpcCallContext context =
16691678
newCallContext(options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod());
16701679
return spannerStub.executeSqlCallable().futureCall(request, context);
@@ -1673,6 +1682,7 @@ public ApiFuture<ResultSet> executeQueryAsync(
16731682
@Override
16741683
public ResultSet executePartitionedDml(
16751684
ExecuteSqlRequest request, @Nullable Map<Option, ?> options) {
1685+
request = validateExecuteSqlRequest(request, false);
16761686
GrpcCallContext context =
16771687
newCallContext(options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod());
16781688
return get(partitionedDmlStub.executeSqlCallable().futureCall(request, context));
@@ -1686,6 +1696,7 @@ public RetrySettings getPartitionedDmlRetrySettings() {
16861696
@Override
16871697
public ServerStream<PartialResultSet> executeStreamingPartitionedDml(
16881698
ExecuteSqlRequest request, Map<Option, ?> options, Duration timeout) {
1699+
request = validateExecuteSqlRequest(request, false);
16891700
GrpcCallContext context =
16901701
newCallContext(
16911702
options, request.getSession(), request, SpannerGrpc.getExecuteStreamingSqlMethod());
@@ -1696,7 +1707,11 @@ public ServerStream<PartialResultSet> executeStreamingPartitionedDml(
16961707

16971708
@Override
16981709
public StreamingCall executeQuery(
1699-
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
1710+
ExecuteSqlRequest request,
1711+
ResultStreamConsumer consumer,
1712+
@Nullable Map<Option, ?> options,
1713+
boolean readOnly) {
1714+
request = validateExecuteSqlRequest(request, readOnly);
17001715
GrpcCallContext context =
17011716
newCallContext(
17021717
options, request.getSession(), request, SpannerGrpc.getExecuteStreamingSqlMethod());
@@ -2014,4 +2029,32 @@ private static Duration systemProperty(String name, int defaultValue) {
20142029
String stringValue = System.getProperty(name, "");
20152030
return Duration.ofSeconds(stringValue.isEmpty() ? defaultValue : Integer.parseInt(stringValue));
20162031
}
2032+
2033+
private ExecuteSqlRequest validateExecuteSqlRequest(ExecuteSqlRequest request, boolean readOnly) {
2034+
if (!readOnly) {
2035+
if (request.hasDirectedReadOptions() || (directedReadOptions != null)) {
2036+
throw SpannerExceptionFactory.newSpannerException(
2037+
ErrorCode.FAILED_PRECONDITION,
2038+
"DirectedReadOptions can't be set for Read-Write or Partitioned DML transactions");
2039+
}
2040+
}
2041+
if (directedReadOptions != null) {
2042+
request = request.toBuilder().setDirectedReadOptions(directedReadOptions).build();
2043+
}
2044+
return request;
2045+
}
2046+
2047+
private ReadRequest validateReadRequest(ReadRequest request, boolean readOnly) {
2048+
if (!readOnly) {
2049+
if (request.hasDirectedReadOptions() || (directedReadOptions != null)) {
2050+
throw SpannerExceptionFactory.newSpannerException(
2051+
ErrorCode.FAILED_PRECONDITION,
2052+
"DirectedReadOptions can't be set for Read-Write or Partitioned DML transactions");
2053+
}
2054+
}
2055+
if (directedReadOptions != null) {
2056+
request = request.toBuilder().setDirectedReadOptions(directedReadOptions).build();
2057+
}
2058+
return request;
2059+
}
20172060
}

‎google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -320,12 +320,16 @@ ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?>
320320
throws SpannerException;
321321

322322
StreamingCall read(
323-
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);
323+
ReadRequest request,
324+
ResultStreamConsumer consumer,
325+
@Nullable Map<Option, ?> options,
326+
boolean readOnly);
324327

325-
ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map<Option, ?> options);
328+
ResultSet executeQuery(
329+
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, boolean readOnly);
326330

327331
ApiFuture<ResultSet> executeQueryAsync(
328-
ExecuteSqlRequest request, @Nullable Map<Option, ?> options);
332+
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, boolean readOnly);
329333

330334
ResultSet executePartitionedDml(ExecuteSqlRequest request, @Nullable Map<Option, ?> options);
331335

@@ -335,7 +339,10 @@ ServerStream<PartialResultSet> executeStreamingPartitionedDml(
335339
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout);
336340

337341
StreamingCall executeQuery(
338-
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);
342+
ExecuteSqlRequest request,
343+
ResultStreamConsumer consumer,
344+
@Nullable Map<Option, ?> options,
345+
boolean readOnly);
339346

340347
ExecuteBatchDmlResponse executeBatchDml(ExecuteBatchDmlRequest build, Map<Option, ?> options);
341348

‎google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public void testCommitReturnsCommitStats() {
4040
new AsyncTransactionManagerImpl(session, mock(Span.class), Options.commitStats())) {
4141
when(session.newTransaction(Options.fromTransactionOptions(Options.commitStats())))
4242
.thenReturn(transaction);
43-
when(transaction.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null));
4443
Timestamp commitTimestamp = Timestamp.ofTimeMicroseconds(1);
4544
CommitResponse response = mock(CommitResponse.class);
4645
when(response.getCommitTimestamp()).thenReturn(commitTimestamp);

‎google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,8 @@ public void request(int numMessages) {}
414414
private void mockRead(final PartialResultSet myResultSet) {
415415
final ArgumentCaptor<SpannerRpc.ResultStreamConsumer> consumer =
416416
ArgumentCaptor.forClass(SpannerRpc.ResultStreamConsumer.class);
417-
Mockito.when(rpc.read(Mockito.any(), consumer.capture(), Mockito.eq(options)))
417+
Mockito.when(
418+
rpc.read(Mockito.any(), consumer.capture(), Mockito.eq(options), Mockito.anyBoolean()))
418419
.then(
419420
invocation -> {
420421
consumer.getValue().onPartialResultSet(myResultSet);

‎google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -763,9 +763,12 @@ public void testSessionNotFoundReadWriteTransaction() {
763763
when(rpc.asyncDeleteSession(Mockito.anyString(), Mockito.anyMap()))
764764
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
765765
when(rpc.executeQuery(
766-
any(ExecuteSqlRequest.class), any(ResultStreamConsumer.class), any(Map.class)))
766+
any(ExecuteSqlRequest.class),
767+
any(ResultStreamConsumer.class),
768+
any(Map.class),
769+
any(Boolean.class)))
767770
.thenReturn(closedStreamingCall);
768-
when(rpc.executeQuery(any(ExecuteSqlRequest.class), any(Map.class)))
771+
when(rpc.executeQuery(any(ExecuteSqlRequest.class), any(Map.class), any(Boolean.class)))
769772
.thenThrow(sessionNotFound);
770773
when(rpc.executeBatchDml(any(ExecuteBatchDmlRequest.class), any(Map.class)))
771774
.thenThrow(sessionNotFound);

‎google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,8 @@ public void inlineBegin() {
286286
.setId(ByteString.copyFromUtf8(UUID.randomUUID().toString()))
287287
.build()));
288288
final AtomicInteger transactionsStarted = new AtomicInteger();
289-
when(rpc.executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap()))
289+
when(rpc.executeQuery(
290+
Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap(), Mockito.anyBoolean()))
290291
.thenAnswer(
291292
invocation -> {
292293
ResultSet.Builder builder =
@@ -334,7 +335,9 @@ public void inlineBegin() {
334335
verify(rpc, Mockito.never())
335336
.beginTransaction(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap());
336337
// We should have 2 ExecuteSql requests.
337-
verify(rpc, times(2)).executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap());
338+
verify(rpc, times(2))
339+
.executeQuery(
340+
Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap(), Mockito.anyBoolean());
338341
// But only 1 with a BeginTransaction.
339342
assertThat(transactionsStarted.get()).isEqualTo(1);
340343
}

0 commit comments

Comments
 (0)