diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java index 9ab9745c09..93e20c25a3 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java @@ -20,6 +20,7 @@ import java.io.InputStream; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; import javax.ws.rs.core.Response; @@ -36,6 +37,7 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.timeout.IdleStateEvent; /** * Jersey implementation of Netty channel handler. @@ -51,6 +53,8 @@ class JerseyClientHandler extends SimpleChannelInboundHandler { private NettyInputStream nis; private ClientResponse jerseyResponse; + private boolean readTimedOut; + JerseyClientHandler(ClientRequest request, CompletableFuture responseAvailable, CompletableFuture responseDone) { @@ -67,7 +71,12 @@ public void channelReadComplete(ChannelHandlerContext ctx) { @Override public void channelInactive(ChannelHandlerContext ctx) { // assert: no-op, if channel is closed after LastHttpContent has been consumed - responseDone.completeExceptionally(new IOException("Stream closed")); + + if (readTimedOut) { + responseDone.completeExceptionally(new TimeoutException("Stream closed: read timeout")); + } else { + responseDone.completeExceptionally(new IOException("Stream closed")); + } } protected void notifyResponse() { @@ -145,4 +154,14 @@ public int read() throws IOException { public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) { responseDone.completeExceptionally(cause); } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + readTimedOut = true; + ctx.close(); + } else { + super.userEventTriggered(ctx, evt); + } + } } diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyClientProperties.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyClientProperties.java new file mode 100644 index 0000000000..3ee79d7839 --- /dev/null +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyClientProperties.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.netty.connector; + +import org.glassfish.jersey.internal.util.PropertiesClass; + +/** + * Configuration options specific to the Client API that utilizes {@link NettyConnectorProvider}. + * + * @since 2.32 + */ +@PropertiesClass +public class NettyClientProperties { + + /** + *

+ * This property determines the maximum number of idle connections that will be simultaneously kept alive + * in total, rather than per destination. The default is 60. + *

+ */ + public static final String MAX_CONNECTIONS_TOTAL = "jersey.config.client.maxTotalConnections"; + + /** + *

+ * This property determines the maximum number of idle connections that will be simultaneously kept alive, per destination. + * The default is 5. + *

+ *

+ * This property is a Jersey alternative to System property {@code}http.maxConnections{@code}. The Jersey property takes + * precedence over the system property. + *

+ */ + public static final String MAX_CONNECTIONS = "jersey.config.client.maxConnections"; +} diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java index e3f3cdf390..d13c86dae3 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -37,6 +37,8 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; @@ -58,6 +60,9 @@ import io.netty.handler.ssl.ClientAuth; import io.netty.handler.ssl.JdkSslContext; import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.GenericFutureListener; import org.glassfish.jersey.client.ClientProperties; import org.glassfish.jersey.client.ClientRequest; @@ -79,9 +84,30 @@ class NettyConnector implements Connector { final Client client; final HashMap> connections = new HashMap<>(); + // If HTTP keepalive is enabled the value of "http.maxConnections" determines the maximum number + // of idle connections that will be simultaneously kept alive, per destination. + private static final String HTTP_KEEPALIVE_STRING = System.getProperty("http.keepAlive"); + // http.keepalive (default: true) + private static final Boolean HTTP_KEEPALIVE = + HTTP_KEEPALIVE_STRING == null ? Boolean.TRUE : Boolean.parseBoolean(HTTP_KEEPALIVE_STRING); + + // http.maxConnections (default: 5) + private static final int DEFAULT_MAX_POOL_SIZE = 5; + private static final int MAX_POOL_SIZE = Integer.getInteger("http.maxConnections", DEFAULT_MAX_POOL_SIZE); + private static final int MAX_POOL_IDLE = 60; + + private final Integer maxPoolSize; // either from system property, or from Jersey config, or default + private final Integer maxPoolIdle; // either from Jersey config, or default + + private static final String INACTIVE_POOLED_CONNECTION_HANDLER = "inactive_pooled_connection_handler"; + private static final String PRUNE_INACTIVE_POOL = "prune_inactive_pool"; + private static final String READ_TIMEOUT_HANDLER = "read_timeout_handler"; + private static final String REQUEST_HANDLER = "request_handler"; + NettyConnector(Client client) { - final Object threadPoolSize = client.getConfiguration().getProperties().get(ClientProperties.ASYNC_THREADPOOL_SIZE); + final Map properties = client.getConfiguration().getProperties(); + final Object threadPoolSize = properties.get(ClientProperties.ASYNC_THREADPOOL_SIZE); if (threadPoolSize != null && threadPoolSize instanceof Integer && (Integer) threadPoolSize > 0) { executorService = Executors.newFixedThreadPool((Integer) threadPoolSize); @@ -92,20 +118,31 @@ class NettyConnector implements Connector { } this.client = client; + + final Object maxPoolIdleProperty = properties.get(NettyClientProperties.MAX_CONNECTIONS_TOTAL); + final Object maxPoolSizeProperty = properties.get(NettyClientProperties.MAX_CONNECTIONS); + + maxPoolIdle = maxPoolIdleProperty != null ? (Integer) maxPoolIdleProperty : MAX_POOL_IDLE; + maxPoolSize = maxPoolSizeProperty != null + ? (Integer) maxPoolSizeProperty + : (HTTP_KEEPALIVE ? MAX_POOL_SIZE : DEFAULT_MAX_POOL_SIZE); + + if (maxPoolIdle == null || maxPoolIdle < 0) { + throw new ProcessingException(LocalizationMessages.WRONG_MAX_POOL_IDLE(maxPoolIdle)); + } + + if (maxPoolSize == null || maxPoolSize < 0) { + throw new ProcessingException(LocalizationMessages.WRONG_MAX_POOL_SIZE(maxPoolIdle)); + } } @Override public ClientResponse apply(ClientRequest jerseyRequest) { try { - CompletableFuture resultFuture = execute(jerseyRequest); - - Integer timeout = jerseyRequest.resolveProperty(ClientProperties.READ_TIMEOUT, 0); - - return (timeout != null && timeout > 0) ? resultFuture.get(timeout, TimeUnit.MILLISECONDS) - : resultFuture.get(); - } catch (ExecutionException ex) { - Throwable e = ex.getCause() == null ? ex : ex.getCause(); - throw new ProcessingException(e.getMessage(), e); + return execute(jerseyRequest).join(); + } catch (CompletionException cex) { + final Throwable t = cex.getCause() == null ? cex : cex.getCause(); + throw new ProcessingException(t.getMessage(), t); } catch (Exception ex) { throw new ProcessingException(ex.getMessage(), ex); } @@ -120,6 +157,11 @@ public Future apply(final ClientRequest jerseyRequest, final AsyncConnectorCa } protected CompletableFuture execute(final ClientRequest jerseyRequest) { + Integer timeout = jerseyRequest.resolveProperty(ClientProperties.READ_TIMEOUT, 0); + if (timeout == null || timeout < 0) { + throw new ProcessingException(LocalizationMessages.WRONG_READ_TIMEOUT(timeout)); + } + final CompletableFuture responseAvailable = new CompletableFuture<>(); final CompletableFuture responseDone = new CompletableFuture<>(); @@ -128,6 +170,7 @@ protected CompletableFuture execute(final ClientRequest jerseyRe int port = requestUri.getPort() != -1 ? requestUri.getPort() : "https".equals(requestUri.getScheme()) ? 443 : 80; try { + String key = requestUri.getScheme() + "://" + host + ":" + port; ArrayList conns; synchronized (connections) { @@ -138,9 +181,16 @@ protected CompletableFuture execute(final ClientRequest jerseyRe } } - Channel chan; + Channel chan = null; synchronized (conns) { - chan = conns.size() == 0 ? null : conns.remove(conns.size() - 1); + while (chan == null && !conns.isEmpty()) { + chan = conns.remove(conns.size() - 1); + chan.pipeline().remove(INACTIVE_POOLED_CONNECTION_HANDLER); + chan.pipeline().remove(PRUNE_INACTIVE_POOL); + if (!chan.isOpen()) { + chan = null; + } + } } if (chan == null) { @@ -199,16 +249,30 @@ protected void initChannel(SocketChannel ch) throws Exception { // will leak final Channel ch = chan; JerseyClientHandler clientHandler = new JerseyClientHandler(jerseyRequest, responseAvailable, responseDone); - ch.pipeline().addLast(clientHandler); + // read timeout makes sense really as an inactivity timeout + ch.pipeline().addLast(READ_TIMEOUT_HANDLER, + new IdleStateHandler(0, 0, timeout, TimeUnit.MILLISECONDS)); + ch.pipeline().addLast(REQUEST_HANDLER, clientHandler); responseDone.whenComplete((_r, th) -> { + ch.pipeline().remove(READ_TIMEOUT_HANDLER); ch.pipeline().remove(clientHandler); if (th == null) { + ch.pipeline().addLast(INACTIVE_POOLED_CONNECTION_HANDLER, new IdleStateHandler(0, 0, maxPoolIdle)); + ch.pipeline().addLast(PRUNE_INACTIVE_POOL, new PruneIdlePool(connections, key)); synchronized (connections) { ArrayList conns1 = connections.get(key); - synchronized (conns1) { + if (conns1 == null) { + conns1 = new ArrayList<>(1); conns1.add(ch); + connections.put(key, conns1); + } else { + synchronized (conns1) { + if (conns1.size() < maxPoolSize) { + conns1.add(ch); + } // else do not add the Channel to the idle pool + } } } } else { @@ -331,4 +395,35 @@ private static URI getProxyUri(final Object proxy) { throw new ProcessingException(LocalizationMessages.WRONG_PROXY_URI_TYPE(ClientProperties.PROXY_URI)); } } + + protected static class PruneIdlePool extends ChannelDuplexHandler { + HashMap> connections; + String key; + + public PruneIdlePool(HashMap> connections, String key) { + this.connections = connections; + this.key = key; + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent e = (IdleStateEvent) evt; + if (e.state() == IdleState.ALL_IDLE) { + ctx.close(); + synchronized (connections) { + ArrayList chans = connections.get(key); + synchronized (chans) { + chans.remove(ctx.channel()); + if (chans.isEmpty()) { + connections.remove(key); + } + } + } + } + } else { + super.userEventTriggered(ctx, evt); + } + } + } } diff --git a/connectors/netty-connector/src/main/resources/org/glassfish/jersey/netty/connector/localization.properties b/connectors/netty-connector/src/main/resources/org/glassfish/jersey/netty/connector/localization.properties index 24033076d2..8f9bf8f70e 100644 --- a/connectors/netty-connector/src/main/resources/org/glassfish/jersey/netty/connector/localization.properties +++ b/connectors/netty-connector/src/main/resources/org/glassfish/jersey/netty/connector/localization.properties @@ -1,5 +1,5 @@ # -# Copyright (c) 2016, 2018 Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved. # # This program and the accompanying materials are made available under the # terms of the Eclipse Public License v. 2.0, which is available at @@ -15,3 +15,7 @@ # wrong.proxy.uri.type=The proxy URI ("{0}") property MUST be an instance of String or URI. +wrong.read.timeout=Unexpected ("{0}") READ_TIMEOUT. +wrong.max.pool.size=Unexpected ("{0}") maximum number of connections per destination. +wrong.max.pool.idle=Unexpected ("{0}") maximum number of connections total. + diff --git a/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/TimeoutTest.java b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/TimeoutTest.java index 3cd8e8d75b..f88e132622 100644 --- a/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/TimeoutTest.java +++ b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/TimeoutTest.java @@ -16,6 +16,7 @@ package org.glassfish.jersey.netty.connector; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeoutException; import javax.ws.rs.GET; @@ -80,6 +81,7 @@ public void testSlow() { target("test/timeout").property(ClientProperties.READ_TIMEOUT, 1_000).request().get(); fail("Timeout expected."); } catch (ProcessingException e) { + assertEquals(e.getMessage(), "Stream closed: read timeout"); assertThat("Unexpected processing exception cause", e.getCause(), instanceOf(TimeoutException.class)); } @@ -91,8 +93,41 @@ public void testTimeoutInRequest() { target("test/timeout").request().property(ClientProperties.READ_TIMEOUT, 1_000).get(); fail("Timeout expected."); } catch (ProcessingException e) { + assertEquals(e.getMessage(), "Stream closed: read timeout"); assertThat("Unexpected processing exception cause", - e.getCause(), instanceOf(TimeoutException.class)); + e.getCause(), instanceOf(TimeoutException.class)); + } + } + + @Test + public void testRxSlow() { + try { + target("test/timeout").property(ClientProperties.READ_TIMEOUT, 1_000).request() + .rx().get().toCompletableFuture().join(); + fail("Timeout expected."); + } catch (CompletionException cex) { + assertThat("Unexpected async cause", + cex.getCause(), instanceOf(ProcessingException.class)); + ProcessingException e = (ProcessingException) cex.getCause(); + assertThat("Unexpected processing exception cause", + e.getCause(), instanceOf(TimeoutException.class)); + assertEquals(e.getCause().getMessage(), "Stream closed: read timeout"); + } + } + + @Test + public void testRxTimeoutInRequest() { + try { + target("test/timeout").request().property(ClientProperties.READ_TIMEOUT, 1_000) + .rx().get().toCompletableFuture().join(); + fail("Timeout expected."); + } catch (CompletionException cex) { + assertThat("Unexpected async cause", + cex.getCause(), instanceOf(ProcessingException.class)); + ProcessingException e = (ProcessingException) cex.getCause(); + assertThat("Unexpected processing exception cause", + e.getCause(), instanceOf(TimeoutException.class)); + assertEquals(e.getCause().getMessage(), "Stream closed: read timeout"); } } } diff --git a/core-common/src/main/java/org/glassfish/jersey/internal/config/SystemPropertiesConfigurationModel.java b/core-common/src/main/java/org/glassfish/jersey/internal/config/SystemPropertiesConfigurationModel.java index f8d40b81ba..31bc349b3a 100644 --- a/core-common/src/main/java/org/glassfish/jersey/internal/config/SystemPropertiesConfigurationModel.java +++ b/core-common/src/main/java/org/glassfish/jersey/internal/config/SystemPropertiesConfigurationModel.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2019, 2020 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -51,6 +51,7 @@ class SystemPropertiesConfigurationModel implements ExternalConfigurationModel