From 11476a9f1ce1e50347dc859bcc57fc6d840c1023 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 26 Sep 2024 18:05:13 +0200 Subject: [PATCH] Fixes #7515 - Connection limit problem for "onAccepting" connections. * Fixed `ManagedSelector.Accept` to emit the event "accept failed" when closed. * Fixed `ConnectionLimit` to close connections that exceed the maximum (may happen when the connector is configured with acceptors=0). * Added test cases. * Added documentation. Signed-off-by: Simone Bordet --- .../server/http/HTTPServerDocs.java | 24 ++- .../pages/modules/standard.adoc | 16 ++ .../programming-guide/pages/server/http.adoc | 21 ++ .../org/eclipse/jetty/io/ManagedSelector.java | 32 +-- .../main/config/modules/connectionlimit.mod | 9 +- .../eclipse/jetty/server/ConnectionLimit.java | 87 +++++--- .../jetty/server/ConnectionLimitTest.java | 188 ++++++++++++++++++ .../jetty/server/NotAcceptingTest.java | 95 --------- 8 files changed, 323 insertions(+), 149 deletions(-) create mode 100644 jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectionLimitTest.java diff --git a/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/server/http/HTTPServerDocs.java b/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/server/http/HTTPServerDocs.java index a56a71fde498..2f6a0b2a66e9 100644 --- a/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/server/http/HTTPServerDocs.java +++ b/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/server/http/HTTPServerDocs.java @@ -35,7 +35,6 @@ import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.ee10.servlet.DefaultServlet; import org.eclipse.jetty.ee10.servlet.ResourceServlet; -import org.eclipse.jetty.ee10.servlet.ResourceServlet; import org.eclipse.jetty.ee10.servlet.ServletContextHandler; import org.eclipse.jetty.ee10.servlet.ServletHolder; import org.eclipse.jetty.ee10.webapp.WebAppContext; @@ -61,6 +60,7 @@ import org.eclipse.jetty.rewrite.handler.RewriteHandler; import org.eclipse.jetty.rewrite.handler.RewriteRegexRule; import org.eclipse.jetty.server.ConnectionFactory; +import org.eclipse.jetty.server.ConnectionLimit; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.CustomRequestLog; import org.eclipse.jetty.server.FormFields; @@ -353,6 +353,28 @@ public void onOpen(NetworkConnector connector) // end::sameRandomPort[] } + public void connectionLimit() + { + // tag::connectionLimit[] + Server server = new Server(); + + // Limit connections to the server, across all connectors. + ConnectionLimit serverConnectionLimit = new ConnectionLimit(1024, server); + server.addBean(serverConnectionLimit); + + ServerConnector connector1 = new ServerConnector(server); + connector1.setPort(8080); + server.addConnector(connector1); + + ServerConnector connector2 = new ServerConnector(server); + connector2.setPort(9090); + server.addConnector(connector2); + // Limit connections for this connector only. + ConnectionLimit connectorConnectionLimit = new ConnectionLimit(64, connector2); + connector2.addBean(connectorConnectionLimit); + // end::connectionLimit[] + } + public void sslHandshakeListener() throws Exception { // tag::sslHandshakeListener[] diff --git a/documentation/jetty/modules/operations-guide/pages/modules/standard.adoc b/documentation/jetty/modules/operations-guide/pages/modules/standard.adoc index 95c020f899c2..f70a516403e4 100644 --- a/documentation/jetty/modules/operations-guide/pages/modules/standard.adoc +++ b/documentation/jetty/modules/operations-guide/pages/modules/standard.adoc @@ -54,6 +54,22 @@ This property allows you to cap the max heap memory retained by the pool. `jetty.byteBufferPool.maxDirectMemory`:: This property allows you to cap the max direct memory retained by the pool. +[[connectionlimit]] +== Module `connectionlimit` + +The `connectionlimit` module limits the number of connections accepted by the server, across all connectors. + +Once the configured maximum number of connections is reached, Jetty will not accept more connections. +Existing, established connections will work normally. +When existing connections are closed, accepting new connections will be resumed. + +NOTE: The number of connections seen at the JVM level may be different from the number of connections seen at the OS level. +For more information, refer to xref:programming-guide:server/http.adoc#connector-limiting[this section]. + +The module file is `$JETTY_HOME/modules/connectionlimit.mod`: + +include::{jetty-home}/modules/connectionlimit.mod[tags=documentation] + [[console-capture]] == Module `console-capture` diff --git a/documentation/jetty/modules/programming-guide/pages/server/http.adoc b/documentation/jetty/modules/programming-guide/pages/server/http.adoc index ebf082631c35..dd79213fbfc0 100644 --- a/documentation/jetty/modules/programming-guide/pages/server/http.adoc +++ b/documentation/jetty/modules/programming-guide/pages/server/http.adoc @@ -394,6 +394,27 @@ For example: include::code:example$src/main/java/org/eclipse/jetty/docs/programming/server/http/HTTPServerDocs.java[tags=sameRandomPort] ---- +[[connector-limiting]] +=== Limiting Connections + +It is possible to limit the number of connections accepted by the whole server (and therefore across all connectors), or by a specific connector. + +This feature is implemented by class `org.eclipse.jetty.server.ConnectionLimit` and you can use it in this way: + +[,java,indent=0,options=nowrap] +---- +include::code:example$src/main/java/org/eclipse/jetty/docs/programming/server/http/HTTPServerDocs.java[tags=connectionLimit] +---- + +[NOTE] +==== +When the maximum number of connections is reached, no more connections will be accepted _at the JVM level_ -- but they could be accepted at the OS level. + +This means that if you are using OS tools (like Linux's `ss`) to count the number of established connections, you may find a number that may be greater than the maximum number of connections configured in a `ConnectionLimit`. + +Note also that different operative systems may behave differently when Jetty is not accepting connections: some OS accepts connections at the TCP level anyway (but does not notify this event to the JVM), some other OS may not accept connections at the TCP level. +==== + [[connector-protocol]] === Configuring Protocols diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 62eb7e24c5c0..9150169e04f3 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -18,6 +18,7 @@ import java.net.ConnectException; import java.net.SocketTimeoutException; import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; @@ -864,14 +865,6 @@ class Accept implements SelectorUpdate, Runnable, Closeable _selectorManager.onAccepting(channel); } - @Override - public void close() - { - if (LOG.isDebugEnabled()) - LOG.debug("closed accept of {}", channel); - IO.close(channel); - } - @Override public void update(Selector selector) { @@ -882,10 +875,9 @@ public void update(Selector selector) } catch (Throwable x) { - IO.close(channel); - _selectorManager.onAcceptFailed(channel, x); if (LOG.isDebugEnabled()) LOG.debug("Could not register channel after accept {}", channel, x); + failed(x); } } @@ -894,22 +886,28 @@ public void run() { try { - createEndPoint(channel, key); _selectorManager.onAccepted(channel); + createEndPoint(channel, key); } catch (Throwable x) { + if (LOG.isDebugEnabled()) + LOG.debug("Could not process accepted channel {}", channel, x); failed(x); } } - protected void failed(Throwable failure) + @Override + public void close() { - IO.close(channel); if (LOG.isDebugEnabled()) - LOG.warn("Could not accept {}", channel, failure); - else - LOG.warn("Could not accept {}: {}", channel, String.valueOf(failure)); + LOG.debug("Closed accept of {}", channel); + failed(new ClosedChannelException()); + } + + private void failed(Throwable failure) + { + IO.close(channel); _selectorManager.onAcceptFailed(channel, failure); } @@ -1028,6 +1026,8 @@ public void update(Selector selector) IO.close((Closeable)attachment); } _selector = null; + if (LOG.isDebugEnabled()) + LOG.debug("Closing {} on {}", selector, ManagedSelector.this); IO.close(selector); } finally diff --git a/jetty-core/jetty-server/src/main/config/modules/connectionlimit.mod b/jetty-core/jetty-server/src/main/config/modules/connectionlimit.mod index a6d489c0f805..86a6dd1f505d 100644 --- a/jetty-core/jetty-server/src/main/config/modules/connectionlimit.mod +++ b/jetty-core/jetty-server/src/main/config/modules/connectionlimit.mod @@ -1,7 +1,7 @@ # DO NOT EDIT THIS FILE - See: https://jetty.org/docs/ [description] -Enables a server-wide connection limit. +Enables a server-wide limit on TCP connections. [tags] connector @@ -13,9 +13,10 @@ server etc/jetty-connectionlimit.xml [ini-template] - -## The limit of connections to apply +#tag::documentation[] +## The maximum number of TCP connections allowed across all connectors. #jetty.connectionlimit.maxConnections=1000 -## The idle timeout to apply (in milliseconds) when connections are limited +## The idle timeout to apply (in milliseconds) to existing connections when the connection limit is reached. #jetty.connectionlimit.idleTimeout=1000 +#end::documentation[] diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/ConnectionLimit.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/ConnectionLimit.java index d7f1793d30e0..325ce89928be 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/ConnectionLimit.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/ConnectionLimit.java @@ -15,14 +15,13 @@ import java.nio.channels.SelectableChannel; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.Connection.Listener; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.SelectorManager; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.Name; @@ -36,18 +35,23 @@ *

