Skip to content

Commit

Permalink
Move OperationInitializer to Dagger Object Graph
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Dec 22, 2023
1 parent d782ab1 commit 9670342
Show file tree
Hide file tree
Showing 16 changed files with 98 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.util.AbstractScriptSession;
import io.deephaven.engine.util.PythonEvaluator;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class PythonDeephavenSession extends AbstractScriptSession<PythonSnapshot
* Create a Python ScriptSession.
*
* @param updateGraph the default update graph to install for the repl
* @param operationInitializer the default operation initializer to install for the repl
* @param objectTypeLookup the object type lookup
* @param listener an optional listener that will be notified whenever the query scope changes
* @param runInitScripts if init scripts should be executed
Expand All @@ -78,12 +80,13 @@ public class PythonDeephavenSession extends AbstractScriptSession<PythonSnapshot
*/
public PythonDeephavenSession(
final UpdateGraph updateGraph,
final OperationInitializer operationInitializer,
final ThreadInitializationFactory threadInitializationFactory,
final ObjectTypeLookup objectTypeLookup,
@Nullable final Listener listener,
final boolean runInitScripts,
final PythonEvaluatorJpy pythonEvaluator) throws IOException {
super(updateGraph, threadInitializationFactory, objectTypeLookup, listener);
super(updateGraph, operationInitializer, objectTypeLookup, listener);

evaluator = pythonEvaluator;
scope = pythonEvaluator.getScope();
Expand Down Expand Up @@ -112,9 +115,12 @@ public PythonDeephavenSession(
* Creates a Python "{@link ScriptSession}", for use where we should only be reading from the scope, such as an
* IPython kernel session.
*/
public PythonDeephavenSession(final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory, final PythonScope<?> scope) {
super(updateGraph, threadInitializationFactory, NoOp.INSTANCE, null);
public PythonDeephavenSession(
final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory,
final OperationInitializer operationInitializer,
final PythonScope<?> scope) {
super(updateGraph, operationInitializer, NoOp.INSTANCE, null);

evaluator = null;
this.scope = (PythonScope<PyObject>) scope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public static Builder newBuilder(final String name) {
private final long defaultTargetCycleDurationMillis;
private volatile long targetCycleDurationMillis;
private final ThreadInitializationFactory threadInitializationFactory;
private final OperationInitializer operationInitializer;


/**
Expand All @@ -118,22 +119,23 @@ public PeriodicUpdateGraph(
final long targetCycleDurationMillis,
final long minimumCycleDurationToLogNanos,
final int numUpdateThreads,
final ThreadInitializationFactory threadInitializationFactory) {
final ThreadInitializationFactory threadInitializationFactory,
final OperationInitializer operationInitializer) {
super(name, allowUnitTestMode, log, minimumCycleDurationToLogNanos);
this.allowUnitTestMode = allowUnitTestMode;
this.defaultTargetCycleDurationMillis = targetCycleDurationMillis;
this.targetCycleDurationMillis = targetCycleDurationMillis;
this.threadInitializationFactory = threadInitializationFactory;
this.operationInitializer = operationInitializer;

if (numUpdateThreads <= 0) {
this.updateThreads = Runtime.getRuntime().availableProcessors();
} else {
this.updateThreads = numUpdateThreads;
}

OperationInitializer captured = ExecutionContext.getContext().getOperationInitializer();
refreshThread = new Thread(threadInitializationFactory.createInitializer(() -> {
configureRefreshThread(captured);
configureRefreshThread(operationInitializer);
while (running) {
Assert.eqFalse(this.allowUnitTestMode, "allowUnitTestMode");
refreshTablesAndFlushNotifications();
Expand Down Expand Up @@ -1097,9 +1099,8 @@ private NotificationProcessorThreadFactory(@NotNull final ThreadGroup threadGrou

@Override
public Thread newThread(@NotNull final Runnable r) {
OperationInitializer captured = ExecutionContext.getContext().getOperationInitializer();
return super.newThread(threadInitializationFactory.createInitializer(() -> {
configureRefreshThread(captured);
configureRefreshThread(operationInitializer);
r.run();
}));
}
Expand Down Expand Up @@ -1168,6 +1169,7 @@ public static final class Builder {
private String name;
private int numUpdateThreads = -1;
private ThreadInitializationFactory threadInitializationFactory = runnable -> runnable;
private OperationInitializer operationInitializer = ExecutionContext.getContext().getOperationInitializer();

public Builder(String name) {
this.name = name;
Expand Down Expand Up @@ -1221,6 +1223,17 @@ public Builder threadInitializationFactory(ThreadInitializationFactory threadIni
return this;
}

/**
* Sets the {@link OperationInitializer} to use for threads started by this UpdateGraph.
*
* @param operationInitializer the operation initializer to use
* @return this builder
*/
public Builder operationInitializer(OperationInitializer operationInitializer) {
this.operationInitializer = operationInitializer;
return this;
}

/**
* Constructs and returns a PeriodicUpdateGraph. It is an error to do so an instance already exists with the
* name provided to this builder.
Expand Down Expand Up @@ -1250,7 +1263,8 @@ private PeriodicUpdateGraph construct() {
targetCycleDurationMillis,
minimumCycleDurationToLogNanos,
numUpdateThreads,
threadInitializationFactory);
threadInitializationFactory,
operationInitializer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.context.QueryScopeParam;
import io.deephaven.engine.table.hierarchical.HierarchicalTable;
import io.deephaven.engine.table.impl.OperationInitializationThreadPool;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.plugin.type.ObjectType;
import io.deephaven.plugin.type.ObjectTypeLookup;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.thread.ThreadInitializationFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -72,7 +71,7 @@ private static void createOrClearDirectory(final File directory) {

protected AbstractScriptSession(
UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory,
final OperationInitializer operationInitializer,
ObjectTypeLookup objectTypeLookup,
@Nullable Listener changeListener) {
this.objectTypeLookup = objectTypeLookup;
Expand All @@ -93,7 +92,7 @@ protected AbstractScriptSession(
.setQueryScope(queryScope)
.setQueryCompiler(compilerContext)
.setUpdateGraph(updateGraph)
.setOperationInitializer(new OperationInitializationThreadPool(threadInitializationFactory))
.setOperationInitializer(operationInitializer)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.deephaven.engine.table.TableFactory;
import io.deephaven.engine.table.impl.lang.QueryLanguageFunctionUtils;
import io.deephaven.engine.table.impl.util.TableLoggers;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.util.GroovyDeephavenSession.GroovySnapshot;
import io.deephaven.internal.log.LoggerFactory;
Expand All @@ -41,7 +42,6 @@
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.thread.ThreadInitializationFactory;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.util.type.TypeUtils;
import io.github.classgraph.ClassGraph;
Expand Down Expand Up @@ -146,20 +146,20 @@ private String getNextScriptClassName() {

public GroovyDeephavenSession(
final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory,
final OperationInitializer operationInitializer,
final ObjectTypeLookup objectTypeLookup,
final RunScripts runScripts) throws IOException {
this(updateGraph, threadInitializationFactory, objectTypeLookup, null, runScripts);
this(updateGraph, operationInitializer, objectTypeLookup, null, runScripts);
}

public GroovyDeephavenSession(
final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory,
final OperationInitializer operationInitializer,
ObjectTypeLookup objectTypeLookup,
@Nullable final Listener changeListener,
final RunScripts runScripts)
throws IOException {
super(updateGraph, threadInitializationFactory, objectTypeLookup, changeListener);
super(updateGraph, operationInitializer, objectTypeLookup, changeListener);

addDefaultImports(consoleImports);
if (INCLUDE_DEFAULT_IMPORTS_IN_LOADED_GROOVY) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
package io.deephaven.engine.util;

import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.thread.ThreadInitializationFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

/**
* ScriptSession implementation that simply allows variables to be exported. This is not intended for use in user
Expand All @@ -25,14 +24,17 @@ public class NoLanguageDeephavenSession extends AbstractScriptSession<AbstractSc
private final String scriptType;
private final Map<String, Object> variables;

public NoLanguageDeephavenSession(final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory) {
this(updateGraph, threadInitializationFactory, SCRIPT_TYPE);
public NoLanguageDeephavenSession(
final UpdateGraph updateGraph,
final OperationInitializer operationInitializer) {
this(updateGraph, operationInitializer, SCRIPT_TYPE);
}

public NoLanguageDeephavenSession(final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory, final String scriptType) {
super(updateGraph, threadInitializationFactory, null, null);
public NoLanguageDeephavenSession(
final UpdateGraph updateGraph,
final OperationInitializer operationInitializer,
final String scriptType) {
super(updateGraph, operationInitializer, null, null);

this.scriptType = scriptType;
variables = new LinkedHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private GroovyDeephavenSession getGroovySession() throws IOException {
private GroovyDeephavenSession getGroovySession(@Nullable Clock clock) throws IOException {
final GroovyDeephavenSession session = new GroovyDeephavenSession(
ExecutionContext.getContext().getUpdateGraph(),
ThreadInitializationFactory.NO_OP,
ExecutionContext.getContext().getOperationInitializer(),
NoOp.INSTANCE,
GroovyDeephavenSession.RunScripts.serviceLoader());
session.getExecutionContext().open();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ public class TestGroovyDeephavenSession {
public void setup() throws IOException {
livenessScope = new LivenessScope();
LivenessScopeStack.push(livenessScope);
final ExecutionContext context = ExecutionContext.getContext();
session = new GroovyDeephavenSession(
ExecutionContext.getContext().getUpdateGraph(), ThreadInitializationFactory.NO_OP, NoOp.INSTANCE, null,
context.getUpdateGraph(), context.getOperationInitializer(), NoOp.INSTANCE, null,
GroovyDeephavenSession.RunScripts.none());
executionContext = session.getExecutionContext().open();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import io.deephaven.util.thread.ThreadInitializationFactory;

public class TestExecutionContext {
public static final ControlledUpdateGraph UPDATE_GRAPH = new ControlledUpdateGraph();

public static final OperationInitializationThreadPool OPERATION_INITIALIZATION =
new OperationInitializationThreadPool(ThreadInitializationFactory.NO_OP);

public static final ControlledUpdateGraph UPDATE_GRAPH = new ControlledUpdateGraph(OPERATION_INITIALIZATION);

public static ExecutionContext createForUnitTests() {
return new ExecutionContext.Builder(new AuthContext.SuperUser())
.markSystemic()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package io.deephaven.engine.testutil;

import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.util.thread.ThreadInitializationFactory;

// TODO (deephaven-core#3886): Extract test functionality from PeriodicUpdateGraph
public class ControlledUpdateGraph extends PeriodicUpdateGraph {
public ControlledUpdateGraph() {
super("TEST", true, 1000, 25, -1, ThreadInitializationFactory.NO_OP);
public ControlledUpdateGraph(final OperationInitializer operationInitializer) {
super("TEST", true, 1000, 25, -1, ThreadInitializationFactory.NO_OP, operationInitializer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
import dagger.Provides;
import dagger.multibindings.IntoMap;
import dagger.multibindings.StringKey;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.engine.util.NoLanguageDeephavenSession;
import io.deephaven.engine.util.ScriptSession;
import io.deephaven.server.console.groovy.InitScriptsModule;
import io.deephaven.util.thread.ThreadInitializationFactory;

import javax.inject.Named;

Expand All @@ -28,7 +28,7 @@ ScriptSession bindScriptSession(NoLanguageDeephavenSession noLanguageSession) {
@Provides
NoLanguageDeephavenSession bindNoLanguageSession(
@Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph,
ThreadInitializationFactory threadInitializationFactory) {
return new NoLanguageDeephavenSession(updateGraph, threadInitializationFactory);
final OperationInitializer operationInitializer) {
return new NoLanguageDeephavenSession(updateGraph, operationInitializer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
import dagger.Provides;
import dagger.multibindings.IntoMap;
import dagger.multibindings.StringKey;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.engine.util.GroovyDeephavenSession;
import io.deephaven.engine.util.GroovyDeephavenSession.RunScripts;
import io.deephaven.engine.util.ScriptSession;
import io.deephaven.plugin.type.ObjectTypeLookup;
import io.deephaven.util.thread.ThreadInitializationFactory;

import javax.inject.Named;
import java.io.IOException;
Expand All @@ -31,12 +31,12 @@ ScriptSession bindScriptSession(final GroovyDeephavenSession groovySession) {
@Provides
GroovyDeephavenSession bindGroovySession(
@Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph,
ThreadInitializationFactory threadInitializationFactory,
final OperationInitializer operationInitializer,
final ObjectTypeLookup lookup,
final ScriptSession.Listener listener,
final RunScripts runScripts) {
try {
return new GroovyDeephavenSession(updateGraph, threadInitializationFactory, lookup, listener, runScripts);
return new GroovyDeephavenSession(updateGraph, operationInitializer, lookup, listener, runScripts);
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import dagger.Provides;
import dagger.multibindings.IntoMap;
import dagger.multibindings.StringKey;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.engine.util.PythonEvaluatorJpy;
Expand All @@ -32,12 +33,14 @@ ScriptSession bindScriptSession(PythonDeephavenSession pythonSession) {
PythonDeephavenSession bindPythonSession(
@Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory,
final OperationInitializer operationInitializer,
final ObjectTypeLookup lookup,
final ScriptSession.Listener listener,
final PythonEvaluatorJpy pythonEvaluator) {
try {
return new PythonDeephavenSession(updateGraph, threadInitializationFactory, lookup, listener, true,
pythonEvaluator);
return new PythonDeephavenSession(
updateGraph, operationInitializer, threadInitializationFactory, lookup, listener,
true, pythonEvaluator);
} catch (IOException e) {
throw new UncheckedIOException("Unable to run python startup scripts", e);
}
Expand Down
Loading

0 comments on commit 9670342

Please sign in to comment.