diff --git a/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/MultiplexedWebSocketServerStream.java b/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/MultiplexedWebSocketServerStream.java index 9e75d9560ab..489937eca88 100644 --- a/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/MultiplexedWebSocketServerStream.java +++ b/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/MultiplexedWebSocketServerStream.java @@ -12,6 +12,7 @@ import io.grpc.internal.ServerTransportListener; import io.grpc.internal.StatsTraceContext; import jakarta.websocket.CloseReason; +import jakarta.websocket.EndpointConfig; import jakarta.websocket.Session; import java.io.IOException; @@ -20,6 +21,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -41,6 +44,16 @@ * runOnTransportThread while in onMessage, as we're already in the transport thread. */ public class MultiplexedWebSocketServerStream extends AbstractWebSocketServerStream { + public static final String GRACEFUL_CLOSE = MultiplexedWebSocketServerStream.class.getName() + ".graceful_close"; + + /** + * Callback to initiate a graceful shutdown of this websocket instance, as an alternative to just closing the + * websocket. Since this websocket behaves like gRPC transport, we give the client a chance to finish up and close + * itself before the server does it. + */ + public interface GracefulClose extends Supplier> { + } + private static final Logger logger = Logger.getLogger(MultiplexedWebSocketServerStream.class.getName()); /** Custom metadata to hold the path requested by the incoming stream */ public static final Metadata.Key PATH = @@ -54,12 +67,55 @@ public class MultiplexedWebSocketServerStream extends AbstractWebSocketServerStr private final Map streams = new HashMap<>(); private final boolean isTextRequest = false;// not supported yet + /** + * Enum to describe the process of closing a transport. After the server has begun to close but the client hasn't + * yet acknowledged, it is permitted for the client to start a new stream, but after the server acknowledges, no new + * streams can be started. Note that shutdown will proceed anyway, and will eventually stop all streams. + */ + enum ClosedState { + OPEN, CLOSING, CLOSED + } + + private ClosedState closed = ClosedState.OPEN; + private final CompletableFuture closingFuture = new CompletableFuture<>(); + public MultiplexedWebSocketServerStream(ServerTransportListener transportListener, List streamTracerFactories, int maxInboundMessageSize, Attributes attributes) { super(transportListener, streamTracerFactories, maxInboundMessageSize, attributes); } + /** + * Stops this multiplexed transport from accepting new streams. Instead, it will reply with its version of GO_AWAY, + * a stream of Integer.MAX_INTEGER to the client to signal that new requests will not be accepted, and future + * incoming streams will be closed by the server right away. In keeping with h2, until the client ACKs the close, we + * will permit incoming streams that were sent before we closed, but they likely will not have a large window to get + * their work done before they are closed the rest of the way. + */ + private CompletableFuture stopAcceptingNewStreams() { + if (closed != ClosedState.OPEN) { + return closingFuture; + } + closed = ClosedState.CLOSING; + ByteBuffer end = ByteBuffer.allocate(4); + end.putInt(0, Integer.MAX_VALUE); + // ignore the results of this future, the closingFuture will instead tell us when the client ACKs + websocketSession.getAsyncRemote().sendBinary(end); + return closingFuture; + } + + @Override + public void onOpen(Session websocketSession, EndpointConfig config) { + super.onOpen(websocketSession, config); + websocketSession.getUserProperties().put(GRACEFUL_CLOSE, (GracefulClose) this::stopAcceptingNewStreams); + } + + @Override + public void onClose(Session session, CloseReason closeReason) { + // regardless of state, indicate that we are already closed and no need to wait + closingFuture.complete(null); + } + @Override public void onMessage(String message) { for (MultiplexedWebsocketStreamImpl stream : streams.values()) { @@ -89,6 +145,23 @@ public void onMessage(ByteBuffer message) throws IOException { } else { closed = false; } + + if (closed && streamId == Integer.MAX_VALUE) { + if (this.closed != ClosedState.CLOSING) { + // error, client tried to finish a close we didn't initiate, hang up + websocketSession.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "Unexpected close ACK")); + return; + } + // Client has ack'd our close, no more new streams allowed, client will be finishing up so we can exit + this.closed = ClosedState.CLOSED; + + // Mark the future as finished + closingFuture.complete(null); + + // (note that technically there is a 5th byte to indicate close, but we're ignoring that here) + return; + } + // may be null if this is the first request for this streamId final MultiplexedWebsocketStreamImpl stream = streams.get(streamId); @@ -104,6 +177,16 @@ public void onMessage(ByteBuffer message) throws IOException { // if this is the first message on this websocket, it is the request headers if (stream == null) { + if (this.closed == ClosedState.CLOSED) { + // Not accepting new streams on existing websockets, and the client knew that when they sent this (since + // the GO_AWAY was ACK'd). We treat this as an error, since the client isn't behaving. If instead closed + // was still CLOSING, then, client sent this before they saw that, we permit them to still open streams, + // though the application likely has begun to clean up state. + websocketSession.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, + "Stream created after closing initiated")); + + return; + } processHeaders(message, streamId); return; } diff --git a/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedServer.java b/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedServer.java index 1b4c69ac82a..5dd96ed7547 100644 --- a/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedServer.java +++ b/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedServer.java @@ -48,6 +48,7 @@ public class EmbeddedServer { HealthCheckModule.class, PythonPluginsRegistration.Module.class, JettyServerModule.class, + HealthCheckModule.class, PythonConsoleSessionModule.class, GroovyConsoleSessionModule.class, SessionToExecutionStateModule.class, diff --git a/server/jetty/src/main/java/io/deephaven/server/jetty/JettyBackedGrpcServer.java b/server/jetty/src/main/java/io/deephaven/server/jetty/JettyBackedGrpcServer.java index 8ba0eb5abc0..d07e6ece433 100644 --- a/server/jetty/src/main/java/io/deephaven/server/jetty/JettyBackedGrpcServer.java +++ b/server/jetty/src/main/java/io/deephaven/server/jetty/JettyBackedGrpcServer.java @@ -33,19 +33,25 @@ import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.ErrorPageErrorHandler; import org.eclipse.jetty.servlet.FilterHolder; +import org.eclipse.jetty.util.MultiException; +import org.eclipse.jetty.util.component.Graceful; import org.eclipse.jetty.util.resource.Resource; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.webapp.WebAppContext; +import org.eclipse.jetty.websocket.jakarta.common.SessionTracker; import org.eclipse.jetty.websocket.jakarta.server.config.JakartaWebSocketServletContainerInitializer; +import org.eclipse.jetty.websocket.jakarta.server.internal.JakartaWebSocketServerContainer; import javax.inject.Inject; import java.io.IOException; import java.io.UncheckedIOException; import java.net.URL; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -56,6 +62,7 @@ public class JettyBackedGrpcServer implements GrpcServer { private final Server jetty; + private final boolean websocketsEnabled; @Inject public JettyBackedGrpcServer( @@ -109,19 +116,24 @@ public JettyBackedGrpcServer( endpoints.put(GRPC_WEBSOCKETS_MULTIPLEX_PROTOCOL, () -> filter.create(MultiplexedWebSocketServerStream::new)); } + JakartaWebSocketServerContainer jettyWebsocketContainer = (JakartaWebSocketServerContainer) container; + WebsocketFactory websocketFactory = + new WebsocketFactory(() -> new GrpcWebsocket(endpoints), jettyWebsocketContainer); + jettyWebsocketContainer.addBean(websocketFactory); container.addEndpoint(ServerEndpointConfig.Builder.create(GrpcWebsocket.class, "/{service}/{method}") .configurator(new ServerEndpointConfig.Configurator() { @Override - public T getEndpointInstance(Class endpointClass) throws InstantiationException { + public T getEndpointInstance(Class endpointClass) { // noinspection unchecked - return (T) new GrpcWebsocket(endpoints); + return (T) websocketFactory.create(); } }) .subprotocols(new ArrayList<>(endpoints.keySet())) - .build() - - ); + .build()); }); + this.websocketsEnabled = true; + } else { + this.websocketsEnabled = false; } // Note: handler order matters due to pathSpec order @@ -149,15 +161,57 @@ public void join() throws InterruptedException { jetty.join(); } + @Override + public void beginShutdown() { + // "start to stop" the jetty container, skipping over websockets, since their Graceful implementation isn't + // very nice. This is roughly the implementation of Graceful.shutdown(Component), except avoiding anything that + // would directly stop a websocket, which instead will be handled later, as part of the actual stop() call tell + // the graceful handlers that we are shutting down. + + // For websockets, since the SessionTracker will instantly stop the socket rather than allow it to finish + // nicely. Instead, when websockets were created, we registered extra graceful beans to shutdown like h2. + // See Server.doStop(), this is roughly the implementation of the first phase of that method, only asking + // Graceful instances to stop, but not stopping connectors or non-graceful components. + + // Note that this would not apply correctly if we used WebSockets for some purpose other than gRPC transport. + Collection gracefuls = jetty.getContainedBeans(Graceful.class); + gracefuls.stream().filter(g -> !(g instanceof SessionTracker)).forEach(Graceful::shutdown); + } + @Override public void stopWithTimeout(long timeout, TimeUnit unit) { - jetty.setStopTimeout(unit.toMillis(timeout)); Thread shutdownThread = new Thread(() -> { + MultiException exceptions = new MultiException(); + long millis = unit.toMillis(timeout); + + // If websockets are enabled, try to spend part of our shutdown timeout budget on waiting for websockets, as + // in beginShutdown. + if (websocketsEnabled && millis > 250) { + // shut down everything except the websockets themselves with half our timeout + millis /= 2; + + // Collect the same beans we gracefully stopped before (or, if we didn't already start a graceful + // shutdown, this is the first attempt) + Collection gracefuls = jetty.getContainedBeans(Graceful.class); + try { + CompletableFuture.allOf(gracefuls.stream().filter(g -> !(g instanceof SessionTracker)) + .map(Graceful::shutdown).toArray(CompletableFuture[]::new)) + .get(millis, TimeUnit.MILLISECONDS); + } catch (Exception e) { + exceptions.add(e); + } + } + + // regardless of failures so far, continue shutdown with remaining budget. This will end all websockets + // right away. try { + jetty.setStopTimeout(millis); jetty.stop(); + exceptions.ifExceptionThrow(); } catch (Exception exception) { - throw new IllegalStateException("Failure while stopping", exception); + exceptions.add(exception); } + exceptions.ifExceptionThrowRuntime(); }); shutdownThread.start(); } @@ -205,6 +259,10 @@ private static ServerConnector createConnector(Server server, JettyConfig config } config.host().ifPresent(serverConnector::setHost); serverConnector.setPort(config.port()); + + // Give connections extra time to shutdown, since we have an explicit server shutdown + serverConnector.setShutdownIdleTimeout(serverConnector.getIdleTimeout()); + return serverConnector; } } diff --git a/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerComponent.java b/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerComponent.java index e5dc662be55..22be0c4c950 100644 --- a/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerComponent.java +++ b/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerComponent.java @@ -27,6 +27,7 @@ HealthCheckModule.class, PythonPluginsRegistration.Module.class, JettyServerModule.class, + HealthCheckModule.class, PythonConsoleSessionModule.class, GroovyConsoleSessionModule.class, SessionToExecutionStateModule.class, diff --git a/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerModule.java b/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerModule.java index 2475fc38562..5e1403d1484 100644 --- a/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerModule.java +++ b/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerModule.java @@ -15,6 +15,11 @@ import javax.inject.Named; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static io.grpc.internal.GrpcUtil.getThreadFactory; @Module public interface JettyServerModule { @@ -34,6 +39,22 @@ static ServletAdapter provideGrpcServletAdapter( services.forEach(serverBuilder::addService); interceptors.forEach(serverBuilder::intercept); + // create a custom executor service, just like grpc would use, so that grpc doesn't shut it down ahead + // of when we are ready + // We don't use newSingleThreadScheduledExecutor because it doesn't return a + // ScheduledThreadPoolExecutor. + ScheduledThreadPoolExecutor service = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool( + 1, getThreadFactory("grpc-timer-%d", true)); + + // If there are long timeouts that are cancelled, they will not actually be removed from + // the executors queue. This forces immediate removal upon cancellation to avoid a + // memory leak. + service.setRemoveOnCancelPolicy(true); + + ScheduledExecutorService executorService = Executors.unconfigurableScheduledExecutorService(service); + + serverBuilder.scheduledExecutorService(executorService); + serverBuilder.maxInboundMessageSize(maxMessageSize); serverBuilder.directExecutor(); diff --git a/server/jetty/src/main/java/io/deephaven/server/jetty/WebsocketFactory.java b/server/jetty/src/main/java/io/deephaven/server/jetty/WebsocketFactory.java new file mode 100644 index 00000000000..bfe8b021189 --- /dev/null +++ b/server/jetty/src/main/java/io/deephaven/server/jetty/WebsocketFactory.java @@ -0,0 +1,70 @@ +package io.deephaven.server.jetty; + +import io.grpc.servlet.web.websocket.GrpcWebsocket; +import io.grpc.servlet.web.websocket.MultiplexedWebSocketServerStream; +import jakarta.websocket.Endpoint; +import org.eclipse.jetty.util.component.Graceful; +import org.eclipse.jetty.websocket.jakarta.server.internal.JakartaWebSocketServerContainer; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import static io.grpc.servlet.web.websocket.MultiplexedWebSocketServerStream.GRACEFUL_CLOSE; + +/** + * Helper class to bridge the gap between Jetty's Graceful interface and the jakarta implementation of the supported + * grpc websocket transports. + *

+ *

+ * This type is not an Endpoint, so only one instance of this can create and shutdown many endpoints. + */ +public class WebsocketFactory implements Graceful { + private final AtomicReference> shutdown = new AtomicReference<>(); + + private final Supplier factory; + private final JakartaWebSocketServerContainer jettyWebsocketContainer; + + public WebsocketFactory(Supplier factory, JakartaWebSocketServerContainer jettyWebsocketContainer) { + this.factory = factory; + this.jettyWebsocketContainer = jettyWebsocketContainer; + } + + public Endpoint create() { + return factory.get(); + } + + @Override + public CompletableFuture shutdown() { + // Modeled after AbstractHTTP2ServerConnectionFactory.HTTP2SessionContainer.shutdown() + CompletableFuture result = new CompletableFuture<>(); + // Simply by setting the shutdown, we don't allow new endpoint instances to be created + if (shutdown.compareAndSet(null, result)) { + // iterate created transports, and if we can, prevent new streams + CompletableFuture.allOf( + jettyWebsocketContainer.getOpenSessions().stream() + .map(s -> (MultiplexedWebSocketServerStream.GracefulClose) s.getUserProperties() + .get(GRACEFUL_CLOSE)) + .map(MultiplexedWebSocketServerStream.GracefulClose::get) + .filter(Objects::nonNull) + .toArray(CompletableFuture[]::new)) + .whenComplete((success, throwable) -> { + if (throwable == null) { + // When all clients have acknowledged, complete + result.complete(success); + } else { + result.completeExceptionally(throwable); + } + }); + return result; + } else { + return shutdown.get(); + } + } + + @Override + public boolean isShutdown() { + return shutdown.get() != null; + } +} diff --git a/server/netty/src/main/java/io/deephaven/server/netty/NettyServerComponent.java b/server/netty/src/main/java/io/deephaven/server/netty/NettyServerComponent.java index d50256c9519..fe3cefcf325 100644 --- a/server/netty/src/main/java/io/deephaven/server/netty/NettyServerComponent.java +++ b/server/netty/src/main/java/io/deephaven/server/netty/NettyServerComponent.java @@ -27,6 +27,7 @@ HealthCheckModule.class, PythonPluginsRegistration.Module.class, NettyServerModule.class, + HealthCheckModule.class, PythonConsoleSessionModule.class, GroovyConsoleSessionModule.class, SessionToExecutionStateModule.class, diff --git a/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java index aefb85a60cd..d7a5e49fe22 100644 --- a/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java @@ -260,10 +260,10 @@ public void bindTableToVariable(BindTableToVariableRequest request, public StreamObserver autoCompleteStream( StreamObserver responseObserver) { return GrpcUtil.rpcWrapper(log, responseObserver, () -> { + final SessionState session = sessionService.getCurrentSession(); if (AUTOCOMPLETE_DISABLED) { - return new NoopAutoCompleteObserver(responseObserver); + return new NoopAutoCompleteObserver(session, responseObserver); } - final SessionState session = sessionService.getCurrentSession(); if (PythonDeephavenSession.SCRIPT_TYPE.equals(scriptSessionProvider.get().scriptType())) { PyObject[] settings = new PyObject[1]; safelyExecute(() -> { @@ -276,18 +276,17 @@ public StreamObserver autoCompleteStream( log.info().append(canJedi ? "Using jedi for python autocomplete" : "No jedi dependency available in python environment; disabling autocomplete.").endl(); return canJedi ? new PythonAutoCompleteObserver(responseObserver, scriptSessionProvider, session) - : new NoopAutoCompleteObserver(responseObserver); + : new NoopAutoCompleteObserver(session, responseObserver); } return new JavaAutoCompleteObserver(session, responseObserver); }); } - private static class NoopAutoCompleteObserver implements StreamObserver { - private final StreamObserver responseObserver; - - public NoopAutoCompleteObserver(StreamObserver responseObserver) { - this.responseObserver = responseObserver; + private static class NoopAutoCompleteObserver extends SessionCloseableObserver + implements StreamObserver { + public NoopAutoCompleteObserver(SessionState session, StreamObserver responseObserver) { + super(session, responseObserver); } @Override diff --git a/server/src/main/java/io/deephaven/server/console/completer/JavaAutoCompleteObserver.java b/server/src/main/java/io/deephaven/server/console/completer/JavaAutoCompleteObserver.java index 5513a1c2692..831f583e2f9 100644 --- a/server/src/main/java/io/deephaven/server/console/completer/JavaAutoCompleteObserver.java +++ b/server/src/main/java/io/deephaven/server/console/completer/JavaAutoCompleteObserver.java @@ -14,6 +14,7 @@ import io.deephaven.lang.shared.lsp.CompletionCancelled; import io.deephaven.proto.backplane.script.grpc.*; import io.deephaven.server.console.ConsoleServiceGrpcImpl; +import io.deephaven.server.session.SessionCloseableObserver; import io.deephaven.server.session.SessionState; import io.deephaven.util.SafeCloseable; import io.grpc.stub.StreamObserver; @@ -29,12 +30,11 @@ * Autocomplete handling for JVM languages, that directly can interact with Java instances without any name mangling, * and are able to use our flexible parser. */ -public class JavaAutoCompleteObserver implements StreamObserver { +public class JavaAutoCompleteObserver extends SessionCloseableObserver + implements StreamObserver { private static final Logger log = LoggerFactory.getLogger(JavaAutoCompleteObserver.class); private final CompletionParser parser; - private final SessionState session; - private final StreamObserver responseObserver; private final Map parsers = new ConcurrentHashMap<>(); @@ -49,10 +49,8 @@ private CompletionParser ensureParserForSession(SessionState session) { }); } - public JavaAutoCompleteObserver(SessionState session, StreamObserver responseObserver) { - this.session = session; - this.responseObserver = responseObserver; + super(session, responseObserver); parser = ensureParserForSession(session); } diff --git a/server/src/main/java/io/deephaven/server/console/completer/PythonAutoCompleteObserver.java b/server/src/main/java/io/deephaven/server/console/completer/PythonAutoCompleteObserver.java index e64d1295a3c..e779418ab37 100644 --- a/server/src/main/java/io/deephaven/server/console/completer/PythonAutoCompleteObserver.java +++ b/server/src/main/java/io/deephaven/server/console/completer/PythonAutoCompleteObserver.java @@ -9,6 +9,7 @@ import io.deephaven.lang.parse.CompletionParser; import io.deephaven.proto.backplane.script.grpc.*; import io.deephaven.server.console.ConsoleServiceGrpcImpl; +import io.deephaven.server.session.SessionCloseableObserver; import io.deephaven.server.session.SessionState; import io.deephaven.util.SafeCloseable; import io.grpc.stub.StreamObserver; @@ -23,7 +24,8 @@ /** * Autocomplete handling for python that will use the jedi library, if it is installed. */ -public class PythonAutoCompleteObserver implements StreamObserver { +public class PythonAutoCompleteObserver extends SessionCloseableObserver + implements StreamObserver { private static final Logger log = LoggerFactory.getLogger(PythonAutoCompleteObserver.class); @@ -32,14 +34,11 @@ public class PythonAutoCompleteObserver implements StreamObserver scriptSession; - private final SessionState session; - private final StreamObserver responseObserver; public PythonAutoCompleteObserver(StreamObserver responseObserver, Provider scriptSession, final SessionState session) { + super(session, responseObserver); this.scriptSession = scriptSession; - this.session = session; - this.responseObserver = responseObserver; } @Override diff --git a/server/src/main/java/io/deephaven/server/healthcheck/HealthCheckModule.java b/server/src/main/java/io/deephaven/server/healthcheck/HealthCheckModule.java index 9029213d8c9..e551c4b3f4f 100644 --- a/server/src/main/java/io/deephaven/server/healthcheck/HealthCheckModule.java +++ b/server/src/main/java/io/deephaven/server/healthcheck/HealthCheckModule.java @@ -9,6 +9,7 @@ import io.deephaven.util.process.ProcessEnvironment; import io.deephaven.util.process.ShutdownManager; import io.grpc.BindableService; +import io.grpc.health.v1.HealthCheckResponse; import io.grpc.protobuf.services.HealthStatusManager; import javax.inject.Singleton; @@ -19,6 +20,12 @@ public class HealthCheckModule { @Singleton public HealthStatusManager bindHealthStatusManager() { HealthStatusManager healthStatusManager = new HealthStatusManager(); + + // As we start to shut down, first notify all watchers of the health service + ProcessEnvironment.getGlobalShutdownManager().registerTask( + ShutdownManager.OrderingCategory.FIRST, + healthStatusManager::enterTerminalState); + return healthStatusManager; } diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java index 43cdbc4d10d..d6824c38d15 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java @@ -25,8 +25,6 @@ import io.deephaven.util.annotations.VisibleForTesting; import io.deephaven.util.process.ProcessEnvironment; import io.deephaven.util.process.ShutdownManager; -import io.grpc.health.v1.HealthCheckResponse; -import io.grpc.protobuf.services.HealthStatusManager; import javax.inject.Inject; import javax.inject.Provider; @@ -53,7 +51,6 @@ public class DeephavenApiServer { private final Map authenticationHandlers; private final Provider executionContextProvider; private final ServerConfig serverConfig; - private final HealthStatusManager healthStatusManager; @Inject public DeephavenApiServer( @@ -67,8 +64,7 @@ public DeephavenApiServer( final SessionService sessionService, final Map authenticationHandlers, final Provider executionContextProvider, - final ServerConfig serverConfig, - final HealthStatusManager healthStatusManager) { + final ServerConfig serverConfig) { this.server = server; this.ugp = ugp; this.logInit = logInit; @@ -80,7 +76,6 @@ public DeephavenApiServer( this.authenticationHandlers = authenticationHandlers; this.executionContextProvider = executionContextProvider; this.serverConfig = serverConfig; - this.healthStatusManager = healthStatusManager; } @VisibleForTesting @@ -103,22 +98,19 @@ SessionService sessionService() { * @throws ClassNotFoundException thrown if a class can't be found while finding and running an application. */ public DeephavenApiServer run() throws IOException, ClassNotFoundException, TimeoutException { - // Stop accepting new gRPC requests. + + // Prevent new gRPC calls from being started ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.FIRST, - () -> { - // healthStatusManager.enterTerminalState() must be called before server.stopWithTimeout(). - // If we add multiple `OrderingCategory.FIRST` callbacks, they'll execute in the wrong order. - healthStatusManager.enterTerminalState(); - server.stopWithTimeout(10, TimeUnit.SECONDS); - }); - - // Close outstanding sessions to give any gRPCs closure. + server::beginShutdown); + + // Now that no new gRPC calls may be made, close outstanding sessions to give any clients closure ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.MIDDLE, sessionService::onShutdown); - // Finally wait for gRPC to exit now. + // Finally, wait for the http server to be finished stopping ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.LAST, () -> { try { + server.stopWithTimeout(10, TimeUnit.SECONDS); server.join(); } catch (final InterruptedException ignored) { } @@ -161,7 +153,6 @@ public DeephavenApiServer run() throws IOException, ClassNotFoundException, Time log.info().append("Starting server...").endl(); server.start(); log.info().append("Server started on port ").append(server.getPort()).endl(); - healthStatusManager.setStatus("", HealthCheckResponse.ServingStatus.SERVING); return this; } diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java index 675a2852222..69f960baeaa 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java @@ -31,7 +31,6 @@ import io.deephaven.util.thread.NamingThreadFactory; import io.grpc.BindableService; import io.grpc.ServerInterceptor; -import io.grpc.protobuf.services.HealthStatusManager; import org.jetbrains.annotations.NotNull; import javax.inject.Named; @@ -64,15 +63,14 @@ PluginsModule.class, PartitionedTableServiceModule.class, FilesystemStorageServiceModule.class, - HealthCheckModule.class, ConfigServiceModule.class, }) public class DeephavenApiServerModule { @Provides @ElementsIntoSet - static Set primeServices(HealthStatusManager healthStatusManager) { - return Collections.singleton(healthStatusManager.getHealthService()); + static Set primeServices() { + return Collections.emptySet(); } @Provides diff --git a/server/src/main/java/io/deephaven/server/runner/GrpcServer.java b/server/src/main/java/io/deephaven/server/runner/GrpcServer.java index 86eb35b0389..b8c83f36435 100644 --- a/server/src/main/java/io/deephaven/server/runner/GrpcServer.java +++ b/server/src/main/java/io/deephaven/server/runner/GrpcServer.java @@ -38,10 +38,27 @@ public interface GrpcServer { */ void join() throws InterruptedException; + /** + * Server must stop accepting new streams, but let existing streams continue for now. + *

+ *

+ * In theory the listening socket should be freed and available for another application to take it, but this is not + * rigorously tested. + *

+ *

+ * To complete shutdown, call stopWithTimeout() after any remaining calls have been completed to the server's + * satisfaction, which will terminate the remaining calls. + */ + void beginShutdown(); + + /** * Stops the server, using the specified timeout as a deadline. Returns immediately. Call {@link #join()} to block * until this is completed. - * + *

+ *

+ * If pending calls do not matter, it is unnecessary to call beginShutdown() before this method. + * * @param timeout time to allow for a graceful shutdown before giving up and halting * @param unit unit to apply to the timeout */ @@ -67,8 +84,14 @@ public void join() throws InterruptedException { server.awaitTermination(); } + @Override + public void beginShutdown() { + server.shutdown(); + } + @Override public void stopWithTimeout(long timeout, TimeUnit unit) { + // gRPC's Server.shutdown is idempotent, so we call it again in case beginShutdown was never invoked server.shutdown(); // Create and start a thread to make sure we obey the deadline diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java b/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java index 835973b5aa8..0fcb24b9e87 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java @@ -443,10 +443,10 @@ private void subscribeToTerminationNotification() { if (checkStatus((ResponseStreamWrapper.Status) fail)) { // restart the termination notification subscribeToTerminationNotification(); - return; } else { info.notifyConnectionError(Js.cast(fail)); } + return; } assert success != null; diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java index f7b64bba249..fe9439b000e 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java @@ -121,9 +121,77 @@ public void sendFallback(Transport transport) { } private static int nextStreamId = 0; - private static final Map activeSockets = new HashMap<>(); - private final WebSocket webSocket; + static class ActiveTransport { + private static final Map activeSockets = new HashMap<>(); + private final WebSocket webSocket; + private boolean closing; + private int activeCount = 0; + + /** + * Gets a websocket transport for the grpc url. + * + * @param grpcUrl The URL to access - the full path will be used (with ws/s instead of http/s) to connect to the + * service, but only the protocol+host+port will be used to share the instance. + * @return a websocket instance to use to connect, newly created if necessary + */ + public static ActiveTransport get(String grpcUrl) { + URL urlWrapper = new URL(grpcUrl); + if (urlWrapper.protocol.equals("http:")) { + urlWrapper.protocol = "ws:"; + } else { + urlWrapper.protocol = "wss:"; + } + String actualUrl = urlWrapper.toString(); + urlWrapper.pathname = "/"; + String key = urlWrapper.toString(); + return activeSockets.computeIfAbsent(key, url -> new ActiveTransport(key, actualUrl)); + } + + /** + * @param key URL to use as the key entry to reuse the transport + * @param actualUrl the url to connect to + */ + private ActiveTransport(String key, String actualUrl) { + this.webSocket = new WebSocket(actualUrl, new String[] {MULTIPLEX_PROTOCOL, SOCKET_PER_STREAM_PROTOCOL}); + + webSocket.binaryType = "arraybuffer"; + + webSocket.addEventListener("message", event -> { + MessageEvent messageEvent = Js.uncheckedCast(event); + // read the message, check if it was a GO_AWAY + int streamId = new DataView(messageEvent.data, 0, 4).getInt32(0); + if (streamId == Integer.MAX_VALUE) { + // Server sent equiv of H2 GO_AWAY, time to wrap up. + // ACK the message + new WebsocketFinishSignal().send(webSocket, Integer.MAX_VALUE); + + // We can attempt to create new transport instances, but cannot use this one any longer for new + // streams (and any new one is likely to fail unless some new server is ready for us) + activeSockets.remove(key); + + // Mark that this transport should be closed when existing streams finish + this.closing = true; + if (activeCount == 0) { + webSocket.close(); + } + } + }); + } + + private void retain() { + activeCount++; + } + + private void release() { + activeCount--; + if (activeCount == 0 && closing) { + webSocket.close(); + } + } + } + + private final ActiveTransport transport; private final int streamId = nextStreamId++; private final List sendQueue = new ArrayList<>(); private final TransportOptions options; @@ -137,24 +205,12 @@ public MultiplexedWebsocketTransport(TransportOptions options, JsRunnable avoidM this.options = options; String url = options.getUrl(); URL urlWrapper = new URL(url); - if (urlWrapper.protocol.equals("http:")) { - urlWrapper.protocol = "ws:"; - } else { - urlWrapper.protocol = "wss:"; - } // preserve the path to send as metadata, but still talk to the server with that path path = urlWrapper.pathname.substring(1); - String actualUrl = urlWrapper.toString(); - urlWrapper.pathname = "/"; - String key = urlWrapper.toString(); // note that we connect to the actual url so the server can inform us via subprotocols that it isn't supported, // but the global map removes the path as the key for each websocket - webSocket = activeSockets.computeIfAbsent(key, ignore -> { - WebSocket ws = new WebSocket(actualUrl, new String[] {MULTIPLEX_PROTOCOL, SOCKET_PER_STREAM_PROTOCOL}); - ws.binaryType = "arraybuffer"; - return ws; - }); + transport = ActiveTransport.get(url); // prepare a fallback alternativeTransport = new JsLazy<>(() -> { @@ -169,8 +225,9 @@ public void start(BrowserHeaders metadata) { alternativeTransport.get().start(metadata); return; } + this.transport.retain(); - if (webSocket.readyState == WebSocket.CONNECTING) { + if (transport.webSocket.readyState == WebSocket.CONNECTING) { // if the socket isn't open already, wait until the socket is // open, then flush the queue, otherwise everything will be // fine to send right away on the already open socket. @@ -184,18 +241,18 @@ public void start(BrowserHeaders metadata) { } private void addWebsocketEventListener(String eventName, EventListener listener) { - webSocket.addEventListener(eventName, listener); - cleanup = cleanup.andThen(() -> webSocket.removeEventListener(eventName, listener)); + transport.webSocket.addEventListener(eventName, listener); + cleanup = cleanup.andThen(() -> transport.webSocket.removeEventListener(eventName, listener)); } private void onOpen(Event event) { - Object protocol = Js.asPropertyMap(webSocket).get("protocol"); + Object protocol = Js.asPropertyMap(transport.webSocket).get("protocol"); if (protocol.equals(SOCKET_PER_STREAM_PROTOCOL)) { // delegate to plain websocket impl, try to dissuade future users of this server Transport transport = alternativeTransport.get(); // close our own websocket - webSocket.close(); + this.transport.webSocket.close(); // flush the queued items, which are now the new transport's problems - we'll forward all future work there // as well automatically @@ -210,7 +267,7 @@ private void onOpen(Event event) { return; } for (int i = 0; i < sendQueue.size(); i++) { - sendQueue.get(i).send(webSocket, streamId); + sendQueue.get(i).send(transport.webSocket, streamId); } sendQueue.clear(); } @@ -247,6 +304,8 @@ public void cancel() { private void removeHandlers() { cleanup.run(); cleanup = JsRunnable.doNothing(); + + transport.release(); } private void onClose(Event event) { @@ -284,10 +343,10 @@ private void onMessage(Event event) { } private void sendOrEnqueue(QueuedEntry e) { - if (webSocket.readyState == WebSocket.CONNECTING) { + if (transport.webSocket.readyState == WebSocket.CONNECTING) { sendQueue.add(e); } else { - e.send(webSocket, streamId); + e.send(transport.webSocket, streamId); } } }