Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for Directed Read options #2373

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ void initTransaction() {
@GuardedBy("lock")
protected boolean isClosed = false;

protected boolean readOnly;
rajatbhatta marked this conversation as resolved.
Show resolved Hide resolved

// A per-transaction sequence number used to identify this ExecuteSqlRequests. Required for DML,
// ignored for query by the server.
private AtomicLong seqNo = new AtomicLong();
Expand All @@ -416,6 +418,7 @@ void initTransaction() {
this.defaultQueryOptions = builder.defaultQueryOptions;
this.span = builder.span;
this.executorProvider = builder.executorProvider;
this.readOnly = true;
}

@Override
Expand Down Expand Up @@ -613,6 +616,9 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
if (options.hasDataBoostEnabled()) {
builder.setDataBoostEnabled(options.dataBoostEnabled());
}
if (options.hasDirectedReadOptions()) {
builder.setDirectedReadOptions(options.directedReadOptions());
}
builder.setSeqno(getSeqNo());
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
builder.setRequestOptions(buildRequestOptions(options));
Expand Down Expand Up @@ -688,7 +694,11 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
}
SpannerRpc.StreamingCall call =
rpc.executeQuery(
request.build(), stream.consumer(), session.getOptions(), isRouteToLeader());
request.build(),
stream.consumer(),
session.getOptions(),
isRouteToLeader(),
readOnly);
call.request(prefetchChunks);
stream.setCall(call, request.getTransaction().hasBegin());
return stream;
Expand Down Expand Up @@ -800,6 +810,9 @@ ResultSet readInternalWithOptions(
if (readOptions.hasDataBoostEnabled()) {
builder.setDataBoostEnabled(readOptions.dataBoostEnabled());
}
if (readOptions.hasDirectedReadOptions()) {
builder.setDirectedReadOptions(readOptions.directedReadOptions());
}
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
Expand All @@ -825,7 +838,11 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
builder.setRequestOptions(buildRequestOptions(readOptions));
SpannerRpc.StreamingCall call =
rpc.read(
builder.build(), stream.consumer(), session.getOptions(), isRouteToLeader());
builder.build(),
stream.consumer(),
session.getOptions(),
isRouteToLeader(),
readOnly);
call.request(prefetchChunks);
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
return stream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package com.google.cloud.spanner;

import com.google.cloud.spanner.util.DirectedReadsUtil;
import com.google.common.base.Preconditions;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.RequestOptions.Priority;
import java.io.Serializable;
import java.util.Objects;
Expand Down Expand Up @@ -224,6 +226,18 @@ public static CreateUpdateDeleteAdminApiOption validateOnly(Boolean validateOnly
return new ValidateOnlyOption(validateOnly);
}

/**
* Option to request DirectedRead for ReadOnlyTransaction and SingleUseTransaction.
*
* <p>The DirectedReadOptions can be used to indicate which replicas or regions should be used for
* non-transactional reads or queries. Not all requests can be sent to non-leader replicas. In
* particular, some requests such as reads within read-write transactions must be sent to a
* designated leader replica. These requests ignore DirectedReadOptions.
*/
public static ReadAndQueryOption directedRead(DirectedReadOptions directedReadOptions) {
return new DirectedReadOption(directedReadOptions);
}

/** Option to request {@link CommitStats} for read/write transactions. */
static final class CommitStatsOption extends InternalOption implements TransactionOption {
@Override
Expand Down Expand Up @@ -325,6 +339,23 @@ void appendToOptions(Options options) {
}
}

static final class DirectedReadOption extends InternalOption implements ReadAndQueryOption {
private final DirectedReadOptions directedReadOptions;

DirectedReadOption(DirectedReadOptions directedReadOptions) {
this.directedReadOptions =
Preconditions.checkNotNull(
DirectedReadsUtil.validateDirectedReadOptions(directedReadOptions),
"DirectedReadOptions cannot be null");
;
}

@Override
void appendToOptions(Options options) {
options.directedReadOptions = directedReadOptions;
}
}

private boolean withCommitStats;
private Long limit;
private Integer prefetchChunks;
Expand All @@ -338,6 +369,7 @@ void appendToOptions(Options options) {
private Boolean validateOnly;
private Boolean withOptimisticLock;
private Boolean dataBoostEnabled;
private DirectedReadOptions directedReadOptions;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -438,6 +470,14 @@ Boolean dataBoostEnabled() {
return dataBoostEnabled;
}

boolean hasDirectedReadOptions() {
return directedReadOptions != null;
}

DirectedReadOptions directedReadOptions() {
return directedReadOptions;
}

@Override
public String toString() {
StringBuilder b = new StringBuilder();
Expand Down Expand Up @@ -477,6 +517,9 @@ public String toString() {
if (dataBoostEnabled != null) {
b.append("dataBoostEnabled: ").append(dataBoostEnabled).append(' ');
}
if (directedReadOptions != null) {
b.append("directedReadOptions: ").append(directedReadOptions).append(' ');
}
return b.toString();
}

Expand Down Expand Up @@ -512,7 +555,8 @@ public boolean equals(Object o) {
&& Objects.equals(etag(), that.etag())
&& Objects.equals(validateOnly(), that.validateOnly())
&& Objects.equals(withOptimisticLock(), that.withOptimisticLock())
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled());
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled())
&& Objects.equals(directedReadOptions(), that.directedReadOptions());
}

