Skip to content

Commit

Permalink
[grid] keep HttpClient alive until unused #12558 (#12978)
Browse files Browse the repository at this point in the history
* fixed merge conflict

* [grid] use a URI as key to the map

* [java] make the json parsing exception text more helpful

* [java] fixed the get session url endpoint

* [java] use sessions.getUri when only the session uri is needed

* Revert "[java] make the json parsing exception text more helpful"

This reverts commit ce7cfc8.

* [java] format script

---------

Co-authored-by: Diego Molina <diemol@users.noreply.github.com>
  • Loading branch information
joerg1985 and diemol authored Nov 13, 2023
1 parent de22f34 commit 600a614
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 28 deletions.
127 changes: 103 additions & 24 deletions java/src/org/openqa/selenium/grid/router/HandleSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,25 @@
import static org.openqa.selenium.remote.tracing.Tags.HTTP_REQUEST_EVENT;
import static org.openqa.selenium.remote.tracing.Tags.HTTP_RESPONSE;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import java.net.URL;
import java.time.Duration;
import java.io.Closeable;
import java.net.URI;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.openqa.selenium.NoSuchSessionException;
import org.openqa.selenium.concurrent.GuardedRunnable;
import org.openqa.selenium.grid.data.Session;
import org.openqa.selenium.grid.sessionmap.SessionMap;
import org.openqa.selenium.grid.web.ReverseProxyHandler;
import org.openqa.selenium.internal.Require;
import org.openqa.selenium.net.Urls;
import org.openqa.selenium.remote.ErrorCodec;
import org.openqa.selenium.remote.SessionId;
import org.openqa.selenium.remote.http.ClientConfig;
Expand All @@ -58,24 +61,78 @@

class HandleSession implements HttpHandler {

private static final Logger LOG = Logger.getLogger(HandleSession.class.getName());

private static class CacheEntry {
private final HttpClient httpClient;
private final AtomicLong inUse;
// volatile as the ConcurrentMap will not take care of synchronization
private volatile Instant lastUse;

public CacheEntry(HttpClient httpClient, long initialUsage) {
this.httpClient = httpClient;
this.inUse = new AtomicLong(initialUsage);
this.lastUse = Instant.now();
}
}

private static class UsageCountingReverseProxyHandler extends ReverseProxyHandler
implements Closeable {
private final CacheEntry entry;

public UsageCountingReverseProxyHandler(
Tracer tracer, HttpClient httpClient, CacheEntry entry) {
super(tracer, httpClient);

this.entry = entry;
}

@Override
public void close() {
// set the last use here, to ensure we have to calculate the real inactivity of the client
entry.lastUse = Instant.now();
entry.inUse.decrementAndGet();
}
}

private final Tracer tracer;
private final HttpClient.Factory httpClientFactory;
private final SessionMap sessions;
private final Cache<URL, HttpClient> httpClients;
private final ConcurrentMap<URI, CacheEntry> httpClients;

HandleSession(Tracer tracer, HttpClient.Factory httpClientFactory, SessionMap sessions) {
this.tracer = Require.nonNull("Tracer", tracer);
this.httpClientFactory = Require.nonNull("HTTP client factory", httpClientFactory);
this.sessions = Require.nonNull("Sessions", sessions);

this.httpClients =
CacheBuilder.newBuilder()
// this timeout must be bigger than default connection + read timeout, to ensure we do
// not close HttpClients which might have requests waiting for responses
.expireAfterAccess(Duration.ofMinutes(4))
.removalListener(
(RemovalListener<URL, HttpClient>) removal -> removal.getValue().close())
.build();
this.httpClients = new ConcurrentHashMap<>();

Runnable cleanUpHttpClients =
() -> {
Instant staleBefore = Instant.now().minus(2, ChronoUnit.MINUTES);
Iterator<CacheEntry> iterator = httpClients.values().iterator();

while (iterator.hasNext()) {
CacheEntry entry = iterator.next();

if (entry.inUse.get() != 0) {
// the client is currently in use
return;
} else if (!entry.lastUse.isBefore(staleBefore)) {
// the client was recently used
return;
} else {
// the client has not been used for a while, remove it from the cache
iterator.remove();

try {
entry.httpClient.close();
} catch (Exception ex) {
LOG.log(Level.WARNING, "failed to close a stale httpclient", ex);
}
}
}
};

ScheduledExecutorService cleanUpHttpClientsCacheService =
Executors.newSingleThreadScheduledExecutor(
Expand All @@ -86,7 +143,7 @@ class HandleSession implements HttpHandler {
return thread;
});
cleanUpHttpClientsCacheService.scheduleAtFixedRate(
GuardedRunnable.guard(httpClients::cleanUp), 1, 1, TimeUnit.MINUTES);
GuardedRunnable.guard(cleanUpHttpClients), 1, 1, TimeUnit.MINUTES);
}

