Skip to content

Commit

Permalink
Merge branch '3123-merge-conflict' into 3134-snappy-parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
niloc132 committed Dec 5, 2022
2 parents 3aac4be + 35efc89 commit a8f7458
Show file tree
Hide file tree
Showing 16 changed files with 381 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.StatsTraceContext;
import jakarta.websocket.CloseReason;
import jakarta.websocket.EndpointConfig;
import jakarta.websocket.Session;

import java.io.IOException;
Expand All @@ -20,6 +21,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -41,6 +44,16 @@
* runOnTransportThread while in onMessage, as we're already in the transport thread.
*/
public class MultiplexedWebSocketServerStream extends AbstractWebSocketServerStream {
public static final String GRACEFUL_CLOSE = MultiplexedWebSocketServerStream.class.getName() + ".graceful_close";

/**
* Callback to initiate a graceful shutdown of this websocket instance, as an alternative to just closing the
* websocket. Since this websocket behaves like gRPC transport, we give the client a chance to finish up and close
* itself before the server does it.
*/
public interface GracefulClose extends Supplier<CompletableFuture<Void>> {
}

private static final Logger logger = Logger.getLogger(MultiplexedWebSocketServerStream.class.getName());
/** Custom metadata to hold the path requested by the incoming stream */
public static final Metadata.Key<String> PATH =
Expand All @@ -54,12 +67,55 @@ public class MultiplexedWebSocketServerStream extends AbstractWebSocketServerStr
private final Map<Integer, MultiplexedWebsocketStreamImpl> streams = new HashMap<>();
private final boolean isTextRequest = false;// not supported yet

/**
* Enum to describe the process of closing a transport. After the server has begun to close but the client hasn't
* yet acknowledged, it is permitted for the client to start a new stream, but after the server acknowledges, no new
* streams can be started. Note that shutdown will proceed anyway, and will eventually stop all streams.
*/
enum ClosedState {
OPEN, CLOSING, CLOSED
}

private ClosedState closed = ClosedState.OPEN;
private final CompletableFuture<Void> closingFuture = new CompletableFuture<>();

public MultiplexedWebSocketServerStream(ServerTransportListener transportListener,
List<? extends ServerStreamTracer.Factory> streamTracerFactories, int maxInboundMessageSize,
Attributes attributes) {
super(transportListener, streamTracerFactories, maxInboundMessageSize, attributes);
}

/**
* Stops this multiplexed transport from accepting new streams. Instead, it will reply with its version of GO_AWAY,
* a stream of Integer.MAX_INTEGER to the client to signal that new requests will not be accepted, and future
* incoming streams will be closed by the server right away. In keeping with h2, until the client ACKs the close, we
* will permit incoming streams that were sent before we closed, but they likely will not have a large window to get
* their work done before they are closed the rest of the way.
*/
private CompletableFuture<Void> stopAcceptingNewStreams() {
if (closed != ClosedState.OPEN) {
return closingFuture;
}
closed = ClosedState.CLOSING;
ByteBuffer end = ByteBuffer.allocate(4);
end.putInt(0, Integer.MAX_VALUE);
// ignore the results of this future, the closingFuture will instead tell us when the client ACKs
websocketSession.getAsyncRemote().sendBinary(end);
return closingFuture;
}

@Override
public void onOpen(Session websocketSession, EndpointConfig config) {
super.onOpen(websocketSession, config);
websocketSession.getUserProperties().put(GRACEFUL_CLOSE, (GracefulClose) this::stopAcceptingNewStreams);
}

@Override
public void onClose(Session session, CloseReason closeReason) {
// regardless of state, indicate that we are already closed and no need to wait
closingFuture.complete(null);
}

@Override
public void onMessage(String message) {
for (MultiplexedWebsocketStreamImpl stream : streams.values()) {
Expand Down Expand Up @@ -89,6 +145,23 @@ public void onMessage(ByteBuffer message) throws IOException {
} else {
closed = false;
}

if (closed && streamId == Integer.MAX_VALUE) {
if (this.closed != ClosedState.CLOSING) {
// error, client tried to finish a close we didn't initiate, hang up
websocketSession.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "Unexpected close ACK"));
return;
}
// Client has ack'd our close, no more new streams allowed, client will be finishing up so we can exit
this.closed = ClosedState.CLOSED;

// Mark the future as finished
closingFuture.complete(null);

// (note that technically there is a 5th byte to indicate close, but we're ignoring that here)
return;
}

// may be null if this is the first request for this streamId
final MultiplexedWebsocketStreamImpl stream = streams.get(streamId);

Expand All @@ -104,6 +177,16 @@ public void onMessage(ByteBuffer message) throws IOException {

// if this is the first message on this websocket, it is the request headers
if (stream == null) {
if (this.closed == ClosedState.CLOSED) {
// Not accepting new streams on existing websockets, and the client knew that when they sent this (since
// the GO_AWAY was ACK'd). We treat this as an error, since the client isn't behaving. If instead closed
// was still CLOSING, then, client sent this before they saw that, we permit them to still open streams,
// though the application likely has begun to clean up state.
websocketSession.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR,
"Stream created after closing initiated"));

return;
}
processHeaders(message, streamId);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class EmbeddedServer {
HealthCheckModule.class,
PythonPluginsRegistration.Module.class,
JettyServerModule.class,
HealthCheckModule.class,
PythonConsoleSessionModule.class,
GroovyConsoleSessionModule.class,
SessionToExecutionStateModule.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,25 @@
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ErrorPageErrorHandler;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.util.MultiException;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.webapp.WebAppContext;
import org.eclipse.jetty.websocket.jakarta.common.SessionTracker;
import org.eclipse.jetty.websocket.jakarta.server.config.JakartaWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.jakarta.server.internal.JakartaWebSocketServerContainer;

import javax.inject.Inject;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

Expand All @@ -56,6 +62,7 @@
public class JettyBackedGrpcServer implements GrpcServer {

private final Server jetty;
private final boolean websocketsEnabled;

@Inject
public JettyBackedGrpcServer(
Expand Down Expand Up @@ -109,19 +116,24 @@ public JettyBackedGrpcServer(
endpoints.put(GRPC_WEBSOCKETS_MULTIPLEX_PROTOCOL,
() -> filter.create(MultiplexedWebSocketServerStream::new));
}
JakartaWebSocketServerContainer jettyWebsocketContainer = (JakartaWebSocketServerContainer) container;
WebsocketFactory websocketFactory =
new WebsocketFactory(() -> new GrpcWebsocket(endpoints), jettyWebsocketContainer);
jettyWebsocketContainer.addBean(websocketFactory);
container.addEndpoint(ServerEndpointConfig.Builder.create(GrpcWebsocket.class, "/{service}/{method}")
.configurator(new ServerEndpointConfig.Configurator() {
@Override
public <T> T getEndpointInstance(Class<T> endpointClass) throws InstantiationException {
public <T> T getEndpointInstance(Class<T> endpointClass) {
// noinspection unchecked
return (T) new GrpcWebsocket(endpoints);
return (T) websocketFactory.create();
}
})
.subprotocols(new ArrayList<>(endpoints.keySet()))
.build()

);
.build());
});
this.websocketsEnabled = true;
} else {
this.websocketsEnabled = false;
}

// Note: handler order matters due to pathSpec order
Expand Down Expand Up @@ -149,15 +161,57 @@ public void join() throws InterruptedException {
jetty.join();
}

@Override
public void beginShutdown() {
// "start to stop" the jetty container, skipping over websockets, since their Graceful implementation isn't
// very nice. This is roughly the implementation of Graceful.shutdown(Component), except avoiding anything that
// would directly stop a websocket, which instead will be handled later, as part of the actual stop() call tell
// the graceful handlers that we are shutting down.

// For websockets, since the SessionTracker will instantly stop the socket rather than allow it to finish
// nicely. Instead, when websockets were created, we registered extra graceful beans to shutdown like h2.
// See Server.doStop(), this is roughly the implementation of the first phase of that method, only asking
// Graceful instances to stop, but not stopping connectors or non-graceful components.

// Note that this would not apply correctly if we used WebSockets for some purpose other than gRPC transport.
Collection<Graceful> gracefuls = jetty.getContainedBeans(Graceful.class);
gracefuls.stream().filter(g -> !(g instanceof SessionTracker)).forEach(Graceful::shutdown);
}

@Override
public void stopWithTimeout(long timeout, TimeUnit unit) {
jetty.setStopTimeout(unit.toMillis(timeout));
Thread shutdownThread = new Thread(() -> {
MultiException exceptions = new MultiException();
long millis = unit.toMillis(timeout);

// If websockets are enabled, try to spend part of our shutdown timeout budget on waiting for websockets, as
// in beginShutdown.
if (websocketsEnabled && millis > 250) {
// shut down everything except the websockets themselves with half our timeout
millis /= 2;

// Collect the same beans we gracefully stopped before (or, if we didn't already start a graceful
// shutdown, this is the first attempt)
Collection<Graceful> gracefuls = jetty.getContainedBeans(Graceful.class);
try {
CompletableFuture.allOf(gracefuls.stream().filter(g -> !(g instanceof SessionTracker))
.map(Graceful::shutdown).toArray(CompletableFuture[]::new))
.get(millis, TimeUnit.MILLISECONDS);
} catch (Exception e) {
exceptions.add(e);
}
}

// regardless of failures so far, continue shutdown with remaining budget. This will end all websockets
// right away.
try {
jetty.setStopTimeout(millis);
jetty.stop();
exceptions.ifExceptionThrow();
} catch (Exception exception) {
throw new IllegalStateException("Failure while stopping", exception);
exceptions.add(exception);
}
exceptions.ifExceptionThrowRuntime();
});
shutdownThread.start();
}
Expand Down Expand Up @@ -205,6 +259,10 @@ private static ServerConnector createConnector(Server server, JettyConfig config
}
config.host().ifPresent(serverConnector::setHost);
serverConnector.setPort(config.port());

// Give connections extra time to shutdown, since we have an explicit server shutdown
serverConnector.setShutdownIdleTimeout(serverConnector.getIdleTimeout());

return serverConnector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
HealthCheckModule.class,
PythonPluginsRegistration.Module.class,
JettyServerModule.class,
HealthCheckModule.class,
PythonConsoleSessionModule.class,
GroovyConsoleSessionModule.class,
SessionToExecutionStateModule.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@

import javax.inject.Named;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;

import static io.grpc.internal.GrpcUtil.getThreadFactory;

@Module
public interface JettyServerModule {
Expand All @@ -34,6 +39,22 @@ static ServletAdapter provideGrpcServletAdapter(
services.forEach(serverBuilder::addService);
interceptors.forEach(serverBuilder::intercept);

// create a custom executor service, just like grpc would use, so that grpc doesn't shut it down ahead
// of when we are ready
// We don't use newSingleThreadScheduledExecutor because it doesn't return a
// ScheduledThreadPoolExecutor.
ScheduledThreadPoolExecutor service = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(
1, getThreadFactory("grpc-timer-%d", true));

// If there are long timeouts that are cancelled, they will not actually be removed from
// the executors queue. This forces immediate removal upon cancellation to avoid a
// memory leak.
service.setRemoveOnCancelPolicy(true);

ScheduledExecutorService executorService = Executors.unconfigurableScheduledExecutorService(service);

serverBuilder.scheduledExecutorService(executorService);

serverBuilder.maxInboundMessageSize(maxMessageSize);

serverBuilder.directExecutor();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.deephaven.server.jetty;

import io.grpc.servlet.web.websocket.GrpcWebsocket;
import io.grpc.servlet.web.websocket.MultiplexedWebSocketServerStream;
import jakarta.websocket.Endpoint;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.websocket.jakarta.server.internal.JakartaWebSocketServerContainer;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static io.grpc.servlet.web.websocket.MultiplexedWebSocketServerStream.GRACEFUL_CLOSE;

/**
* Helper class to bridge the gap between Jetty's Graceful interface and the jakarta implementation of the supported
* grpc websocket transports.
* <p>
* </p>
* This type is not an Endpoint, so only one instance of this can create and shutdown many endpoints.
*/
public class WebsocketFactory implements Graceful {
private final AtomicReference<CompletableFuture<Void>> shutdown = new AtomicReference<>();

private final Supplier<GrpcWebsocket> factory;
private final JakartaWebSocketServerContainer jettyWebsocketContainer;

public WebsocketFactory(Supplier<GrpcWebsocket> factory, JakartaWebSocketServerContainer jettyWebsocketContainer) {
this.factory = factory;
this.jettyWebsocketContainer = jettyWebsocketContainer;
}

public Endpoint create() {
return factory.get();
}

@Override
public CompletableFuture<Void> shutdown() {
// Modeled after AbstractHTTP2ServerConnectionFactory.HTTP2SessionContainer.shutdown()
CompletableFuture<Void> result = new CompletableFuture<>();
// Simply by setting the shutdown, we don't allow new endpoint instances to be created
if (shutdown.compareAndSet(null, result)) {
// iterate created transports, and if we can, prevent new streams
CompletableFuture.allOf(
jettyWebsocketContainer.getOpenSessions().stream()
.map(s -> (MultiplexedWebSocketServerStream.GracefulClose) s.getUserProperties()
.get(GRACEFUL_CLOSE))
.map(MultiplexedWebSocketServerStream.GracefulClose::get)
.filter(Objects::nonNull)
.toArray(CompletableFuture[]::new))
.whenComplete((success, throwable) -> {
if (throwable == null) {
// When all clients have acknowledged, complete
result.complete(success);
} else {
result.completeExceptionally(throwable);
}
});
return result;
} else {
return shutdown.get();
}
}

@Override
public boolean isShutdown() {
return shutdown.get() != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
HealthCheckModule.class,
PythonPluginsRegistration.Module.class,
NettyServerModule.class,
HealthCheckModule.class,
PythonConsoleSessionModule.class,
GroovyConsoleSessionModule.class,
SessionToExecutionStateModule.class,
Expand Down
Loading

0 comments on commit a8f7458

Please sign in to comment.