From da9a8289322199b65c89020a50b5e8a66a452221 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Fri, 24 Apr 2020 11:10:47 -0700 Subject: [PATCH 01/10] netty: send WINDOW_UPDATE before BDP ping --- .../io/grpc/netty/AbstractNettyHandler.java | 20 +++++++++++++++++-- .../io/grpc/netty/NettyClientHandler.java | 3 ++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java index 59267c24301..f2f2cbc4402 100644 --- a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java +++ b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController; import io.netty.handler.codec.http2.Http2ConnectionDecoder; import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2Exception; @@ -117,7 +118,10 @@ void setAutoTuneFlowControl(boolean isOn) { */ final class FlowControlPinger { + // NOTE: Up to 16MB guarantees sending WINDOW_UPDATE for every BDP ping private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024; + // A max float value < 1.0f + private static final float MAX_WINDOW_UPDATE_RATIO = 0.99999997f; private int pingCount; private int pingReturn; private boolean pinging; @@ -133,11 +137,24 @@ public int maxWindow() { return MAX_WINDOW_SIZE; } - public void onDataRead(int dataLength, int paddingLength) { + public void onDataRead(int dataLength, int paddingLength) throws Http2Exception { if (!autoTuneFlowControlOn) { return; } if (!isPinging()) { + // if a bdpPing is being sent out we can piggyback connection's window update for the bytes + // we just received. DefaultHttp2LocalFlowController doesn't always send WINDOW_UPDATE. + DefaultHttp2LocalFlowController flowController = + (DefaultHttp2LocalFlowController) decoder().flowController(); + Http2Stream connectionStream = connection().connectionStream(); + decoder() + .flowController() + .incrementWindowSize(connectionStream, dataLength + paddingLength); + float ratio = flowController.windowUpdateRatio(connectionStream); + // To make it always send WINDOW_UPDATE, temporarily increase the windowUpdateRatio. + flowController.windowUpdateRatio(connectionStream, MAX_WINDOW_UPDATE_RATIO); + flowController.windowUpdateRatio(connectionStream, ratio); + setPinging(true); sendPing(ctx()); } @@ -168,7 +185,6 @@ public void updateWindow() throws Http2Exception { settings.initialWindowSize(targetWindow); frameWriter().writeSettings(ctx(), settings, ctx().newPromise()); } - } private boolean isPinging() { diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 677358f4fef..4fae9a49143 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -369,7 +369,8 @@ private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream /** * Handler for an inbound HTTP/2 DATA frame. */ - private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) { + private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) + throws Http2Exception { flowControlPing().onDataRead(data.readableBytes(), padding); NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId)); PerfMark.event("NettyClientHandler.onDataRead", stream.tag()); From fdc6f2e7b7052fcfbebeabaa313e205a2ceb527b Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Fri, 24 Apr 2020 11:24:26 -0700 Subject: [PATCH 02/10] netty: bdp accessor allows to clear handlers --- .../grpc/testing/integration/AbstractInteropTest.java | 2 +- .../testing/integration/AutoWindowSizingOnTest.java | 10 +++++++++- .../grpc/testing/integration/NettyFlowControlTest.java | 5 ++++- .../java/io/grpc/netty/InternalHandlerSettings.java | 4 ++++ .../main/java/io/grpc/netty/NettyHandlerSettings.java | 8 ++++++++ 5 files changed, 26 insertions(+), 3 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index dc8819eaf92..60a72b5f8d3 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -1907,7 +1907,7 @@ protected int operationTimeoutMillis() { * Some tests run on memory constrained environments. Rather than OOM, just give up. 64 is * chosen as a maximum amount of memory a large test would need. */ - private static void assumeEnoughMemory() { + protected static void assumeEnoughMemory() { Runtime r = Runtime.getRuntime(); long usedMem = r.totalMemory() - r.freeMemory(); long actuallyFreeMemory = r.maxMemory() - usedMem; diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java index 462ad480a4a..52f99412626 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java @@ -22,6 +22,7 @@ import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -35,9 +36,16 @@ public static void turnOnAutoWindow() { InternalHandlerSettings.autoWindowOn(true); } + @AfterClass + public static void turnOffAutoWindow() { + InternalHandlerSettings.enable(false); + InternalHandlerSettings.autoWindowOn(false); + InternalHandlerSettings.clearHandlers(); + } + @Override protected AbstractServerImplBuilder getServerBuilder() { - return NettyServerBuilder.forPort(0) + return NettyServerBuilder.forPort(8080) .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java index e976696ff59..286530c3b3f 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java @@ -81,8 +81,11 @@ public static void setUp() { } @AfterClass - public static void shutDownTests() { + public static void shutDown() { executor.shutdown(); + InternalHandlerSettings.enable(false); + InternalHandlerSettings.autoWindowOn(false); + InternalHandlerSettings.clearHandlers(); } @Before diff --git a/netty/src/main/java/io/grpc/netty/InternalHandlerSettings.java b/netty/src/main/java/io/grpc/netty/InternalHandlerSettings.java index ed6a552f8b6..b50659ddd25 100644 --- a/netty/src/main/java/io/grpc/netty/InternalHandlerSettings.java +++ b/netty/src/main/java/io/grpc/netty/InternalHandlerSettings.java @@ -41,4 +41,8 @@ public static synchronized int getLatestClientWindow() { public static synchronized int getLatestServerWindow() { return NettyHandlerSettings.getLatestServerWindow(); } + + public static synchronized void clearHandlers() { + NettyHandlerSettings.clearHandlers(); + } } diff --git a/netty/src/main/java/io/grpc/netty/NettyHandlerSettings.java b/netty/src/main/java/io/grpc/netty/NettyHandlerSettings.java index a977bb7049f..a672ba88d4f 100644 --- a/netty/src/main/java/io/grpc/netty/NettyHandlerSettings.java +++ b/netty/src/main/java/io/grpc/netty/NettyHandlerSettings.java @@ -17,6 +17,7 @@ package io.grpc.netty; import com.google.common.base.Preconditions; +import javax.annotation.Nullable; /** * Allows autoFlowControl to be turned on and off from interop testing and flow control windows to @@ -29,7 +30,9 @@ final class NettyHandlerSettings { private static boolean autoFlowControlOn; // These will be the most recently created handlers created using NettyClientTransport and // NettyServerTransport + @Nullable private static AbstractNettyHandler clientHandler; + @Nullable private static AbstractNettyHandler serverHandler; static void setAutoWindow(AbstractNettyHandler handler) { @@ -64,6 +67,11 @@ public static synchronized int getLatestServerWindow() { return getLatestWindow(serverHandler); } + public static synchronized void clearHandlers() { + clientHandler = null; + serverHandler = null; + } + private static synchronized int getLatestWindow(AbstractNettyHandler handler) { Preconditions.checkNotNull(handler); return handler.decoder().flowController() From 758fa2942dc8c978b1a423e35fcea6aacb9c9551 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Fri, 24 Apr 2020 14:38:20 -0700 Subject: [PATCH 03/10] fix tests and logic --- .../integration/AutoWindowSizingOnTest.java | 21 +------ .../integration/NettyFlowControlTest.java | 20 ++----- .../io/grpc/netty/AbstractNettyHandler.java | 58 ++++++++++++------- .../grpc/netty/InternalHandlerSettings.java | 12 ---- .../io/grpc/netty/NettyChannelBuilder.java | 31 +++++++--- .../io/grpc/netty/NettyClientHandler.java | 11 +++- .../io/grpc/netty/NettyClientTransport.java | 9 ++- .../io/grpc/netty/NettyHandlerSettings.java | 27 +++++---- .../main/java/io/grpc/netty/NettyServer.java | 6 +- .../io/grpc/netty/NettyServerBuilder.java | 24 ++++++-- .../io/grpc/netty/NettyServerHandler.java | 11 +++- .../io/grpc/netty/NettyServerTransport.java | 9 ++- .../io/grpc/netty/NettyClientHandlerTest.java | 1 + .../grpc/netty/NettyClientTransportTest.java | 7 ++- .../io/grpc/netty/NettyHandlerTestBase.java | 1 + .../io/grpc/netty/NettyServerHandlerTest.java | 1 + .../java/io/grpc/netty/NettyServerTest.java | 4 ++ 17 files changed, 148 insertions(+), 105 deletions(-) diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java index 52f99412626..dc19d286881 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java @@ -18,34 +18,18 @@ import io.grpc.ManagedChannel; import io.grpc.internal.AbstractServerImplBuilder; -import io.grpc.netty.InternalHandlerSettings; import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class AutoWindowSizingOnTest extends AbstractInteropTest { - @BeforeClass - public static void turnOnAutoWindow() { - InternalHandlerSettings.enable(true); - InternalHandlerSettings.autoWindowOn(true); - } - - @AfterClass - public static void turnOffAutoWindow() { - InternalHandlerSettings.enable(false); - InternalHandlerSettings.autoWindowOn(false); - InternalHandlerSettings.clearHandlers(); - } - @Override protected AbstractServerImplBuilder getServerBuilder() { - return NettyServerBuilder.forPort(8080) + return NettyServerBuilder.forPort(0) .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); } @@ -53,7 +37,8 @@ protected AbstractServerImplBuilder getServerBuilder() { protected ManagedChannel createChannel() { NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress()) .negotiationType(NegotiationType.PLAINTEXT) - .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); + .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) + .initialFlowControlWindow(NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW); // Disable the default census stats interceptor, use testing interceptor instead. io.grpc.internal.TestingAccessor.setStatsEnabled(builder, false); return builder.intercept(createCensusStatsClientInterceptor()).build(); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java index 286530c3b3f..932b69d11c6 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java @@ -43,7 +43,6 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -74,18 +73,10 @@ public class NettyFlowControlTest { new ThreadPoolExecutor(1, 10, 1, TimeUnit.SECONDS, new LinkedBlockingQueue(), new DefaultThreadFactory("flowcontrol-test-pool", true)); - @BeforeClass - public static void setUp() { - InternalHandlerSettings.enable(true); - InternalHandlerSettings.autoWindowOn(true); - } @AfterClass - public static void shutDown() { + public static void shutDownTests() { executor.shutdown(); - InternalHandlerSettings.enable(false); - InternalHandlerSettings.autoWindowOn(false); - InternalHandlerSettings.clearHandlers(); } @Before @@ -172,15 +163,16 @@ private void resetConnection(int clientFlowControlWindow) } } channel = NettyChannelBuilder.forAddress(new InetSocketAddress("localhost", proxyPort)) - .flowControlWindow(clientFlowControlWindow) + .initialFlowControlWindow(clientFlowControlWindow) .negotiationType(NegotiationType.PLAINTEXT) .build(); } private void startServer(int serverFlowControlWindow) { ServerBuilder builder = - NettyServerBuilder.forAddress(new InetSocketAddress("localhost", 0)) - .flowControlWindow(serverFlowControlWindow); + NettyServerBuilder + .forAddress(new InetSocketAddress("localhost", 0)) + .initialFlowControlWindow(serverFlowControlWindow); builder.addService(ServerInterceptors.intercept( new TestServiceImpl(Executors.newScheduledThreadPool(2)), ImmutableList.of())); @@ -231,7 +223,7 @@ public long getElapsedTime() { } public int waitFor() throws InterruptedException { - latch.await(); + latch.await(5, TimeUnit.SECONDS); return lastWindow; } } diff --git a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java index f2f2cbc4402..c7bdfc342d4 100644 --- a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java +++ b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java @@ -36,18 +36,22 @@ */ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { private static final long GRACEFUL_SHUTDOWN_NO_TIMEOUT = -1; - private boolean autoTuneFlowControlOn = false; - private int initialConnectionWindow; - private ChannelHandlerContext ctx; + + private final int initialConnectionWindow; private final FlowControlPinger flowControlPing = new FlowControlPinger(); + private boolean autoTuneFlowControlOn; + private ChannelHandlerContext ctx; + private boolean initialWindowSent = false; + private static final long BDP_MEASUREMENT_PING = 1234; AbstractNettyHandler( ChannelPromise channelUnused, Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, - Http2Settings initialSettings) { + Http2Settings initialSettings, + boolean autoFlowControl) { super(channelUnused, decoder, encoder, initialSettings); // During a graceful shutdown, wait until all streams are closed. @@ -56,6 +60,7 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { // Extract the connection window from the settings if it was set. this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 : initialSettings.initialWindowSize(); + this.autoTuneFlowControlOn = autoFlowControl; } @Override @@ -93,16 +98,20 @@ protected final ChannelHandlerContext ctx() { * Sends initial connection window to the remote endpoint if necessary. */ private void sendInitialConnectionWindow() throws Http2Exception { - if (ctx.channel().isActive() && initialConnectionWindow > 0) { + if (!initialWindowSent && ctx.channel().isActive()) { Http2Stream connectionStream = connection().connectionStream(); int currentSize = connection().local().flowController().windowSize(connectionStream); int delta = initialConnectionWindow - currentSize; decoder().flowController().incrementWindowSize(connectionStream, delta); - initialConnectionWindow = -1; + initialWindowSent = true; ctx.flush(); } } + boolean isAutoTuneFlowControlOn() { + return autoTuneFlowControlOn; + } + @VisibleForTesting FlowControlPinger flowControlPing() { return flowControlPing; @@ -121,10 +130,11 @@ final class FlowControlPinger { // NOTE: Up to 16MB guarantees sending WINDOW_UPDATE for every BDP ping private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024; // A max float value < 1.0f - private static final float MAX_WINDOW_UPDATE_RATIO = 0.99999997f; + private static final float MAX_WINDOW_UPDATE_RATIO = 0.99999994f; private int pingCount; private int pingReturn; private boolean pinging; + private int windowSent; private int dataSizeSincePing; private float lastBandwidth; // bytes per second private long lastPingTime; @@ -142,25 +152,29 @@ public void onDataRead(int dataLength, int paddingLength) throws Http2Exception return; } if (!isPinging()) { - // if a bdpPing is being sent out we can piggyback connection's window update for the bytes - // we just received. DefaultHttp2LocalFlowController doesn't always send WINDOW_UPDATE. - DefaultHttp2LocalFlowController flowController = - (DefaultHttp2LocalFlowController) decoder().flowController(); - Http2Stream connectionStream = connection().connectionStream(); - decoder() - .flowController() - .incrementWindowSize(connectionStream, dataLength + paddingLength); - float ratio = flowController.windowUpdateRatio(connectionStream); - // To make it always send WINDOW_UPDATE, temporarily increase the windowUpdateRatio. - flowController.windowUpdateRatio(connectionStream, MAX_WINDOW_UPDATE_RATIO); - flowController.windowUpdateRatio(connectionStream, ratio); - setPinging(true); + sendWindowUpdate(dataLength, paddingLength); sendPing(ctx()); } incrementDataSincePing(dataLength + paddingLength); } + private void sendWindowUpdate(int dataLength, int paddingLength) throws Http2Exception { + // if a bdpPing is being sent out we can piggyback connection's window update for the bytes + // we just received. DefaultHttp2LocalFlowController doesn't always send WINDOW_UPDATE. + DefaultHttp2LocalFlowController flowController = + (DefaultHttp2LocalFlowController) decoder().flowController(); + Http2Stream connectionStream = connection().connectionStream(); + decoder() + .flowController() + .incrementWindowSize(connectionStream, dataLength + paddingLength); + windowSent += (dataLength + paddingLength); + float ratio = flowController.windowUpdateRatio(connectionStream); + // To make it always send WINDOW_UPDATE, temporarily increase the windowUpdateRatio. + flowController.windowUpdateRatio(connectionStream, MAX_WINDOW_UPDATE_RATIO); + flowController.windowUpdateRatio(connectionStream, ratio); + } + public void updateWindow() throws Http2Exception { if (!autoTuneFlowControlOn) { return; @@ -176,7 +190,8 @@ public void updateWindow() throws Http2Exception { int targetWindow = Math.min(getDataSincePing() * 2, MAX_WINDOW_SIZE); setPinging(false); int currentWindow = fc.initialWindowSize(connection().connectionStream()); - if (targetWindow > currentWindow && bandwidth > lastBandwidth) { + int adjustedCurrentWindow = currentWindow - windowSent; + if (targetWindow > adjustedCurrentWindow && bandwidth > lastBandwidth) { lastBandwidth = bandwidth; int increase = targetWindow - currentWindow; fc.incrementWindowSize(connection().connectionStream(), increase); @@ -184,6 +199,7 @@ public void updateWindow() throws Http2Exception { Http2Settings settings = new Http2Settings(); settings.initialWindowSize(targetWindow); frameWriter().writeSettings(ctx(), settings, ctx().newPromise()); + windowSent = 0; } } diff --git a/netty/src/main/java/io/grpc/netty/InternalHandlerSettings.java b/netty/src/main/java/io/grpc/netty/InternalHandlerSettings.java index b50659ddd25..edc963d617c 100644 --- a/netty/src/main/java/io/grpc/netty/InternalHandlerSettings.java +++ b/netty/src/main/java/io/grpc/netty/InternalHandlerSettings.java @@ -26,14 +26,6 @@ @Internal public final class InternalHandlerSettings { - public static void enable(boolean enable) { - NettyHandlerSettings.enable(enable); - } - - public static synchronized void autoWindowOn(boolean autoFlowControl) { - NettyHandlerSettings.autoWindowOn(autoFlowControl); - } - public static synchronized int getLatestClientWindow() { return NettyHandlerSettings.getLatestClientWindow(); } @@ -41,8 +33,4 @@ public static synchronized int getLatestClientWindow() { public static synchronized int getLatestServerWindow() { return NettyHandlerSettings.getLatestServerWindow(); } - - public static synchronized void clearHandlers() { - NettyHandlerSettings.clearHandlers(); - } } diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index 85f06119497..fb1ef9fe3dc 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -84,6 +84,7 @@ public final class NettyChannelBuilder private ChannelFactory channelFactory = DEFAULT_CHANNEL_FACTORY; private ObjectPool eventLoopGroupPool = DEFAULT_EVENT_LOOP_GROUP_POOL; private SslContext sslContext; + private boolean autoFlowControl; private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW; private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE; private long keepAliveTimeNanos = KEEPALIVE_TIME_NANOS_DISABLED; @@ -178,10 +179,6 @@ public NettyChannelBuilder channelType(Class channelType) { * *

It's an optional parameter. If the user has not provided an Channel type or ChannelFactory * when the channel is built, the builder will use the default one which is static. - * - *

You must also provide corresponding {@link #eventLoopGroup(EventLoopGroup)}. For example, - * {@link NioSocketChannel} based {@link ChannelFactory} must use {@link - * io.netty.channel.nio.NioEventLoopGroup}, otherwise your application won't start. */ public NettyChannelBuilder channelFactory(ChannelFactory channelFactory) { this.channelFactory = checkNotNull(channelFactory, "channelFactory"); @@ -247,12 +244,26 @@ public NettyChannelBuilder sslContext(SslContext sslContext) { } /** - * Sets the flow control window in bytes. If not called, the default value - * is {@link #DEFAULT_FLOW_CONTROL_WINDOW}). + * Sets the initial flow control window in bytes. Setting initial flow control window enables auto + * flow control tuning using bandwidth-delay product algorithm. To disable auto flow control + * tuning, use {@link #flowControlWindow(int)}. + */ + public NettyChannelBuilder initialFlowControlWindow(int initialFlowControlWindow) { + checkArgument(initialFlowControlWindow > 0, "initialFlowControlWindow must be positive"); + this.flowControlWindow = initialFlowControlWindow; + this.autoFlowControl = true; + return this; + } + + /** + * Sets the flow control window in bytes. Setting flowControlWindow disables auto flow control + * tuning, use {@link #initialFlowControlWindow(int)} to enable auto flow control tuning. If not + * called, the default value is {@link #DEFAULT_FLOW_CONTROL_WINDOW}). */ public NettyChannelBuilder flowControlWindow(int flowControlWindow) { checkArgument(flowControlWindow > 0, "flowControlWindow must be positive"); this.flowControlWindow = flowControlWindow; + this.autoFlowControl = false; return this; } @@ -405,7 +416,7 @@ protected ClientTransportFactory buildTransportFactory() { return new NettyTransportFactory( negotiator, channelFactory, channelOptions, - eventLoopGroupPool, flowControlWindow, maxInboundMessageSize(), + eventLoopGroupPool, autoFlowControl, flowControlWindow, maxInboundMessageSize(), maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls, transportTracerFactory, localSocketPicker, useGetForSafeMethods); } @@ -521,6 +532,7 @@ private static final class NettyTransportFactory implements ClientTransportFacto private final Map, ?> channelOptions; private final ObjectPool groupPool; private final EventLoopGroup group; + private final boolean autoFlowControl; private final int flowControlWindow; private final int maxMessageSize; private final int maxHeaderListSize; @@ -536,7 +548,7 @@ private static final class NettyTransportFactory implements ClientTransportFacto NettyTransportFactory(ProtocolNegotiator protocolNegotiator, ChannelFactory channelFactory, Map, ?> channelOptions, ObjectPool groupPool, - int flowControlWindow, int maxMessageSize, int maxHeaderListSize, + boolean autoFlowControl, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker, boolean useGetForSafeMethods) { @@ -545,6 +557,7 @@ private static final class NettyTransportFactory implements ClientTransportFacto this.channelOptions = new HashMap, Object>(channelOptions); this.groupPool = groupPool; this.group = groupPool.getObject(); + this.autoFlowControl = autoFlowControl; this.flowControlWindow = flowControlWindow; this.maxMessageSize = maxMessageSize; this.maxHeaderListSize = maxHeaderListSize; @@ -584,7 +597,7 @@ public void run() { // TODO(carl-mastrangelo): Pass channelLogger in. NettyClientTransport transport = new NettyClientTransport( serverAddress, channelFactory, channelOptions, group, - localNegotiator, flowControlWindow, + localNegotiator, autoFlowControl, flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(), tooManyPingsRunnable, transportTracerFactory.create(), options.getEagAttributes(), diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 4fae9a49143..b7ef758730d 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -131,6 +131,7 @@ protected void handleNotInUse() { static NettyClientHandler newHandler( ClientTransportLifecycleManager lifecycleManager, @Nullable KeepAliveManager keepAliveManager, + boolean autoFlowControl, int flowControlWindow, int maxHeaderListSize, Supplier stopwatchFactory, @@ -155,6 +156,7 @@ static NettyClientHandler newHandler( frameWriter, lifecycleManager, keepAliveManager, + autoFlowControl, flowControlWindow, maxHeaderListSize, stopwatchFactory, @@ -171,6 +173,7 @@ static NettyClientHandler newHandler( Http2FrameWriter frameWriter, ClientTransportLifecycleManager lifecycleManager, KeepAliveManager keepAliveManager, + boolean autoFlowControl, int flowControlWindow, int maxHeaderListSize, Supplier stopwatchFactory, @@ -230,7 +233,8 @@ public TransportTracer.FlowControlWindows read() { tooManyPingsRunnable, transportTracer, eagAttributes, - authority); + authority, + autoFlowControl); } private NettyClientHandler( @@ -243,8 +247,9 @@ private NettyClientHandler( final Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes, - String authority) { - super(/* channelUnused= */ null, decoder, encoder, settings); + String authority, + boolean autoFlowControl) { + super(/* channelUnused= */ null, decoder, encoder, settings, autoFlowControl); this.lifecycleManager = lifecycleManager; this.keepAliveManager = keepAliveManager; this.stopwatchFactory = stopwatchFactory; diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 81831ac918c..0c495cbcc60 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -79,6 +79,7 @@ class NettyClientTransport implements ConnectionClientTransport { private final String authorityString; private final AsciiString authority; private final AsciiString userAgent; + private final boolean autoFlowControl; private final int flowControlWindow; private final int maxMessageSize; private final int maxHeaderListSize; @@ -106,8 +107,9 @@ class NettyClientTransport implements ConnectionClientTransport { NettyClientTransport( SocketAddress address, ChannelFactory channelFactory, Map, ?> channelOptions, EventLoopGroup group, - ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize, - int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, + ProtocolNegotiator negotiator, boolean autoFlowControl, int flowControlWindow, + int maxMessageSize, int maxHeaderListSize, + long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent, Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes, LocalSocketPicker localSocketPicker, ChannelLogger channelLogger, @@ -118,6 +120,7 @@ class NettyClientTransport implements ConnectionClientTransport { this.group = Preconditions.checkNotNull(group, "group"); this.channelFactory = channelFactory; this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions"); + this.autoFlowControl = autoFlowControl; this.flowControlWindow = flowControlWindow; this.maxMessageSize = maxMessageSize; this.maxHeaderListSize = maxHeaderListSize; @@ -214,6 +217,7 @@ public Runnable start(Listener transportListener) { handler = NettyClientHandler.newHandler( lifecycleManager, keepAliveManager, + autoFlowControl, flowControlWindow, maxHeaderListSize, GrpcUtil.STOPWATCH_SUPPLIER, @@ -306,6 +310,7 @@ public void operationComplete(ChannelFuture future) throws Exception { if (keepAliveManager != null) { keepAliveManager.onTransportStarted(); } + channel.closeFuture().addListener(NettyHandlerSettings.cleanUpTask()); return null; } diff --git a/netty/src/main/java/io/grpc/netty/NettyHandlerSettings.java b/netty/src/main/java/io/grpc/netty/NettyHandlerSettings.java index a672ba88d4f..0c5d2a834ba 100644 --- a/netty/src/main/java/io/grpc/netty/NettyHandlerSettings.java +++ b/netty/src/main/java/io/grpc/netty/NettyHandlerSettings.java @@ -17,6 +17,8 @@ package io.grpc.netty; import com.google.common.base.Preconditions; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import javax.annotation.Nullable; /** @@ -25,9 +27,6 @@ */ final class NettyHandlerSettings { - private static volatile boolean enabled; - - private static boolean autoFlowControlOn; // These will be the most recently created handlers created using NettyClientTransport and // NettyServerTransport @Nullable @@ -36,11 +35,10 @@ final class NettyHandlerSettings { private static AbstractNettyHandler serverHandler; static void setAutoWindow(AbstractNettyHandler handler) { - if (!enabled) { + if (!handler.isAutoTuneFlowControlOn()) { return; } synchronized (NettyHandlerSettings.class) { - handler.setAutoTuneFlowControl(autoFlowControlOn); if (handler instanceof NettyClientHandler) { clientHandler = handler; } else if (handler instanceof NettyServerHandler) { @@ -51,14 +49,6 @@ static void setAutoWindow(AbstractNettyHandler handler) { } } - public static void enable(boolean enable) { - enabled = enable; - } - - public static synchronized void autoWindowOn(boolean autoFlowControl) { - autoFlowControlOn = autoFlowControl; - } - public static synchronized int getLatestClientWindow() { return getLatestWindow(clientHandler); } @@ -67,7 +57,7 @@ public static synchronized int getLatestServerWindow() { return getLatestWindow(serverHandler); } - public static synchronized void clearHandlers() { + private static synchronized void clearHandlers() { clientHandler = null; serverHandler = null; } @@ -77,4 +67,13 @@ private static synchronized int getLatestWindow(AbstractNettyHandler handler) { return handler.decoder().flowController() .initialWindowSize(handler.connection().connectionStream()); } + + static ChannelFutureListener cleanUpTask() { + return new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + NettyHandlerSettings.clearHandlers(); + } + }; + } } diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index da80a79e481..0cd4e749af5 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -78,6 +78,7 @@ class NettyServer implements InternalServer, InternalWithLogId { private EventLoopGroup workerGroup; private ServerListener listener; private Channel channel; + private final boolean autoFlowControl; private final int flowControlWindow; private final int maxMessageSize; private final int maxHeaderListSize; @@ -106,7 +107,8 @@ class NettyServer implements InternalServer, InternalWithLogId { ProtocolNegotiator protocolNegotiator, List streamTracerFactories, TransportTracer.Factory transportTracerFactory, - int maxStreamsPerConnection, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, + int maxStreamsPerConnection, boolean autoFlowControl, int flowControlWindow, + int maxMessageSize, int maxHeaderListSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, @@ -127,6 +129,7 @@ class NettyServer implements InternalServer, InternalWithLogId { this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories"); this.transportTracerFactory = transportTracerFactory; this.maxStreamsPerConnection = maxStreamsPerConnection; + this.autoFlowControl = autoFlowControl; this.flowControlWindow = flowControlWindow; this.maxMessageSize = maxMessageSize; this.maxHeaderListSize = maxHeaderListSize; @@ -205,6 +208,7 @@ public void initChannel(Channel ch) { streamTracerFactories, transportTracerFactory.create(), maxStreamsPerConnection, + autoFlowControl, flowControlWindow, maxMessageSize, maxHeaderListSize, diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index bb10b2189de..fb1ed3422c9 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -94,6 +94,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder 0, "initialFlowControlWindow must be positive"); + this.flowControlWindow = initialFlowControlWindow; + this.autoFlowControl = true; + return this; + } + + /** + * Sets the flow control window in bytes. Setting flowControlWindow disables auto flow control + * tuning, use {@link #initialFlowControlWindow(int)} to enable auto flow control tuning. If not + * called, the default value is {@link #DEFAULT_FLOW_CONTROL_WINDOW}). */ public NettyServerBuilder flowControlWindow(int flowControlWindow) { checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s", flowControlWindow); this.flowControlWindow = flowControlWindow; + this.autoFlowControl = false; return this; } @@ -564,8 +579,9 @@ protected List buildTransportServers( listenAddress, channelFactory, channelOptions, childChannelOptions, bossEventLoopGroupPool, workerEventLoopGroupPool, forceHeapBuffer, negotiator, streamTracerFactories, getTransportTracerFactory(), maxConcurrentCallsPerConnection, - flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, - keepAliveTimeoutInNanos, maxConnectionIdleInNanos, maxConnectionAgeInNanos, + autoFlowControl, flowControlWindow, maxMessageSize, maxHeaderListSize, + keepAliveTimeInNanos, keepAliveTimeoutInNanos, + maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, getChannelz()); transportServers.add(transportServer); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index d0354e5e2a6..b7c1240e24c 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -139,6 +139,7 @@ static NettyServerHandler newHandler( List streamTracerFactories, TransportTracer transportTracer, int maxStreams, + boolean autoFlowControl, int flowControlWindow, int maxHeaderListSize, int maxMessageSize, @@ -165,6 +166,7 @@ static NettyServerHandler newHandler( streamTracerFactories, transportTracer, maxStreams, + autoFlowControl, flowControlWindow, maxHeaderListSize, maxMessageSize, @@ -186,6 +188,7 @@ static NettyServerHandler newHandler( List streamTracerFactories, TransportTracer transportTracer, int maxStreams, + boolean autoFlowControl, int flowControlWindow, int maxHeaderListSize, int maxMessageSize, @@ -238,7 +241,8 @@ static NettyServerHandler newHandler( keepAliveTimeInNanos, keepAliveTimeoutInNanos, maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, - keepAliveEnforcer); + keepAliveEnforcer, + autoFlowControl); } private NettyServerHandler( @@ -256,8 +260,9 @@ private NettyServerHandler( long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, - final KeepAliveEnforcer keepAliveEnforcer) { - super(channelUnused, decoder, encoder, settings); + final KeepAliveEnforcer keepAliveEnforcer, + boolean autoFlowControl) { + super(channelUnused, decoder, encoder, settings, autoFlowControl); final MaxConnectionIdleManager maxConnectionIdleManager; if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index 82ed972552f..f7bf3983a24 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -64,6 +64,7 @@ class NettyServerTransport implements ServerTransport { private NettyServerHandler grpcHandler; private ServerTransportListener listener; private boolean terminated; + private final boolean autoFlowControl; private final int flowControlWindow; private final int maxMessageSize; private final int maxHeaderListSize; @@ -84,6 +85,7 @@ class NettyServerTransport implements ServerTransport { List streamTracerFactories, TransportTracer transportTracer, int maxStreams, + boolean autoFlowControl, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, @@ -101,6 +103,7 @@ class NettyServerTransport implements ServerTransport { Preconditions.checkNotNull(streamTracerFactories, "streamTracerFactories"); this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); this.maxStreams = maxStreams; + this.autoFlowControl = autoFlowControl; this.flowControlWindow = flowControlWindow; this.maxMessageSize = maxMessageSize; this.maxHeaderListSize = maxHeaderListSize; @@ -141,7 +144,10 @@ public void operationComplete(ChannelFuture future) throws Exception { ChannelFutureListener terminationNotifier = new TerminationNotifier(); channelUnused.addListener(terminationNotifier); - channel.closeFuture().addListener(terminationNotifier); + channel + .closeFuture() + .addListener(terminationNotifier) + .addListener(NettyHandlerSettings.cleanUpTask()); channel.pipeline().addLast(bufferingHandler); } @@ -258,6 +264,7 @@ private NettyServerHandler createHandler( streamTracerFactories, transportTracer, maxStreams, + autoFlowControl, flowControlWindow, maxHeaderListSize, maxMessageSize, diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index c5f81325bd5..f7e5f2c94e8 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -790,6 +790,7 @@ public Stopwatch get() { frameWriter(), lifecycleManager, mockKeepAliveManager, + false, flowControlWindow, maxHeaderListSize, stopwatchSupplier, diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 48cd0603672..8762f0b659b 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -191,7 +191,7 @@ public void setSoLingerChannelOption() throws IOException { channelOptions.put(ChannelOption.SO_LINGER, soLinger); NettyClientTransport transport = new NettyClientTransport( address, new ReflectiveChannelFactory<>(NioSocketChannel.class), channelOptions, group, - newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, + newNegotiator(), false, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(), new FakeChannelLogger(), false); @@ -437,7 +437,7 @@ public void failingToConstructChannelShouldFailGracefully() throws Exception { NettyClientTransport transport = new NettyClientTransport( address, new ReflectiveChannelFactory<>(CantConstructChannel.class), new HashMap, Object>(), group, - newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, + newNegotiator(), false, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority, null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(), new FakeChannelLogger(), false); @@ -752,7 +752,7 @@ private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int max } NettyClientTransport transport = new NettyClientTransport( address, channelFactory, new HashMap, Object>(), group, - negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, + negotiator, false, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, keepAliveTimeNano, keepAliveTimeoutNano, false, authority, userAgent, tooManyPingsRunnable, new TransportTracer(), eagAttributes, new SocketPicker(), new FakeChannelLogger(), false); @@ -774,6 +774,7 @@ private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) thr Collections.emptyList(), TransportTracer.getDefaultFactory(), maxStreamsPerConnection, + false, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize, DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS, MAX_CONNECTION_IDLE_NANOS_DISABLED, diff --git a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java index fab3c767923..aab000b9279 100644 --- a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java +++ b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java @@ -101,6 +101,7 @@ protected void manualSetUp() throws Exception {} protected final TransportTracer transportTracer = new TransportTracer(); protected int flowControlWindow = DEFAULT_WINDOW_SIZE; + protected boolean autoFlowControl = false; private final FakeClock fakeClock = new FakeClock(); diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 453d8222f57..cf28cf79315 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -1116,6 +1116,7 @@ protected NettyServerHandler newHandler() { Arrays.asList(streamTracerFactory), transportTracer, maxConcurrentStreams, + autoFlowControl, flowControlWindow, maxHeaderListSize, DEFAULT_MAX_MESSAGE_SIZE, diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java index 0e19b0875e4..3413503b8e2 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java @@ -100,6 +100,7 @@ class TestProtocolNegotiator implements ProtocolNegotiator { Collections.emptyList(), TransportTracer.getDefaultFactory(), 1, // ignore + false, // ignore 1, // ignore 1, // ignore 1, // ignore @@ -146,6 +147,7 @@ public void getPort_notStarted() { Collections.emptyList(), TransportTracer.getDefaultFactory(), 1, // ignore + false, // ignore 1, // ignore 1, // ignore 1, // ignore @@ -186,6 +188,7 @@ public void childChannelOptions() throws Exception { Collections.emptyList(), TransportTracer.getDefaultFactory(), 1, // ignore + false, // ignore 1, // ignore 1, // ignore 1, // ignore @@ -238,6 +241,7 @@ public void channelzListenSocket() throws Exception { Collections.emptyList(), TransportTracer.getDefaultFactory(), 1, // ignore + false, // ignore 1, // ignore 1, // ignore 1, // ignore From 46df843e51c5bae922563e01b5e0fe6f21531af5 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Mon, 27 Apr 2020 15:29:48 -0700 Subject: [PATCH 04/10] disable NettyFlowControlTest#verySmallWindowMakesProgress --- .../io/grpc/testing/integration/NettyFlowControlTest.java | 2 ++ netty/src/main/java/io/grpc/netty/ListeningEncoder.java | 5 +++++ 2 files changed, 7 insertions(+) create mode 100644 netty/src/main/java/io/grpc/netty/ListeningEncoder.java diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java index 932b69d11c6..0a306e9f0dc 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java @@ -43,6 +43,7 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -110,6 +111,7 @@ public void smallBdp() throws InterruptedException, IOException { } @Test + @Ignore("enable once 2 pings between data is no longer necessary") public void verySmallWindowMakesProgress() throws InterruptedException, IOException { proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS); proxy.start(); diff --git a/netty/src/main/java/io/grpc/netty/ListeningEncoder.java b/netty/src/main/java/io/grpc/netty/ListeningEncoder.java new file mode 100644 index 00000000000..2262e7d2203 --- /dev/null +++ b/netty/src/main/java/io/grpc/netty/ListeningEncoder.java @@ -0,0 +1,5 @@ +package io.grpc.netty; + +public class ListeningEncoder { + +} From fd831081e7cca87299558a5374b9e2add4c3c003 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Mon, 27 Apr 2020 15:32:41 -0700 Subject: [PATCH 05/10] netty: Netty{Client,Server}Handler use ListeningEncoder to keep track of ping for bdp --- .../io/grpc/netty/AbstractNettyHandler.java | 69 +++++---- .../java/io/grpc/netty/ListeningEncoder.java | 133 +++++++++++++++++- .../io/grpc/netty/NettyClientHandler.java | 12 +- .../io/grpc/netty/NettyServerHandler.java | 5 +- 4 files changed, 184 insertions(+), 35 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java index c7bdfc342d4..1d188ee4a86 100644 --- a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java +++ b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java @@ -16,12 +16,14 @@ package io.grpc.netty; +import static com.google.common.base.Preconditions.checkArgument; import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception; import com.google.common.annotations.VisibleForTesting; +import io.grpc.netty.ListeningEncoder.Http2OutboundFrameListener; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController; import io.netty.handler.codec.http2.Http2ConnectionDecoder; import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2Exception; @@ -36,9 +38,11 @@ */ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { private static final long GRACEFUL_SHUTDOWN_NO_TIMEOUT = -1; + private static final int MAX_ALLOWED_PING = 2; private final int initialConnectionWindow; - private final FlowControlPinger flowControlPing = new FlowControlPinger(); + private final PingCountingListener pingCountingListener = new PingCountingListener(); + private final FlowControlPinger flowControlPing = new FlowControlPinger(MAX_ALLOWED_PING); private boolean autoTuneFlowControlOn; private ChannelHandlerContext ctx; @@ -61,6 +65,9 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 : initialSettings.initialWindowSize(); this.autoTuneFlowControlOn = autoFlowControl; + if (encoder instanceof ListeningEncoder) { + ((ListeningEncoder) encoder).setListener(pingCountingListener); + } } @Override @@ -129,16 +136,19 @@ final class FlowControlPinger { // NOTE: Up to 16MB guarantees sending WINDOW_UPDATE for every BDP ping private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024; - // A max float value < 1.0f - private static final float MAX_WINDOW_UPDATE_RATIO = 0.99999994f; + private final int maxAllowedPing; private int pingCount; private int pingReturn; private boolean pinging; - private int windowSent; private int dataSizeSincePing; private float lastBandwidth; // bytes per second private long lastPingTime; + public FlowControlPinger(int maxAllowedPing) { + checkArgument(maxAllowedPing > 0, "maxAllowedPing must be positive"); + this.maxAllowedPing = maxAllowedPing; + } + public long payload() { return BDP_MEASUREMENT_PING; } @@ -147,34 +157,17 @@ public int maxWindow() { return MAX_WINDOW_SIZE; } - public void onDataRead(int dataLength, int paddingLength) throws Http2Exception { + public void onDataRead(int dataLength, int paddingLength) { if (!autoTuneFlowControlOn) { return; } - if (!isPinging()) { + if (!isPinging() && pingCountingListener.pingCount < maxAllowedPing) { setPinging(true); - sendWindowUpdate(dataLength, paddingLength); sendPing(ctx()); } incrementDataSincePing(dataLength + paddingLength); } - private void sendWindowUpdate(int dataLength, int paddingLength) throws Http2Exception { - // if a bdpPing is being sent out we can piggyback connection's window update for the bytes - // we just received. DefaultHttp2LocalFlowController doesn't always send WINDOW_UPDATE. - DefaultHttp2LocalFlowController flowController = - (DefaultHttp2LocalFlowController) decoder().flowController(); - Http2Stream connectionStream = connection().connectionStream(); - decoder() - .flowController() - .incrementWindowSize(connectionStream, dataLength + paddingLength); - windowSent += (dataLength + paddingLength); - float ratio = flowController.windowUpdateRatio(connectionStream); - // To make it always send WINDOW_UPDATE, temporarily increase the windowUpdateRatio. - flowController.windowUpdateRatio(connectionStream, MAX_WINDOW_UPDATE_RATIO); - flowController.windowUpdateRatio(connectionStream, ratio); - } - public void updateWindow() throws Http2Exception { if (!autoTuneFlowControlOn) { return; @@ -190,8 +183,7 @@ public void updateWindow() throws Http2Exception { int targetWindow = Math.min(getDataSincePing() * 2, MAX_WINDOW_SIZE); setPinging(false); int currentWindow = fc.initialWindowSize(connection().connectionStream()); - int adjustedCurrentWindow = currentWindow - windowSent; - if (targetWindow > adjustedCurrentWindow && bandwidth > lastBandwidth) { + if (targetWindow > currentWindow && bandwidth > lastBandwidth) { lastBandwidth = bandwidth; int increase = targetWindow - currentWindow; fc.incrementWindowSize(connection().connectionStream(), increase); @@ -199,7 +191,6 @@ public void updateWindow() throws Http2Exception { Http2Settings settings = new Http2Settings(); settings.initialWindowSize(targetWindow); frameWriter().writeSettings(ctx(), settings, ctx().newPromise()); - windowSent = 0; } } @@ -248,4 +239,28 @@ void setDataSizeAndSincePing(int dataSize) { lastPingTime = System.nanoTime() - TimeUnit.SECONDS.toNanos(1); } } + + private static class PingCountingListener extends Http2OutboundFrameListener { + int pingCount = 0; + + @Override + public void onWindowUpdate(int streamId, int windowSizeIncrement) { + pingCount = 0; + super.onWindowUpdate(streamId, windowSizeIncrement); + } + + @Override + public void onPing(boolean ack, long data) { + if (!ack) { + pingCount++; + } + super.onPing(ack, data); + } + + @Override + public void onData(int streamId, ByteBuf data, int padding, boolean endStream) { + pingCount = 0; + super.onData(streamId, data, padding, endStream); + } + } } diff --git a/netty/src/main/java/io/grpc/netty/ListeningEncoder.java b/netty/src/main/java/io/grpc/netty/ListeningEncoder.java index 2262e7d2203..03270d399ae 100644 --- a/netty/src/main/java/io/grpc/netty/ListeningEncoder.java +++ b/netty/src/main/java/io/grpc/netty/ListeningEncoder.java @@ -1,5 +1,136 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.grpc.netty; -public class ListeningEncoder { +import static com.google.common.base.Preconditions.checkNotNull; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; +import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2ConnectionEncoder; +import io.netty.handler.codec.http2.Http2FrameWriter; +import io.netty.handler.codec.http2.StreamBufferingEncoder; + +/** A ListeningEncoder notifies {@link Http2OutboundFrameListener} on http2 outbound frame event. */ +interface ListeningEncoder { + + void setListener(Http2OutboundFrameListener listener); + + /** + * Partial implementation of (Listening subset of event) event listener for outbound http2 + * frames. + */ + class Http2OutboundFrameListener { + + /** Notifies on outbound WINDOW_UPDATE frame. */ + public void onWindowUpdate(int streamId, int windowSizeIncrement) {} + + /** Notifies on outbound PING frame. */ + public void onPing(boolean ack, long data) {} + + /** Notifies on outbound DATA frame. */ + public void onData(int streamId, ByteBuf data, int padding, boolean endStream) {} + } + + /** A {@link StreamBufferingEncoder} notifies http2 outbound frame event. */ + final class ListeningStreamBufferingEncoder + extends StreamBufferingEncoder implements ListeningEncoder { + + private Http2OutboundFrameListener listener = new Http2OutboundFrameListener(); + + public ListeningStreamBufferingEncoder(Http2ConnectionEncoder encoder) { + super(encoder); + } + + @Override + public void setListener(Http2OutboundFrameListener listener) { + this.listener = checkNotNull(listener, "listener"); + } + + @Override + public ChannelFuture writePing( + ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) { + listener.onPing(ack, data); + return super.writePing(ctx, ack, data, promise); + } + + @Override + public ChannelFuture writeWindowUpdate( + ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) { + listener.onWindowUpdate(streamId, windowSizeIncrement); + return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise); + } + + @Override + public ChannelFuture writeData( + ChannelHandlerContext ctx, + int streamId, + ByteBuf data, + int padding, + boolean eos, + ChannelPromise promise) { + listener.onData(streamId, data, padding, eos); + return super.writeData(ctx, streamId, data, padding, eos, promise); + } + } + + /** A {@link DefaultHttp2ConnectionEncoder} notifies http2 outbound frame event. */ + final class ListeningDefaultHttp2ConnectionEncoder + extends DefaultHttp2ConnectionEncoder implements ListeningEncoder { + + private Http2OutboundFrameListener listener = new Http2OutboundFrameListener(); + + public ListeningDefaultHttp2ConnectionEncoder( + Http2Connection connection, Http2FrameWriter frameWriter) { + super(connection, frameWriter); + } + + @Override + public void setListener(Http2OutboundFrameListener listener) { + this.listener = checkNotNull(listener, "listener"); + } + + @Override + public ChannelFuture writePing( + ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) { + listener.onPing(ack, data); + return super.writePing(ctx, ack, data, promise); + } + + @Override + public ChannelFuture writeWindowUpdate( + ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) { + listener.onWindowUpdate(streamId, windowSizeIncrement); + return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise); + } + @Override + public ChannelFuture writeData( + ChannelHandlerContext ctx, + int streamId, + ByteBuf data, + int padding, + boolean eos, + ChannelPromise promise) { + listener.onData(streamId, data, padding, eos); + return super.writeData(ctx, streamId, data, padding, eos, promise); + } + } } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index b7ef758730d..f02b885c2b0 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -38,6 +38,7 @@ import io.grpc.internal.KeepAliveManager; import io.grpc.internal.TransportTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder; +import io.grpc.netty.ListeningEncoder.ListeningStreamBufferingEncoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; @@ -57,6 +58,7 @@ import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2ConnectionAdapter; import io.netty.handler.codec.http2.Http2ConnectionDecoder; +import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2FlowController; @@ -195,8 +197,9 @@ static NettyClientHandler newHandler( frameReader = new Http2InboundFrameLogger(frameReader, frameLogger); frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger); - StreamBufferingEncoder encoder = new StreamBufferingEncoder( - new DefaultHttp2ConnectionEncoder(connection, frameWriter)); + StreamBufferingEncoder encoder = + new ListeningStreamBufferingEncoder( + new DefaultHttp2ConnectionEncoder(connection, frameWriter)); // Create the local flow controller configured to auto-refill the connection window. connection.local().flowController( @@ -239,7 +242,7 @@ public TransportTracer.FlowControlWindows read() { private NettyClientHandler( Http2ConnectionDecoder decoder, - StreamBufferingEncoder encoder, + Http2ConnectionEncoder encoder, Http2Settings settings, ClientTransportLifecycleManager lifecycleManager, KeepAliveManager keepAliveManager, @@ -374,8 +377,7 @@ private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream /** * Handler for an inbound HTTP/2 DATA frame. */ - private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) - throws Http2Exception { + private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) { flowControlPing().onDataRead(data.readableBytes(), padding); NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId)); PerfMark.event("NettyClientHandler.onDataRead", stream.tag()); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index b7c1240e24c..b40ee7bd453 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -45,6 +45,7 @@ import io.grpc.internal.StatsTraceContext; import io.grpc.internal.TransportTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder; +import io.grpc.netty.ListeningEncoder.ListeningDefaultHttp2ConnectionEncoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelFuture; @@ -54,7 +55,6 @@ import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter; import io.netty.handler.codec.http2.DefaultHttp2Connection; import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; -import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; import io.netty.handler.codec.http2.DefaultHttp2Headers; @@ -220,7 +220,8 @@ static NettyServerHandler newHandler( connection.local().flowController( new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true)); frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer); - Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter); + Http2ConnectionEncoder encoder = + new ListeningDefaultHttp2ConnectionEncoder(connection, frameWriter); encoder = new Http2ControlFrameLimitEncoder(encoder, 10000); Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader); From 52010010d13642787f40207872058de4d0d88fa0 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Mon, 27 Apr 2020 15:36:38 -0700 Subject: [PATCH 06/10] remove invalid comment --- netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java index 1d188ee4a86..b1b49abc634 100644 --- a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java +++ b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java @@ -134,7 +134,6 @@ void setAutoTuneFlowControl(boolean isOn) { */ final class FlowControlPinger { - // NOTE: Up to 16MB guarantees sending WINDOW_UPDATE for every BDP ping private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024; private final int maxAllowedPing; private int pingCount; From 048f4eac5d37ee84f89c87a831688bae487803e4 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Mon, 27 Apr 2020 17:36:43 -0700 Subject: [PATCH 07/10] fix javadoc, restore accidently removed paragraph in javadoc --- netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java | 6 +++++- netty/src/main/java/io/grpc/netty/NettyServerBuilder.java | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index fb1ef9fe3dc..31c5d64221d 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -179,6 +179,10 @@ public NettyChannelBuilder channelType(Class channelType) { * *

It's an optional parameter. If the user has not provided an Channel type or ChannelFactory * when the channel is built, the builder will use the default one which is static. + * + *

You must also provide corresponding {@link #eventLoopGroup(EventLoopGroup)}. For example, + * {@link NioSocketChannel} based {@link ChannelFactory} must use {@link + * io.netty.channel.nio.NioEventLoopGroup}, otherwise your application won't start. */ public NettyChannelBuilder channelFactory(ChannelFactory channelFactory) { this.channelFactory = checkNotNull(channelFactory, "channelFactory"); @@ -257,7 +261,7 @@ public NettyChannelBuilder initialFlowControlWindow(int initialFlowControlWindow /** * Sets the flow control window in bytes. Setting flowControlWindow disables auto flow control - * tuning, use {@link #initialFlowControlWindow(int)} to enable auto flow control tuning. If not + * tuning; use {@link #initialFlowControlWindow(int)} to enable auto flow control tuning. If not * called, the default value is {@link #DEFAULT_FLOW_CONTROL_WINDOW}). */ public NettyChannelBuilder flowControlWindow(int flowControlWindow) { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index fb1ed3422c9..73d3e964364 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -360,7 +360,7 @@ public NettyServerBuilder initialFlowControlWindow(int initialFlowControlWindow) /** * Sets the flow control window in bytes. Setting flowControlWindow disables auto flow control - * tuning, use {@link #initialFlowControlWindow(int)} to enable auto flow control tuning. If not + * tuning; use {@link #initialFlowControlWindow(int)} to enable auto flow control tuning. If not * called, the default value is {@link #DEFAULT_FLOW_CONTROL_WINDOW}). */ public NettyServerBuilder flowControlWindow(int flowControlWindow) { From a73585c39ab7da11e571bc6aec233bd7535667f7 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Mon, 27 Apr 2020 18:22:49 -0700 Subject: [PATCH 08/10] retire {Internal,Netty}HandlerSettings --- .../integration/NettyFlowControlTest.java | 108 +++++++++++++----- .../grpc/netty/InternalHandlerSettings.java | 36 ------ .../io/grpc/netty/NettyClientTransport.java | 2 - .../io/grpc/netty/NettyHandlerSettings.java | 79 ------------- .../io/grpc/netty/NettyServerTransport.java | 6 +- 5 files changed, 79 insertions(+), 152 deletions(-) delete mode 100644 netty/src/main/java/io/grpc/netty/InternalHandlerSettings.java delete mode 100644 netty/src/main/java/io/grpc/netty/NettyHandlerSettings.java diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java index 0a306e9f0dc..c8296e0bbec 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java @@ -24,7 +24,11 @@ import io.grpc.ServerBuilder; import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptors; -import io.grpc.netty.InternalHandlerSettings; +import io.grpc.netty.GrpcHttp2ConnectionHandler; +import io.grpc.netty.InternalNettyChannelBuilder; +import io.grpc.netty.InternalNettyChannelBuilder.ProtocolNegotiatorFactory; +import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator; +import io.grpc.netty.InternalProtocolNegotiators; import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; @@ -32,6 +36,9 @@ import io.grpc.testing.integration.Messages.ResponseParameters; import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; +import io.netty.channel.ChannelHandler; +import io.netty.handler.codec.http2.Http2Stream; +import io.netty.util.AsciiString; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; import java.net.InetSocketAddress; @@ -40,6 +47,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -63,9 +71,11 @@ public class NettyFlowControlTest { private static final int REGULAR_WINDOW = 64 * 1024; private static final int MAX_WINDOW = 8 * 1024 * 1024; - private static ManagedChannel channel; - private static Server server; - private static TrafficControlProxy proxy; + private final CapturingProtocolNegotiationFactory capturingPnFactory + = new CapturingProtocolNegotiationFactory(); + private ManagedChannel channel; + private Server server; + private TrafficControlProxy proxy; private int proxyPort; private int serverPort; @@ -88,8 +98,13 @@ public void initTest() { @After public void endTest() throws IOException { - proxy.shutDown(); - server.shutdown(); + if (proxy != null) { + proxy.shutDown(); + } + server.shutdownNow(); + if (channel != null) { + channel.shutdownNow(); + } } @Test @@ -97,7 +112,7 @@ public void largeBdp() throws InterruptedException, IOException { proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS); proxy.start(); proxyPort = proxy.getPort(); - resetConnection(REGULAR_WINDOW); + createAndStartChannel(REGULAR_WINDOW); doTest(HIGH_BAND, MED_LAT); } @@ -106,7 +121,7 @@ public void smallBdp() throws InterruptedException, IOException { proxy = new TrafficControlProxy(serverPort, LOW_BAND, MED_LAT, TimeUnit.MILLISECONDS); proxy.start(); proxyPort = proxy.getPort(); - resetConnection(REGULAR_WINDOW); + createAndStartChannel(REGULAR_WINDOW); doTest(LOW_BAND, MED_LAT); } @@ -116,7 +131,7 @@ public void verySmallWindowMakesProgress() throws InterruptedException, IOExcept proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS); proxy.start(); proxyPort = proxy.getPort(); - resetConnection(TINY_WINDOW); + createAndStartChannel(TINY_WINDOW); doTest(HIGH_BAND, MED_LAT); } @@ -138,9 +153,10 @@ private void doTest(int bandwidth, int latency) throws InterruptedException { .addResponseParameters(ResponseParameters.newBuilder().setSize(streamSize / 2)); StreamingOutputCallRequest request = builder.build(); - TestStreamObserver observer = new TestStreamObserver(expectedWindow); + TestStreamObserver observer = + new TestStreamObserver(capturingPnFactory.grpcHandlerRef, expectedWindow); stub.streamingOutputCall(request, observer); - int lastWindow = observer.waitFor(); + int lastWindow = observer.waitFor(5, TimeUnit.SECONDS); // deal with cases that either don't cause a window update or hit max window expectedWindow = Math.min(MAX_WINDOW, Math.max(expectedWindow, REGULAR_WINDOW)); @@ -156,18 +172,14 @@ private void doTest(int bandwidth, int latency) throws InterruptedException { /** * Resets client/server and their flow control windows. */ - private void resetConnection(int clientFlowControlWindow) - throws InterruptedException { - if (channel != null) { - if (!channel.isShutdown()) { - channel.shutdown(); - channel.awaitTermination(100, TimeUnit.MILLISECONDS); - } - } - channel = NettyChannelBuilder.forAddress(new InetSocketAddress("localhost", proxyPort)) - .initialFlowControlWindow(clientFlowControlWindow) - .negotiationType(NegotiationType.PLAINTEXT) - .build(); + private void createAndStartChannel(int clientFlowControlWindow) { + NettyChannelBuilder channelBuilder = + NettyChannelBuilder + .forAddress(new InetSocketAddress("localhost", proxyPort)) + .initialFlowControlWindow(clientFlowControlWindow) + .negotiationType(NegotiationType.PLAINTEXT); + InternalNettyChannelBuilder.setProtocolNegotiatorFactory(channelBuilder, capturingPnFactory); + channel = channelBuilder.build(); } private void startServer(int serverFlowControlWindow) { @@ -190,20 +202,25 @@ private void startServer(int serverFlowControlWindow) { */ private static class TestStreamObserver implements StreamObserver { - long startRequestNanos; + final AtomicReference grpcHandlerRef; + final long startRequestNanos; long endRequestNanos; - private final CountDownLatch latch = new CountDownLatch(1); - long expectedWindow; + final CountDownLatch latch = new CountDownLatch(1); + final long expectedWindow; int lastWindow; - public TestStreamObserver(long window) { + public TestStreamObserver( + AtomicReference grpcHandlerRef, long window) { + this.grpcHandlerRef = grpcHandlerRef; startRequestNanos = System.nanoTime(); expectedWindow = window; } @Override public void onNext(StreamingOutputCallResponse value) { - lastWindow = InternalHandlerSettings.getLatestClientWindow(); + GrpcHttp2ConnectionHandler grpcHandler = grpcHandlerRef.get(); + Http2Stream connectionStream = grpcHandler.connection().connectionStream(); + lastWindow = grpcHandler.decoder().flowController().initialWindowSize(connectionStream); if (lastWindow >= expectedWindow) { onCompleted(); } @@ -224,9 +241,40 @@ public long getElapsedTime() { return endRequestNanos - startRequestNanos; } - public int waitFor() throws InterruptedException { - latch.await(5, TimeUnit.SECONDS); + public int waitFor(long duration, TimeUnit unit) throws InterruptedException { + latch.await(duration, unit); return lastWindow; } } + + private static class CapturingProtocolNegotiationFactory implements ProtocolNegotiatorFactory { + + AtomicReference grpcHandlerRef = new AtomicReference<>(); + + @Override + public ProtocolNegotiator buildProtocolNegotiator() { + return new CapturingProtocolNegotiator(); + } + + private class CapturingProtocolNegotiator implements ProtocolNegotiator { + + final ProtocolNegotiator delegate = InternalProtocolNegotiators.plaintext(); + + @Override + public AsciiString scheme() { + return delegate.scheme(); + } + + @Override + public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) { + CapturingProtocolNegotiationFactory.this.grpcHandlerRef.set(grpcHandler); + return delegate.newHandler(grpcHandler); + } + + @Override + public void close() { + delegate.close();; + } + } + } } diff --git a/netty/src/main/java/io/grpc/netty/InternalHandlerSettings.java b/netty/src/main/java/io/grpc/netty/InternalHandlerSettings.java deleted file mode 100644 index edc963d617c..00000000000 --- a/netty/src/main/java/io/grpc/netty/InternalHandlerSettings.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2016 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.netty; - -import com.google.common.annotations.VisibleForTesting; -import io.grpc.Internal; - -/** - * Controlled accessor to {@link NettyHandlerSettings}. - */ -@VisibleForTesting // Visible for tests in other packages. -@Internal -public final class InternalHandlerSettings { - - public static synchronized int getLatestClientWindow() { - return NettyHandlerSettings.getLatestClientWindow(); - } - - public static synchronized int getLatestServerWindow() { - return NettyHandlerSettings.getLatestServerWindow(); - } -} diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 0c495cbcc60..bd00da98384 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -225,7 +225,6 @@ public Runnable start(Listener transportListener) { transportTracer, eagAttributes, authorityString); - NettyHandlerSettings.setAutoWindow(handler); ChannelHandler negotiationHandler = negotiator.newHandler(handler); @@ -310,7 +309,6 @@ public void operationComplete(ChannelFuture future) throws Exception { if (keepAliveManager != null) { keepAliveManager.onTransportStarted(); } - channel.closeFuture().addListener(NettyHandlerSettings.cleanUpTask()); return null; } diff --git a/netty/src/main/java/io/grpc/netty/NettyHandlerSettings.java b/netty/src/main/java/io/grpc/netty/NettyHandlerSettings.java deleted file mode 100644 index 0c5d2a834ba..00000000000 --- a/netty/src/main/java/io/grpc/netty/NettyHandlerSettings.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2016 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.netty; - -import com.google.common.base.Preconditions; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import javax.annotation.Nullable; - -/** - * Allows autoFlowControl to be turned on and off from interop testing and flow control windows to - * be accessed. - */ -final class NettyHandlerSettings { - - // These will be the most recently created handlers created using NettyClientTransport and - // NettyServerTransport - @Nullable - private static AbstractNettyHandler clientHandler; - @Nullable - private static AbstractNettyHandler serverHandler; - - static void setAutoWindow(AbstractNettyHandler handler) { - if (!handler.isAutoTuneFlowControlOn()) { - return; - } - synchronized (NettyHandlerSettings.class) { - if (handler instanceof NettyClientHandler) { - clientHandler = handler; - } else if (handler instanceof NettyServerHandler) { - serverHandler = handler; - } else { - throw new RuntimeException("Expecting NettyClientHandler or NettyServerHandler"); - } - } - } - - public static synchronized int getLatestClientWindow() { - return getLatestWindow(clientHandler); - } - - public static synchronized int getLatestServerWindow() { - return getLatestWindow(serverHandler); - } - - private static synchronized void clearHandlers() { - clientHandler = null; - serverHandler = null; - } - - private static synchronized int getLatestWindow(AbstractNettyHandler handler) { - Preconditions.checkNotNull(handler); - return handler.decoder().flowController() - .initialWindowSize(handler.connection().connectionStream()); - } - - static ChannelFutureListener cleanUpTask() { - return new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - NettyHandlerSettings.clearHandlers(); - } - }; - } -} diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index f7bf3983a24..3638dd5b0d6 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -124,7 +124,6 @@ public void start(ServerTransportListener listener) { // Create the Netty handler for the pipeline. grpcHandler = createHandler(listener, channelUnused); - NettyHandlerSettings.setAutoWindow(grpcHandler); // Notify when the channel closes. final class TerminationNotifier implements ChannelFutureListener { @@ -144,10 +143,7 @@ public void operationComplete(ChannelFuture future) throws Exception { ChannelFutureListener terminationNotifier = new TerminationNotifier(); channelUnused.addListener(terminationNotifier); - channel - .closeFuture() - .addListener(terminationNotifier) - .addListener(NettyHandlerSettings.cleanUpTask()); + channel.closeFuture().addListener(terminationNotifier); channel.pipeline().addLast(bufferingHandler); } From bce0e9db5d5d3710f1940c2f5ebdec1815e12bf8 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Mon, 27 Apr 2020 18:56:25 -0700 Subject: [PATCH 09/10] remove unused method --- netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java index b1b49abc634..6fcd03fdc42 100644 --- a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java +++ b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java @@ -115,10 +115,6 @@ private void sendInitialConnectionWindow() throws Http2Exception { } } - boolean isAutoTuneFlowControlOn() { - return autoTuneFlowControlOn; - } - @VisibleForTesting FlowControlPinger flowControlPing() { return flowControlPing; From 7ab9d9de2f381e439cb07afc0e2e5c75f4f576a7 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Fri, 1 May 2020 15:13:36 -0700 Subject: [PATCH 10/10] remove extra semicolon --- .../java/io/grpc/testing/integration/NettyFlowControlTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java index c8296e0bbec..4d7c8bdbca7 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java @@ -273,7 +273,7 @@ public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) { @Override public void close() { - delegate.close();; + delegate.close(); } } }