Skip to content

Commit

Permalink
Update Performance Tracker gets one Driver per Update Graph
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Jul 15, 2023
1 parent dca4fc5 commit c93c479
Show file tree
Hide file tree
Showing 23 changed files with 271 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.liveness.LivenessNode;
import org.jetbrains.annotations.Nullable;

/**
* Listener implementation for {@link Table} changes.
Expand All @@ -18,7 +19,7 @@ public interface TableListener extends LivenessNode {
* @param originalException exception
* @param sourceEntry performance tracking
*/
void onFailure(Throwable originalException, Entry sourceEntry);
void onFailure(Throwable originalException, @Nullable Entry sourceEntry);

/**
* Creates a notification for the exception.
Expand All @@ -27,7 +28,7 @@ public interface TableListener extends LivenessNode {
* @param sourceEntry performance tracking
* @return exception notification
*/
NotificationQueue.ErrorNotification getErrorNotification(Throwable originalException, Entry sourceEntry);
NotificationQueue.ErrorNotification getErrorNotification(Throwable originalException, @Nullable Entry sourceEntry);

/**
* Interface for instrumentation entries used by update graph nodes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public UpdateGraph getUpdateGraph() {
return this;
}

@Override
public String getName() {
return "PoisonedUpdateGraph";
}

@Override
public void addNotification(@NotNull Notification notification) {
fail();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.table.impl.util.StepUpdater;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.exceptions.UncheckedTableException;
import io.deephaven.engine.table.TableListener;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.perf.PerformanceEntry;
import io.deephaven.engine.updategraph.*;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.io.log.LogEntry;
import io.deephaven.io.log.impl.LogOutputStringImpl;
Expand All @@ -23,7 +25,6 @@
import io.deephaven.engine.liveness.LivenessArtifact;
import io.deephaven.engine.table.impl.util.AsyncClientErrorNotifier;
import io.deephaven.engine.table.impl.util.AsyncErrorLogger;
import io.deephaven.engine.table.impl.perf.UpdatePerformanceTracker;
import io.deephaven.util.Utils;
import io.deephaven.internal.log.LoggerFactory;
import org.jetbrains.annotations.NotNull;
Expand All @@ -43,6 +44,8 @@ public abstract class InstrumentedTableListenerBase extends LivenessArtifact
private static final Logger log = LoggerFactory.getLogger(InstrumentedTableListenerBase.class);

private final UpdateGraph updateGraph;
private final String description;
@Nullable
private final PerformanceEntry entry;
private final boolean terminalListener;

Expand All @@ -58,7 +61,10 @@ public abstract class InstrumentedTableListenerBase extends LivenessArtifact

InstrumentedTableListenerBase(@Nullable String description, boolean terminalListener) {
this.updateGraph = ExecutionContext.getContext().getUpdateGraph();
this.entry = UpdatePerformanceTracker.getInstance().getEntry(description);
this.description = (description == null || description.length() == 0)
? QueryPerformanceRecorder.UNINSTRUMENTED_CODE_DESCRIPTION
: description;
this.entry = PeriodicUpdateGraph.createUpdatePerformanceEntry(updateGraph, description);
this.terminalListener = terminalListener;
}

Expand All @@ -69,7 +75,7 @@ public UpdateGraph getUpdateGraph() {

@Override
public String toString() {
return Utils.getSimpleNameFor(this) + '-' + entry.getDescription();
return Utils.getSimpleNameFor(this) + '-' + description;
}

public static boolean setVerboseLogging(boolean enableVerboseLogging) {
Expand All @@ -78,6 +84,7 @@ public static boolean setVerboseLogging(boolean enableVerboseLogging) {
return original;
}

@Nullable
public PerformanceEntry getEntry() {
return entry;
}
Expand Down Expand Up @@ -159,12 +166,12 @@ public boolean satisfied(final long step) {
}

@Override
public void onFailure(Throwable originalException, Entry sourceEntry) {
public void onFailure(Throwable originalException, @Nullable Entry sourceEntry) {
forceReferenceCountToZero();
onFailureInternal(originalException, sourceEntry == null ? entry : sourceEntry);
}

protected abstract void onFailureInternal(Throwable originalException, Entry sourceEntry);
protected abstract void onFailureInternal(Throwable originalException, @Nullable Entry sourceEntry);

protected final void onFailureInternalWithDependent(
final BaseTable<?> dependent,
Expand Down Expand Up @@ -316,7 +323,9 @@ private void doRunInternal(final Runnable invokeOnUpdate) {
return;
}

entry.onUpdateStart(update.added(), update.removed(), update.modified(), update.shifted());
if (entry != null) {
entry.onUpdateStart(update.added(), update.removed(), update.modified(), update.shifted());
}

final long currentStep = getUpdateGraph().clock().currentStep();
try {
Expand All @@ -329,7 +338,7 @@ private void doRunInternal(final Runnable invokeOnUpdate) {
if (useVerboseLogging) {
en.append(entry);
} else {
en.append(entry.getDescription());
en.append(description);
}

en.append(", added.size()=").append(update.added().size())
Expand All @@ -354,7 +363,9 @@ private void doRunInternal(final Runnable invokeOnUpdate) {
onFailure(e, entry);
} finally {
afterRunNotification(currentStep);
entry.onUpdateEnd();
if (entry != null) {
entry.onUpdateEnd();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,31 @@
package io.deephaven.engine.table.impl;

import io.deephaven.engine.table.impl.perf.PerformanceEntry;
import io.deephaven.engine.table.impl.perf.UpdatePerformanceTracker;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;

import javax.annotation.Nullable;

public abstract class InstrumentedUpdateSource implements Runnable {

@Nullable
protected final PerformanceEntry entry;

public InstrumentedUpdateSource(String description) {
this.entry = UpdatePerformanceTracker.getInstance().getEntry(description);
public InstrumentedUpdateSource(final UpdateGraph updateGraph, final String description) {
this.entry = PeriodicUpdateGraph.createUpdatePerformanceEntry(updateGraph, description);
}

@Override
public final void run() {
entry.onUpdateStart();
if (entry != null) {
entry.onUpdateStart();
}
try {
instrumentedRefresh();
} finally {
entry.onUpdateEnd();
if (entry != null) {
entry.onUpdateEnd();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.deephaven.engine.updategraph.AbstractNotification;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
Expand Down Expand Up @@ -52,6 +53,7 @@ public abstract class MergedListener extends LivenessArtifact implements Notific
private final Iterable<NotificationQueue.Dependency> dependencies;
private final String listenerDescription;
protected final QueryTable result;
@Nullable
private final PerformanceEntry entry;
private final String logPrefix;

Expand All @@ -71,13 +73,13 @@ protected MergedListener(
Iterable<NotificationQueue.Dependency> dependencies,
String listenerDescription,
QueryTable result) {
this.updateGraph = ExecutionContext.getContext().getUpdateGraph();
this.updateGraph = result.getUpdateGraph();
this.recorders = recorders;
recorders.forEach(this::manage);
this.dependencies = dependencies;
this.listenerDescription = listenerDescription;
this.result = result;
this.entry = UpdatePerformanceTracker.getInstance().getEntry(listenerDescription);
this.entry = PeriodicUpdateGraph.createUpdatePerformanceEntry(updateGraph, listenerDescription);
this.logPrefix = System.identityHashCode(this) + " " + listenerDescription + " Merged Listener: ";
}

Expand Down Expand Up @@ -282,7 +284,9 @@ protected void handleUncaughtException(Exception updateException) {
}

protected void accumulatePeformanceEntry(BasePerformanceEntry subEntry) {
entry.accumulate(subEntry);
if (entry != null) {
entry.accumulate(subEntry);
}
}

private class MergedNotification extends AbstractNotification {
Expand Down Expand Up @@ -321,7 +325,9 @@ public void run() {
}
}

entry.onUpdateStart(added, removed, modified, shifted);
if (entry != null) {
entry.onUpdateStart(added, removed, modified, shifted);
}
try {
synchronized (MergedListener.this) {
if (notificationStep == lastEnqueuedStep) {
Expand All @@ -337,7 +343,9 @@ public void run() {
.append("MergedListener has completed execution ")
.append(this).endl();
} finally {
entry.onUpdateEnd();
if (entry != null) {
entry.onUpdateEnd();
}
}
} catch (Exception updateException) {
handleUncaughtException(updateException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.perf.BasePerformanceEntry;
import io.deephaven.engine.table.impl.perf.PerformanceEntry;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzer;
import io.deephaven.engine.updategraph.TerminalNotification;
import io.deephaven.engine.table.impl.util.ImmediateJobScheduler;
Expand Down Expand Up @@ -134,7 +135,10 @@ private void completionRoutine(TableUpdate upstream, JobScheduler jobScheduler,
@Override
public void run() {
synchronized (accumulated) {
getEntry().accumulate(accumulated);
final PerformanceEntry entry = getEntry();
if (entry != null) {
entry.accumulate(accumulated);
}
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ private UnderlyingTableMaintainer(
readyLocationStates = new IntrusiveDoublyLinkedQueue<>(
IntrusiveDoublyLinkedNode.Adapter.<PendingLocationState>getInstance());
processNewLocationsUpdateRoot = new InstrumentedUpdateSource(
result.getUpdateGraph(),
SourcePartitionedTable.class.getSimpleName() + '[' + tableLocationProvider + ']'
+ "-processPendingLocations") {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ private class LocationChangePoller extends InstrumentedUpdateSource {
private final TableLocationSubscriptionBuffer locationBuffer;

private LocationChangePoller(@NotNull final TableLocationSubscriptionBuffer locationBuffer) {
super(description + ".rowSetUpdateSource");
super(updateGraph, description + ".rowSetUpdateSource");
this.locationBuffer = locationBuffer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.deephaven.engine.table.impl.perf.UpdatePerformanceTracker;
import io.deephaven.engine.table.impl.sources.FillUnordered;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.engine.util.TableTools;
import io.deephaven.function.Numeric;
import io.deephaven.util.QueryConstants;
Expand Down Expand Up @@ -111,6 +112,7 @@ public static Builder newBuilder() {
private long lastIndex = -1;
private final SyntheticInstantSource columnSource;
private final Clock clock;
@Nullable
private final PerformanceEntry entry;
private final boolean isBlinkTable;
private final UpdateSourceRegistrar registrar;
Expand All @@ -125,7 +127,8 @@ public TimeTable(
this.registrar = registrar;
this.isBlinkTable = isBlinkTable;
final String name = isBlinkTable ? "TimeTableBlink" : "TimeTable";
this.entry = UpdatePerformanceTracker.getInstance().getEntry(name + "(" + startTime + "," + period + ")");
this.entry = PeriodicUpdateGraph.createUpdatePerformanceEntry(
updateGraph, name + "(" + startTime + "," + period + ")");
columnSource = (SyntheticInstantSource) getColumnSourceMap().get(TIMESTAMP);
this.clock = clock;
if (isBlinkTable) {
Expand Down Expand Up @@ -154,7 +157,9 @@ public void run() {
}

private void refresh(final boolean notifyListeners) {
entry.onUpdateStart();
if (entry != null) {
entry.onUpdateStart();
}
try {
final Instant now = clock.instantNanos();
long rangeStart = lastIndex + 1;
Expand Down Expand Up @@ -187,7 +192,9 @@ private void refresh(final boolean notifyListeners) {
}
}
} finally {
entry.onUpdateEnd();
if (entry != null) {
entry.onUpdateEnd();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class PerformanceEntry extends BasePerformanceEntry implements TableListe
private final String callerLine;

private final AuthContext authContext;
private final String updateGraphName;

private long intervalInvocationCount;

Expand All @@ -42,13 +43,14 @@ public class PerformanceEntry extends BasePerformanceEntry implements TableListe
private final RuntimeMemory.Sample endSample;

PerformanceEntry(final int id, final int evaluationNumber, final int operationNumber,
final String description, final String callerLine) {
final String description, final String callerLine, final String updateGraphName) {
this.id = id;
this.evaluationNumber = evaluationNumber;
this.operationNumber = operationNumber;
this.description = description;
this.callerLine = callerLine;
authContext = id == QueryConstants.NULL_INT ? null : ExecutionContext.getContext().getAuthContext();
this.updateGraphName = updateGraphName;
startSample = new RuntimeMemory.Sample();
endSample = new RuntimeMemory.Sample();
maxTotalMemory = 0;
Expand Down Expand Up @@ -165,6 +167,13 @@ public AuthContext getAuthContext() {
return authContext;
}

/**
* @return The name of the update graph that this PerformanceEntry is associated with
*/
public String getUpdateGraphName() {
return updateGraphName;
}

