Skip to content

Commit

Permalink
Fixes #4904 - WebsocketClient creates more connections than needed.
Browse files Browse the repository at this point in the history
Fixed connection pool's `acquire()` methods to correctly take into account the number of queued requests.

Also fixed a collateral bug in `BufferingResponseListener` - wrong calculation of the max content length.

Restored `ConnectionPoolTest` that was disabled in #2540, cleaned it up, and let it run for hours without failures.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed May 26, 2020
1 parent 051e102 commit 05f538d
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
* The bottom 32 bits represent the total connections and the top 32 bits represent the pending connections.
*/
private final AtomicBiInteger connections = new AtomicBiInteger();
private final Destination destination;
private final HttpDestination destination;
private final int maxConnections;
private final Callback requester;

protected AbstractConnectionPool(Destination destination, int maxConnections, Callback requester)
{
this.destination = destination;
this.destination = (HttpDestination)destination;
this.maxConnections = maxConnections;
this.requester = requester;
}
Expand Down Expand Up @@ -102,7 +102,7 @@ public Connection acquire()
Connection connection = activate();
if (connection == null)
{
tryCreate(-1);
tryCreate(destination.getQueuedRequestCount());
connection = activate();
}
return connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ public Connection acquire()
Connection connection = activate();
if (connection == null)
{
int maxPending = 1 + destination.getQueuedRequestCount() / getMaxMultiplex();
int queuedRequests = destination.getQueuedRequestCount();
int maxMultiplex = getMaxMultiplex();
int maxPending = queuedRequests / maxMultiplex;
if (maxPending * maxMultiplex != queuedRequests)
++maxPending;
tryCreate(maxPending);
connection = activate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ public void onContent(Response response, ByteBuffer content)
int length = content.remaining();
if (length > BufferUtil.space(buffer))
{
int requiredCapacity = buffer == null ? length : buffer.capacity() + length;
if (requiredCapacity > maxLength)
int remaining = buffer == null ? 0 : buffer.remaining();
if (remaining + length > maxLength)
response.abort(new IllegalArgumentException("Buffering capacity " + maxLength + " exceeded"));
int requiredCapacity = buffer == null ? length : buffer.capacity() + length;
int newCapacity = Math.min(Integer.highestOneBit(requiredCapacity) << 1, maxLength);
buffer = BufferUtil.ensureCapacity(buffer, newCapacity);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@
package org.eclipse.jetty.client;

import java.io.IOException;
import java.util.ArrayList;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.util.BytesContentProvider;
Expand All @@ -43,53 +43,53 @@
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Disabled // Disabled by @gregw on issue #2540 - commit 621b946b10884e7308eacca241dcf8b5d6f6cff2
public class ConnectionPoolTest
{
private Server server;
private ServerConnector connector;
private HttpClient client;

public static Stream<Arguments> pools()
public static Stream<ConnectionPoolFactory> pools()
{
List<Object[]> pools = new ArrayList<>();
pools.add(new Object[]{
DuplexConnectionPool.class,
(ConnectionPool.Factory)
destination -> new DuplexConnectionPool(destination, 8, destination)
});
pools.add(new Object[]{
RoundRobinConnectionPool.class,
(ConnectionPool.Factory)
destination -> new RoundRobinConnectionPool(destination, 8, destination)
});
return pools.stream().map(Arguments::of);
return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, 8, destination)),
new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, 8, destination)),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, 8, destination, 1))
);
}

private void start(final ConnectionPool.Factory factory, Handler handler) throws Exception
private void start(ConnectionPool.Factory factory, Handler handler) throws Exception
{
server = new Server();
connector = new ServerConnector(server);
server.addConnector(connector);
server.setHandler(handler);
startServer(handler);
startClient(factory);
}

private void startClient(ConnectionPool.Factory factory) throws Exception
{
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
transport.setConnectionPoolFactory(factory);
server.start();

client = new HttpClient(transport, null);
client.start();
}

private void startServer(Handler handler) throws Exception
{
server = new Server();
connector = new ServerConnector(server);
server.addConnector(connector);
server.setHandler(handler);
server.start();
}

@AfterEach
public void disposeServer() throws Exception
{
Expand All @@ -111,14 +111,14 @@ public void disposeClient() throws Exception
}
}

@ParameterizedTest(name = "[{index}] {0}")
@ParameterizedTest
@MethodSource("pools")
public void test(Class<? extends ConnectionPool> connectionPoolClass, ConnectionPool.Factory factory) throws Exception
public void test(ConnectionPoolFactory factory) throws Exception
{
start(factory, new EmptyServerHandler()
start(factory.factory, new EmptyServerHandler()
{
@Override
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
switch (HttpMethod.fromString(request.getMethod()))
{
Expand Down Expand Up @@ -233,4 +233,74 @@ else if (serverClose)
failures.add(x);
}
}

@ParameterizedTest
@MethodSource("pools")
public void testQueuedRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception
{
start(factory.factory, new EmptyServerHandler());

long delay = 1000;
client.setSocketAddressResolver(new SocketAddressResolver.Sync()
{
@Override
public void resolve(String host, int port, Promise<List<InetSocketAddress>> promise)
{
client.getExecutor().execute(() ->
{
try
{
Thread.sleep(delay);
super.resolve(host, port, promise);
}
catch (InterruptedException x)
{
promise.failed(x);
}
});
}
});

CountDownLatch latch = new CountDownLatch(2);
client.newRequest("localhost", connector.getLocalPort())
.path("/one")
.send(result ->
{
if (result.isSucceeded())
latch.countDown();
});
Thread.sleep(delay / 2);
client.newRequest("localhost", connector.getLocalPort())
.path("/two")
.send(result ->
{
if (result.isSucceeded())
latch.countDown();
});

assertTrue(latch.await(2 * delay, TimeUnit.MILLISECONDS));
List<Destination> destinations = client.getDestinations();
assertEquals(1, destinations.size());
HttpDestination destination = (HttpDestination)destinations.get(0);
AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool();
assertEquals(2, connectionPool.getConnectionCount());
}

private static class ConnectionPoolFactory
{
private final String name;
private final ConnectionPool.Factory factory;

private ConnectionPoolFactory(String name, ConnectionPool.Factory factory)
{
this.name = name;
this.factory = factory;
}

@Override
public String toString()
{
return name;
}
}
}

0 comments on commit 05f538d

Please sign in to comment.