Skip to content

Commit

Permalink
Create a GrpcApiApplication That Exposes a Blink Table of Export Stat…
Browse files Browse the repository at this point in the history
…e Changes
  • Loading branch information
nbauernfeind committed Sep 4, 2023
1 parent d3b2147 commit 666f9e4
Show file tree
Hide file tree
Showing 23 changed files with 337 additions and 55 deletions.
1 change: 1 addition & 0 deletions Util/Util.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ configurations {

dependencies {
api project(':engine-query-constants')
api project(':Base')
implementation project(':log-factory')

testRuntimeOnly project(path: ':configs')
Expand Down
1 change: 1 addition & 0 deletions application-mode/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ plugins {

dependencies {
implementation project(':Integrations')
implementation project(':engine-updategraph')

Classpaths.inheritImmutables(project)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import java.util.Objects;
import java.util.ServiceLoader;

public class ApplicationState {
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.liveness.LivenessScope;

public class ApplicationState extends LivenessScope {

public interface Factory {

Expand Down Expand Up @@ -75,12 +78,17 @@ public synchronized <T> void setField(String name, T value, String description)
}

public synchronized void setField(Field<?> field) {
Field<?> oldField = fields.remove(field.name());
if (oldField != null) {
listener.onRemoveField(this, field);
// manage the new value before release the old value
final Object newValue = field.value();
if (newValue instanceof LivenessReferent) {
manage((LivenessReferent) newValue);
}
listener.onNewField(this, field);

// remove and release the old value
removeField(field.name());

fields.put(field.name(), field);
listener.onNewField(this, field);
}

public synchronized void setFields(Field<?>... fields) {
Expand All @@ -97,6 +105,10 @@ public synchronized void removeField(String name) {
Field<?> field = fields.remove(name);
if (field != null) {
listener.onRemoveField(this, field);
Object oldValue = field.value();
if (oldValue instanceof LivenessReferent) {
unmanage((LivenessReferent) oldValue);
}
}
}

Expand Down
1 change: 1 addition & 0 deletions engine/updategraph/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ plugins {
description 'Engine Update Graph: Core utilities for maintaining a DAG for update processing'

dependencies {
api project(':Util')
api project(':qst')

implementation project(':engine-chunk')
Expand Down
32 changes: 11 additions & 21 deletions plugin/gc-app/src/main/java/io/deephaven/app/GcApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@
import io.deephaven.appmode.ApplicationState;
import io.deephaven.appmode.ApplicationState.Listener;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessScope;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.sources.ring.RingTableTools;
import io.deephaven.stream.StreamToBlinkTableAdapter;
import io.deephaven.util.SafeCloseable;

import javax.management.ListenerNotFoundException;
import javax.management.Notification;
Expand Down Expand Up @@ -46,14 +43,12 @@ public final class GcApplication implements ApplicationState.Factory, Notificati
private static final String POOLS = "pools";
private static final String POOLS_STATS = "pools_stats";

private static final String ENABLED = "io.deephaven.app.GcApplication.enabled";
private static final String NOTIFICATION_INFO_ENABLED = "io.deephaven.app.GcApplication.notification_info.enabled";
private static final String NOTIFICATION_INFO_STATS_ENABLED =
"io.deephaven.app.GcApplication.notification_info_stats.enabled";
private static final String NOTIFICATION_INFO_RING_ENABLED =
"io.deephaven.app.GcApplication.notification_info_ring.enabled";
private static final String POOLS_ENABLED = "io.deephaven.app.GcApplication.pools.enabled";
private static final String POOLS_STATS_ENABLED = "io.deephaven.app.GcApplication.pools_stats.enabled";
private static final String ENABLED = APP_ID + ".enabled";
private static final String NOTIFICATION_INFO_ENABLED = APP_ID + ".notification_info.enabled";
private static final String NOTIFICATION_INFO_STATS_ENABLED = APP_ID + ".notification_info_stats.enabled";
private static final String NOTIFICATION_INFO_RING_ENABLED = APP_ID + ".notification_info_ring.enabled";
private static final String POOLS_ENABLED = APP_ID + ".pools.enabled";
private static final String POOLS_STATS_ENABLED = APP_ID + ".pools_stats.enabled";

/**
* Looks up the system property {@value ENABLED}, defaults to {@code false}.
Expand Down Expand Up @@ -113,8 +108,6 @@ public static boolean poolStatsEnabled() {

private GcNotificationPublisher notificationInfoPublisher;
private GcPoolsPublisher poolsPublisher;
@SuppressWarnings("FieldCanBeLocal")
private LivenessScope scope;

@Override
public void handleNotification(Notification notification, Object handback) {
Expand Down Expand Up @@ -146,14 +139,11 @@ public ApplicationState create(Listener listener) {
if (!notificationInfoEnabled() && !poolsEnabled()) {
return state;
}
scope = new LivenessScope();
try (final SafeCloseable ignored = LivenessScopeStack.open(scope, false)) {
if (notificationInfoEnabled) {
setNotificationInfo(state);
}
if (poolsEnabled) {
setPools(state);
}
if (notificationInfoEnabled) {
setNotificationInfo(state);
}
if (poolsEnabled) {
setPools(state);
}
install();
return state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private void loadApplication(final Path applicationDir, final ApplicationConfig
log.info().append("Starting application '").append(config.toString()).append('\'').endl();

final ApplicationState app;
try (final SafeCloseable ignored = LivenessScopeStack.open();
try (final SafeCloseable ignored1 = LivenessScopeStack.open();
final SafeCloseable ignored2 = scriptSessionProvider.get().getExecutionContext().open()) {
app = ApplicationFactory.create(applicationDir, config, scriptSessionProvider.get(), applicationListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private <T> SessionState.ExportObject<T> resolve(final AppFieldId id, final Stri
}
Object value = authorization.transform(field.value());
// noinspection unchecked
return SessionState.wrapAsExport((T) value);
return SessionState.wrapAsExport((T) value, "ApplicationTicketResolver#resolve");
}

@Override
Expand Down Expand Up @@ -113,7 +113,7 @@ public SessionState.ExportObject<Flight.FlightInfo> flightInfoFor(
}
}

return SessionState.wrapAsExport(info);
return SessionState.wrapAsExport(info, "ApplicationTicketResolver#flightInfoFor");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.*;

import static io.deephaven.extensions.barrage.util.BarrageUtil.DEFAULT_SNAPSHOT_DESER_OPTIONS;
Expand All @@ -80,6 +78,7 @@ public static void DoGetCustom(

final long queueStartTm = System.nanoTime();
session.nonExport()
.description("FlightService#DoGet")
.require(export)
.onError(observer)
.submit(() -> {
Expand Down Expand Up @@ -154,6 +153,7 @@ public void onNext(final InputStream request) {
flightDescriptor = mi.descriptor;
resultExportBuilder = ticketRouter
.<Table>publish(session, mi.descriptor, "Flight.Descriptor", null)
.description("ArrowFlight#DoPut")
.onError(observer);
}
}
Expand Down Expand Up @@ -463,6 +463,7 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) {

final long queueStartTm = System.nanoTime();
session.nonExport()
.description("FlightService#DoExchange(snapshot)")
.require(parent)
.onError(listener)
.submit(() -> {
Expand Down Expand Up @@ -563,6 +564,7 @@ public void handleMessage(@NotNull final MessageInfo message) {

synchronized (this) {
onExportResolvedContinuation = session.nonExport()
.description("FlightService#DoExchange(subscription)")
.require(parent)
.onErrorHandler(DoExchangeMarshaller.this::onError)
.submit(() -> onExportResolved(parent));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public void getFlightInfo(

if (session != null) {
session.nonExport()
.description("FlightService#getFlightInfo")
.require(export)
.onError(responseObserver)
.submit(() -> {
Expand Down Expand Up @@ -209,6 +210,7 @@ public void getSchema(

if (session != null) {
session.nonExport()
.description("FlightService#getSchema")
.require(export)
.onError(responseObserver)
.submit(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public void startConsole(
}

session.newExport(request.getResultId(), "resultId")
.description("ConsoleService#startConsole")
.onError(responseObserver)
.submit(() -> {
final ScriptSession scriptSession = new DelegatingScriptSession(scriptSessionProvider.get());
Expand Down Expand Up @@ -167,6 +168,7 @@ public void executeCommand(
SessionState.ExportObject<ScriptSession> exportedConsole =
ticketRouter.resolve(session, consoleId, "consoleId");
session.nonExport()
.description("ConsoleService#executeCommand")
.requiresSerialQueue()
.require(exportedConsole)
.onError(responseObserver)
Expand Down Expand Up @@ -244,6 +246,7 @@ public void bindTableToVariable(
final SessionState.ExportObject<ScriptSession> exportedConsole;

ExportBuilder<?> exportBuilder = session.nonExport()
.description("ConsoleService#bindTableToVariable")
.requiresSerialQueue()
.onError(responseObserver);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.deephaven.base.string.EncodingInfo;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.liveness.LivenessStateException;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.updategraph.DynamicNode;
import io.deephaven.engine.util.ScriptSession;
Expand Down Expand Up @@ -75,7 +76,7 @@ public SessionState.ExportObject<Flight.FlightInfo> flightInfoFor(
"Could not resolve '" + logId + "': no variable exists with name '" + scopeName + "'");
});

return SessionState.wrapAsExport(flightInfo);
return SessionState.wrapAsExport(flightInfo, "ScopeTicketResolver#flightInfoFor");
}

@Override
Expand Down Expand Up @@ -106,6 +107,7 @@ private <T> SessionState.ExportObject<T> resolve(
// fetch the variable from the scope right now
T export = gss.getExecutionContext().getUpdateGraph().sharedLock().computeLocked(() -> {
T scopeVar = null;
// note we do not need to synchronize on gss here because it cannot change while holding the ugp lock
try {
// noinspection unchecked
scopeVar = (T) gss.unwrapObject(gss.getVariable(scopeName));
Expand All @@ -119,9 +121,20 @@ private <T> SessionState.ExportObject<T> resolve(
if (export == null) {
return SessionState.wrapAsFailedExport(Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not resolve '" + logId + "': no variable exists with name '" + scopeName + "'"));
} else if (export instanceof SessionState.ExportObject) {
// noinspection unchecked
final SessionState.ExportObject<T> asExportObject = (SessionState.ExportObject<T>) export;
// ensure the result is live until the caller uses it
try {
asExportObject.manageWithCurrentScope();
return asExportObject;
} catch (LivenessStateException ignored) {
return SessionState.wrapAsFailedExport(Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not resolve '" + logId + "': no variable exists with name '" + scopeName + "'"));
}
}

return SessionState.wrapAsExport(export);
return SessionState.wrapAsExport(export, "ScopeTicketResolver#resolve");
}

@Override
Expand Down Expand Up @@ -153,11 +166,16 @@ private <T> SessionState.ExportBuilder<T> publish(
final SessionState.ExportObject<T> resultExport = resultBuilder.getExport();
final SessionState.ExportBuilder<T> publishTask = session.nonExport();

// if we receive requests to read from this variable before the client has finished publishing, we will
// give the user the result export object to wait on as a dependency.
final ScriptSession gss = scriptSessionProvider.get();
gss.setVariable(varName, resultExport);

publishTask
.description("ScopeTicketResolver#publish(" + varName + ")")
.requiresSerialQueue()
.require(resultExport)
.submit(() -> {
final ScriptSession gss = scriptSessionProvider.get();
T value = resultExport.get();
if (value instanceof LivenessReferent && DynamicNode.notDynamicOrIsRefreshing(value)) {
gss.manage((LivenessReferent) value);
Expand All @@ -168,6 +186,8 @@ private <T> SessionState.ExportBuilder<T> publish(
}
});

publishTask.setStateToPublishing();
resultBuilder.setStateToPublishing();
return resultBuilder;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.deephaven.server.grpc_api_app;

import com.google.auto.service.AutoService;
import io.deephaven.appmode.ApplicationState;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.sources.ring.RingTableTools;
import io.deephaven.engine.util.TableTools;
import io.deephaven.proto.backplane.grpc.ExportNotification;
import io.deephaven.server.session.SessionState;
import io.deephaven.stream.StreamToBlinkTableAdapter;
import io.grpc.stub.StreamObserver;

/**
* The {@value APP_NAME}, application id {@value APP_ID}, produces stream {@link io.deephaven.engine.table.Table tables}
* {@value NOTIFICATION_INFO}; and derived table {@value NOTIFICATION_INFO_RING}. This data is modeled after the
* {@link ExportNotification} event information from {@link SessionState#addExportListener(StreamObserver)}.
*
* @see #ENABLED
* @see #RING_SIZE
*/
@AutoService(ApplicationState.Factory.class)
public final class GrpcApiApplication implements ApplicationState.Factory {
private static final String APP_ID = "GRPC_API_APP";
private static final String APP_NAME = "GRPC API Application";
private static final String NOTIFICATION_INFO = "session_export_notification_info";
private static final String NOTIFICATION_INFO_RING = "session_export_notification_info_ring";

private static final String ENABLED = "enabled";
private static final String RING_SIZE = "ringSize";

private static Table updateStreamTable = TableTools.emptyTable(0);

public static Table getExportObjectUpdateStreamTable() {
return updateStreamTable;
}

/**
* Looks up the system property {@value ENABLED}, defaults to {@code false}.
*
* @return if the gRPC API application is enabled
*/
private static boolean enabled() {
return "true".equals(System.getProperty(GrpcApiApplication.class + "." + ENABLED, "false"));
}

/**
* Looks up the system property {@value RING_SIZE}, defaults to {@code 1024}. The
* {@value NOTIFICATION_INFO_RING} table is disabled when {@code 0} or less.
*
* @return the {@value NOTIFICATION_INFO_RING} table size
*/
private static int ringSize() {
return Integer.getInteger(GrpcApiApplication.class + "." + RING_SIZE, 1024);
}

@Override
public ApplicationState create(ApplicationState.Listener listener) {
final ApplicationState state = new ApplicationState(listener, APP_ID, APP_NAME);
if (!enabled()) {
return state;
}

final SessionStateExportObjectUpdateStream updateStream = new SessionStateExportObjectUpdateStream();
SessionState.LISTENER = updateStream::onExportObjectStateUpdate;

// noinspection resource
final StreamToBlinkTableAdapter adapter = new StreamToBlinkTableAdapter(
SessionStateExportObjectUpdateStream.definition(), updateStream,
ExecutionContext.getContext().getUpdateGraph(), NOTIFICATION_INFO);
updateStreamTable = adapter.table();
state.setField(NOTIFICATION_INFO, updateStreamTable);

final int ringSize = ringSize();
if (ringSize > 0) {
state.setField(NOTIFICATION_INFO_RING, RingTableTools.of(updateStreamTable, ringSize));
}
return state;
}
}
Loading

0 comments on commit 666f9e4

Please sign in to comment.