Skip to content

Commit

Permalink
finished feedback from rnd 1
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Jul 28, 2023
1 parent d5d1a08 commit fbc81b1
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 195 deletions.
8 changes: 0 additions & 8 deletions Integrations/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,3 @@ services:
service: redpanda
expose:
- 29092
- 8081

postgres:
extends:
file: ../postgres/docker-compose.yml
service: postgres
expose:
- 5432

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class TableLoggers {
*/
@ScriptApi
public static QueryTable updatePerformanceLog() {
return UpdatePerformanceTracker.getInstance().getQueryTable();
return UpdatePerformanceTracker.getQueryTable();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ public static PerformanceEntry createUpdatePerformanceEntry(
final String description) {
if (updateGraph instanceof PeriodicUpdateGraph) {
final PeriodicUpdateGraph pug = (PeriodicUpdateGraph) updateGraph;
// if the pug has not been started, then we can't get the performance tracker
if (pug.updatePerformanceTracker != null) {
return pug.updatePerformanceTracker.getEntry(description);
}
throw new IllegalStateException("Cannot create a performance entry for a PeriodicUpdateGraph that has "
+ "not been completely constructed.");
}
return null;
}
Expand Down Expand Up @@ -160,6 +161,9 @@ public static PerformanceEntry createUpdatePerformanceEntry(
private volatile long targetCycleDurationMillis;
private final long minimumCycleDurationToLogNanos;

/** when to next flush the performance tracker; initializes to zero to force a flush on start */
private long nextUpdatePerformanceTrackerFlushTime = 0;

/**
* How many cycles we have not logged, but were non-zero.
*/
Expand Down Expand Up @@ -295,7 +299,7 @@ public synchronized void take(final AccumulatedCycleStats out) {

private final String name;

private final UpdatePerformanceTracker.Driver updatePerformanceTracker;
private final UpdatePerformanceTracker updatePerformanceTracker;

public PeriodicUpdateGraph(
final String name,
Expand Down Expand Up @@ -328,7 +332,7 @@ public PeriodicUpdateGraph(
}), "PeriodicUpdateGraph." + name + ".refreshThread");
refreshThread.setDaemon(true);

updatePerformanceTracker = UpdatePerformanceTracker.createDriverFor(this);
updatePerformanceTracker = new UpdatePerformanceTracker(this);
}

public String getName() {
Expand Down Expand Up @@ -608,7 +612,6 @@ public void start() {
Assert.eqFalse(unitTestMode, "unitTestMode");
Assert.eqFalse(allowUnitTestMode, "allowUnitTestMode");
synchronized (refreshThread) {
updatePerformanceTracker.start();
if (notificationProcessor instanceof PoisonedNotificationProcessor) {
notificationProcessor = makeNotificationProcessor();
}
Expand Down Expand Up @@ -1754,9 +1757,14 @@ private void logSuppressedCycles() {
* @param timeSource The source of time that startTime was based on
*/
private void waitForNextCycle(final long startTime, final Scheduler timeSource) {
final long now = timeSource.currentTimeMillis();
if (now >= nextUpdatePerformanceTrackerFlushTime) {
nextUpdatePerformanceTrackerFlushTime = now + UpdatePerformanceTracker.REPORT_INTERVAL_MILLIS;
updatePerformanceTracker.flush();
}
long expectedEndTime = startTime + targetCycleDurationMillis;
if (minimumInterCycleSleep > 0) {
expectedEndTime = Math.max(expectedEndTime, timeSource.currentTimeMillis() + minimumInterCycleSleep);
expectedEndTime = Math.max(expectedEndTime, now + minimumInterCycleSleep);
}
waitForEndTime(expectedEndTime, timeSource);
}
Expand Down Expand Up @@ -1972,6 +1980,12 @@ public void takeAccumulatedCycleStats(AccumulatedCycleStats updateGraphAccumCycl
accumulatedCycleStats.take(updateGraphAccumCycleStats);
}

public static PeriodicUpdateGraph getInstance(final String name) {
synchronized (INSTANCES) {
return INSTANCES.get(name);
}
}

public static final class Builder {
private final boolean allowUnitTestMode =
Configuration.getInstance().getBooleanWithDefault(ALLOW_UNIT_TEST_MODE_PROP, false);
Expand Down
7 changes: 1 addition & 6 deletions py/server/test_helper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,10 @@ def start_jvm_for_tests(jvm_props: Dict[str, str] = None):
py_scope_jpy = jpy.get_type("io.deephaven.engine.util.PythonScopeJpyImpl").ofMainGlobals()
global py_dh_session
_JPeriodicUpdateGraph = jpy.get_type("io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph")
_j_test_update_graph = _JPeriodicUpdateGraph.newBuilder("PYTHON_TEST").existingOrBuild()
_j_test_update_graph = _JPeriodicUpdateGraph.newBuilder(_JPeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME).existingOrBuild()
_JPythonScriptSession = jpy.get_type("io.deephaven.integrations.python.PythonDeephavenSession")
py_dh_session = _JPythonScriptSession(_j_test_update_graph, py_scope_jpy)

_JUpdatePerformanceTracker = jpy.get_type("io.deephaven.engine.table.impl.perf.UpdatePerformanceTracker")
exec_ctx = py_dh_session.getExecutionContext().open()
_JUpdatePerformanceTracker.start(_j_test_update_graph)
exec_ctx.close()


def _expand_wildcards_in_list(elements):
"""
Expand Down
1 change: 1 addition & 0 deletions py/server/tests/testbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

_JTableTools = jpy.get_type("io.deephaven.engine.util.TableTools")


def table_equals(table_a: Table, table_b: Table) -> bool:
try:
return False if _JTableTools.diff(table_a.j_table, table_b.j_table, 1) else True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,15 @@ public DeephavenApiServer run() throws IOException, ClassNotFoundException, Time

EngineMetrics.maybeStartStatsCollection();

log.info().append("Starting Update Graph...").endl();
getUpdateGraph().<PeriodicUpdateGraph>cast().start();

log.info().append("Starting Performance Trackers...").endl();
QueryPerformanceRecorder.installPoolAllocationRecorder();
QueryPerformanceRecorder.installUpdateGraphLockInstrumentation();
UpdatePerformanceTracker.start(getUpdateGraph());
ServerStateTracker.start();
AsyncErrorLogger.init();

log.info().append("Starting Update Graph...").endl();
getUpdateGraph().<PeriodicUpdateGraph>cast().start();

for (UriResolver resolver : uriResolvers.resolvers()) {
log.debug().append("Found table resolver ").append(resolver.getClass().toString()).endl();
}
Expand Down

0 comments on commit fbc81b1

Please sign in to comment.