Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up Spanner before merging to master #3362

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
super(
checkNotNull(session),
checkNotNull(bound),
checkNotNull(spanner).getOptions().getGapicSpannerRpc(),
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
spanner.getOptions().getPrefetchChunks());
this.sessionName = session.getName();
this.options = session.getOptions();
Expand All @@ -82,7 +82,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
checkNotNull(session),
checkNotNull(batchTransactionId).getTransactionId(),
batchTransactionId.getTimestamp(),
checkNotNull(spanner).getOptions().getGapicSpannerRpc(),
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
spanner.getOptions().getPrefetchChunks());
this.sessionName = session.getName();
this.options = session.getOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
}

private final Random random = new Random();
private final SpannerRpc rawGrpcRpc;
private final SpannerRpc gapicRpc;
private final int defaultPrefetchChunks;

Expand All @@ -153,12 +152,10 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
private boolean spannerIsClosed = false;

SpannerImpl(
SpannerRpc rawGrpcRpc,
SpannerRpc gapicRpc,
int defaultPrefetchChunks,
SpannerOptions options) {
super(options);
this.rawGrpcRpc = rawGrpcRpc;
this.gapicRpc = gapicRpc;
this.defaultPrefetchChunks = defaultPrefetchChunks;
this.dbAdminClient = new DatabaseAdminClientImpl(options.getProjectId(), gapicRpc);
Expand All @@ -169,7 +166,6 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
SpannerImpl(SpannerOptions options) {
this(
options.getSpannerRpcV1(),
options.getGapicSpannerRpc(),
options.getPrefetchChunks(),
options);
}
Expand Down Expand Up @@ -336,12 +332,10 @@ public void close() {
} catch (InterruptedException | ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
for (ManagedChannel channel : getOptions().getRpcChannels()) {
try {
channel.shutdown();
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Failed to close channel", e);
}
try {
gapicRpc.shutdown();

This comment was marked as spam.

This comment was marked as spam.

} catch (RuntimeException e) {
logger.log(Level.WARNING, "Failed to close channel", e);

This comment was marked as spam.

This comment was marked as spam.

}
}

Expand Down Expand Up @@ -1067,18 +1061,21 @@ ResultSet executeQueryInternalWithOptions(
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, QUERY) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
return new CloseableServerStreamIterator<PartialResultSet>(
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
SpannerRpc.StreamingCall call =
rpc.executeQuery(
resumeToken == null
? request
: request.toBuilder().setResumeToken(resumeToken).build(),
null,
session.options));

// TODO(hzyi): make resume work

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

// Let resume fail for now. Gapic has its own resume, but in order not
// to introduce too much change at a time, we decide to plumb up
// ServerStream first and then figure out how to make resume work
stream.consumer(),
session.options);
// StreamController does not auto-request 1 message. Kick it off mannually
call.request(1);
if (prefetchChunks > 1) {
call.request(prefetchChunks - 1);
}
stream.setCall(call);
return stream;
}
};
return new GrpcResultSet(stream, this, queryMode);
Expand Down Expand Up @@ -1178,18 +1175,21 @@ ResultSet readInternalWithOptions(
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
return new CloseableServerStreamIterator<PartialResultSet>(
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
SpannerRpc.StreamingCall call =
rpc.read(
resumeToken == null
? request
: request.toBuilder().setResumeToken(resumeToken).build(),
null,
session.options));

// TODO(hzyi): make resume work
// Let resume fail for now. Gapic has its own resume, but in order not
// to introduce too much change at a time, we decide to plumb up
// ServerStream first and then figure out how to make resume work
stream.consumer(),
session.options);
// StreamController does not auto-request 1 message. Kick it off mannually
call.request(1);

This comment was marked as spam.

This comment was marked as spam.

if (prefetchChunks > 1) {
call.request(prefetchChunks - 1);
}
stream.setCall(call);
return stream;
}
};
GrpcResultSet resultSet =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

package com.google.cloud.spanner;

