Skip to content

Commit

Permalink
Merge branch 'main' into remove-grpclb
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Apr 20, 2024
2 parents 00a2e8d + 01b82fa commit 3a66eb2
Show file tree
Hide file tree
Showing 27 changed files with 318 additions and 72 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/unmanaged_dependency_check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ jobs:
# repository
.kokoro/build.sh
- name: Unmanaged dependency check
uses: googleapis/sdk-platform-java/java-shared-dependencies/unmanaged-dependency-check@google-cloud-shared-dependencies/v3.28.1
uses: googleapis/sdk-platform-java/java-shared-dependencies/unmanaged-dependency-check@google-cloud-shared-dependencies/v3.29.0
with:
bom-path: google-cloud-spanner-bom/pom.xml
2 changes: 1 addition & 1 deletion .kokoro/presubmit/graalvm-native-17.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Configure the docker image for kokoro-trampoline.
env_vars: {
key: "TRAMPOLINE_IMAGE"
value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_b:3.28.1"
value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_b:3.29.0"
}

env_vars: {
Expand Down
2 changes: 1 addition & 1 deletion .kokoro/presubmit/graalvm-native.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Configure the docker image for kokoro-trampoline.
env_vars: {
key: "TRAMPOLINE_IMAGE"
value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_a:3.28.1"
value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_a:3.29.0"
}

env_vars: {
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ If you are using Maven with [BOM][libraries-bom], add this to your pom.xml file:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>26.36.0</version>
<version>26.37.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
2 changes: 1 addition & 1 deletion google-cloud-spanner-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>sdk-platform-java-config</artifactId>
<version>3.28.1</version>
<version>3.29.0</version>
</parent>

<name>Google Cloud Spanner BOM</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadCon
private DecodeMode defaultDecodeMode = SpannerOptions.Builder.DEFAULT_DECODE_MODE;
private DirectedReadOptions defaultDirectedReadOption;
private ExecutorProvider executorProvider;
private Clock clock = new Clock();
private Clock clock = Clock.INSTANCE;

Builder() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
* Clock.
*/
class Clock {
static final Clock INSTANCE = new Clock();

Clock() {}

Instant instant() {
return Instant.now();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,13 @@ interface SessionTransaction {
private final SessionReference sessionReference;
private SessionTransaction activeTransaction;
private ISpan currentSpan;
private final Clock clock;

SessionImpl(SpannerImpl spanner, SessionReference sessionReference) {
this.spanner = spanner;
this.tracer = spanner.getTracer();
this.sessionReference = sessionReference;
this.clock = spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock();
}

@Override
Expand Down Expand Up @@ -292,6 +294,7 @@ public ReadContext singleUse(TimestampBound bound) {
.setSpan(currentSpan)
.setTracer(tracer)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.setClock(clock)
.build());
}

Expand All @@ -314,6 +317,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
.setSpan(currentSpan)
.setTracer(tracer)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.setClock(clock)
.buildSingleUseReadOnlyTransaction());
}

Expand All @@ -336,6 +340,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
.setSpan(currentSpan)
.setTracer(tracer)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.setClock(clock)
.build());
}

Expand Down Expand Up @@ -418,9 +423,6 @@ ApiFuture<ByteString> beginTransactionAsync(Options transactionOptions, boolean
}

TransactionContextImpl newTransaction(Options options) {
// A clock instance is passed in {@code SessionPoolOptions} in order to allow mocking via tests.
final Clock poolMaintainerClock =
spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock();
return TransactionContextImpl.newBuilder()
.setSession(this)
.setOptions(options)
Expand All @@ -434,7 +436,7 @@ TransactionContextImpl newTransaction(Options options) {
.setSpan(currentSpan)
.setTracer(tracer)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.setClock(poolMaintainerClock == null ? new Clock() : poolMaintainerClock)
.setClock(clock)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2349,8 +2349,13 @@ private PooledSession pollUninterruptiblyWithTimeout(
+ acquireSessionTimeout.toMillis()
+ "ms for acquiring session. To mitigate error SessionPoolOptions#setAcquireSessionTimeout(Duration) to set a higher timeout"
+ " or increase the number of sessions in the session pool.");
waiter.setException(exception);
throw exception;
if (waiter.setException(exception)) {
// Only throw the exception if setting it on the waiter was successful. The
// waiter.setException(..) method returns false if some other thread in the meantime
// called waiter.set(..), which means that a session became available between the
// time that the TimeoutException was thrown and now.
throw exception;
}
}
return null;
} catch (ExecutionException e) {
Expand Down Expand Up @@ -3043,6 +3048,13 @@ int getNumberOfSessionsBeingCreated() {
}
}

