diff --git a/java/src/org/openqa/selenium/grid/router/HandleSession.java b/java/src/org/openqa/selenium/grid/router/HandleSession.java index 729196d362b99..12eebca1b59aa 100644 --- a/java/src/org/openqa/selenium/grid/router/HandleSession.java +++ b/java/src/org/openqa/selenium/grid/router/HandleSession.java @@ -26,22 +26,25 @@ import static org.openqa.selenium.remote.tracing.Tags.HTTP_REQUEST_EVENT; import static org.openqa.selenium.remote.tracing.Tags.HTTP_RESPONSE; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import java.net.URL; -import java.time.Duration; +import java.io.Closeable; +import java.net.URI; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Iterator; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; import org.openqa.selenium.NoSuchSessionException; import org.openqa.selenium.concurrent.GuardedRunnable; -import org.openqa.selenium.grid.data.Session; import org.openqa.selenium.grid.sessionmap.SessionMap; import org.openqa.selenium.grid.web.ReverseProxyHandler; import org.openqa.selenium.internal.Require; -import org.openqa.selenium.net.Urls; import org.openqa.selenium.remote.ErrorCodec; import org.openqa.selenium.remote.SessionId; import org.openqa.selenium.remote.http.ClientConfig; @@ -58,24 +61,78 @@ class HandleSession implements HttpHandler { + private static final Logger LOG = Logger.getLogger(HandleSession.class.getName()); + + private static class CacheEntry { + private final HttpClient httpClient; + private final AtomicLong inUse; + // volatile as the ConcurrentMap will not take care of synchronization + private volatile Instant lastUse; + + public CacheEntry(HttpClient httpClient, long initialUsage) { + this.httpClient = httpClient; + this.inUse = new AtomicLong(initialUsage); + this.lastUse = Instant.now(); + } + } + + private static class UsageCountingReverseProxyHandler extends ReverseProxyHandler + implements Closeable { + private final CacheEntry entry; + + public UsageCountingReverseProxyHandler( + Tracer tracer, HttpClient httpClient, CacheEntry entry) { + super(tracer, httpClient); + + this.entry = entry; + } + + @Override + public void close() { + // set the last use here, to ensure we have to calculate the real inactivity of the client + entry.lastUse = Instant.now(); + entry.inUse.decrementAndGet(); + } + } + private final Tracer tracer; private final HttpClient.Factory httpClientFactory; private final SessionMap sessions; - private final Cache httpClients; + private final ConcurrentMap httpClients; HandleSession(Tracer tracer, HttpClient.Factory httpClientFactory, SessionMap sessions) { this.tracer = Require.nonNull("Tracer", tracer); this.httpClientFactory = Require.nonNull("HTTP client factory", httpClientFactory); this.sessions = Require.nonNull("Sessions", sessions); - this.httpClients = - CacheBuilder.newBuilder() - // this timeout must be bigger than default connection + read timeout, to ensure we do - // not close HttpClients which might have requests waiting for responses - .expireAfterAccess(Duration.ofMinutes(4)) - .removalListener( - (RemovalListener) removal -> removal.getValue().close()) - .build(); + this.httpClients = new ConcurrentHashMap<>(); + + Runnable cleanUpHttpClients = + () -> { + Instant staleBefore = Instant.now().minus(2, ChronoUnit.MINUTES); + Iterator iterator = httpClients.values().iterator(); + + while (iterator.hasNext()) { + CacheEntry entry = iterator.next(); + + if (entry.inUse.get() != 0) { + // the client is currently in use + return; + } else if (!entry.lastUse.isBefore(staleBefore)) { + // the client was recently used + return; + } else { + // the client has not been used for a while, remove it from the cache + iterator.remove(); + + try { + entry.httpClient.close(); + } catch (Exception ex) { + LOG.log(Level.WARNING, "failed to close a stale httpclient", ex); + } + } + } + }; ScheduledExecutorService cleanUpHttpClientsCacheService = Executors.newSingleThreadScheduledExecutor( @@ -86,7 +143,7 @@ class HandleSession implements HttpHandler { return thread; }); cleanUpHttpClientsCacheService.scheduleAtFixedRate( - GuardedRunnable.guard(httpClients::cleanUp), 1, 1, TimeUnit.MINUTES); + GuardedRunnable.guard(cleanUpHttpClients), 1, 1, TimeUnit.MINUTES); } @Override @@ -119,7 +176,10 @@ public HttpResponse execute(HttpRequest req) { try { HttpTracing.inject(tracer, span, req); - HttpResponse res = loadSessionId(tracer, span, id).call().execute(req); + HttpResponse res; + try (UsageCountingReverseProxyHandler handler = loadSessionId(tracer, span, id).call()) { + res = handler.execute(req); + } HTTP_RESPONSE.accept(span, res); @@ -154,14 +214,33 @@ public HttpResponse execute(HttpRequest req) { } } - private Callable loadSessionId(Tracer tracer, Span span, SessionId id) { + private Callable loadSessionId( + Tracer tracer, Span span, SessionId id) { return span.wrap( () -> { - Session session = sessions.get(id); - URL url = Urls.fromUri(session.getUri()); - ClientConfig config = ClientConfig.defaultConfig().baseUrl(url).withRetries(); - HttpClient client = httpClients.get(url, () -> httpClientFactory.createClient(config)); - return new ReverseProxyHandler(tracer, client); + CacheEntry cacheEntry = + httpClients.compute( + sessions.getUri(id), + (sessionUri, entry) -> { + if (entry != null) { + entry.inUse.incrementAndGet(); + return entry; + } + + ClientConfig config = + ClientConfig.defaultConfig().baseUri(sessionUri).withRetries(); + HttpClient httpClient = httpClientFactory.createClient(config); + + return new CacheEntry(httpClient, 1); + }); + + try { + return new UsageCountingReverseProxyHandler(tracer, cacheEntry.httpClient, cacheEntry); + } catch (Throwable t) { + // ensure we do not keep the http client when an unexpected throwable is raised + cacheEntry.inUse.decrementAndGet(); + throw t; + } }); } } diff --git a/java/src/org/openqa/selenium/grid/router/ProxyWebsocketsIntoGrid.java b/java/src/org/openqa/selenium/grid/router/ProxyWebsocketsIntoGrid.java index 903c9e0f132ec..b39ac1602aedf 100644 --- a/java/src/org/openqa/selenium/grid/router/ProxyWebsocketsIntoGrid.java +++ b/java/src/org/openqa/selenium/grid/router/ProxyWebsocketsIntoGrid.java @@ -19,6 +19,7 @@ import static org.openqa.selenium.remote.http.HttpMethod.GET; +import java.net.URI; import java.util.Objects; import java.util.Optional; import java.util.function.BiFunction; @@ -26,7 +27,6 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.openqa.selenium.NoSuchSessionException; -import org.openqa.selenium.grid.data.Session; import org.openqa.selenium.grid.sessionmap.SessionMap; import org.openqa.selenium.remote.HttpSessionId; import org.openqa.selenium.remote.SessionId; @@ -62,10 +62,10 @@ public Optional> apply(String uri, Consumer downstrea } try { - Session session = sessions.get(sessionId.get()); + URI sessionUri = sessions.getUri(sessionId.get()); HttpClient client = - clientFactory.createClient(ClientConfig.defaultConfig().baseUri(session.getUri())); + clientFactory.createClient(ClientConfig.defaultConfig().baseUri(sessionUri)); WebSocket upstream = client.openSocket(new HttpRequest(GET, uri), new ForwardingListener(downstream)); diff --git a/java/src/org/openqa/selenium/grid/sessionmap/GetSessionUri.java b/java/src/org/openqa/selenium/grid/sessionmap/GetSessionUri.java index 62576dd3bfe5d..39ec88cc7024c 100644 --- a/java/src/org/openqa/selenium/grid/sessionmap/GetSessionUri.java +++ b/java/src/org/openqa/selenium/grid/sessionmap/GetSessionUri.java @@ -20,6 +20,7 @@ import static org.openqa.selenium.remote.http.Contents.asJson; import java.io.UncheckedIOException; +import java.util.Map; import org.openqa.selenium.internal.Require; import org.openqa.selenium.remote.SessionId; import org.openqa.selenium.remote.http.HttpHandler; @@ -37,6 +38,6 @@ class GetSessionUri implements HttpHandler { @Override public HttpResponse execute(HttpRequest req) throws UncheckedIOException { - return new HttpResponse().setContent(asJson(sessionMap.getUri(sessionId))); + return new HttpResponse().setContent(asJson(Map.of("value", sessionMap.getUri(sessionId)))); } }