import com.google.api.gax.grpc.GrpcInterceptorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.ServiceDefaults;
import com.google.cloud.ServiceOptions;
import com.google.cloud.ServiceRpc;
import com.google.cloud.TransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.spi.SpannerRpcFactory;
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
import com.google.cloud.spanner.spi.v1.GrpcSpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -53,7 +55,8 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
"https://www.googleapis.com/auth/spanner.admin",
"https://www.googleapis.com/auth/spanner.data");
private static final int MAX_CHANNELS = 256;
private static final RpcChannelFactory DEFAULT_RPC_CHANNEL_FACTORY = new NettyRpcChannelFactory();
private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
private static final int MAX_HEADER_LIST_SIZE = 32 * 1024; //bytes

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


/** Default implementation of {@code SpannerFactory}. */
private static class DefaultSpannerFactory implements SpannerFactory {
Expand All @@ -71,29 +74,28 @@ private static class DefaultSpannerRpcFactory implements SpannerRpcFactory {

@Override
public ServiceRpc create(SpannerOptions options) {
return new GrpcSpannerRpc(options);
return new GapicSpannerRpc(options);
}
}

private final List<ManagedChannel> rpcChannels;
private final TransportChannelProvider channelProvider;
private final GrpcInterceptorProvider interceptorProvider;
private final SessionPoolOptions sessionPoolOptions;
private final int prefetchChunks;
private final int numChannels;
private final ImmutableMap<String, String> sessionLabels;

private SpannerOptions(Builder builder) {
super(SpannerFactory.class, SpannerRpcFactory.class, builder, new SpannerDefaults());
numChannels = builder.numChannels;
String userAgent = getUserAgent();

This comment was marked as spam.

This comment was marked as spam.

RpcChannelFactory defaultRpcChannelFactory =
userAgent == null
? DEFAULT_RPC_CHANNEL_FACTORY
: new NettyRpcChannelFactory(userAgent);
rpcChannels =
createChannels(
getHost(),
MoreObjects.firstNonNull(builder.rpcChannelFactory, defaultRpcChannelFactory),
numChannels);
numChannels = builder.numChannels;
Preconditions.checkArgument(
numChannels >= 1 && numChannels <= MAX_CHANNELS,
"Number of channels must fall in the range [1, %s], found: %s",
MAX_CHANNELS,
numChannels);

channelProvider = builder.channelProvider;
interceptorProvider = builder.interceptorProvider;
sessionPoolOptions =
builder.sessionPoolOptions != null
? builder.sessionPoolOptions
Expand All @@ -107,10 +109,11 @@ public static class Builder
extends ServiceOptions.Builder<
Spanner, SpannerOptions, SpannerOptions.Builder> {
private static final int DEFAULT_PREFETCH_CHUNKS = 4;
private RpcChannelFactory rpcChannelFactory;
private TransportChannelProvider channelProvider;
private GrpcInterceptorProvider interceptorProvider;

/** By default, we create 4 channels per {@link SpannerOptions} */
private int numChannels = 4;

private int prefetchChunks = DEFAULT_PREFETCH_CHUNKS;
private SessionPoolOptions sessionPoolOptions;
private ImmutableMap<String, String> sessionLabels;
Expand All @@ -123,6 +126,8 @@ private Builder() {}
this.sessionPoolOptions = options.sessionPoolOptions;
this.prefetchChunks = options.prefetchChunks;
this.sessionLabels = options.sessionLabels;
this.channelProvider = options.channelProvider;
this.interceptorProvider = options.interceptorProvider;
}

@Override
Expand All @@ -134,9 +139,21 @@ public Builder setTransportOptions(TransportOptions transportOptions) {
return super.setTransportOptions(transportOptions);
}

/** Sets the factory for creating gRPC channels. If not set, a default will be used. */
public Builder setRpcChannelFactory(RpcChannelFactory factory) {
this.rpcChannelFactory = factory;
/**
* Sets the {@code ChannelProvider}. {@link GapicSpannerRpc} would create a default
* one if none is provided.
*/
public Builder setChannelProvider(TransportChannelProvider channelProvider) {
this.channelProvider = channelProvider;
return this;
}

/**
* Sets the {@code GrpcInterceptorProvider}. {@link GapicSpannerRpc} would create
* a default one if none is provided.
*/
public Builder setInterceptorProvider(GrpcInterceptorProvider interceptorProvider) {
this.interceptorProvider = interceptorProvider;
return this;
}

Expand Down Expand Up @@ -197,14 +214,6 @@ public SpannerOptions build() {
}
}

/**
* Interface for gRPC channel creation. Most users won't need to use this, as the default covers
* typical deployment scenarios.
*/
public interface RpcChannelFactory {
ManagedChannel newChannel(String host, int port);
}

/** Returns default instance of {@code SpannerOptions}. */
public static SpannerOptions getDefaultInstance() {
return newBuilder().build();
Expand All @@ -214,8 +223,12 @@ public static Builder newBuilder() {
return new Builder();
}

public List<ManagedChannel> getRpcChannels() {
return rpcChannels;
public TransportChannelProvider getChannelProvider() {
return channelProvider;
}

public GrpcInterceptorProvider getInterceptorProvider() {
return interceptorProvider;
}

public int getNumChannels() {
Expand All @@ -238,88 +251,11 @@ public static GrpcTransportOptions getDefaultGrpcTransportOptions() {
return GrpcTransportOptions.newBuilder().build();
}

/**
* Returns the default RPC channel factory used when none is specified. This may be useful for
* callers that wish to add interceptors to gRPC channels used by the Cloud Spanner client
* library.
*/
public static RpcChannelFactory getDefaultRpcChannelFactory() {
return DEFAULT_RPC_CHANNEL_FACTORY;
}

@Override
protected String getDefaultHost() {
return DEFAULT_HOST;
}

private static List<ManagedChannel> createChannels(
String rootUrl, RpcChannelFactory factory, int numChannels) {
Preconditions.checkArgument(
numChannels >= 1 && numChannels <= MAX_CHANNELS,
"Number of channels must fall in the range [1, %s], found: %s",
MAX_CHANNELS,
numChannels);
ImmutableList.Builder<ManagedChannel> builder = ImmutableList.builder();
for (int i = 0; i < numChannels; i++) {
builder.add(createChannel(rootUrl, factory));
}
return builder.build();
}

private static ManagedChannel createChannel(String rootUrl, RpcChannelFactory factory) {
URL url;
try {
url = new URL(rootUrl);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Invalid host: " + rootUrl, e);
}
ManagedChannel channel =
factory.newChannel(url.getHost(), url.getPort() > 0 ? url.getPort() : url.getDefaultPort());
return channel;
}

static class NettyRpcChannelFactory implements RpcChannelFactory {
private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
private static final int MAX_HEADER_LIST_SIZE = 32 * 1024; //bytes
private final String userAgent;
private final List<ClientInterceptor> interceptors;

NettyRpcChannelFactory() {
this(null);
}

NettyRpcChannelFactory(String userAgent) {
this(userAgent, ImmutableList.<ClientInterceptor>of());
}

NettyRpcChannelFactory(String userAgent, List<ClientInterceptor> interceptors) {
this.userAgent = userAgent;
this.interceptors = interceptors;
}

@Override
public ManagedChannel newChannel(String host, int port) {
NettyChannelBuilder builder =
NettyChannelBuilder.forAddress(host, port)
.sslContext(newSslContext())

This comment was marked as spam.

This comment was marked as spam.

.intercept(interceptors)
.maxHeaderListSize(MAX_HEADER_LIST_SIZE)
.maxMessageSize(MAX_MESSAGE_SIZE);
if (userAgent != null) {
builder.userAgent(userAgent);
}
return builder.build();
}

private static SslContext newSslContext() {

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

try {
return GrpcSslContexts.forClient().ciphers(null).build();
} catch (SSLException e) {
throw new RuntimeException("SSL configuration failed: " + e.getMessage(), e);
}
}
}

private static class SpannerDefaults implements
ServiceDefaults<Spanner, SpannerOptions> {

Expand Down Expand Up @@ -348,10 +284,6 @@ protected SpannerRpc getSpannerRpcV1() {
return (SpannerRpc) getRpc();
}

protected SpannerRpc getGapicSpannerRpc() {
return GapicSpannerRpc.create(this);
}

@SuppressWarnings("unchecked")
@Override
public Builder toBuilder() {
Expand Down
Loading