Skip to content

Commit

Permalink
Create a GrpcApiApplication That Exposes a Blink Table of Export Stat…
Browse files Browse the repository at this point in the history
…e Changes
  • Loading branch information
nbauernfeind committed Nov 4, 2023
1 parent e4506cc commit a0d8439
Show file tree
Hide file tree
Showing 25 changed files with 383 additions and 77 deletions.
2 changes: 2 additions & 0 deletions application-mode/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ plugins {
}

dependencies {
implementation project(':Base')
implementation project(':Integrations')
implementation project(':engine-updategraph')

Classpaths.inheritImmutables(project)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
import java.util.Objects;
import java.util.ServiceLoader;

public class ApplicationState {
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.liveness.LivenessScope;
import io.deephaven.engine.updategraph.DynamicNode;

public class ApplicationState extends LivenessScope {

public interface Factory {

Expand Down Expand Up @@ -75,12 +79,17 @@ public synchronized <T> void setField(String name, T value, String description)
}

public synchronized void setField(Field<?> field) {
Field<?> oldField = fields.remove(field.name());
if (oldField != null) {
listener.onRemoveField(this, field);
// manage the new value before release the old value
final Object newValue = field.value();
if ((newValue instanceof LivenessReferent) && DynamicNode.notDynamicOrIsRefreshing(newValue)) {
manage((LivenessReferent) newValue);
}
listener.onNewField(this, field);

// remove and release the old value
removeField(field.name());

fields.put(field.name(), field);
listener.onNewField(this, field);
}

public synchronized void setFields(Field<?>... fields) {
Expand All @@ -97,6 +106,10 @@ public synchronized void removeField(String name) {
Field<?> field = fields.remove(name);
if (field != null) {
listener.onRemoveField(this, field);
Object oldValue = field.value();
if ((oldValue instanceof LivenessReferent) && DynamicNode.notDynamicOrIsRefreshing(oldValue)) {
unmanage((LivenessReferent) oldValue);
}
}
}

Expand Down
1 change: 1 addition & 0 deletions engine/updategraph/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ plugins {
description 'Engine Update Graph: Core utilities for maintaining a DAG for update processing'

dependencies {
api project(':Util')
api project(':qst')

implementation project(':engine-chunk')
Expand Down
32 changes: 11 additions & 21 deletions plugin/gc-app/src/main/java/io/deephaven/app/GcApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@
import io.deephaven.appmode.ApplicationState;
import io.deephaven.appmode.ApplicationState.Listener;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessScope;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.sources.ring.RingTableTools;
import io.deephaven.stream.StreamToBlinkTableAdapter;
import io.deephaven.util.SafeCloseable;

import javax.management.ListenerNotFoundException;
import javax.management.Notification;
Expand Down Expand Up @@ -46,14 +43,12 @@ public final class GcApplication implements ApplicationState.Factory, Notificati
private static final String POOLS = "pools";
private static final String POOLS_STATS = "pools_stats";

private static final String ENABLED = "io.deephaven.app.GcApplication.enabled";
private static final String NOTIFICATION_INFO_ENABLED = "io.deephaven.app.GcApplication.notification_info.enabled";
private static final String NOTIFICATION_INFO_STATS_ENABLED =
"io.deephaven.app.GcApplication.notification_info_stats.enabled";
private static final String NOTIFICATION_INFO_RING_ENABLED =
"io.deephaven.app.GcApplication.notification_info_ring.enabled";
private static final String POOLS_ENABLED = "io.deephaven.app.GcApplication.pools.enabled";
private static final String POOLS_STATS_ENABLED = "io.deephaven.app.GcApplication.pools_stats.enabled";
private static final String ENABLED = APP_ID + ".enabled";
private static final String NOTIFICATION_INFO_ENABLED = APP_ID + ".notification_info.enabled";
private static final String NOTIFICATION_INFO_STATS_ENABLED = APP_ID + ".notification_info_stats.enabled";
private static final String NOTIFICATION_INFO_RING_ENABLED = APP_ID + ".notification_info_ring.enabled";
private static final String POOLS_ENABLED = APP_ID + ".pools.enabled";
private static final String POOLS_STATS_ENABLED = APP_ID + ".pools_stats.enabled";

/**
* Looks up the system property {@value ENABLED}, defaults to {@code false}.
Expand Down Expand Up @@ -113,8 +108,6 @@ public static boolean poolStatsEnabled() {

private GcNotificationPublisher notificationInfoPublisher;
private GcPoolsPublisher poolsPublisher;
@SuppressWarnings("FieldCanBeLocal")
private LivenessScope scope;

@Override
public void handleNotification(Notification notification, Object handback) {
Expand Down Expand Up @@ -146,14 +139,11 @@ public ApplicationState create(Listener listener) {
if (!notificationInfoEnabled() && !poolsEnabled()) {
return state;
}
scope = new LivenessScope();
try (final SafeCloseable ignored = LivenessScopeStack.open(scope, false)) {
if (notificationInfoEnabled) {
setNotificationInfo(state);
}
if (poolsEnabled) {
setPools(state);
}
if (notificationInfoEnabled) {
setNotificationInfo(state);
}
if (poolsEnabled) {
setPools(state);
}
install();
return state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private void loadApplication(final Path applicationDir, final ApplicationConfig
log.info().append("Starting application '").append(config.toString()).append('\'').endl();

final ApplicationState app;
try (final SafeCloseable ignored = LivenessScopeStack.open();
try (final SafeCloseable ignored1 = LivenessScopeStack.open();
final SafeCloseable ignored2 = scriptSessionProvider.get().getExecutionContext().open()) {
app = ApplicationFactory.create(applicationDir, config, scriptSessionProvider.get(), applicationListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private <T> SessionState.ExportObject<T> resolve(final AppFieldId id, final Stri
}
Object value = authorization.transform(field.value());
// noinspection unchecked
return SessionState.wrapAsExport((T) value);
return SessionState.wrapAsExport((T) value, "ApplicationTicketResolver#resolve");
}

@Override
Expand Down Expand Up @@ -113,7 +113,7 @@ public SessionState.ExportObject<Flight.FlightInfo> flightInfoFor(
}
}

return SessionState.wrapAsExport(info);
return SessionState.wrapAsExport(info, "ApplicationTicketResolver#flightInfoFor");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import dagger.multibindings.IntoSet;
import io.deephaven.app.GcApplication;
import io.deephaven.appmode.ApplicationState;
import io.deephaven.server.grpc_api_app.GrpcApiApplication;

@Module
public interface ApplicationsModule {
Expand All @@ -17,4 +18,10 @@ public interface ApplicationsModule {
static ApplicationState.Factory providesGcApplication() {
return new GcApplication();
}

@Provides
@IntoSet
static ApplicationState.Factory providesGrpcApiApplication() {
return new GrpcApiApplication();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public static void DoGetCustom(

final long queueStartTm = System.nanoTime();
session.nonExport()
.description("FlightService#DoGet")
.require(export)
.onError(observer)
.submit(() -> {
Expand Down Expand Up @@ -153,6 +154,7 @@ public void onNext(final InputStream request) {
flightDescriptor = mi.descriptor;
resultExportBuilder = ticketRouter
.<Table>publish(session, mi.descriptor, "Flight.Descriptor", null)
.description("ArrowFlight#DoPut")
.onError(observer);
}
}
Expand Down Expand Up @@ -486,6 +488,7 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) {

final long queueStartTm = System.nanoTime();
session.nonExport()
.description("FlightService#DoExchange(snapshot)")
.require(parent)
.onError(listener)
.submit(() -> {
Expand Down Expand Up @@ -628,6 +631,7 @@ public void handleMessage(@NotNull final MessageInfo message) {

synchronized (this) {
onExportResolvedContinuation = session.nonExport()
.description("FlightService#DoExchange(subscription)")
.require(parent)
.onErrorHandler(DoExchangeMarshaller.this::onError)
.submit(() -> onExportResolved(parent));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public void getFlightInfo(

if (session != null) {
session.nonExport()
.description("FlightService#getFlightInfo")
.require(export)
.onError(responseObserver)
.submit(() -> {
Expand Down Expand Up @@ -209,6 +210,7 @@ public void getSchema(

if (session != null) {
session.nonExport()
.description("FlightService#getSchema")
.require(export)
.onError(responseObserver)
.submit(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public void startConsole(
}

session.newExport(request.getResultId(), "resultId")
.description("ConsoleService#startConsole")
.onError(responseObserver)
.submit(() -> {
final ScriptSession scriptSession = new DelegatingScriptSession(scriptSessionProvider.get());
Expand Down Expand Up @@ -167,25 +168,33 @@ public void executeCommand(
SessionState.ExportObject<ScriptSession> exportedConsole =
ticketRouter.resolve(session, consoleId, "consoleId");
session.nonExport()
.description("ConsoleService#executeCommand")
.requiresSerialQueue()
.require(exportedConsole)
.onError(responseObserver)
.submit(() -> {
ScriptSession scriptSession = exportedConsole.get();
ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode());
ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder();
FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder();
changes.created.entrySet()
.forEach(entry -> fieldChanges.addCreated(makeVariableDefinition(entry)));
changes.updated.entrySet()
.forEach(entry -> fieldChanges.addUpdated(makeVariableDefinition(entry)));
changes.removed.entrySet()
.forEach(entry -> fieldChanges.addRemoved(makeVariableDefinition(entry)));
if (changes.error != null) {
diff.setErrorMessage(Throwables.getStackTraceAsString(changes.error));
log.error().append("Error running script: ").append(changes.error).endl();
// since we are on the serial queue, we can safely stash our session id someplace accessible
SessionService.CURRENT_SESSION_ID = session.getSessionId();

try {
ScriptSession scriptSession = exportedConsole.get();
ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode());
ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder();
FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder();
changes.created.entrySet()
.forEach(entry -> fieldChanges.addCreated(makeVariableDefinition(entry)));
changes.updated.entrySet()
.forEach(entry -> fieldChanges.addUpdated(makeVariableDefinition(entry)));
changes.removed.entrySet()
.forEach(entry -> fieldChanges.addRemoved(makeVariableDefinition(entry)));
if (changes.error != null) {
diff.setErrorMessage(Throwables.getStackTraceAsString(changes.error));
log.error().append("Error running script: ").append(changes.error).endl();
}
safelyComplete(responseObserver, diff.setChanges(fieldChanges).build());
} finally {
SessionService.CURRENT_SESSION_ID = "NO SESSION IN USE";
}
safelyComplete(responseObserver, diff.setChanges(fieldChanges).build());
});
}

Expand Down Expand Up @@ -244,6 +253,7 @@ public void bindTableToVariable(
final SessionState.ExportObject<ScriptSession> exportedConsole;

ExportBuilder<?> exportBuilder = session.nonExport()
.description("ConsoleService#bindTableToVariable")
.requiresSerialQueue()
.onError(responseObserver);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.deephaven.base.string.EncodingInfo;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.liveness.LivenessStateException;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.updategraph.DynamicNode;
import io.deephaven.engine.util.ScriptSession;
Expand Down Expand Up @@ -75,7 +76,7 @@ public SessionState.ExportObject<Flight.FlightInfo> flightInfoFor(
"Could not resolve '" + logId + "': no variable exists with name '" + scopeName + "'");
});

return SessionState.wrapAsExport(flightInfo);
return SessionState.wrapAsExport(flightInfo, "ScopeTicketResolver#flightInfoFor");
}

@Override
Expand Down Expand Up @@ -119,9 +120,20 @@ private <T> SessionState.ExportObject<T> resolve(
if (export == null) {
return SessionState.wrapAsFailedExport(Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not resolve '" + logId + "': no variable exists with name '" + scopeName + "'"));
} else if (export instanceof SessionState.ExportObject) {
// noinspection unchecked
final SessionState.ExportObject<T> asExportObject = (SessionState.ExportObject<T>) export;
// ensure the result is live until the caller uses it
try {
asExportObject.manageWithCurrentScope();
return asExportObject;
} catch (LivenessStateException ignored) {
return SessionState.wrapAsFailedExport(Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not resolve '" + logId + "': no variable exists with name '" + scopeName + "'"));
}
}

return SessionState.wrapAsExport(export);
return SessionState.wrapAsExport(export, "ScopeTicketResolver#resolve");
}

@Override
Expand Down Expand Up @@ -153,11 +165,16 @@ private <T> SessionState.ExportBuilder<T> publish(
final SessionState.ExportObject<T> resultExport = resultBuilder.getExport();
final SessionState.ExportBuilder<T> publishTask = session.nonExport();

// if we receive requests to read from this variable before the client has finished publishing, we will
// give the user the result export object to wait on as a dependency.
final ScriptSession gss = scriptSessionProvider.get();
gss.setVariable(varName, resultExport);

publishTask
.description("ScopeTicketResolver#publish(" + varName + ")")
.requiresSerialQueue()
.require(resultExport)
.submit(() -> {
final ScriptSession gss = scriptSessionProvider.get();
T value = resultExport.get();
if (value instanceof LivenessReferent && DynamicNode.notDynamicOrIsRefreshing(value)) {
gss.manage((LivenessReferent) value);
Expand All @@ -168,6 +185,8 @@ private <T> SessionState.ExportBuilder<T> publish(
}
});

publishTask.setStateToPublishing();
resultBuilder.setStateToPublishing();
return resultBuilder;
}

Expand Down
Loading

0 comments on commit a0d8439

Please sign in to comment.