Skip to content

Commit

Permalink
Simplify server startup and dagger usage (#1601)
Browse files Browse the repository at this point in the history
This is a first step towards making the http server for grpc
replacable with Jetty.

Partial #1270
  • Loading branch information
niloc132 authored Nov 24, 2021
1 parent 96218b7 commit 2e457fc
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.deephaven.db.v2.utils.ProcessMemoryTracker;
import io.deephaven.db.v2.utils.UpdatePerformanceTracker;
import io.deephaven.grpc_api.appmode.ApplicationInjector;
import io.deephaven.grpc_api.appmode.ApplicationServiceGrpcImpl;
import io.deephaven.grpc_api.console.ConsoleServiceGrpcImpl;
import io.deephaven.grpc_api.log.LogInit;
import io.deephaven.grpc_api.session.SessionService;
Expand All @@ -15,6 +14,7 @@
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.grpc_api.uri.UriResolversInstance;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.process.ProcessEnvironment;
import io.deephaven.util.process.ShutdownManager;
import io.grpc.Server;
Expand All @@ -23,44 +23,20 @@
import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* Entrypoint for the Deephaven gRPC server, starting the various engine and script components, running any specified
* application, and enabling the gRPC endpoints to be accessed by consumers.
*/
public class DeephavenApiServer {

private static final Logger log = LoggerFactory.getLogger(DeephavenApiServer.class);

public static void start(DeephavenApiServer server, SessionService sessionService)
throws IOException, ClassNotFoundException, InterruptedException {
// Stop accepting new gRPC requests.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.FIRST,
server.server::shutdown);

// Close outstanding sessions to give any gRPCs closure.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.MIDDLE,
sessionService::onShutdown);

// Finally wait for gRPC to exit now.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.LAST, () -> {
try {
if (!server.server.awaitTermination(10, TimeUnit.SECONDS)) {
log.error().append(
"The gRPC server did not terminate in a reasonable amount of time. Invoking shutdownNow().")
.endl();
server.server.shutdownNow();
}
} catch (final InterruptedException ignored) {
}
});

server.start();
server.blockUntilShutdown();
}

private final Server server;
private final LiveTableMonitor ltm;
private final LogInit logInit;
private final ConsoleServiceGrpcImpl consoleService;
private final ApplicationInjector applicationInjector;
private final ApplicationServiceGrpcImpl applicationService;
private final UriResolvers uriResolvers;
private final SessionService sessionService;

@Inject
public DeephavenApiServer(
Expand All @@ -69,22 +45,54 @@ public DeephavenApiServer(
final LogInit logInit,
final ConsoleServiceGrpcImpl consoleService,
final ApplicationInjector applicationInjector,
final ApplicationServiceGrpcImpl applicationService,
final UriResolvers uriResolvers) {
final UriResolvers uriResolvers,
final SessionService sessionService) {
this.server = server;
this.ltm = ltm;
this.logInit = logInit;
this.consoleService = consoleService;
this.applicationInjector = applicationInjector;
this.applicationService = applicationService;
this.uriResolvers = uriResolvers;
this.sessionService = sessionService;
}

@VisibleForTesting
public Server server() {
return server;
}

public void start() throws IOException, ClassNotFoundException {
/**
* Starts the various server components, and blocks until the gRPC server has shut down. That shutdown is mediated
* by the ShutdownManager, and will call the gRPC server to shut it down when the process is itself shutting down.
* Only once that is complete will this method return.
*
* @throws IOException thrown in event of an error with logging, finding and running an application, and starting
* the gRPC service.
* @throws ClassNotFoundException thrown if a class can't be found while finding and running an application.
* @throws InterruptedException thrown if this thread is interrupted while blocking for the server to halt.
*/
public void run() throws IOException, ClassNotFoundException, InterruptedException {
// Stop accepting new gRPC requests.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.FIRST,
server::shutdown);

// Close outstanding sessions to give any gRPCs closure.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.MIDDLE,
sessionService::onShutdown);

// Finally wait for gRPC to exit now.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.LAST, () -> {
try {
if (!server.awaitTermination(10, TimeUnit.SECONDS)) {
log.error().append(
"The gRPC server did not terminate in a reasonable amount of time. Invoking shutdownNow().")
.endl();
server.shutdownNow();
}
} catch (final InterruptedException ignored) {
}
});

log.info().append("Configuring logging...").endl();
logInit.run();

Expand All @@ -103,26 +111,22 @@ public void start() throws IOException, ClassNotFoundException {
UpdatePerformanceTracker.start();
ProcessMemoryTracker.start();

for (UriResolver resolver : uriResolvers.resolvers()) {
log.debug().append("Found table resolver ").append(resolver.getClass().toString()).endl();
}
UriResolversInstance.init(uriResolvers);

// inject applications before we start the gRPC server
applicationInjector.run();

{
for (UriResolver resolver : uriResolvers.resolvers()) {
log.info().append("Found table resolver ").append(resolver.getClass().toString()).endl();
}
UriResolversInstance.init(uriResolvers);
}

log.info().append("Starting server...").endl();
server.start();
server.awaitTermination();
}

void startForUnitTests() throws IOException {
log.info().append("Starting server...").endl();
server.start();
}

