Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClockFilter: Correct Missing Dependency and Improve Performance #4572

Merged
merged 4 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -474,26 +474,31 @@ public boolean satisfied(final long step) {
StepUpdater.checkForOlderStep(step, lastSatisfiedStep);
StepUpdater.checkForOlderStep(step, lastNotificationStep);

final Collection<Object> localParents = parents;
// If we have no parents whatsoever then we are a source, and have no dependency chain other than the UGP
// itself
if (localParents.isEmpty()) {
if (updateGraph.satisfied(step)) {
updateGraph.logDependencies().append("Root node satisfied ").append(this)
.endl();
StepUpdater.tryUpdateRecordedStep(LAST_SATISFIED_STEP_UPDATER, this, step);
return true;
final Collection<Object> localParents = parents;

if (!updateGraph.satisfied(step)) {
if (localParents.isEmpty()) {
updateGraph.logDependencies().append("Root node not satisfied ").append(this).endl();
} else {
updateGraph.logDependencies().append("Update graph not satisfied for ").append(this).endl();
}
return false;
}

// noinspection SynchronizationOnLocalVariableOrMethodParameter
if (localParents.isEmpty()) {
updateGraph.logDependencies().append("Root node satisfied ").append(this).endl();
StepUpdater.tryUpdateRecordedStep(LAST_SATISFIED_STEP_UPDATER, this, step);
return true;
}

synchronized (localParents) {
for (Object parent : localParents) {
if (parent instanceof NotificationQueue.Dependency) {
if (!((NotificationQueue.Dependency) parent).satisfied(step)) {
updateGraph.logDependencies()
.append("Parents dependencies not satisfied for ").append(this)
.append("Parent dependencies not satisfied for ").append(this)
.append(", parent=").append((NotificationQueue.Dependency) parent)
.endl();
return false;
Expand All @@ -503,7 +508,7 @@ public boolean satisfied(final long step) {
}

updateGraph.logDependencies()
.append("All parents dependencies satisfied for ").append(this)
.append("All parent dependencies satisfied for ").append(this)
.endl();

StepUpdater.tryUpdateRecordedStep(LAST_SATISFIED_STEP_UPDATER, this, step);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -976,22 +976,13 @@ public Table moveColumns(int index, boolean moveToEnd, String... columnsToMove)

public static class FilteredTable extends QueryTable implements WhereFilter.RecomputeListener {
private final QueryTable source;
@ReferentialIntegrity
private final WhereFilter[] filters;
private boolean refilterMatchedRequested = false;
private boolean refilterUnmatchedRequested = false;
private MergedListener whereListener;

public FilteredTable(final TrackingRowSet currentMapping, final QueryTable source,
final WhereFilter[] filters) {
public FilteredTable(final TrackingRowSet currentMapping, final QueryTable source) {
super(source.getDefinition(), currentMapping, source.columns, null, null);
this.source = source;
this.filters = filters;
for (final WhereFilter f : filters) {
if (f instanceof LivenessReferent && f.isRefreshing()) {
manage((LivenessReferent) f);
}
}
}

@Override
Expand Down Expand Up @@ -1288,8 +1279,7 @@ void handleUncaughtException(Exception throwable) {
}
currentMapping.initializePreviousValue();

final FilteredTable filteredTable =
new FilteredTable(currentMapping, this, filters);
final FilteredTable filteredTable = new FilteredTable(currentMapping, this);

for (final WhereFilter filter : filters) {
filter.setRecomputeListener(filteredTable);
Expand All @@ -1307,29 +1297,19 @@ void handleUncaughtException(Exception throwable) {
}
}

final List<NotificationQueue.Dependency> dependencies = Stream.concat(
Stream.of(filters)
.filter(f -> f instanceof NotificationQueue.Dependency)
.map(f -> (NotificationQueue.Dependency) f),
Stream.of(filters)
.filter(f -> f instanceof DependencyStreamProvider)
.flatMap(f -> ((DependencyStreamProvider) f)
.getDependencyStream()))
.collect(Collectors.toList());
if (swapListener != null) {
final ListenerRecorder recorder = new ListenerRecorder(
"where(" + Arrays.toString(filters) + ")", QueryTable.this,
filteredTable);
final WhereListener whereListener = new WhereListener(
log, this, recorder, dependencies, filteredTable, filters);
log, this, recorder, filteredTable, filters);
filteredTable.setWhereListener(whereListener);
recorder.setMergedListener(whereListener);
swapListener.setListenerAndResult(recorder, filteredTable);
filteredTable.addParentReference(swapListener);
filteredTable.addParentReference(whereListener);
} else if (refreshingFilters) {
final WhereListener whereListener = new WhereListener(
log, this, null, dependencies, filteredTable, filters);
log, this, null, filteredTable, filters);
filteredTable.setWhereListener(whereListener);
filteredTable.addParentReference(whereListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.deephaven.base.verify.Assert;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.WritableRowSet;
Expand All @@ -13,8 +13,11 @@
import io.deephaven.engine.table.impl.select.WhereFilter;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.io.logger.Logger;
import org.jetbrains.annotations.NotNull;

import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* The WhereListener is a MergedListener for computing updated filters
Expand Down Expand Up @@ -47,10 +50,10 @@ class WhereListener extends MergedListener {
final Logger log,
final QueryTable sourceTable,
final ListenerRecorder recorder,
final Collection<NotificationQueue.Dependency> dependencies,
final QueryTable.FilteredTable result,
final WhereFilter[] filters) {
super(recorder == null ? Collections.emptyList() : Collections.singleton(recorder), dependencies,
super(recorder == null ? Collections.emptyList() : Collections.singleton(recorder),
extractDependencies(filters),
"where(" + Arrays.toString(filters) + ")", result);
this.sourceTable = sourceTable;
this.recorder = recorder;
Expand All @@ -63,6 +66,9 @@ class WhereListener extends MergedListener {
for (final WhereFilter filter : this.filters) {
hasColumnArray |= !filter.getColumnArrays().isEmpty();
filterColumnNames.addAll(filter.getColumns());
if (filter instanceof LivenessReferent && filter.isRefreshing()) {
manage((LivenessReferent) filter);
}
}
permitParallelization = AbstractFilterExecution.permitParallelization(filters);
this.filterColumns = hasColumnArray ? null
Expand All @@ -81,6 +87,19 @@ class WhereListener extends MergedListener {
}
}

@NotNull
private static List<NotificationQueue.Dependency> extractDependencies(@NotNull final WhereFilter[] filters) {
return Stream.concat(
Stream.of(filters)
.filter(f -> f instanceof NotificationQueue.Dependency)
.map(f -> (NotificationQueue.Dependency) f),
Stream.of(filters)
.filter(f -> f instanceof DependencyStreamProvider)
.flatMap(f -> ((DependencyStreamProvider) f)
.getDependencyStream()))
.collect(Collectors.toList());
}

@Override
public void process() {
initialNotificationStep = getUpdateGraph().clock().currentStep();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.annotations.ScriptApi;
import org.jetbrains.annotations.NotNull;
Expand All @@ -21,7 +23,10 @@
* The use case is for benchmarks that want to replay a table in order to better understand incremental processing
* capacity.
*/
public abstract class BaseIncrementalReleaseFilter extends WhereFilterLivenessArtifactImpl implements Runnable {
public abstract class BaseIncrementalReleaseFilter
extends WhereFilterLivenessArtifactImpl
implements Runnable, NotificationQueue.Dependency {

private final long initialSize;
private long releasedSize;
private long expectedSize;
Expand Down Expand Up @@ -187,6 +192,16 @@ public void start() {
}
}

@Override
public boolean satisfied(long step) {
return updateGraph.satisfied(step);
}

@Override
public UpdateGraph getUpdateGraph() {
return updateGraph;
}

public long getInitialSize() {
return initialSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import io.deephaven.engine.updategraph.DynamicNode;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -26,7 +28,8 @@
/**
* Boilerplate super-class for various clock-oriented filters.
*/
public abstract class ClockFilter extends WhereFilterLivenessArtifactImpl implements ReindexingFilter, Runnable {
public abstract class ClockFilter extends WhereFilterLivenessArtifactImpl
implements ReindexingFilter, Runnable, NotificationQueue.Dependency {

protected final String columnName;
protected final Clock clock;
Expand Down Expand Up @@ -91,6 +94,16 @@ public boolean isRefreshing() {
return refreshing;
}

@Override
public boolean satisfied(long step) {
return updateGraph.satisfied(step);
}

@Override
public UpdateGraph getUpdateGraph() {
return updateGraph;
}

@Override
public final void setRecomputeListener(@NotNull final RecomputeListener listener) {
if (!refreshing) {
Expand All @@ -112,8 +125,7 @@ public final void run() {
final RowSet added = updateAndGetAddedIndex();
if (added != null && !added.isEmpty()) {
resultTable.getRowSet().writableCast().insert(added);
resultTable.notifyListeners(added, RowSetFactory.empty(),
RowSetFactory.empty());
resultTable.notifyListeners(added, RowSetFactory.empty(), RowSetFactory.empty());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import org.jetbrains.annotations.NotNull;

import java.util.Collections;
Expand All @@ -17,7 +19,9 @@
/**
* This will filter a table starting off with the first N rows, and then adding new rows to the table on each run.
*/
public class RollingReleaseFilter extends WhereFilterLivenessArtifactImpl implements Runnable {
public class RollingReleaseFilter
extends WhereFilterLivenessArtifactImpl
implements Runnable, NotificationQueue.Dependency {
private final long workingSize;
private final long rollingSize;
private long offset = 0;
Expand Down Expand Up @@ -87,6 +91,16 @@ public void setRecomputeListener(RecomputeListener listener) {
updateGraph.addSource(this);
}

@Override
public boolean satisfied(long step) {
return updateGraph.satisfied(step);
}

@Override
public UpdateGraph getUpdateGraph() {
return updateGraph;
}

@Override
public RollingReleaseFilter copy() {
return new RollingReleaseFilter(workingSize, rollingSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ private enum State {
private final Condition ugpCondition = updateGraph.exclusiveLock().newCondition();

private Instant now;
private boolean maxSpeed;

/**
* Create a simulation clock for the specified time range and step.
*
*
* @param startTime The initial time that will be returned by this clock, before it is started
* @param endTime The final time that will be returned by this clock, when the simulation has completed
* @param stepSize The time to "elapse" in each run loop
Expand Down Expand Up @@ -109,13 +110,14 @@ public void start() {
* @param maxSpeed run the simulation clock at the max possible speed.
*/
public void start(final boolean maxSpeed) {
if (maxSpeed) {
updateGraph.setTargetCycleDurationMillis(0);
}
if (!state.compareAndSet(State.NOT_STARTED, State.STARTED)) {
throw new IllegalStateException(this + " already started");
}
this.maxSpeed = maxSpeed;
updateGraph.addSource(refreshTask);
if (maxSpeed) {
updateGraph.requestRefresh();
}
}

/**
Expand All @@ -133,11 +135,14 @@ public void advance() {
}
final Instant incremented = DateTimeUtils.plus(now, stepNanos);
now = DateTimeUtils.isAfter(incremented, endTime) ? endTime : incremented;
if (maxSpeed) {
updateGraph.requestRefresh();
}
}

/**
* Is the simulation done?
*
*
* @return True if the simulation is done
*/
public boolean done() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.engine.table.ColumnSource;
import org.jetbrains.annotations.NotNull;
Expand All @@ -24,7 +26,9 @@
/**
* This will filter a table for the most recent N nanoseconds (must be on a date time column).
*/
public class TimeSeriesFilter extends WhereFilterLivenessArtifactImpl implements Runnable {
public class TimeSeriesFilter
extends WhereFilterLivenessArtifactImpl
implements Runnable, NotificationQueue.Dependency {
protected final String columnName;
protected final long nanos;
private RecomputeListener listener;
Expand Down Expand Up @@ -102,6 +106,16 @@ public void setRecomputeListener(RecomputeListener listener) {
updateGraph.addSource(this);
}

@Override
public boolean satisfied(long step) {
return updateGraph.satisfied(step);
}

@Override
public UpdateGraph getUpdateGraph() {
return updateGraph;
}

@Override
public TimeSeriesFilter copy() {
return new TimeSeriesFilter(columnName, nanos);
Expand Down
Loading