A Listener that limits the number of Connections.

*

This listener applies a limit to the number of connections, which when * exceeded results in a call to {@link AbstractConnector#setAccepting(boolean)} - * to prevent further connections being received. It can be applied to an - * entire server or to a specific connector by adding it via {@link Container#addBean(Object)} + * to prevent further connections being received. + * This listener can be applied to an entire {@link Server} or to a specific + * {@link Connector} by adding it via {@link Container#addBean(Object)}. *

+ *

When the number of connections is exceeded, the idle timeout of existing + * connections is changed with the value configured in this listener (typically + * a shorter value).

*

* Usage: *

- *
+ * 
{@code
  *   Server server = new Server();
  *   server.addBean(new ConnectionLimit(5000,server));
  *   ...
  *   server.start();
- * 
+ * }
+ * * * @see LowResourceMonitor * @see Connection.Listener @@ -61,7 +65,7 @@ public class ConnectionLimit extends AbstractLifeCycle implements Listener, Sele private final AutoLock _lock = new AutoLock(); private final Server _server; private final List _connectors = new ArrayList<>(); - private final Set _accepting = new HashSet<>(); + private int _accepting; private int _connections; private int _maxConnections; private long _idleTimeout; @@ -81,12 +85,12 @@ public ConnectionLimit(@Name("maxConnections") int maxConnections, @Name("connec if (c instanceof AbstractConnector) _connectors.add((AbstractConnector)c); else - LOG.warn("Connector {} is not an AbstractConnection. Connections not limited", c); + LOG.warn("Connector {} is not an AbstractConnector: connections will not be limited", c); } } /** - * @return If >= 0, the endpoint idle timeout in ms to apply when the connection limit is reached + * @return the endpoint idle timeout in ms to apply when the connection limit is reached */ @ManagedAttribute("The endpoint idle timeout in ms to apply when the connection limit is reached") public long getIdleTimeout() @@ -95,7 +99,10 @@ public long getIdleTimeout() } /** - * @param idleTimeout If >= 0 the endpoint idle timeout in ms to apply when the connection limit is reached + *

Sets the endpoint idle timeout in ms to apply when the connection limit is reached.

+ *

A value less than or equal to zero will not change the existing idle timeout.

+ * + * @param idleTimeout the endpoint idle timeout in ms to apply when the connection limit is reached */ public void setIdleTimeout(long idleTimeout) { @@ -105,7 +112,7 @@ public void setIdleTimeout(long idleTimeout) @ManagedAttribute("The maximum number of connections allowed") public int getMaxConnections() { - try (AutoLock l = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { return _maxConnections; } @@ -113,25 +120,34 @@ public int getMaxConnections() public void setMaxConnections(int max) { - try (AutoLock l = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { _maxConnections = max; } } - @ManagedAttribute("The current number of connections ") + @ManagedAttribute(value = "The current number of connections", readonly = true) public int getConnections() { - try (AutoLock l = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { return _connections; } } + @ManagedAttribute(value = "The current number of pending connections", readonly = true) + public int getPendingConnections() + { + try (AutoLock ignored = _lock.lock()) + { + return _accepting; + } + } + @Override protected void doStart() throws Exception { - try (AutoLock l = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { if (_server != null) { @@ -144,7 +160,7 @@ protected void doStart() throws Exception } } if (LOG.isDebugEnabled()) - LOG.debug("ConnectionLimit {} for {}", _maxConnections, _connectors); + LOG.debug("Connection limit {} for {}", _maxConnections, _connectors); _connections = 0; _limiting = false; for (AbstractConnector c : _connectors) @@ -157,7 +173,7 @@ protected void doStart() throws Exception @Override protected void doStop() throws Exception { - try (AutoLock l = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { for (AbstractConnector c : _connectors) { @@ -169,25 +185,29 @@ protected void doStop() throws Exception } } - protected void check() + private boolean check() { - if ((_accepting.size() + _connections) >= _maxConnections) + assert _lock.isHeldByCurrentThread(); + int total = _accepting + _connections; + if (total >= _maxConnections) { if (!_limiting) { _limiting = true; - LOG.info("Connection Limit({}) reached for {}", _maxConnections, _connectors); + LOG.info("Connection limit {} reached for {}", _maxConnections, _connectors); limit(); } + return total > _maxConnections; } else { if (_limiting) { _limiting = false; - LOG.info("Connection Limit({}) cleared for {}", _maxConnections, _connectors); + LOG.info("Connection limit {} cleared for {}", _maxConnections, _connectors); unlimit(); } + return false; } } @@ -226,23 +246,24 @@ protected void unlimit() @Override public void onAccepting(SelectableChannel channel) { - try (AutoLock l = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { - _accepting.add(channel); + _accepting++; if (LOG.isDebugEnabled()) - LOG.debug("onAccepting ({}+{}) < {} {}", _accepting.size(), _connections, _maxConnections, channel); - check(); + LOG.debug("Accepting ({}+{}) <= {} {}", _accepting, _connections, _maxConnections, channel); + if (check()) + IO.close(channel); } } @Override public void onAcceptFailed(SelectableChannel channel, Throwable cause) { - try (AutoLock l = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { - _accepting.remove(channel); + _accepting--; if (LOG.isDebugEnabled()) - LOG.debug("onAcceptFailed ({}+{}) < {} {} {}", _accepting.size(), _connections, _maxConnections, channel, cause); + LOG.debug("Accept failed ({}+{}) <= {} {}", _accepting, _connections, _maxConnections, channel, cause); check(); } } @@ -255,12 +276,12 @@ public void onAccepted(SelectableChannel channel) @Override public void onOpened(Connection connection) { - try (AutoLock l = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { - _accepting.remove(connection.getEndPoint().getTransport()); + _accepting--; _connections++; if (LOG.isDebugEnabled()) - LOG.debug("onOpened ({}+{}) < {} {}", _accepting.size(), _connections, _maxConnections, connection); + LOG.debug("Opened ({}+{}) <= {} {}", _accepting, _connections, _maxConnections, connection); check(); } } @@ -268,11 +289,11 @@ public void onOpened(Connection connection) @Override public void onClosed(Connection connection) { - try (AutoLock l = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { _connections--; if (LOG.isDebugEnabled()) - LOG.debug("onClosed ({}+{}) < {} {}", _accepting.size(), _connections, _maxConnections, connection); + LOG.debug("Closed ({}+{}) <= {} {}", _accepting, _connections, _maxConnections, connection); check(); } } diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectionLimitTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectionLimitTest.java new file mode 100644 index 000000000000..769cfdfb100a --- /dev/null +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectionLimitTest.java @@ -0,0 +1,188 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class ConnectionLimitTest +{ + private Server server; + private ServerConnector connector; + + private void prepare(int acceptors, Handler handler) + { + if (server == null) + server = new Server(); + connector = new ServerConnector(server, acceptors, 1); + server.addConnector(connector); + server.setHandler(handler); + } + + @AfterEach + public void dispose() + { + LifeCycle.stop(server); + } + + @ParameterizedTest + @ValueSource(ints = {0, 1}) + public void testConnectionLimitWithConnector(int acceptors) throws Exception + { + prepare(acceptors, new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + callback.succeeded(); + return true; + } + }); + int maxConnections = 2; + ConnectionLimit limiter = new ConnectionLimit(maxConnections, connector); + connector.addBean(limiter); + server.start(); + + List channels = new ArrayList<>(); + for (int i = 0; i < maxConnections; ++i) + { + SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())); + channels.add(channel); + } + // On the client connections may be accepted, but on server not yet. + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(maxConnections, limiter.getConnections())); + // The limit was reached. + assertFalse(connector.isAccepting()); + + // An extra connection is accepted at the TCP level, but not notified to the JVM yet: + // it remains in the connector accept queue, which cannot be configured to be zero. + List extraChannels = new ArrayList<>(); + for (int i = 0; i < 2; ++i) + { + SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())); + extraChannels.add(channel); + } + await().during(1, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(maxConnections, limiter.getConnections())); + + // Closing one existing connection may accept + // all the extra connections when acceptors=0. + channels.remove(0).close(); + // Verify that we are still correctly limited + // and that we have accepted a pending connection. + await().atMost(5, TimeUnit.SECONDS).during(1, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(maxConnections, limiter.getConnections())); + + extraChannels.forEach(IO::close); + channels.forEach(IO::close); + } + + @Test + public void testConnectionLimitWithServer() throws Exception + { + prepare(1, new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + callback.succeeded(); + return true; + } + }); + ServerConnector connector2 = new ServerConnector(server, 0, 1); + server.addConnector(connector2); + int maxConnections = 2; + ConnectionLimit limiter = new ConnectionLimit(maxConnections, server); + server.addBean(limiter); + server.start(); + + // Max out the connections. + SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())); + SocketChannel.open(new InetSocketAddress("localhost", connector2.getLocalPort())); + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(maxConnections, limiter.getConnections())); + + // Try to create more, should not be possible. + SocketChannel extraChannel1 = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())); + await().during(1, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(maxConnections, limiter.getConnections())); + SocketChannel extraChannel2 = SocketChannel.open(new InetSocketAddress("localhost", connector2.getLocalPort())); + await().during(1, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(maxConnections, limiter.getConnections())); + + extraChannel2.close(); + extraChannel1.close(); + } + + @Test + public void testAcceptRejectedByExecutor() throws Exception + { + // One acceptor, one selector, one application. + int maxThreads = 3; + int maxQueue = 1; + QueuedThreadPool serverThreads = new QueuedThreadPool(maxThreads, 0, new ArrayBlockingQueue<>(maxQueue)); + serverThreads.setReservedThreads(0); + serverThreads.setDetailedDump(true); + server = new Server(serverThreads); + prepare(1, new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + callback.succeeded(); + return true; + } + }); + int maxConnections = 2; + ConnectionLimit limiter = new ConnectionLimit(maxConnections, connector); + connector.addBean(limiter); + server.start(); + + // Block the last thread. + CompletableFuture blocker = new CompletableFuture<>(); + serverThreads.execute(blocker::join); + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(maxThreads, serverThreads.getThreads())); + + // Fill the thread pool queue. + IntStream.range(0, maxQueue).forEach(i -> serverThreads.execute(() -> + { + })); + + // Try to connect, the accept task should be rejected. + try (SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort()))) + { + ByteBuffer byteBuffer = ByteBuffer.allocate(16); + assertEquals(-1, channel.read(byteBuffer)); + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(0, limiter.getPendingConnections())); + } + + // Release the blocked task. + blocker.complete(null); + } +} diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/NotAcceptingTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/NotAcceptingTest.java index d7242ef30b06..e3659921961a 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/NotAcceptingTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/NotAcceptingTest.java @@ -17,21 +17,16 @@ import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.Supplier; import org.eclipse.jetty.http.HttpTester; import org.eclipse.jetty.server.LocalConnector.LocalEndPoint; import org.eclipse.jetty.server.handler.HelloHandler; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.NanoTime; -import org.hamcrest.Matcher; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -40,7 +35,6 @@ @Disabled // TODO public class NotAcceptingTest { - private static final Logger LOG = LoggerFactory.getLogger(NotAcceptingTest.class); private final long idleTimeout = 2000; Server server; LocalConnector localConnector; @@ -362,93 +356,4 @@ public void testAcceptRateLimit() throws Exception assertThat(blockingConnector.isAccepting(), is(true)); assertThat(asyncConnector.isAccepting(), is(true)); } - - @Test - public void testConnectionLimit() throws Exception - { - server.addBean(new ConnectionLimit(9, server)); - server.setHandler(new HelloHandler()); - - server.start(); - - LOG.debug("CONNECT:"); - try ( - LocalEndPoint local0 = localConnector.connect(); - LocalEndPoint local1 = localConnector.connect(); - LocalEndPoint local2 = localConnector.connect(); - Socket blocking0 = new Socket("localhost", blockingConnector.getLocalPort()); - Socket blocking1 = new Socket("localhost", blockingConnector.getLocalPort()); - Socket blocking2 = new Socket("localhost", blockingConnector.getLocalPort()); - Socket async0 = new Socket("localhost", asyncConnector.getLocalPort()); - Socket async1 = new Socket("localhost", asyncConnector.getLocalPort()); - Socket async2 = new Socket("localhost", asyncConnector.getLocalPort()); - ) - { - String expectedContent = "Hello" + System.lineSeparator(); - - LOG.debug("LOCAL:"); - for (LocalEndPoint client : new LocalEndPoint[]{local0, local1, local2}) - { - client.addInputAndExecute(BufferUtil.toBuffer("GET /test HTTP/1.1\r\nHost:localhost\r\n\r\n")); - HttpTester.Response response = HttpTester.parseResponse(client.getResponse()); - assertThat(response.getStatus(), is(200)); - assertThat(response.getContent(), is(expectedContent)); - } - - LOG.debug("NETWORK:"); - for (Socket client : new Socket[]{blocking0, blocking1, blocking2, async0, async1, async2}) - { - HttpTester.Input in = HttpTester.from(client.getInputStream()); - client.getOutputStream().write("GET /test HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes()); - HttpTester.Response response = HttpTester.parseResponse(in); - assertThat(response.getStatus(), is(200)); - assertThat(response.getContent(), is(expectedContent)); - } - - assertThat(localConnector.isAccepting(), is(false)); - assertThat(blockingConnector.isAccepting(), is(false)); - assertThat(asyncConnector.isAccepting(), is(false)); - - { - // Close a async connection - HttpTester.Input in = HttpTester.from(async1.getInputStream()); - async1.getOutputStream().write("GET /test HTTP/1.1\r\nHost:localhost\r\nConnection: close\r\n\r\n".getBytes()); - HttpTester.Response response = HttpTester.parseResponse(in); - assertThat(response.getStatus(), is(200)); - assertThat(response.getContent(), is(expectedContent)); - } - } - - waitFor(localConnector::isAccepting, is(true), 2 * idleTimeout, TimeUnit.MILLISECONDS); - waitFor(blockingConnector::isAccepting, is(true), 2 * idleTimeout, TimeUnit.MILLISECONDS); - waitFor(asyncConnector::isAccepting, is(true), 2 * idleTimeout, TimeUnit.MILLISECONDS); - } - - public static void waitFor(Supplier value, Matcher matcher, long wait, TimeUnit units) - { - long start = NanoTime.now(); - - while (true) - { - try - { - matcher.matches(value.get()); - return; - } - catch (Throwable e) - { - if (NanoTime.since(start) > units.toNanos(wait)) - throw e; - } - - try - { - TimeUnit.MILLISECONDS.sleep(50); - } - catch (InterruptedException e) - { - // no op - } - } - } }