private void blockUntilShutdown() throws InterruptedException {
server.awaitTermination();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,11 @@

import dagger.BindsInstance;
import dagger.Component;
import io.deephaven.grpc_api.appmode.AppMode;
import io.deephaven.grpc_api.healthcheck.HealthCheckModule;
import io.deephaven.grpc_api.session.SessionService;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.process.ProcessEnvironment;
import io.deephaven.util.process.ShutdownManager;

import javax.inject.Named;
import javax.inject.Singleton;
import java.io.IOException;
import java.io.PrintStream;
import java.util.concurrent.TimeUnit;

@Singleton
@Component(modules = {
Expand All @@ -24,16 +16,12 @@
})
public interface DeephavenApiServerComponent {

@Singleton
DeephavenApiServer getServer();

@Singleton
SessionService getSessionService();

@Component.Builder
interface Builder {
@BindsInstance
Builder withPort(@Named("grpc.port") int port);
Builder withPort(@Named("http.port") int port);

@BindsInstance
Builder withSchedulerPoolSize(@Named("scheduler.poolSize") int numThreads);
Expand All @@ -47,25 +35,6 @@ interface Builder {
@BindsInstance
Builder withErr(@Named("err") PrintStream err);

@BindsInstance
Builder withAppMode(AppMode appMode);

DeephavenApiServerComponent build();
}

static void startMain(PrintStream out, PrintStream err)
throws IOException, InterruptedException, ClassNotFoundException {
final DeephavenApiServerComponent injector = DaggerDeephavenApiServerComponent
.builder()
.withPort(8080)
.withSchedulerPoolSize(4)
.withSessionTokenExpireTmMs(300000) // defaults to 5 min
.withOut(out)
.withErr(err)
.withAppMode(AppMode.currentMode())
.build();
final DeephavenApiServer server = injector.getServer();
final SessionService sessionService = injector.getSessionService();
DeephavenApiServer.start(server, sessionService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,11 @@

import dagger.BindsInstance;
import dagger.Component;
import io.deephaven.grpc_api.appmode.AppMode;
import io.deephaven.grpc_api.session.SessionService;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.process.ProcessEnvironment;
import io.deephaven.util.process.ShutdownManager;
import io.grpc.ManagedChannelBuilder;

import javax.inject.Named;
import javax.inject.Singleton;
import java.io.IOException;
import java.io.PrintStream;
import java.util.concurrent.TimeUnit;

@Singleton
@Component(modules = {
Expand All @@ -23,12 +15,8 @@
})
public interface DeephavenApiServerInProcessComponent {

@Singleton
DeephavenApiServer getServer();

@Singleton
SessionService getSessionService();

ManagedChannelBuilder<?> channelBuilder();

@Component.Builder
Expand All @@ -46,9 +34,6 @@ interface Builder {
@BindsInstance
Builder withErr(@Named("err") PrintStream err);

@BindsInstance
Builder withAppMode(AppMode appMode);

DeephavenApiServerInProcessComponent build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import dagger.multibindings.ElementsIntoSet;
import io.deephaven.db.tables.live.LiveTableMonitor;
import io.deephaven.db.v2.sources.chunk.util.pools.MultiChunkPool;
import io.deephaven.grpc_api.appmode.AppMode;
import io.deephaven.grpc_api.appmode.AppModeModule;
import io.deephaven.grpc_api.arrow.ArrowModule;
import io.deephaven.grpc_api.auth.AuthContextModule;
Expand Down Expand Up @@ -83,6 +84,12 @@ static Set<ServerInterceptor> primeInterceptors() {
return Collections.emptySet();
}

@Provides
@Singleton
public static AppMode provideAppMode() {
return AppMode.currentMode();
}

@Provides
@Singleton
public static Scheduler provideScheduler(final @Named("scheduler.poolSize") int poolSize) {
Expand Down
16 changes: 15 additions & 1 deletion grpc-api/src/main/java/io/deephaven/grpc_api/runner/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ public static void main(String[] args) throws IOException, InterruptedException,
ProcessEnvironment.basicInteractiveProcessInitialization(config, Main.class.getName(), log);
Thread.setDefaultUncaughtExceptionHandler(processEnvironment.getFatalErrorReporter());

DeephavenApiServerComponent.startMain(PrintStreamGlobals.getOut(), PrintStreamGlobals.getErr());
// defaults to 5 minutes
int httpSessionExpireMs = config.getIntegerWithDefault("http.session.durationMs", 300000);
int httpPort = config.getIntegerWithDefault("http.port", 8080);
int schedulerPoolSize = config.getIntegerWithDefault("scheduler.poolSize", 4);

DaggerDeephavenApiServerComponent
.builder()
.withPort(httpPort)
.withSchedulerPoolSize(schedulerPoolSize)
.withSessionTokenExpireTmMs(httpSessionExpireMs)
.withOut(PrintStreamGlobals.getOut())
.withErr(PrintStreamGlobals.getErr())
.build()
.getServer()
.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
public class ServerBuilderModule {

@Provides
static ServerBuilder<?> serverBuilder(final @Named("grpc.port") int port) {
static ServerBuilder<?> serverBuilder(final @Named("http.port") int port) {
return ServerBuilder.forPort(port);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public void setUp() throws Exception {
.withSessionTokenExpireTmMs(sessionTokenExpireTmMs())
.withOut(System.out)
.withErr(System.err)
.withAppMode(AppMode.API_ONLY)
.build();

server = serverComponent.getServer();
Expand Down

0 comments on commit 2e457fc

Please sign in to comment.