Skip to content

Commit

Permalink
Audited ExportObject Creation
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Nov 15, 2023
1 parent 10e455f commit 29115c5
Show file tree
Hide file tree
Showing 9 changed files with 1,030 additions and 665 deletions.
236 changes: 140 additions & 96 deletions server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
import io.deephaven.auth.AuthenticationException;
import io.deephaven.auth.AuthenticationRequestHandler;
import io.deephaven.auth.BasicAuthMarshaller;
import io.deephaven.engine.table.impl.BaseTable;
import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.table.impl.perf.QueryProcessingResults;
import io.deephaven.engine.table.impl.util.EngineMetrics;
import io.deephaven.extensions.barrage.BarrageStreamGenerator;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.internal.log.LoggerFactory;
Expand All @@ -22,6 +27,8 @@
import io.deephaven.server.session.SessionState;
import io.deephaven.server.session.TicketRouter;
import io.deephaven.auth.AuthContext;
import io.deephaven.util.SafeCloseable;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.FlightServiceGrpc;
Expand Down Expand Up @@ -170,30 +177,52 @@ public void getFlightInfo(
@NotNull final StreamObserver<Flight.FlightInfo> responseObserver) {
final SessionState session = sessionService.getOptionalSession();

final SessionState.ExportObject<Flight.FlightInfo> export =
ticketRouter.flightInfoFor(session, request, "request");

if (session != null) {
session.nonExport()
.require(export)
.onError(responseObserver)
.submit(() -> {
responseObserver.onNext(export.get());
responseObserver.onCompleted();
});
} else {
if (export.tryRetainReference()) {
try {
if (export.getState() == ExportNotification.State.EXPORTED) {
responseObserver.onNext(export.get());
responseObserver.onCompleted();
final String description =
"FlightService#getFlightInfo(session=" + (session == null ? "Anonymous" : session.getSessionId()) + ")";
final QueryPerformanceRecorder queryPerformanceRecorder = QueryPerformanceRecorder.newQuery(
description, QueryPerformanceNugget.DEFAULT_FACTORY);

try (final SafeCloseable ignored1 = queryPerformanceRecorder.startQuery()) {

final SessionState.ExportObject<Flight.FlightInfo> export;
try (final SafeCloseable ignored2 = QueryPerformanceRecorder.getInstance().getNugget("flightInfoFor")) {
export = ticketRouter.flightInfoFor(session, request, "request");
}

if (session != null) {
session.nonExport()
.queryPerformanceRecorder(queryPerformanceRecorder, false)
.require(export)
.onError(responseObserver)
.submit(() -> {
responseObserver.onNext(export.get());
responseObserver.onCompleted();
});
} else {
String exception = null;
if (export.tryRetainReference()) {
try {
if (export.getState() == ExportNotification.State.EXPORTED) {
GrpcUtil.safelyOnNext(responseObserver, export.get());
GrpcUtil.safelyComplete(responseObserver);
}
} finally {
export.dropReference();
}
} finally {
export.dropReference();
} else {
final StatusRuntimeException err =
Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Could not find flight info");
exception = err.getMessage();
GrpcUtil.safelyError(responseObserver, err);
}

if (queryPerformanceRecorder.endQuery() || exception != null) {
QueryProcessingResults results = new QueryProcessingResults(queryPerformanceRecorder);
if (exception != null) {
results.setException(exception);
}
EngineMetrics.getInstance().logQueryProcessingResults(results);
}
} else {
responseObserver.onError(
Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Could not find flight info"));
}
}
}
Expand All @@ -204,33 +233,55 @@ public void getSchema(
@NotNull final StreamObserver<Flight.SchemaResult> responseObserver) {
final SessionState session = sessionService.getOptionalSession();

final SessionState.ExportObject<Flight.FlightInfo> export =
ticketRouter.flightInfoFor(session, request, "request");
final String description =
"FlightService#getSchema(session=" + (session == null ? "Anonymous" : session.getSessionId()) + ")";
final QueryPerformanceRecorder queryPerformanceRecorder = QueryPerformanceRecorder.newQuery(
description, QueryPerformanceNugget.DEFAULT_FACTORY);

if (session != null) {
session.nonExport()
.require(export)
.onError(responseObserver)
.submit(() -> {
responseObserver.onNext(Flight.SchemaResult.newBuilder()
try (final SafeCloseable ignored1 = queryPerformanceRecorder.startQuery()) {

final SessionState.ExportObject<Flight.FlightInfo> export;
try (final SafeCloseable ignored2 = QueryPerformanceRecorder.getInstance().getNugget("flightInfoFor")) {
export = ticketRouter.flightInfoFor(session, request, "request");
}

String exception = null;
if (session != null) {
session.nonExport()
.queryPerformanceRecorder(queryPerformanceRecorder, false)
.require(export)
.onError(responseObserver)
.submit(() -> {
responseObserver.onNext(Flight.SchemaResult.newBuilder()
.setSchema(export.get().getSchema())
.build());
responseObserver.onCompleted();
});
} else if (export.tryRetainReference()) {
try {
if (export.getState() == ExportNotification.State.EXPORTED) {
GrpcUtil.safelyOnNext(responseObserver, Flight.SchemaResult.newBuilder()
.setSchema(export.get().getSchema())
.build());
responseObserver.onCompleted();
});
} else if (export.tryRetainReference()) {
try {
if (export.getState() == ExportNotification.State.EXPORTED) {
responseObserver.onNext(Flight.SchemaResult.newBuilder()
.setSchema(export.get().getSchema())
.build());
responseObserver.onCompleted();
GrpcUtil.safelyComplete(responseObserver);
}
} finally {
export.dropReference();
}
} finally {
export.dropReference();
} else {
final StatusRuntimeException err =
Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Could not find flight info");
exception = err.getMessage();
responseObserver.onError(err);
}

if (queryPerformanceRecorder.endQuery() || exception != null) {
QueryProcessingResults results = new QueryProcessingResults(queryPerformanceRecorder);
if (exception != null) {
results.setException(exception);
}
EngineMetrics.getInstance().logQueryProcessingResults(results);
}
} else {
responseObserver.onError(
Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Could not find flight info"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import io.deephaven.base.LockFreeArrayQueue;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.table.impl.util.RuntimeMemory;
import io.deephaven.engine.table.impl.util.RuntimeMemory.Sample;
import io.deephaven.engine.updategraph.DynamicNode;
Expand Down Expand Up @@ -35,6 +37,7 @@
import io.deephaven.server.session.SessionState.ExportBuilder;
import io.deephaven.server.session.TicketRouter;
import io.deephaven.server.util.Scheduler;
import io.deephaven.util.SafeCloseable;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -164,29 +167,42 @@ public void executeCommand(
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "No consoleId supplied");
}

SessionState.ExportObject<ScriptSession> exportedConsole =
ticketRouter.resolve(session, consoleId, "consoleId");
session.nonExport()
.requiresSerialQueue()
.require(exportedConsole)
.onError(responseObserver)
.submit(() -> {
ScriptSession scriptSession = exportedConsole.get();
ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode());
ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder();
FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder();
changes.created.entrySet()
.forEach(entry -> fieldChanges.addCreated(makeVariableDefinition(entry)));
changes.updated.entrySet()
.forEach(entry -> fieldChanges.addUpdated(makeVariableDefinition(entry)));
changes.removed.entrySet()
.forEach(entry -> fieldChanges.addRemoved(makeVariableDefinition(entry)));
if (changes.error != null) {
diff.setErrorMessage(Throwables.getStackTraceAsString(changes.error));
log.error().append("Error running script: ").append(changes.error).endl();
}
safelyComplete(responseObserver, diff.setChanges(fieldChanges).build());
});
final String description = "ConsoleServiceGrpcImpl#executeCommand(session=" + session.getSessionId() + ")";
final QueryPerformanceRecorder queryPerformanceRecorder = QueryPerformanceRecorder.newQuery(
description, QueryPerformanceNugget.DEFAULT_FACTORY);

try (final SafeCloseable ignored1 = queryPerformanceRecorder.startQuery()) {
final String ticketName = ticketRouter.getLogNameFor(consoleId, "consoleId");

final SessionState.ExportObject<ScriptSession> exportedConsole;
try (final SafeCloseable ignored2 = QueryPerformanceRecorder.getInstance().getNugget(
"resolveTicket:" + ticketName)) {
exportedConsole = ticketRouter.resolve(session, consoleId, "consoleId");
}

session.nonExport()
.queryPerformanceRecorder(queryPerformanceRecorder, false)
.requiresSerialQueue()
.require(exportedConsole)
.onError(responseObserver)
.submit(() -> {
ScriptSession scriptSession = exportedConsole.get();
ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode());
ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder();
FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder();
changes.created.entrySet()
.forEach(entry -> fieldChanges.addCreated(makeVariableDefinition(entry)));
changes.updated.entrySet()
.forEach(entry -> fieldChanges.addUpdated(makeVariableDefinition(entry)));
changes.removed.entrySet()
.forEach(entry -> fieldChanges.addRemoved(makeVariableDefinition(entry)));
if (changes.error != null) {
diff.setErrorMessage(Throwables.getStackTraceAsString(changes.error));
log.error().append("Error running script: ").append(changes.error).endl();
}
safelyComplete(responseObserver, diff.setChanges(fieldChanges).build());
});
}
}

@Override
Expand Down Expand Up @@ -240,32 +256,52 @@ public void bindTableToVariable(
if (tableId.getTicket().isEmpty()) {
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "No source tableId supplied");
}
final SessionState.ExportObject<Table> exportedTable = ticketRouter.resolve(session, tableId, "tableId");
final SessionState.ExportObject<ScriptSession> exportedConsole;

ExportBuilder<?> exportBuilder = session.nonExport()
.requiresSerialQueue()
.onError(responseObserver);
final String description = "ConsoleServiceGrpcImpl#bindTableToVariable(session=" + session.getSessionId() + ")";
final QueryPerformanceRecorder queryPerformanceRecorder = QueryPerformanceRecorder.newQuery(
description, QueryPerformanceNugget.DEFAULT_FACTORY);

if (request.hasConsoleId()) {
exportedConsole = ticketRouter.resolve(session, request.getConsoleId(), "consoleId");
exportBuilder.require(exportedTable, exportedConsole);
} else {
exportedConsole = null;
exportBuilder.require(exportedTable);
}
try (final SafeCloseable ignored1 = queryPerformanceRecorder.startQuery()) {
final String tableTicketName = ticketRouter.getLogNameFor(tableId, "tableId");

final SessionState.ExportObject<Table> exportedTable;
try (final SafeCloseable ignored2 = QueryPerformanceRecorder.getInstance().getNugget(
"resolveTableTicket:" + tableTicketName)) {
exportedTable = ticketRouter.resolve(session, tableId, "tableId");
}

final SessionState.ExportObject<ScriptSession> exportedConsole;

ExportBuilder<?> exportBuilder = session.nonExport()
.queryPerformanceRecorder(queryPerformanceRecorder, false)
.requiresSerialQueue()
.onError(responseObserver);

exportBuilder.submit(() -> {
ScriptSession scriptSession =
exportedConsole != null ? exportedConsole.get() : scriptSessionProvider.get();
Table table = exportedTable.get();
scriptSession.setVariable(request.getVariableName(), table);
if (DynamicNode.notDynamicOrIsRefreshing(table)) {
scriptSession.manage(table);
if (request.hasConsoleId()) {
final String consoleTicketName = ticketRouter.getLogNameFor(request.getConsoleId(), "consoleId");

try (final SafeCloseable ignored2 = QueryPerformanceRecorder.getInstance().getNugget(
"resolveConsoleTicket:" + consoleTicketName)) {
exportedConsole = ticketRouter.resolve(session, request.getConsoleId(), "consoleId");
}
exportBuilder.require(exportedTable, exportedConsole);
} else {
exportedConsole = null;
exportBuilder.require(exportedTable);
}
responseObserver.onNext(BindTableToVariableResponse.getDefaultInstance());
responseObserver.onCompleted();
});

exportBuilder.submit(() -> {
ScriptSession scriptSession =
exportedConsole != null ? exportedConsole.get() : scriptSessionProvider.get();
Table table = exportedTable.get();
scriptSession.setVariable(request.getVariableName(), table);
if (DynamicNode.notDynamicOrIsRefreshing(table)) {
scriptSession.manage(table);
}
responseObserver.onNext(BindTableToVariableResponse.getDefaultInstance());
responseObserver.onCompleted();
});
}
}

@Override
Expand Down
Loading

0 comments on commit 29115c5

Please sign in to comment.