@Override
Expand Down Expand Up @@ -119,7 +176,10 @@ public HttpResponse execute(HttpRequest req) {

try {
HttpTracing.inject(tracer, span, req);
HttpResponse res = loadSessionId(tracer, span, id).call().execute(req);
HttpResponse res;
try (UsageCountingReverseProxyHandler handler = loadSessionId(tracer, span, id).call()) {
res = handler.execute(req);
}

HTTP_RESPONSE.accept(span, res);

Expand Down Expand Up @@ -154,14 +214,33 @@ public HttpResponse execute(HttpRequest req) {
}
}

private Callable<HttpHandler> loadSessionId(Tracer tracer, Span span, SessionId id) {
private Callable<UsageCountingReverseProxyHandler> loadSessionId(
Tracer tracer, Span span, SessionId id) {
return span.wrap(
() -> {
Session session = sessions.get(id);
URL url = Urls.fromUri(session.getUri());
ClientConfig config = ClientConfig.defaultConfig().baseUrl(url).withRetries();
HttpClient client = httpClients.get(url, () -> httpClientFactory.createClient(config));
return new ReverseProxyHandler(tracer, client);
CacheEntry cacheEntry =
httpClients.compute(
sessions.getUri(id),
(sessionUri, entry) -> {
if (entry != null) {
entry.inUse.incrementAndGet();
return entry;
}

ClientConfig config =
ClientConfig.defaultConfig().baseUri(sessionUri).withRetries();
HttpClient httpClient = httpClientFactory.createClient(config);

return new CacheEntry(httpClient, 1);
});

try {
return new UsageCountingReverseProxyHandler(tracer, cacheEntry.httpClient, cacheEntry);
} catch (Throwable t) {
// ensure we do not keep the http client when an unexpected throwable is raised
cacheEntry.inUse.decrementAndGet();
throw t;
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

import static org.openqa.selenium.remote.http.HttpMethod.GET;

import java.net.URI;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.openqa.selenium.NoSuchSessionException;
import org.openqa.selenium.grid.data.Session;
import org.openqa.selenium.grid.sessionmap.SessionMap;
import org.openqa.selenium.remote.HttpSessionId;
import org.openqa.selenium.remote.SessionId;
Expand Down Expand Up @@ -62,10 +62,10 @@ public Optional<Consumer<Message>> apply(String uri, Consumer<Message> downstrea
}

try {
Session session = sessions.get(sessionId.get());
URI sessionUri = sessions.getUri(sessionId.get());

HttpClient client =
clientFactory.createClient(ClientConfig.defaultConfig().baseUri(session.getUri()));
clientFactory.createClient(ClientConfig.defaultConfig().baseUri(sessionUri));
WebSocket upstream =
client.openSocket(new HttpRequest(GET, uri), new ForwardingListener(downstream));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.openqa.selenium.remote.http.Contents.asJson;

import java.io.UncheckedIOException;
import java.util.Map;
import org.openqa.selenium.internal.Require;
import org.openqa.selenium.remote.SessionId;
import org.openqa.selenium.remote.http.HttpHandler;
Expand All @@ -37,6 +38,6 @@ class GetSessionUri implements HttpHandler {

@Override
public HttpResponse execute(HttpRequest req) throws UncheckedIOException {
return new HttpResponse().setContent(asJson(sessionMap.getUri(sessionId)));
return new HttpResponse().setContent(asJson(Map.of("value", sessionMap.getUri(sessionId))));
}
}

0 comments on commit 600a614

Please sign in to comment.