public long getIntervalAdded() {
return intervalAdded;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class UpdatePerformanceStreamPublisher implements StreamPublisher {
ColumnDefinition.ofLong("CollectionTimeNanos"),
ColumnDefinition.ofLong("EntryIntervalAllocatedBytes"),
ColumnDefinition.ofLong("EntryIntervalPoolAllocatedBytes"),
ColumnDefinition.ofString("AuthContext"));
ColumnDefinition.ofString("AuthContext"),
ColumnDefinition.ofString("UpdateGraph"));

public static TableDefinition definition() {
return DEFINITION;
Expand Down Expand Up @@ -93,6 +94,7 @@ public synchronized void add(IntervalLevelDetails intervalLevelDetails, Performa
chunks[21].asWritableLongChunk().add(performanceEntry.getIntervalAllocatedBytes());
chunks[22].asWritableLongChunk().add(performanceEntry.getIntervalPoolAllocatedBytes());
chunks[23].<String>asWritableObjectChunk().add(Objects.toString(performanceEntry.getAuthContext()));
chunks[24].<String>asWritableObjectChunk().add(Objects.toString(performanceEntry.getUpdateGraphName()));
if (chunks[0].size() == CHUNK_SIZE) {
flushInternal();
}
Expand Down
Loading

0 comments on commit c93c479

Please sign in to comment.