diff --git a/java/src/org/openqa/selenium/grid/node/Node.java b/java/src/org/openqa/selenium/grid/node/Node.java index 09fe7d02ae9f5..bc2b5c75b5ab7 100644 --- a/java/src/org/openqa/selenium/grid/node/Node.java +++ b/java/src/org/openqa/selenium/grid/node/Node.java @@ -101,6 +101,12 @@ * by {@code sessionId}. This returns a boolean. * * + * POST + * /se/grid/node/connection/{sessionId} + * Allows the node to be ask about whether or not new websocket connections are allowed for the {@link Session} + * identified by {@code sessionId}. This returns a boolean. + * + * * * * /session/{sessionId}/* * The request is forwarded to the {@link Session} identified by {@code sessionId}. When the @@ -172,6 +178,9 @@ protected Node( get("/se/grid/node/owner/{sessionId}") .to(params -> new IsSessionOwner(this, sessionIdFrom(params))) .with(spanDecorator("node.is_session_owner").andThen(requiresSecret)), + post("/se/grid/node/connection/{sessionId}") + .to(params -> new TryAcquireConnection(this, sessionIdFrom(params))) + .with(spanDecorator("node.is_session_owner").andThen(requiresSecret)), delete("/se/grid/node/session/{sessionId}") .to(params -> new StopNodeSession(this, sessionIdFrom(params))) .with(spanDecorator("node.stop_session").andThen(requiresSecret)), @@ -244,6 +253,8 @@ public TemporaryFilesystem getDownloadsFilesystem(UUID uuid) throws IOException public abstract boolean isSessionOwner(SessionId id); + public abstract boolean tryAcquireConnection(SessionId id); + public abstract boolean isSupporting(Capabilities capabilities); public abstract NodeStatus getStatus(); diff --git a/java/src/org/openqa/selenium/grid/node/ProxyNodeWebsockets.java b/java/src/org/openqa/selenium/grid/node/ProxyNodeWebsockets.java index e3f656c069125..eff13dc5a40f5 100644 --- a/java/src/org/openqa/selenium/grid/node/ProxyNodeWebsockets.java +++ b/java/src/org/openqa/selenium/grid/node/ProxyNodeWebsockets.java @@ -94,6 +94,13 @@ public Optional> apply(String uri, Consumer downstrea return Optional.empty(); } + // ensure one session does not open to many connections, this might have a negative impact on + // the grid health + if (!node.tryAcquireConnection(id)) { + LOG.warning("Too many websocket connections initiated by " + id); + return Optional.empty(); + } + Session session = node.getSession(id); Capabilities caps = session.getCapabilities(); LOG.fine("Scanning for endpoint: " + caps); diff --git a/java/src/org/openqa/selenium/grid/node/TryAcquireConnection.java b/java/src/org/openqa/selenium/grid/node/TryAcquireConnection.java new file mode 100644 index 0000000000000..6c8822bea84cd --- /dev/null +++ b/java/src/org/openqa/selenium/grid/node/TryAcquireConnection.java @@ -0,0 +1,45 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.node; + +import static org.openqa.selenium.remote.http.Contents.asJson; + +import com.google.common.collect.ImmutableMap; +import java.io.UncheckedIOException; +import org.openqa.selenium.internal.Require; +import org.openqa.selenium.remote.SessionId; +import org.openqa.selenium.remote.http.HttpHandler; +import org.openqa.selenium.remote.http.HttpRequest; +import org.openqa.selenium.remote.http.HttpResponse; + +class TryAcquireConnection implements HttpHandler { + + private final Node node; + private final SessionId id; + + TryAcquireConnection(Node node, SessionId id) { + this.node = Require.nonNull("Node", node); + this.id = Require.nonNull("Session id", id); + } + + @Override + public HttpResponse execute(HttpRequest req) throws UncheckedIOException { + return new HttpResponse() + .setContent(asJson(ImmutableMap.of("value", node.tryAcquireConnection(id)))); + } +} diff --git a/java/src/org/openqa/selenium/grid/node/config/NodeFlags.java b/java/src/org/openqa/selenium/grid/node/config/NodeFlags.java index 800a0798a4e17..b56e57b3dcb97 100644 --- a/java/src/org/openqa/selenium/grid/node/config/NodeFlags.java +++ b/java/src/org/openqa/selenium/grid/node/config/NodeFlags.java @@ -18,6 +18,7 @@ package org.openqa.selenium.grid.node.config; import static org.openqa.selenium.grid.config.StandardGridRoles.NODE_ROLE; +import static org.openqa.selenium.grid.node.config.NodeOptions.DEFAULT_CONNECTION_LIMIT; import static org.openqa.selenium.grid.node.config.NodeOptions.DEFAULT_DETECT_DRIVERS; import static org.openqa.selenium.grid.node.config.NodeOptions.DEFAULT_DRAIN_AFTER_SESSION_COUNT; import static org.openqa.selenium.grid.node.config.NodeOptions.DEFAULT_ENABLE_BIDI; @@ -77,6 +78,14 @@ public class NodeFlags implements HasRoles { @ConfigValue(section = NODE_SECTION, name = "session-timeout", example = "60") public int sessionTimeout = DEFAULT_SESSION_TIMEOUT; + @Parameter( + names = {"--connection-limit-per-session"}, + description = + "Let X be the maximum number of websocket connections per session.This will ensure one" + + " session is not able to exhaust the connection limit of the host") + @ConfigValue(section = NODE_SECTION, name = "connection-limit-per-session", example = "8") + public int connectionLimitPerSession = DEFAULT_CONNECTION_LIMIT; + @Parameter( names = {"--detect-drivers"}, arity = 1, diff --git a/java/src/org/openqa/selenium/grid/node/config/NodeOptions.java b/java/src/org/openqa/selenium/grid/node/config/NodeOptions.java index 7317f6e7c8870..ff8fc6d76667a 100644 --- a/java/src/org/openqa/selenium/grid/node/config/NodeOptions.java +++ b/java/src/org/openqa/selenium/grid/node/config/NodeOptions.java @@ -73,6 +73,7 @@ public class NodeOptions { public static final int DEFAULT_HEARTBEAT_PERIOD = 60; public static final int DEFAULT_SESSION_TIMEOUT = 300; public static final int DEFAULT_DRAIN_AFTER_SESSION_COUNT = 0; + public static final int DEFAULT_CONNECTION_LIMIT = 10; public static final boolean DEFAULT_ENABLE_CDP = true; public static final boolean DEFAULT_ENABLE_BIDI = true; static final String NODE_SECTION = "node"; @@ -262,6 +263,15 @@ public int getMaxSessions() { return Math.min(maxSessions, DEFAULT_MAX_SESSIONS); } + public int getConnectionLimitPerSession() { + int connectionLimit = + config + .getInt(NODE_SECTION, "connection-limit-per-session") + .orElse(DEFAULT_CONNECTION_LIMIT); + Require.positive("Session connection limit", connectionLimit); + return connectionLimit; + } + public Duration getSessionTimeout() { // If the user sets 10s or less, we default to 10s. int seconds = diff --git a/java/src/org/openqa/selenium/grid/node/k8s/OneShotNode.java b/java/src/org/openqa/selenium/grid/node/k8s/OneShotNode.java index d293d1c6ba78c..af8c05cf7a7c1 100644 --- a/java/src/org/openqa/selenium/grid/node/k8s/OneShotNode.java +++ b/java/src/org/openqa/selenium/grid/node/k8s/OneShotNode.java @@ -34,6 +34,7 @@ import java.util.Optional; import java.util.ServiceLoader; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import java.util.stream.StreamSupport; import org.openqa.selenium.Capabilities; @@ -98,6 +99,8 @@ public class OneShotNode extends Node { private final Duration heartbeatPeriod; private final URI gridUri; private final UUID slotId = UUID.randomUUID(); + private final int connectionLimitPerSession; + private final AtomicInteger connectionCounter = new AtomicInteger(); private RemoteWebDriver driver; private SessionId sessionId; private HttpClient client; @@ -114,7 +117,8 @@ private OneShotNode( URI uri, URI gridUri, Capabilities stereotype, - WebDriverInfo driverInfo) { + WebDriverInfo driverInfo, + int connectionLimitPerSession) { super(tracer, id, uri, registrationSecret, Require.positive(sessionTimeout)); this.heartbeatPeriod = heartbeatPeriod; @@ -122,6 +126,7 @@ private OneShotNode( this.gridUri = Require.nonNull("Public Grid URI", gridUri); this.stereotype = ImmutableCapabilities.copyOf(Require.nonNull("Stereotype", stereotype)); this.driverInfo = Require.nonNull("Driver info", driverInfo); + this.connectionLimitPerSession = connectionLimitPerSession; new JMXHelper().register(this); } @@ -177,7 +182,8 @@ public static Node create(Config config) { .getPublicGridUri() .orElseThrow(() -> new ConfigException("Unable to determine public grid address")), stereotype, - driverInfo); + driverInfo, + nodeOptions.getConnectionLimitPerSession()); } @Override @@ -357,6 +363,11 @@ public boolean isSessionOwner(SessionId id) { return driver != null && sessionId.equals(id); } + @Override + public boolean tryAcquireConnection(SessionId id) { + return sessionId.equals(id) && connectionLimitPerSession > connectionCounter.getAndIncrement(); + } + @Override public boolean isSupporting(Capabilities capabilities) { return driverInfo.isSupporting(capabilities); diff --git a/java/src/org/openqa/selenium/grid/node/local/LocalNode.java b/java/src/org/openqa/selenium/grid/node/local/LocalNode.java index 7304db8a87847..b42c557c91008 100644 --- a/java/src/org/openqa/selenium/grid/node/local/LocalNode.java +++ b/java/src/org/openqa/selenium/grid/node/local/LocalNode.java @@ -59,6 +59,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -127,6 +128,7 @@ public class LocalNode extends Node { private final int configuredSessionCount; private final boolean cdpEnabled; private final boolean managedDownloadsEnabled; + private final int connectionLimitPerSession; private final boolean bidiEnabled; private final AtomicBoolean drainAfterSessions = new AtomicBoolean(); @@ -153,7 +155,8 @@ protected LocalNode( Duration heartbeatPeriod, List factories, Secret registrationSecret, - boolean managedDownloadsEnabled) { + boolean managedDownloadsEnabled, + int connectionLimitPerSession) { super( tracer, new NodeId(UUID.randomUUID()), @@ -176,6 +179,7 @@ protected LocalNode( this.cdpEnabled = cdpEnabled; this.bidiEnabled = bidiEnabled; this.managedDownloadsEnabled = managedDownloadsEnabled; + this.connectionLimitPerSession = connectionLimitPerSession; this.healthCheck = healthCheck == null @@ -579,6 +583,24 @@ public boolean isSessionOwner(SessionId id) { return currentSessions.getIfPresent(id) != null; } + @Override + public boolean tryAcquireConnection(SessionId id) throws NoSuchSessionException { + SessionSlot slot = currentSessions.getIfPresent(id); + + if (slot == null) { + return false; + } + + if (connectionLimitPerSession == -1) { + // no limit + return true; + } + + AtomicLong counter = slot.getConnectionCounter(); + + return connectionLimitPerSession > counter.getAndIncrement(); + } + @Override public Session getSession(SessionId id) throws NoSuchSessionException { Require.nonNull("Session ID", id); @@ -987,6 +1009,7 @@ public static class Builder { private HealthCheck healthCheck; private Duration heartbeatPeriod = Duration.ofSeconds(NodeOptions.DEFAULT_HEARTBEAT_PERIOD); private boolean managedDownloadsEnabled = false; + private int connectionLimitPerSession = -1; private Builder(Tracer tracer, EventBus bus, URI uri, URI gridUri, Secret registrationSecret) { this.tracer = Require.nonNull("Tracer", tracer); @@ -1041,6 +1064,11 @@ public Builder enableManagedDownloads(boolean enable) { return this; } + public Builder connectionLimitPerSession(int connectionLimitPerSession) { + this.connectionLimitPerSession = connectionLimitPerSession; + return this; + } + public LocalNode build() { return new LocalNode( tracer, @@ -1057,7 +1085,8 @@ public LocalNode build() { heartbeatPeriod, factories.build(), registrationSecret, - managedDownloadsEnabled); + managedDownloadsEnabled, + connectionLimitPerSession); } public Advanced advanced() { diff --git a/java/src/org/openqa/selenium/grid/node/local/LocalNodeFactory.java b/java/src/org/openqa/selenium/grid/node/local/LocalNodeFactory.java index 4224b2483f9db..600f516b02992 100644 --- a/java/src/org/openqa/selenium/grid/node/local/LocalNodeFactory.java +++ b/java/src/org/openqa/selenium/grid/node/local/LocalNodeFactory.java @@ -70,7 +70,8 @@ public static Node create(Config config) { .enableCdp(nodeOptions.isCdpEnabled()) .enableBiDi(nodeOptions.isBiDiEnabled()) .enableManagedDownloads(nodeOptions.isManagedDownloadsEnabled()) - .heartbeatPeriod(nodeOptions.getHeartbeatPeriod()); + .heartbeatPeriod(nodeOptions.getHeartbeatPeriod()) + .connectionLimitPerSession(nodeOptions.getConnectionLimitPerSession()); List> builders = new ArrayList<>(); ServiceLoader.load(DriverService.Builder.class).forEach(builders::add); diff --git a/java/src/org/openqa/selenium/grid/node/local/SessionSlot.java b/java/src/org/openqa/selenium/grid/node/local/SessionSlot.java index 5b84accc84c31..3c51b785c13c0 100644 --- a/java/src/org/openqa/selenium/grid/node/local/SessionSlot.java +++ b/java/src/org/openqa/selenium/grid/node/local/SessionSlot.java @@ -21,6 +21,7 @@ import java.util.ServiceLoader; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Predicate; import java.util.logging.Level; @@ -59,6 +60,7 @@ public class SessionSlot private final AtomicBoolean reserved = new AtomicBoolean(false); private final boolean supportingCdp; private final boolean supportingBiDi; + private final AtomicLong connectionCounter; private ActiveSession currentSession; public SessionSlot(EventBus bus, Capabilities stereotype, SessionFactory factory) { @@ -68,6 +70,7 @@ public SessionSlot(EventBus bus, Capabilities stereotype, SessionFactory factory this.factory = Require.nonNull("Session factory", factory); this.supportingCdp = isSlotSupportingCdp(this.stereotype); this.supportingBiDi = isSlotSupportingBiDi(this.stereotype); + this.connectionCounter = new AtomicLong(); } public UUID getId() { @@ -112,6 +115,7 @@ public void stop() { LOG.log(Level.WARNING, "Unable to cleanly close session", e); } currentSession = null; + connectionCounter.set(0); release(); bus.fire(new SessionClosedEvent(id)); LOG.info(String.format("Stopping session %s", id)); @@ -148,6 +152,7 @@ public Either apply(CreateSessionRequest sess if (possibleSession.isRight()) { ActiveSession session = possibleSession.right(); currentSession = session; + connectionCounter.set(0); return Either.right(session); } else { return Either.left(possibleSession.left()); @@ -185,4 +190,8 @@ public boolean hasRelayFactory() { public boolean isRelayServiceUp() { return hasRelayFactory() && ((RelaySessionFactory) factory).isServiceUp(); } + + public AtomicLong getConnectionCounter() { + return connectionCounter; + } } diff --git a/java/src/org/openqa/selenium/grid/node/remote/RemoteNode.java b/java/src/org/openqa/selenium/grid/node/remote/RemoteNode.java index ae7cc8e1af9fb..5df5da5969c42 100644 --- a/java/src/org/openqa/selenium/grid/node/remote/RemoteNode.java +++ b/java/src/org/openqa/selenium/grid/node/remote/RemoteNode.java @@ -174,6 +174,18 @@ public boolean isSessionOwner(SessionId id) { return Boolean.TRUE.equals(Values.get(res, Boolean.class)); } + @Override + public boolean tryAcquireConnection(SessionId id) { + Require.nonNull("Session ID", id); + + HttpRequest req = new HttpRequest(POST, "/se/grid/node/connection/" + id); + HttpTracing.inject(tracer, tracer.getCurrentContext(), req); + + HttpResponse res = client.with(addSecret).execute(req); + + return Boolean.TRUE.equals(Values.get(res, Boolean.class)); + } + @Override public Session getSession(SessionId id) throws NoSuchSessionException { Require.nonNull("Session ID", id); diff --git a/java/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java b/java/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java index 0649e1bbee235..1485d04fca4c6 100644 --- a/java/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java +++ b/java/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java @@ -445,6 +445,11 @@ public boolean isSessionOwner(SessionId id) { return running != null && running.getId().equals(id); } + @Override + public boolean tryAcquireConnection(SessionId id) { + return false; + } + @Override public boolean isSupporting(Capabilities capabilities) { return Objects.equals("cake", capabilities.getCapability("cheese"));