@Override
Expand Down Expand Up @@ -557,6 +601,9 @@ public int hashCode() {
if (dataBoostEnabled != null) {
result = 31 * result + dataBoostEnabled.hashCode();
}
if (directedReadOptions != null) {
result = 31 * result + directedReadOptions.hashCode();
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.cloud.spanner.spi.SpannerRpcFactory;
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.util.DirectedReadsUtil;
import com.google.cloud.spanner.v1.SpannerSettings;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -48,6 +49,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.SpannerGrpc;
Expand Down Expand Up @@ -133,6 +135,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final CallCredentialsProvider callCredentialsProvider;
private final CloseableExecutorProvider asyncExecutorProvider;
private final String compressorName;
private final DirectedReadOptions directedReadOptions;
private final boolean leaderAwareRoutingEnabled;

/** Interface that can be used to provide {@link CallCredentials} to {@link SpannerOptions}. */
Expand Down Expand Up @@ -599,6 +602,7 @@ private SpannerOptions(Builder builder) {
callCredentialsProvider = builder.callCredentialsProvider;
asyncExecutorProvider = builder.asyncExecutorProvider;
compressorName = builder.compressorName;
directedReadOptions = builder.directedReadOptions;
leaderAwareRoutingEnabled = builder.leaderAwareRoutingEnabled;
}

Expand Down Expand Up @@ -700,6 +704,7 @@ public static class Builder
private CloseableExecutorProvider asyncExecutorProvider;
private String compressorName;
private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST");
private DirectedReadOptions directedReadOptions;
private boolean leaderAwareRoutingEnabled = true;

private Builder() {
Expand Down Expand Up @@ -1093,6 +1098,21 @@ public Builder setCompressorName(@Nullable String compressorName) {
return this;
}

/**
* Sets the {@link DirectedReadOptions} that specify which replicas or regions should be used
* for non-transactional reads or queries.
*
* <p>DirectedReadOptions set at the request level will take precedence over the options set
* using this method.
*/
public Builder setDirectedReadOptions(DirectedReadOptions directedReadOptions) {
this.directedReadOptions =
Preconditions.checkNotNull(
DirectedReadsUtil.validateDirectedReadOptions(directedReadOptions),
"DirectedReadOptions cannot be null");
return this;
}

/**
* Sets the {@link ExecutorProvider} to use for high-level async calls that need an executor,
* such as fetching results for an {@link AsyncResultSet}.
Expand Down Expand Up @@ -1321,6 +1341,10 @@ public String getCompressorName() {
return compressorName;
}

public DirectedReadOptions getDirectedReadOptions() {
return directedReadOptions;
}

public boolean isLeaderAwareRoutingEnabled() {
return leaderAwareRoutingEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ private TransactionContextImpl(Builder builder) {
this.trackTransactionStarter = builder.trackTransactionStarter;
this.options = builder.options;
this.finishedAsyncOperations.set(null);
this.readOnly = false;
}

@Override
Expand Down Expand Up @@ -722,7 +723,7 @@ private ResultSet internalExecuteUpdate(
/* withTransactionSelector = */ true);
try {
com.google.spanner.v1.ResultSet resultSet =
rpc.executeQuery(builder.build(), session.getOptions(), isRouteToLeader());
rpc.executeQuery(builder.build(), session.getOptions(), isRouteToLeader(), readOnly);
if (resultSet.getMetadata().hasTransaction()) {
onTransactionMetadata(
resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin());
Expand Down Expand Up @@ -752,7 +753,9 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... o
// Register the update as an async operation that must finish before the transaction may
// commit.
increaseAsyncOperations();
resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions(), isRouteToLeader());
resultSet =
rpc.executeQueryAsync(
builder.build(), session.getOptions(), isRouteToLeader(), readOnly);
} catch (Throwable t) {
decreaseAsyncOperations();
throw t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStubSettings;
import com.google.cloud.spanner.encryption.EncryptionConfigProtoMapper;
import com.google.cloud.spanner.util.DirectedReadsUtil;
import com.google.cloud.spanner.v1.stub.GrpcSpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
Expand Down Expand Up @@ -162,6 +163,7 @@
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.CreateSessionRequest;
import com.google.spanner.v1.DeleteSessionRequest;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteBatchDmlResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
Expand Down Expand Up @@ -259,6 +261,7 @@ public class GapicSpannerRpc implements SpannerRpc {
private static final double ADMINISTRATIVE_REQUESTS_RATE_LIMIT = 1.0D;
private static final ConcurrentMap<String, RateLimiter> ADMINISTRATIVE_REQUESTS_RATE_LIMITERS =
new ConcurrentHashMap<>();
private final DirectedReadOptions directedReadOptions;
private final boolean leaderAwareRoutingEnabled;

public static GapicSpannerRpc create(SpannerOptions options) {
Expand Down Expand Up @@ -310,6 +313,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
internalHeaderProviderBuilder.getResourceHeaderKey());
this.callCredentialsProvider = options.getCallCredentialsProvider();
this.compressorName = options.getCompressorName();
this.directedReadOptions = options.getDirectedReadOptions();
this.leaderAwareRoutingEnabled = options.isLeaderAwareRoutingEnabled();

if (initializeStubs) {
Expand Down Expand Up @@ -1617,7 +1621,16 @@ public StreamingCall read(
ReadRequest request,
ResultStreamConsumer consumer,
@Nullable Map<Option, ?> options,
boolean routeToLeader) {
boolean routeToLeader,
boolean readOnly) {
DirectedReadOptions preferredDirectedReadOptions =
DirectedReadsUtil.validateAndGetPreferredDirectedReadOptions(
directedReadOptions,
request.hasDirectedReadOptions() ? request.getDirectedReadOptions() : null,
readOnly);
if (preferredDirectedReadOptions != null) {
request = request.toBuilder().setDirectedReadOptions(preferredDirectedReadOptions).build();
}
GrpcCallContext context =
newCallContext(
options, request.getSession(), request, SpannerGrpc.getReadMethod(), routeToLeader);
Expand All @@ -1638,13 +1651,27 @@ public Set<Code> getExecuteQueryRetryableCodes() {

@Override
public ResultSet executeQuery(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, boolean routeToLeader) {
return get(executeQueryAsync(request, options, routeToLeader));
ExecuteSqlRequest request,
@Nullable Map<Option, ?> options,
boolean routeToLeader,
boolean readOnly) {
return get(executeQueryAsync(request, options, routeToLeader, readOnly));
}

@Override
public ApiFuture<ResultSet> executeQueryAsync(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, boolean routeToLeader) {
ExecuteSqlRequest request,
@Nullable Map<Option, ?> options,
boolean routeToLeader,
boolean readOnly) {
DirectedReadOptions preferredDirectedReadOptions =
DirectedReadsUtil.validateAndGetPreferredDirectedReadOptions(
directedReadOptions,
request.hasDirectedReadOptions() ? request.getDirectedReadOptions() : null,
readOnly);
if (preferredDirectedReadOptions != null) {
request = request.toBuilder().setDirectedReadOptions(preferredDirectedReadOptions).build();
}
GrpcCallContext context =
newCallContext(
options,
Expand All @@ -1658,6 +1685,10 @@ public ApiFuture<ResultSet> executeQueryAsync(
@Override
public ResultSet executePartitionedDml(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options) {
DirectedReadsUtil.validateAndGetPreferredDirectedReadOptions(
directedReadOptions,
request.hasDirectedReadOptions() ? request.getDirectedReadOptions() : null,
false);
GrpcCallContext context =
newCallContext(
options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod(), true);
Expand All @@ -1672,6 +1703,10 @@ public RetrySettings getPartitionedDmlRetrySettings() {
@Override
public ServerStream<PartialResultSet> executeStreamingPartitionedDml(
ExecuteSqlRequest request, Map<Option, ?> options, Duration timeout) {
DirectedReadsUtil.validateAndGetPreferredDirectedReadOptions(
directedReadOptions,
request.hasDirectedReadOptions() ? request.getDirectedReadOptions() : null,
false);
GrpcCallContext context =
newCallContext(
options,
Expand All @@ -1689,7 +1724,16 @@ public StreamingCall executeQuery(
ExecuteSqlRequest request,
ResultStreamConsumer consumer,
@Nullable Map<Option, ?> options,
boolean routeToLeader) {
boolean routeToLeader,
boolean readOnly) {
DirectedReadOptions preferredDirectedReadOptions =
DirectedReadsUtil.validateAndGetPreferredDirectedReadOptions(
directedReadOptions,
request.hasDirectedReadOptions() ? request.getDirectedReadOptions() : null,
readOnly);
if (preferredDirectedReadOptions != null) {
request = request.toBuilder().setDirectedReadOptions(preferredDirectedReadOptions).build();
}
GrpcCallContext context =
newCallContext(
options,
Expand Down
Loading