Skip to content

Commit

Permalink
chore: randomize session pool order based on TPS (#2792)
Browse files Browse the repository at this point in the history
* chore: randomize session pool order based on TPS

* chore: remove unnecessary changes
  • Loading branch information
olavloite authored Mar 19, 2024
1 parent b782725 commit d81da4e
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,13 @@ final class PoolMaintainer {
*/
@VisibleForTesting Instant lastExecutionTime;

/**
* The previous numSessionsAcquired seen by the maintainer. This is used to calculate the
* transactions per second, which again is used to determine whether to randomize the order of
* the session pool.
*/
private long prevNumSessionsAcquired;

boolean closed = false;

@GuardedBy("lock")
Expand Down Expand Up @@ -1794,6 +1801,12 @@ void maintainPool() {
return;
}
running = true;
if (loopFrequency >= 1000L) {
SessionPool.this.transactionsPerSecond =
(SessionPool.this.numSessionsAcquired - prevNumSessionsAcquired)
/ (loopFrequency / 1000L);
}
this.prevNumSessionsAcquired = SessionPool.this.numSessionsAcquired;
}
Instant currTime = clock.instant();
removeIdleSessions(currTime);
Expand Down Expand Up @@ -1995,6 +2008,7 @@ enum Position {
private final SettableFuture<Dialect> dialect = SettableFuture.create();
private final String databaseRole;
private final SessionClient sessionClient;
private final int numChannels;
private final ScheduledExecutorService executor;
private final ExecutorFactory<ScheduledExecutorService> executorFactory;

Expand Down Expand Up @@ -2054,6 +2068,9 @@ enum Position {
@GuardedBy("lock")
private long numIdleSessionsRemoved = 0;

@GuardedBy("lock")
private long transactionsPerSecond = 0L;

@GuardedBy("lock")
private long numLeakedSessionsRemoved = 0;

Expand Down Expand Up @@ -2190,6 +2207,7 @@ private SessionPool(
this.executorFactory = executorFactory;
this.executor = executor;
this.sessionClient = sessionClient;
this.numChannels = sessionClient.getSpanner().getOptions().getNumChannels();
this.clock = clock;
this.initialReleasePosition = initialReleasePosition;
this.poolMaintainer = new PoolMaintainer();
Expand Down Expand Up @@ -2493,11 +2511,13 @@ private void releaseSession(
if (closureFuture != null) {
return;
}
if (waiters.size() == 0) {
if (waiters.isEmpty()) {
// There are no pending waiters.
// Add to a random position if the head of the session pool already contains many sessions
// with the same channel as this one.
if (session.releaseToPosition == Position.FIRST && isUnbalanced(session)) {
// Add to a random position if the transactions per second is high or the head of the
// session pool already contains many sessions with the same channel as this one.
if (session.releaseToPosition != Position.RANDOM && shouldRandomize()) {
session.releaseToPosition = Position.RANDOM;
} else if (session.releaseToPosition == Position.FIRST && isUnbalanced(session)) {
session.releaseToPosition = Position.RANDOM;
} else if (session.releaseToPosition == Position.RANDOM
&& !isNewSession
Expand Down Expand Up @@ -2532,6 +2552,25 @@ private void releaseSession(
}
}

/**
* Returns true if the position where we return the session should be random if:
*
* <ol>
* <li>The current TPS is higher than the configured threshold.
* <li>AND the number of sessions checked out is larger than the number of channels.
* </ol>
*
* The second check prevents the session pool from being randomized when the application is
* running many small, quick queries using a small number of parallel threads. This can cause a
* high TPS, without actually having a high degree of parallelism.
*/
@VisibleForTesting
boolean shouldRandomize() {
return this.options.getRandomizePositionQPSThreshold() > 0
&& this.transactionsPerSecond >= this.options.getRandomizePositionQPSThreshold()
&& this.numSessionsInUse >= this.numChannels;
}

private boolean isUnbalanced(PooledSession session) {
int channel = session.getChannel();
int numChannels = sessionClient.getSpanner().getOptions().getNumChannels();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class SessionPoolOptions {
private final Duration waitForMinSessions;
private final Duration acquireSessionTimeout;
private final Position releaseToPosition;
private final long randomizePositionQPSThreshold;

/** Property for allowing mocking of session maintenance clock. */
private final Clock poolMaintainerClock;
Expand All @@ -89,6 +90,7 @@ private SessionPoolOptions(Builder builder) {
this.waitForMinSessions = builder.waitForMinSessions;
this.acquireSessionTimeout = builder.acquireSessionTimeout;
this.releaseToPosition = builder.releaseToPosition;
this.randomizePositionQPSThreshold = builder.randomizePositionQPSThreshold;
this.inactiveTransactionRemovalOptions = builder.inactiveTransactionRemovalOptions;
this.poolMaintainerClock = builder.poolMaintainerClock;
}
Expand Down Expand Up @@ -118,6 +120,7 @@ public boolean equals(Object o) {
&& Objects.equals(this.waitForMinSessions, other.waitForMinSessions)
&& Objects.equals(this.acquireSessionTimeout, other.acquireSessionTimeout)
&& Objects.equals(this.releaseToPosition, other.releaseToPosition)
&& Objects.equals(this.randomizePositionQPSThreshold, other.randomizePositionQPSThreshold)
&& Objects.equals(
this.inactiveTransactionRemovalOptions, other.inactiveTransactionRemovalOptions)
&& Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock);
Expand All @@ -143,6 +146,7 @@ public int hashCode() {
this.waitForMinSessions,
this.acquireSessionTimeout,
this.releaseToPosition,
this.randomizePositionQPSThreshold,
this.inactiveTransactionRemovalOptions,
this.poolMaintainerClock);
}
Expand Down Expand Up @@ -263,6 +267,10 @@ Position getReleaseToPosition() {
return releaseToPosition;
}

long getRandomizePositionQPSThreshold() {
return randomizePositionQPSThreshold;
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down Expand Up @@ -451,6 +459,13 @@ public static class Builder {
private Duration waitForMinSessions = Duration.ZERO;
private Duration acquireSessionTimeout = Duration.ofSeconds(60);
private Position releaseToPosition = getReleaseToPositionFromSystemProperty();
/**
* The session pool will randomize the position of a session that is being returned when this
* threshold is exceeded. That is: If the transactions per second exceeds this threshold, then
* the session pool will use a random order for the sessions instead of LIFO. The default is 0,
* which means that the option is disabled.
*/
private long randomizePositionQPSThreshold = 0L;

private Clock poolMaintainerClock;

Expand Down Expand Up @@ -487,6 +502,7 @@ private Builder(SessionPoolOptions options) {
this.autoDetectDialect = options.autoDetectDialect;
this.waitForMinSessions = options.waitForMinSessions;
this.acquireSessionTimeout = options.acquireSessionTimeout;
this.randomizePositionQPSThreshold = options.randomizePositionQPSThreshold;
this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions;
this.poolMaintainerClock = options.poolMaintainerClock;
}
Expand Down Expand Up @@ -764,6 +780,13 @@ Builder setReleaseToPosition(Position releaseToPosition) {
return this;
}

Builder setRandomizePositionQPSThreshold(long randomizePositionQPSThreshold) {
Preconditions.checkArgument(
randomizePositionQPSThreshold >= 0L, "randomizePositionQPSThreshold must be >= 0");
this.randomizePositionQPSThreshold = randomizePositionQPSThreshold;
return this;
}

/** Build a SessionPoolOption object */
public SessionPoolOptions build() {
validate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand All @@ -29,6 +31,7 @@
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SessionPool.Position;
import com.google.cloud.spanner.SessionPool.SessionConsumerImpl;
import com.google.common.base.Preconditions;
import io.opencensus.trace.Tracing;
import io.opentelemetry.api.OpenTelemetry;
import java.util.ArrayList;
Expand Down Expand Up @@ -116,6 +119,10 @@ private SessionImpl setupMockSession(final SessionImpl session, final ReadContex
}

private SessionPool createPool() throws Exception {
return createPool(this.options);
}

private SessionPool createPool(SessionPoolOptions options) throws Exception {
// Allow sessions to be added to the head of the pool in all cases in this test, as it is
// otherwise impossible to know which session exactly is getting pinged at what point in time.
SessionPool pool =
Expand Down Expand Up @@ -324,4 +331,67 @@ public void testIdleSessions() throws Exception {
}
assertThat(pool.totalSessions()).isEqualTo(options.getMinSessions());
}

@Test
public void testRandomizeThreshold() throws Exception {
SessionPool pool =
createPool(
this.options
.toBuilder()
.setMaxSessions(400)
.setLoopFrequency(1000L)
.setRandomizePositionQPSThreshold(4)
.build());
List<Session> sessions;

// Run a maintenance loop. No sessions have been checked out so far, so the TPS should be 0.
runMaintenanceLoop(clock, pool, 1);
assertFalse(pool.shouldRandomize());

// Get and return one session. This means TPS == 1.
returnSessions(1, useSessions(1, pool));
runMaintenanceLoop(clock, pool, 1);
assertFalse(pool.shouldRandomize());

// Get and return four sessions. This means TPS == 4, and that no sessions are checked out.
returnSessions(4, useSessions(4, pool));
runMaintenanceLoop(clock, pool, 1);
assertFalse(pool.shouldRandomize());

// Get four sessions without returning them.
// This means TPS == 4 and that they are all still checked out.
sessions = useSessions(4, pool);
runMaintenanceLoop(clock, pool, 1);
assertTrue(pool.shouldRandomize());
// Returning one of the sessions reduces the number of checked out sessions enough to stop the
// randomizing.
returnSessions(1, sessions);
runMaintenanceLoop(clock, pool, 1);
assertFalse(pool.shouldRandomize());

// Get three more session and run the maintenance loop.
// The TPS is then 3, as we've only gotten 3 sessions since the last maintenance run.
// That means that we should not randomize.
sessions.addAll(useSessions(3, pool));
runMaintenanceLoop(clock, pool, 1);
assertFalse(pool.shouldRandomize());

returnSessions(sessions.size(), sessions);
}

private List<Session> useSessions(int numSessions, SessionPool pool) {
List<Session> sessions = new ArrayList<>(numSessions);
for (int i = 0; i < numSessions; i++) {
sessions.add(pool.getSession());
sessions.get(sessions.size() - 1).singleUse().executeQuery(Statement.of("SELECT 1")).next();
}
return sessions;
}

private void returnSessions(int numSessions, List<Session> sessions) {
Preconditions.checkArgument(numSessions <= sessions.size());
for (int i = 0; i < numSessions; i++) {
sessions.remove(0).close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -218,4 +219,31 @@ public void verifyDefaultAcquireSessionTimeout() {

assertEquals(Duration.ofSeconds(60), sessionPoolOptions.getAcquireSessionTimeout());
}

@Test
public void testRandomizePositionQPSThreshold() {
assertEquals(0L, SessionPoolOptions.newBuilder().build().getRandomizePositionQPSThreshold());
assertEquals(
4L,
SessionPoolOptions.newBuilder()
.setRandomizePositionQPSThreshold(4L)
.build()
.getRandomizePositionQPSThreshold());
assertEquals(
10L,
SessionPoolOptions.newBuilder()
.setRandomizePositionQPSThreshold(4L)
.setRandomizePositionQPSThreshold(10L)
.build()
.getRandomizePositionQPSThreshold());
assertEquals(
0L,
SessionPoolOptions.newBuilder()
.setRandomizePositionQPSThreshold(0L)
.build()
.getRandomizePositionQPSThreshold());
assertThrows(
IllegalArgumentException.class,
() -> SessionPoolOptions.newBuilder().setRandomizePositionQPSThreshold(-1L));
}
}

0 comments on commit d81da4e

Please sign in to comment.