@VisibleForTesting
int getTotalSessionsPlusNumSessionsBeingCreated() {
synchronized (lock) {
return numSessionsBeingCreated + allSessions.size();
}
}

@VisibleForTesting
boolean isMultiplexedSessionBeingCreated() {
synchronized (lock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ public static class Builder {
private boolean useMultiplexedSession = getUseMultiplexedSessionFromEnvVariable();

private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7);
private Clock poolMaintainerClock;
private Clock poolMaintainerClock = Clock.INSTANCE;

private static Position getReleaseToPositionFromSystemProperty() {
// NOTE: This System property is a beta feature. Support for it can be removed in the future.
Expand Down Expand Up @@ -703,7 +703,7 @@ Builder setCloseIfInactiveTransactions() {

@VisibleForTesting
Builder setPoolMaintainerClock(Clock poolMaintainerClock) {
this.poolMaintainerClock = poolMaintainerClock;
this.poolMaintainerClock = Preconditions.checkNotNull(poolMaintainerClock);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ abstract class AbstractMockServerTest {
protected static Server server;
protected static LocalChannelProvider channelProvider;

private Spanner spanner;
protected Spanner spanner;

@BeforeClass
public static void startMockServer() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ SessionImpl mockMultiplexedSession() {
return session;
}

SessionImpl buildMockSession(ReadContext context) {
SpannerImpl spanner = mock(SpannerImpl.class);
SessionImpl buildMockSession(SpannerImpl spanner, ReadContext context) {
Map options = new HashMap<>();
options.put(Option.CHANNEL_HINT, channelHint.getAndIncrement());
final SessionImpl session =
Expand Down Expand Up @@ -140,8 +139,8 @@ public CommitResponse writeWithOptions(
return session;
}

SessionImpl buildMockMultiplexedSession(ReadContext context, Timestamp creationTime) {
SpannerImpl spanner = mock(SpannerImpl.class);
SessionImpl buildMockMultiplexedSession(
SpannerImpl spanner, ReadContext context, Timestamp creationTime) {
Map options = new HashMap<>();
final SessionImpl session =
new SessionImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public void setUp() {
GrpcTransportOptions transportOptions = mock(GrpcTransportOptions.class);
when(transportOptions.getExecutorFactory()).thenReturn(mock(ExecutorFactory.class));
when(spannerOptions.getTransportOptions()).thenReturn(transportOptions);
SessionPoolOptions sessionPoolOptions = mock(SessionPoolOptions.class);
when(sessionPoolOptions.getPoolMaintainerClock()).thenReturn(Clock.INSTANCE);
when(spannerOptions.getSessionPoolOptions()).thenReturn(sessionPoolOptions);
@SuppressWarnings("resource")
SpannerImpl spanner = new SpannerImpl(gapicRpc, spannerOptions);
client = new BatchClientImpl(spanner.getSessionClient(db));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.opencensus.trace.propagation.PropagationComponent;
import io.opencensus.trace.propagation.TextFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -62,7 +63,8 @@ public class FailOnOverkillTraceComponentImpl extends TraceComponent {
private final Clock clock = ZeroTimeClock.getInstance();
private final ExportComponent exportComponent = new TestExportComponent();
private final TraceConfig traceConfig = new TestTraceConfig();
private static final Map<String, Boolean> spans = new LinkedHashMap<>();
private static final Map<String, Boolean> spans =
Collections.synchronizedMap(new LinkedHashMap<>());

private static final List<String> annotations = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2138,6 +2138,10 @@ public int numSessionsCreated() {
return numSessionsCreated.get();
}

public Map<String, Session> getSessions() {
return sessions;
}

@Override
public List<AbstractMessage> getRequests() {
return new ArrayList<>(this.requests);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ public void setUp() {
.setIncStep(1)
.setKeepAliveIntervalMinutes(2)
.setUseMultiplexedSession(true)
.setPoolMaintainerClock(clock)
.build();
when(spannerOptions.getSessionPoolOptions()).thenReturn(options);
Assume.assumeTrue(options.getUseMultiplexedSession());
multiplexedSessionsRemoved.clear();
}
Expand All @@ -96,7 +98,8 @@ public void testMaintainMultiplexedSession_whenNewSessionCreated_assertThatStale
Instant.ofEpochMilli(clock.currentTimeMillis.get()).getEpochSecond(), 0);
consumer.onSessionReady(
setupMockSession(
buildMockMultiplexedSession(mockContext, timestamp.toProto()), mockContext));
buildMockMultiplexedSession(client, mockContext, timestamp.toProto()),
mockContext));
return null;
})
.when(sessionClient)
Expand All @@ -111,7 +114,8 @@ public void testMaintainMultiplexedSession_whenNewSessionCreated_assertThatStale
Instant.ofEpochMilli(clock.currentTimeMillis.get()).getEpochSecond(), 0);
consumer.onSessionReady(
setupMockSession(
buildMockMultiplexedSession(mockContext, timestamp.toProto()), mockContext));
buildMockMultiplexedSession(client, mockContext, timestamp.toProto()),
mockContext));
return null;
})
.when(sessionClient)
Expand Down Expand Up @@ -162,7 +166,8 @@ public void testMaintainMultiplexedSession_whenNewSessionCreated_assertThatStale
Instant.ofEpochMilli(clock.currentTimeMillis.get()).getEpochSecond(), 0);
consumer.onSessionReady(
setupMockSession(
buildMockMultiplexedSession(mockContext, timestamp.toProto()), mockContext));
buildMockMultiplexedSession(client, mockContext, timestamp.toProto()),
mockContext));
return null;
})
.when(sessionClient)
Expand Down Expand Up @@ -197,7 +202,8 @@ public void testMaintainMultiplexedSession_whenNewSessionCreated_assertThatStale
Instant.ofEpochMilli(clock.currentTimeMillis.get()).getEpochSecond(), 0);
consumer.onSessionReady(
setupMockSession(
buildMockMultiplexedSession(mockContext, timestamp.toProto()), mockContext));
buildMockMultiplexedSession(client, mockContext, timestamp.toProto()),
mockContext));
return null;
})
.when(sessionClient)
Expand Down Expand Up @@ -236,7 +242,8 @@ public void testMaintainMultiplexedSession_whenNewSessionCreated_assertThatStale
Instant.ofEpochMilli(clock.currentTimeMillis.get()).getEpochSecond(), 0);
consumer.onSessionReady(
setupMockSession(
buildMockMultiplexedSession(mockContext, timestamp.toProto()), mockContext));
buildMockMultiplexedSession(client, mockContext, timestamp.toProto()),
mockContext));
return null;
})
.when(sessionClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public void setUp() {
.setMaxSessions(2)
.setUseMultiplexedSession(true)
.build();
when(spannerOptions.getSessionPoolOptions()).thenReturn(options);
Assume.assumeTrue(options.getUseMultiplexedSession());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ public ScheduledExecutorService get() {
doNothing().when(span).end();
doNothing().when(span).addAnnotation("Starting Commit");
when(spanner.getRpc()).thenReturn(rpc);
SessionPoolOptions sessionPoolOptions = mock(SessionPoolOptions.class);
when(sessionPoolOptions.getPoolMaintainerClock()).thenReturn(Clock.INSTANCE);
when(spannerOptions.getSessionPoolOptions()).thenReturn(sessionPoolOptions);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ public void setUp() {
GrpcTransportOptions transportOptions = mock(GrpcTransportOptions.class);
when(transportOptions.getExecutorFactory()).thenReturn(mock(ExecutorFactory.class));
when(spannerOptions.getTransportOptions()).thenReturn(transportOptions);
when(spannerOptions.getSessionPoolOptions()).thenReturn(mock(SessionPoolOptions.class));
SessionPoolOptions sessionPoolOptions = mock(SessionPoolOptions.class);
when(sessionPoolOptions.getPoolMaintainerClock()).thenReturn(Clock.INSTANCE);
when(spannerOptions.getSessionPoolOptions()).thenReturn(sessionPoolOptions);
when(spannerOptions.getOpenTelemetry()).thenReturn(OpenTelemetry.noop());
@SuppressWarnings("resource")
SpannerImpl spanner = new SpannerImpl(rpc, spannerOptions);
Expand Down
Loading

0 comments on commit 3a66eb2

Please sign in to comment.