From 5d4b79b7e33274b334516ab1a3689a8cff826cce Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Mon, 5 Dec 2022 16:37:32 -0600 Subject: [PATCH 1/2] Improve Jetty/gRPC shutdown (#3123) This includes a change to the multiplexed websocket format to indicate to the client that the websocket itself will soon stop, so no more streams should be started on it, and existing calls should be finished. This reflects the same basic semantics that http2 uses for GO_AWAY. An earlier change to how the HealthCheck service is stopped is reverted here, by ensuring that gRPC doesn't reuse an existing executor, so that it remains running throughout shutdown. Autocomplete streams are correctly ended by the server during the shutdown process. Fixes #2985 --- .../MultiplexedWebSocketServerStream.java | 83 ++++++++++++++ .../python/server/EmbeddedServer.java | 1 + .../server/jetty/JettyBackedGrpcServer.java | 72 ++++++++++-- .../server/jetty/JettyServerComponent.java | 1 + .../server/jetty/JettyServerModule.java | 21 ++++ .../server/jetty/WebsocketFactory.java | 70 ++++++++++++ .../server/netty/NettyServerComponent.java | 1 + .../console/ConsoleServiceGrpcImpl.java | 11 +- .../completer/JavaAutoCompleteObserver.java | 10 +- .../completer/PythonAutoCompleteObserver.java | 9 +- .../server/healthcheck/HealthCheckModule.java | 7 ++ .../server/runner/DeephavenApiServer.java | 25 ++--- .../runner/DeephavenApiServerModule.java | 6 +- .../deephaven/server/runner/GrpcServer.java | 25 ++++- .../web/client/api/WorkerConnection.java | 2 +- .../grpc/MultiplexedWebsocketTransport.java | 105 ++++++++++++++---- 16 files changed, 379 insertions(+), 70 deletions(-) create mode 100644 server/jetty/src/main/java/io/deephaven/server/jetty/WebsocketFactory.java diff --git a/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/MultiplexedWebSocketServerStream.java b/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/MultiplexedWebSocketServerStream.java index 9e75d9560ab..489937eca88 100644 --- a/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/MultiplexedWebSocketServerStream.java +++ b/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/MultiplexedWebSocketServerStream.java @@ -12,6 +12,7 @@ import io.grpc.internal.ServerTransportListener; import io.grpc.internal.StatsTraceContext; import jakarta.websocket.CloseReason; +import jakarta.websocket.EndpointConfig; import jakarta.websocket.Session; import java.io.IOException; @@ -20,6 +21,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -41,6 +44,16 @@ * runOnTransportThread while in onMessage, as we're already in the transport thread. */ public class MultiplexedWebSocketServerStream extends AbstractWebSocketServerStream { + public static final String GRACEFUL_CLOSE = MultiplexedWebSocketServerStream.class.getName() + ".graceful_close"; + + /** + * Callback to initiate a graceful shutdown of this websocket instance, as an alternative to just closing the + * websocket. Since this websocket behaves like gRPC transport, we give the client a chance to finish up and close + * itself before the server does it. + */ + public interface GracefulClose extends Supplier> { + } + private static final Logger logger = Logger.getLogger(MultiplexedWebSocketServerStream.class.getName()); /** Custom metadata to hold the path requested by the incoming stream */ public static final Metadata.Key PATH = @@ -54,12 +67,55 @@ public class MultiplexedWebSocketServerStream extends AbstractWebSocketServerStr private final Map streams = new HashMap<>(); private final boolean isTextRequest = false;// not supported yet + /** + * Enum to describe the process of closing a transport. After the server has begun to close but the client hasn't + * yet acknowledged, it is permitted for the client to start a new stream, but after the server acknowledges, no new + * streams can be started. Note that shutdown will proceed anyway, and will eventually stop all streams. + */ + enum ClosedState { + OPEN, CLOSING, CLOSED + } + + private ClosedState closed = ClosedState.OPEN; + private final CompletableFuture closingFuture = new CompletableFuture<>(); + public MultiplexedWebSocketServerStream(ServerTransportListener transportListener, List streamTracerFactories, int maxInboundMessageSize, Attributes attributes) { super(transportListener, streamTracerFactories, maxInboundMessageSize, attributes); } + /** + * Stops this multiplexed transport from accepting new streams. Instead, it will reply with its version of GO_AWAY, + * a stream of Integer.MAX_INTEGER to the client to signal that new requests will not be accepted, and future + * incoming streams will be closed by the server right away. In keeping with h2, until the client ACKs the close, we + * will permit incoming streams that were sent before we closed, but they likely will not have a large window to get + * their work done before they are closed the rest of the way. + */ + private CompletableFuture stopAcceptingNewStreams() { + if (closed != ClosedState.OPEN) { + return closingFuture; + } + closed = ClosedState.CLOSING; + ByteBuffer end = ByteBuffer.allocate(4); + end.putInt(0, Integer.MAX_VALUE); + // ignore the results of this future, the closingFuture will instead tell us when the client ACKs + websocketSession.getAsyncRemote().sendBinary(end); + return closingFuture; + } + + @Override + public void onOpen(Session websocketSession, EndpointConfig config) { + super.onOpen(websocketSession, config); + websocketSession.getUserProperties().put(GRACEFUL_CLOSE, (GracefulClose) this::stopAcceptingNewStreams); + } + + @Override + public void onClose(Session session, CloseReason closeReason) { + // regardless of state, indicate that we are already closed and no need to wait + closingFuture.complete(null); + } + @Override public void onMessage(String message) { for (MultiplexedWebsocketStreamImpl stream : streams.values()) { @@ -89,6 +145,23 @@ public void onMessage(ByteBuffer message) throws IOException { } else { closed = false; } + + if (closed && streamId == Integer.MAX_VALUE) { + if (this.closed != ClosedState.CLOSING) { + // error, client tried to finish a close we didn't initiate, hang up + websocketSession.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "Unexpected close ACK")); + return; + } + // Client has ack'd our close, no more new streams allowed, client will be finishing up so we can exit + this.closed = ClosedState.CLOSED; + + // Mark the future as finished + closingFuture.complete(null); + + // (note that technically there is a 5th byte to indicate close, but we're ignoring that here) + return; + } + // may be null if this is the first request for this streamId final MultiplexedWebsocketStreamImpl stream = streams.get(streamId); @@ -104,6 +177,16 @@ public void onMessage(ByteBuffer message) throws IOException { // if this is the first message on this websocket, it is the request headers if (stream == null) { + if (this.closed == ClosedState.CLOSED) { + // Not accepting new streams on existing websockets, and the client knew that when they sent this (since + // the GO_AWAY was ACK'd). We treat this as an error, since the client isn't behaving. If instead closed + // was still CLOSING, then, client sent this before they saw that, we permit them to still open streams, + // though the application likely has begun to clean up state. + websocketSession.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, + "Stream created after closing initiated")); + + return; + } processHeaders(message, streamId); return; } diff --git a/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedServer.java b/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedServer.java index 1b4c69ac82a..5dd96ed7547 100644 --- a/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedServer.java +++ b/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedServer.java @@ -48,6 +48,7 @@ public class EmbeddedServer { HealthCheckModule.class, PythonPluginsRegistration.Module.class, JettyServerModule.class, + HealthCheckModule.class, PythonConsoleSessionModule.class, GroovyConsoleSessionModule.class, SessionToExecutionStateModule.class, diff --git a/server/jetty/src/main/java/io/deephaven/server/jetty/JettyBackedGrpcServer.java b/server/jetty/src/main/java/io/deephaven/server/jetty/JettyBackedGrpcServer.java index 8ba0eb5abc0..d07e6ece433 100644 --- a/server/jetty/src/main/java/io/deephaven/server/jetty/JettyBackedGrpcServer.java +++ b/server/jetty/src/main/java/io/deephaven/server/jetty/JettyBackedGrpcServer.java @@ -33,19 +33,25 @@ import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.ErrorPageErrorHandler; import org.eclipse.jetty.servlet.FilterHolder; +import org.eclipse.jetty.util.MultiException; +import org.eclipse.jetty.util.component.Graceful; import org.eclipse.jetty.util.resource.Resource; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.webapp.WebAppContext; +import org.eclipse.jetty.websocket.jakarta.common.SessionTracker; import org.eclipse.jetty.websocket.jakarta.server.config.JakartaWebSocketServletContainerInitializer; +import org.eclipse.jetty.websocket.jakarta.server.internal.JakartaWebSocketServerContainer; import javax.inject.Inject; import java.io.IOException; import java.io.UncheckedIOException; import java.net.URL; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -56,6 +62,7 @@ public class JettyBackedGrpcServer implements GrpcServer { private final Server jetty; + private final boolean websocketsEnabled; @Inject public JettyBackedGrpcServer( @@ -109,19 +116,24 @@ public JettyBackedGrpcServer( endpoints.put(GRPC_WEBSOCKETS_MULTIPLEX_PROTOCOL, () -> filter.create(MultiplexedWebSocketServerStream::new)); } + JakartaWebSocketServerContainer jettyWebsocketContainer = (JakartaWebSocketServerContainer) container; + WebsocketFactory websocketFactory = + new WebsocketFactory(() -> new GrpcWebsocket(endpoints), jettyWebsocketContainer); + jettyWebsocketContainer.addBean(websocketFactory); container.addEndpoint(ServerEndpointConfig.Builder.create(GrpcWebsocket.class, "/{service}/{method}") .configurator(new ServerEndpointConfig.Configurator() { @Override - public T getEndpointInstance(Class endpointClass) throws InstantiationException { + public T getEndpointInstance(Class endpointClass) { // noinspection unchecked - return (T) new GrpcWebsocket(endpoints); + return (T) websocketFactory.create(); } }) .subprotocols(new ArrayList<>(endpoints.keySet())) - .build() - - ); + .build()); }); + this.websocketsEnabled = true; + } else { + this.websocketsEnabled = false; } // Note: handler order matters due to pathSpec order @@ -149,15 +161,57 @@ public void join() throws InterruptedException { jetty.join(); } + @Override + public void beginShutdown() { + // "start to stop" the jetty container, skipping over websockets, since their Graceful implementation isn't + // very nice. This is roughly the implementation of Graceful.shutdown(Component), except avoiding anything that + // would directly stop a websocket, which instead will be handled later, as part of the actual stop() call tell + // the graceful handlers that we are shutting down. + + // For websockets, since the SessionTracker will instantly stop the socket rather than allow it to finish + // nicely. Instead, when websockets were created, we registered extra graceful beans to shutdown like h2. + // See Server.doStop(), this is roughly the implementation of the first phase of that method, only asking + // Graceful instances to stop, but not stopping connectors or non-graceful components. + + // Note that this would not apply correctly if we used WebSockets for some purpose other than gRPC transport. + Collection gracefuls = jetty.getContainedBeans(Graceful.class); + gracefuls.stream().filter(g -> !(g instanceof SessionTracker)).forEach(Graceful::shutdown); + } + @Override public void stopWithTimeout(long timeout, TimeUnit unit) { - jetty.setStopTimeout(unit.toMillis(timeout)); Thread shutdownThread = new Thread(() -> { + MultiException exceptions = new MultiException(); + long millis = unit.toMillis(timeout); + + // If websockets are enabled, try to spend part of our shutdown timeout budget on waiting for websockets, as + // in beginShutdown. + if (websocketsEnabled && millis > 250) { + // shut down everything except the websockets themselves with half our timeout + millis /= 2; + + // Collect the same beans we gracefully stopped before (or, if we didn't already start a graceful + // shutdown, this is the first attempt) + Collection gracefuls = jetty.getContainedBeans(Graceful.class); + try { + CompletableFuture.allOf(gracefuls.stream().filter(g -> !(g instanceof SessionTracker)) + .map(Graceful::shutdown).toArray(CompletableFuture[]::new)) + .get(millis, TimeUnit.MILLISECONDS); + } catch (Exception e) { + exceptions.add(e); + } + } + + // regardless of failures so far, continue shutdown with remaining budget. This will end all websockets + // right away. try { + jetty.setStopTimeout(millis); jetty.stop(); + exceptions.ifExceptionThrow(); } catch (Exception exception) { - throw new IllegalStateException("Failure while stopping", exception); + exceptions.add(exception); } + exceptions.ifExceptionThrowRuntime(); }); shutdownThread.start(); } @@ -205,6 +259,10 @@ private static ServerConnector createConnector(Server server, JettyConfig config } config.host().ifPresent(serverConnector::setHost); serverConnector.setPort(config.port()); + + // Give connections extra time to shutdown, since we have an explicit server shutdown + serverConnector.setShutdownIdleTimeout(serverConnector.getIdleTimeout()); + return serverConnector; } } diff --git a/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerComponent.java b/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerComponent.java index e5dc662be55..22be0c4c950 100644 --- a/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerComponent.java +++ b/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerComponent.java @@ -27,6 +27,7 @@ HealthCheckModule.class, PythonPluginsRegistration.Module.class, JettyServerModule.class, + HealthCheckModule.class, PythonConsoleSessionModule.class, GroovyConsoleSessionModule.class, SessionToExecutionStateModule.class, diff --git a/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerModule.java b/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerModule.java index 2475fc38562..5e1403d1484 100644 --- a/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerModule.java +++ b/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerModule.java @@ -15,6 +15,11 @@ import javax.inject.Named; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static io.grpc.internal.GrpcUtil.getThreadFactory; @Module public interface JettyServerModule { @@ -34,6 +39,22 @@ static ServletAdapter provideGrpcServletAdapter( services.forEach(serverBuilder::addService); interceptors.forEach(serverBuilder::intercept); + // create a custom executor service, just like grpc would use, so that grpc doesn't shut it down ahead + // of when we are ready + // We don't use newSingleThreadScheduledExecutor because it doesn't return a + // ScheduledThreadPoolExecutor. + ScheduledThreadPoolExecutor service = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool( + 1, getThreadFactory("grpc-timer-%d", true)); + + // If there are long timeouts that are cancelled, they will not actually be removed from + // the executors queue. This forces immediate removal upon cancellation to avoid a + // memory leak. + service.setRemoveOnCancelPolicy(true); + + ScheduledExecutorService executorService = Executors.unconfigurableScheduledExecutorService(service); + + serverBuilder.scheduledExecutorService(executorService); + serverBuilder.maxInboundMessageSize(maxMessageSize); serverBuilder.directExecutor(); diff --git a/server/jetty/src/main/java/io/deephaven/server/jetty/WebsocketFactory.java b/server/jetty/src/main/java/io/deephaven/server/jetty/WebsocketFactory.java new file mode 100644 index 00000000000..bfe8b021189 --- /dev/null +++ b/server/jetty/src/main/java/io/deephaven/server/jetty/WebsocketFactory.java @@ -0,0 +1,70 @@ +package io.deephaven.server.jetty; + +import io.grpc.servlet.web.websocket.GrpcWebsocket; +import io.grpc.servlet.web.websocket.MultiplexedWebSocketServerStream; +import jakarta.websocket.Endpoint; +import org.eclipse.jetty.util.component.Graceful; +import org.eclipse.jetty.websocket.jakarta.server.internal.JakartaWebSocketServerContainer; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import static io.grpc.servlet.web.websocket.MultiplexedWebSocketServerStream.GRACEFUL_CLOSE; + +/** + * Helper class to bridge the gap between Jetty's Graceful interface and the jakarta implementation of the supported + * grpc websocket transports. + *

+ *

+ * This type is not an Endpoint, so only one instance of this can create and shutdown many endpoints. + */ +public class WebsocketFactory implements Graceful { + private final AtomicReference> shutdown = new AtomicReference<>(); + + private final Supplier factory; + private final JakartaWebSocketServerContainer jettyWebsocketContainer; + + public WebsocketFactory(Supplier factory, JakartaWebSocketServerContainer jettyWebsocketContainer) { + this.factory = factory; + this.jettyWebsocketContainer = jettyWebsocketContainer; + } + + public Endpoint create() { + return factory.get(); + } + + @Override + public CompletableFuture shutdown() { + // Modeled after AbstractHTTP2ServerConnectionFactory.HTTP2SessionContainer.shutdown() + CompletableFuture result = new CompletableFuture<>(); + // Simply by setting the shutdown, we don't allow new endpoint instances to be created + if (shutdown.compareAndSet(null, result)) { + // iterate created transports, and if we can, prevent new streams + CompletableFuture.allOf( + jettyWebsocketContainer.getOpenSessions().stream() + .map(s -> (MultiplexedWebSocketServerStream.GracefulClose) s.getUserProperties() + .get(GRACEFUL_CLOSE)) + .map(MultiplexedWebSocketServerStream.GracefulClose::get) + .filter(Objects::nonNull) + .toArray(CompletableFuture[]::new)) + .whenComplete((success, throwable) -> { + if (throwable == null) { + // When all clients have acknowledged, complete + result.complete(success); + } else { + result.completeExceptionally(throwable); + } + }); + return result; + } else { + return shutdown.get(); + } + } + + @Override + public boolean isShutdown() { + return shutdown.get() != null; + } +} diff --git a/server/netty/src/main/java/io/deephaven/server/netty/NettyServerComponent.java b/server/netty/src/main/java/io/deephaven/server/netty/NettyServerComponent.java index d50256c9519..fe3cefcf325 100644 --- a/server/netty/src/main/java/io/deephaven/server/netty/NettyServerComponent.java +++ b/server/netty/src/main/java/io/deephaven/server/netty/NettyServerComponent.java @@ -27,6 +27,7 @@ HealthCheckModule.class, PythonPluginsRegistration.Module.class, NettyServerModule.class, + HealthCheckModule.class, PythonConsoleSessionModule.class, GroovyConsoleSessionModule.class, SessionToExecutionStateModule.class, diff --git a/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java index aefb85a60cd..c767e0798b7 100644 --- a/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java @@ -276,18 +276,17 @@ public StreamObserver autoCompleteStream( log.info().append(canJedi ? "Using jedi for python autocomplete" : "No jedi dependency available in python environment; disabling autocomplete.").endl(); return canJedi ? new PythonAutoCompleteObserver(responseObserver, scriptSessionProvider, session) - : new NoopAutoCompleteObserver(responseObserver); + : new NoopAutoCompleteObserver(session, responseObserver); } return new JavaAutoCompleteObserver(session, responseObserver); }); } - private static class NoopAutoCompleteObserver implements StreamObserver { - private final StreamObserver responseObserver; - - public NoopAutoCompleteObserver(StreamObserver responseObserver) { - this.responseObserver = responseObserver; + private static class NoopAutoCompleteObserver extends SessionCloseableObserver + implements StreamObserver { + public NoopAutoCompleteObserver(SessionState session, StreamObserver responseObserver) { + super(session, responseObserver); } @Override diff --git a/server/src/main/java/io/deephaven/server/console/completer/JavaAutoCompleteObserver.java b/server/src/main/java/io/deephaven/server/console/completer/JavaAutoCompleteObserver.java index 5513a1c2692..831f583e2f9 100644 --- a/server/src/main/java/io/deephaven/server/console/completer/JavaAutoCompleteObserver.java +++ b/server/src/main/java/io/deephaven/server/console/completer/JavaAutoCompleteObserver.java @@ -14,6 +14,7 @@ import io.deephaven.lang.shared.lsp.CompletionCancelled; import io.deephaven.proto.backplane.script.grpc.*; import io.deephaven.server.console.ConsoleServiceGrpcImpl; +import io.deephaven.server.session.SessionCloseableObserver; import io.deephaven.server.session.SessionState; import io.deephaven.util.SafeCloseable; import io.grpc.stub.StreamObserver; @@ -29,12 +30,11 @@ * Autocomplete handling for JVM languages, that directly can interact with Java instances without any name mangling, * and are able to use our flexible parser. */ -public class JavaAutoCompleteObserver implements StreamObserver { +public class JavaAutoCompleteObserver extends SessionCloseableObserver + implements StreamObserver { private static final Logger log = LoggerFactory.getLogger(JavaAutoCompleteObserver.class); private final CompletionParser parser; - private final SessionState session; - private final StreamObserver responseObserver; private final Map parsers = new ConcurrentHashMap<>(); @@ -49,10 +49,8 @@ private CompletionParser ensureParserForSession(SessionState session) { }); } - public JavaAutoCompleteObserver(SessionState session, StreamObserver responseObserver) { - this.session = session; - this.responseObserver = responseObserver; + super(session, responseObserver); parser = ensureParserForSession(session); } diff --git a/server/src/main/java/io/deephaven/server/console/completer/PythonAutoCompleteObserver.java b/server/src/main/java/io/deephaven/server/console/completer/PythonAutoCompleteObserver.java index e64d1295a3c..e779418ab37 100644 --- a/server/src/main/java/io/deephaven/server/console/completer/PythonAutoCompleteObserver.java +++ b/server/src/main/java/io/deephaven/server/console/completer/PythonAutoCompleteObserver.java @@ -9,6 +9,7 @@ import io.deephaven.lang.parse.CompletionParser; import io.deephaven.proto.backplane.script.grpc.*; import io.deephaven.server.console.ConsoleServiceGrpcImpl; +import io.deephaven.server.session.SessionCloseableObserver; import io.deephaven.server.session.SessionState; import io.deephaven.util.SafeCloseable; import io.grpc.stub.StreamObserver; @@ -23,7 +24,8 @@ /** * Autocomplete handling for python that will use the jedi library, if it is installed. */ -public class PythonAutoCompleteObserver implements StreamObserver { +public class PythonAutoCompleteObserver extends SessionCloseableObserver + implements StreamObserver { private static final Logger log = LoggerFactory.getLogger(PythonAutoCompleteObserver.class); @@ -32,14 +34,11 @@ public class PythonAutoCompleteObserver implements StreamObserver scriptSession; - private final SessionState session; - private final StreamObserver responseObserver; public PythonAutoCompleteObserver(StreamObserver responseObserver, Provider scriptSession, final SessionState session) { + super(session, responseObserver); this.scriptSession = scriptSession; - this.session = session; - this.responseObserver = responseObserver; } @Override diff --git a/server/src/main/java/io/deephaven/server/healthcheck/HealthCheckModule.java b/server/src/main/java/io/deephaven/server/healthcheck/HealthCheckModule.java index 9029213d8c9..e551c4b3f4f 100644 --- a/server/src/main/java/io/deephaven/server/healthcheck/HealthCheckModule.java +++ b/server/src/main/java/io/deephaven/server/healthcheck/HealthCheckModule.java @@ -9,6 +9,7 @@ import io.deephaven.util.process.ProcessEnvironment; import io.deephaven.util.process.ShutdownManager; import io.grpc.BindableService; +import io.grpc.health.v1.HealthCheckResponse; import io.grpc.protobuf.services.HealthStatusManager; import javax.inject.Singleton; @@ -19,6 +20,12 @@ public class HealthCheckModule { @Singleton public HealthStatusManager bindHealthStatusManager() { HealthStatusManager healthStatusManager = new HealthStatusManager(); + + // As we start to shut down, first notify all watchers of the health service + ProcessEnvironment.getGlobalShutdownManager().registerTask( + ShutdownManager.OrderingCategory.FIRST, + healthStatusManager::enterTerminalState); + return healthStatusManager; } diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java index 43cdbc4d10d..d6824c38d15 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java @@ -25,8 +25,6 @@ import io.deephaven.util.annotations.VisibleForTesting; import io.deephaven.util.process.ProcessEnvironment; import io.deephaven.util.process.ShutdownManager; -import io.grpc.health.v1.HealthCheckResponse; -import io.grpc.protobuf.services.HealthStatusManager; import javax.inject.Inject; import javax.inject.Provider; @@ -53,7 +51,6 @@ public class DeephavenApiServer { private final Map authenticationHandlers; private final Provider executionContextProvider; private final ServerConfig serverConfig; - private final HealthStatusManager healthStatusManager; @Inject public DeephavenApiServer( @@ -67,8 +64,7 @@ public DeephavenApiServer( final SessionService sessionService, final Map authenticationHandlers, final Provider executionContextProvider, - final ServerConfig serverConfig, - final HealthStatusManager healthStatusManager) { + final ServerConfig serverConfig) { this.server = server; this.ugp = ugp; this.logInit = logInit; @@ -80,7 +76,6 @@ public DeephavenApiServer( this.authenticationHandlers = authenticationHandlers; this.executionContextProvider = executionContextProvider; this.serverConfig = serverConfig; - this.healthStatusManager = healthStatusManager; } @VisibleForTesting @@ -103,22 +98,19 @@ SessionService sessionService() { * @throws ClassNotFoundException thrown if a class can't be found while finding and running an application. */ public DeephavenApiServer run() throws IOException, ClassNotFoundException, TimeoutException { - // Stop accepting new gRPC requests. + + // Prevent new gRPC calls from being started ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.FIRST, - () -> { - // healthStatusManager.enterTerminalState() must be called before server.stopWithTimeout(). - // If we add multiple `OrderingCategory.FIRST` callbacks, they'll execute in the wrong order. - healthStatusManager.enterTerminalState(); - server.stopWithTimeout(10, TimeUnit.SECONDS); - }); - - // Close outstanding sessions to give any gRPCs closure. + server::beginShutdown); + + // Now that no new gRPC calls may be made, close outstanding sessions to give any clients closure ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.MIDDLE, sessionService::onShutdown); - // Finally wait for gRPC to exit now. + // Finally, wait for the http server to be finished stopping ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.LAST, () -> { try { + server.stopWithTimeout(10, TimeUnit.SECONDS); server.join(); } catch (final InterruptedException ignored) { } @@ -161,7 +153,6 @@ public DeephavenApiServer run() throws IOException, ClassNotFoundException, Time log.info().append("Starting server...").endl(); server.start(); log.info().append("Server started on port ").append(server.getPort()).endl(); - healthStatusManager.setStatus("", HealthCheckResponse.ServingStatus.SERVING); return this; } diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java index 675a2852222..69f960baeaa 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java @@ -31,7 +31,6 @@ import io.deephaven.util.thread.NamingThreadFactory; import io.grpc.BindableService; import io.grpc.ServerInterceptor; -import io.grpc.protobuf.services.HealthStatusManager; import org.jetbrains.annotations.NotNull; import javax.inject.Named; @@ -64,15 +63,14 @@ PluginsModule.class, PartitionedTableServiceModule.class, FilesystemStorageServiceModule.class, - HealthCheckModule.class, ConfigServiceModule.class, }) public class DeephavenApiServerModule { @Provides @ElementsIntoSet - static Set primeServices(HealthStatusManager healthStatusManager) { - return Collections.singleton(healthStatusManager.getHealthService()); + static Set primeServices() { + return Collections.emptySet(); } @Provides diff --git a/server/src/main/java/io/deephaven/server/runner/GrpcServer.java b/server/src/main/java/io/deephaven/server/runner/GrpcServer.java index 86eb35b0389..b8c83f36435 100644 --- a/server/src/main/java/io/deephaven/server/runner/GrpcServer.java +++ b/server/src/main/java/io/deephaven/server/runner/GrpcServer.java @@ -38,10 +38,27 @@ public interface GrpcServer { */ void join() throws InterruptedException; + /** + * Server must stop accepting new streams, but let existing streams continue for now. + *

+ *

+ * In theory the listening socket should be freed and available for another application to take it, but this is not + * rigorously tested. + *

+ *

+ * To complete shutdown, call stopWithTimeout() after any remaining calls have been completed to the server's + * satisfaction, which will terminate the remaining calls. + */ + void beginShutdown(); + + /** * Stops the server, using the specified timeout as a deadline. Returns immediately. Call {@link #join()} to block * until this is completed. - * + *

+ *

+ * If pending calls do not matter, it is unnecessary to call beginShutdown() before this method. + * * @param timeout time to allow for a graceful shutdown before giving up and halting * @param unit unit to apply to the timeout */ @@ -67,8 +84,14 @@ public void join() throws InterruptedException { server.awaitTermination(); } + @Override + public void beginShutdown() { + server.shutdown(); + } + @Override public void stopWithTimeout(long timeout, TimeUnit unit) { + // gRPC's Server.shutdown is idempotent, so we call it again in case beginShutdown was never invoked server.shutdown(); // Create and start a thread to make sure we obey the deadline diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java b/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java index 835973b5aa8..0fcb24b9e87 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java @@ -443,10 +443,10 @@ private void subscribeToTerminationNotification() { if (checkStatus((ResponseStreamWrapper.Status) fail)) { // restart the termination notification subscribeToTerminationNotification(); - return; } else { info.notifyConnectionError(Js.cast(fail)); } + return; } assert success != null; diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java index f7b64bba249..fe9439b000e 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java @@ -121,9 +121,77 @@ public void sendFallback(Transport transport) { } private static int nextStreamId = 0; - private static final Map activeSockets = new HashMap<>(); - private final WebSocket webSocket; + static class ActiveTransport { + private static final Map activeSockets = new HashMap<>(); + private final WebSocket webSocket; + private boolean closing; + private int activeCount = 0; + + /** + * Gets a websocket transport for the grpc url. + * + * @param grpcUrl The URL to access - the full path will be used (with ws/s instead of http/s) to connect to the + * service, but only the protocol+host+port will be used to share the instance. + * @return a websocket instance to use to connect, newly created if necessary + */ + public static ActiveTransport get(String grpcUrl) { + URL urlWrapper = new URL(grpcUrl); + if (urlWrapper.protocol.equals("http:")) { + urlWrapper.protocol = "ws:"; + } else { + urlWrapper.protocol = "wss:"; + } + String actualUrl = urlWrapper.toString(); + urlWrapper.pathname = "/"; + String key = urlWrapper.toString(); + return activeSockets.computeIfAbsent(key, url -> new ActiveTransport(key, actualUrl)); + } + + /** + * @param key URL to use as the key entry to reuse the transport + * @param actualUrl the url to connect to + */ + private ActiveTransport(String key, String actualUrl) { + this.webSocket = new WebSocket(actualUrl, new String[] {MULTIPLEX_PROTOCOL, SOCKET_PER_STREAM_PROTOCOL}); + + webSocket.binaryType = "arraybuffer"; + + webSocket.addEventListener("message", event -> { + MessageEvent messageEvent = Js.uncheckedCast(event); + // read the message, check if it was a GO_AWAY + int streamId = new DataView(messageEvent.data, 0, 4).getInt32(0); + if (streamId == Integer.MAX_VALUE) { + // Server sent equiv of H2 GO_AWAY, time to wrap up. + // ACK the message + new WebsocketFinishSignal().send(webSocket, Integer.MAX_VALUE); + + // We can attempt to create new transport instances, but cannot use this one any longer for new + // streams (and any new one is likely to fail unless some new server is ready for us) + activeSockets.remove(key); + + // Mark that this transport should be closed when existing streams finish + this.closing = true; + if (activeCount == 0) { + webSocket.close(); + } + } + }); + } + + private void retain() { + activeCount++; + } + + private void release() { + activeCount--; + if (activeCount == 0 && closing) { + webSocket.close(); + } + } + } + + private final ActiveTransport transport; private final int streamId = nextStreamId++; private final List sendQueue = new ArrayList<>(); private final TransportOptions options; @@ -137,24 +205,12 @@ public MultiplexedWebsocketTransport(TransportOptions options, JsRunnable avoidM this.options = options; String url = options.getUrl(); URL urlWrapper = new URL(url); - if (urlWrapper.protocol.equals("http:")) { - urlWrapper.protocol = "ws:"; - } else { - urlWrapper.protocol = "wss:"; - } // preserve the path to send as metadata, but still talk to the server with that path path = urlWrapper.pathname.substring(1); - String actualUrl = urlWrapper.toString(); - urlWrapper.pathname = "/"; - String key = urlWrapper.toString(); // note that we connect to the actual url so the server can inform us via subprotocols that it isn't supported, // but the global map removes the path as the key for each websocket - webSocket = activeSockets.computeIfAbsent(key, ignore -> { - WebSocket ws = new WebSocket(actualUrl, new String[] {MULTIPLEX_PROTOCOL, SOCKET_PER_STREAM_PROTOCOL}); - ws.binaryType = "arraybuffer"; - return ws; - }); + transport = ActiveTransport.get(url); // prepare a fallback alternativeTransport = new JsLazy<>(() -> { @@ -169,8 +225,9 @@ public void start(BrowserHeaders metadata) { alternativeTransport.get().start(metadata); return; } + this.transport.retain(); - if (webSocket.readyState == WebSocket.CONNECTING) { + if (transport.webSocket.readyState == WebSocket.CONNECTING) { // if the socket isn't open already, wait until the socket is // open, then flush the queue, otherwise everything will be // fine to send right away on the already open socket. @@ -184,18 +241,18 @@ public void start(BrowserHeaders metadata) { } private void addWebsocketEventListener(String eventName, EventListener listener) { - webSocket.addEventListener(eventName, listener); - cleanup = cleanup.andThen(() -> webSocket.removeEventListener(eventName, listener)); + transport.webSocket.addEventListener(eventName, listener); + cleanup = cleanup.andThen(() -> transport.webSocket.removeEventListener(eventName, listener)); } private void onOpen(Event event) { - Object protocol = Js.asPropertyMap(webSocket).get("protocol"); + Object protocol = Js.asPropertyMap(transport.webSocket).get("protocol"); if (protocol.equals(SOCKET_PER_STREAM_PROTOCOL)) { // delegate to plain websocket impl, try to dissuade future users of this server Transport transport = alternativeTransport.get(); // close our own websocket - webSocket.close(); + this.transport.webSocket.close(); // flush the queued items, which are now the new transport's problems - we'll forward all future work there // as well automatically @@ -210,7 +267,7 @@ private void onOpen(Event event) { return; } for (int i = 0; i < sendQueue.size(); i++) { - sendQueue.get(i).send(webSocket, streamId); + sendQueue.get(i).send(transport.webSocket, streamId); } sendQueue.clear(); } @@ -247,6 +304,8 @@ public void cancel() { private void removeHandlers() { cleanup.run(); cleanup = JsRunnable.doNothing(); + + transport.release(); } private void onClose(Event event) { @@ -284,10 +343,10 @@ private void onMessage(Event event) { } private void sendOrEnqueue(QueuedEntry e) { - if (webSocket.readyState == WebSocket.CONNECTING) { + if (transport.webSocket.readyState == WebSocket.CONNECTING) { sendQueue.add(e); } else { - e.send(webSocket, streamId); + e.send(transport.webSocket, streamId); } } } From 35efc898e2914dd6a7ed15f8b9c27f02486fb57e Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Mon, 5 Dec 2022 16:46:41 -0600 Subject: [PATCH 2/2] Correct a merge conflict from #3123 --- .../io/deephaven/server/console/ConsoleServiceGrpcImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java index c767e0798b7..d7a5e49fe22 100644 --- a/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java @@ -260,10 +260,10 @@ public void bindTableToVariable(BindTableToVariableRequest request, public StreamObserver autoCompleteStream( StreamObserver responseObserver) { return GrpcUtil.rpcWrapper(log, responseObserver, () -> { + final SessionState session = sessionService.getCurrentSession(); if (AUTOCOMPLETE_DISABLED) { - return new NoopAutoCompleteObserver(responseObserver); + return new NoopAutoCompleteObserver(session, responseObserver); } - final SessionState session = sessionService.getCurrentSession(); if (PythonDeephavenSession.SCRIPT_TYPE.equals(scriptSessionProvider.get().scriptType())) { PyObject[] settings = new PyObject[1]; safelyExecute(() -> {