diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index cecb94041075..dbbba13e7429 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -44,13 +44,13 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable * The bottom 32 bits represent the total connections and the top 32 bits represent the pending connections. */ private final AtomicBiInteger connections = new AtomicBiInteger(); - private final Destination destination; + private final HttpDestination destination; private final int maxConnections; private final Callback requester; protected AbstractConnectionPool(Destination destination, int maxConnections, Callback requester) { - this.destination = destination; + this.destination = (HttpDestination)destination; this.maxConnections = maxConnections; this.requester = requester; } @@ -102,7 +102,7 @@ public Connection acquire() Connection connection = activate(); if (connection == null) { - tryCreate(-1); + tryCreate(destination.getQueuedRequestCount()); connection = activate(); } return connection; diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java index f9a635dfe79f..df868749dba2 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java @@ -64,7 +64,11 @@ public Connection acquire() Connection connection = activate(); if (connection == null) { - int maxPending = 1 + destination.getQueuedRequestCount() / getMaxMultiplex(); + int queuedRequests = destination.getQueuedRequestCount(); + int maxMultiplex = getMaxMultiplex(); + int maxPending = queuedRequests / maxMultiplex; + if (maxPending * maxMultiplex != queuedRequests) + ++maxPending; tryCreate(maxPending); connection = activate(); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/BufferingResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/BufferingResponseListener.java index 71a60a3af346..6f633aecd294 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/BufferingResponseListener.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/BufferingResponseListener.java @@ -119,9 +119,10 @@ public void onContent(Response response, ByteBuffer content) int length = content.remaining(); if (length > BufferUtil.space(buffer)) { - int requiredCapacity = buffer == null ? length : buffer.capacity() + length; - if (requiredCapacity > maxLength) + int remaining = buffer == null ? 0 : buffer.remaining(); + if (remaining + length > maxLength) response.abort(new IllegalArgumentException("Buffering capacity " + maxLength + " exceeded")); + int requiredCapacity = buffer == null ? length : buffer.capacity() + length; int newCapacity = Math.min(Integer.highestOneBit(requiredCapacity) << 1, maxLength); buffer = BufferUtil.ensureCapacity(buffer, newCapacity); } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java index 81fce582293e..4bf8b39d4dd2 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java @@ -19,7 +19,7 @@ package org.eclipse.jetty.client; import java.io.IOException; -import java.util.ArrayList; +import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -27,11 +27,11 @@ import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import java.util.stream.Stream; -import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; import org.eclipse.jetty.client.util.BytesContentProvider; @@ -43,53 +43,53 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.SocketAddressResolver; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -@Disabled // Disabled by @gregw on issue #2540 - commit 621b946b10884e7308eacca241dcf8b5d6f6cff2 public class ConnectionPoolTest { private Server server; private ServerConnector connector; private HttpClient client; - public static Stream pools() + public static Stream pools() { - List pools = new ArrayList<>(); - pools.add(new Object[]{ - DuplexConnectionPool.class, - (ConnectionPool.Factory) - destination -> new DuplexConnectionPool(destination, 8, destination) - }); - pools.add(new Object[]{ - RoundRobinConnectionPool.class, - (ConnectionPool.Factory) - destination -> new RoundRobinConnectionPool(destination, 8, destination) - }); - return pools.stream().map(Arguments::of); + return Stream.of( + new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, 8, destination)), + new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, 8, destination)), + new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, 8, destination, 1)) + ); } - private void start(final ConnectionPool.Factory factory, Handler handler) throws Exception + private void start(ConnectionPool.Factory factory, Handler handler) throws Exception { - server = new Server(); - connector = new ServerConnector(server); - server.addConnector(connector); - server.setHandler(handler); + startServer(handler); + startClient(factory); + } + private void startClient(ConnectionPool.Factory factory) throws Exception + { HttpClientTransport transport = new HttpClientTransportOverHTTP(1); transport.setConnectionPoolFactory(factory); - server.start(); - client = new HttpClient(transport, null); client.start(); } + private void startServer(Handler handler) throws Exception + { + server = new Server(); + connector = new ServerConnector(server); + server.addConnector(connector); + server.setHandler(handler); + server.start(); + } + @AfterEach public void disposeServer() throws Exception { @@ -111,14 +111,14 @@ public void disposeClient() throws Exception } } - @ParameterizedTest(name = "[{index}] {0}") + @ParameterizedTest @MethodSource("pools") - public void test(Class connectionPoolClass, ConnectionPool.Factory factory) throws Exception + public void test(ConnectionPoolFactory factory) throws Exception { - start(factory, new EmptyServerHandler() + start(factory.factory, new EmptyServerHandler() { @Override - protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException { switch (HttpMethod.fromString(request.getMethod())) { @@ -233,4 +233,74 @@ else if (serverClose) failures.add(x); } } + + @ParameterizedTest + @MethodSource("pools") + public void testQueuedRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception + { + start(factory.factory, new EmptyServerHandler()); + + long delay = 1000; + client.setSocketAddressResolver(new SocketAddressResolver.Sync() + { + @Override + public void resolve(String host, int port, Promise> promise) + { + client.getExecutor().execute(() -> + { + try + { + Thread.sleep(delay); + super.resolve(host, port, promise); + } + catch (InterruptedException x) + { + promise.failed(x); + } + }); + } + }); + + CountDownLatch latch = new CountDownLatch(2); + client.newRequest("localhost", connector.getLocalPort()) + .path("/one") + .send(result -> + { + if (result.isSucceeded()) + latch.countDown(); + }); + Thread.sleep(delay / 2); + client.newRequest("localhost", connector.getLocalPort()) + .path("/two") + .send(result -> + { + if (result.isSucceeded()) + latch.countDown(); + }); + + assertTrue(latch.await(2 * delay, TimeUnit.MILLISECONDS)); + List destinations = client.getDestinations(); + assertEquals(1, destinations.size()); + HttpDestination destination = (HttpDestination)destinations.get(0); + AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool(); + assertEquals(2, connectionPool.getConnectionCount()); + } + + private static class ConnectionPoolFactory + { + private final String name; + private final ConnectionPool.Factory factory; + + private ConnectionPoolFactory(String name, ConnectionPool.Factory factory) + { + this.name = name; + this.factory = factory; + } + + @Override + public String toString() + { + return name; + } + } }