Skip to content

Commit

Permalink
SelectOrUpdateListener to defer notification delivery until all depen…
Browse files Browse the repository at this point in the history
…dency column values added/removed/modified on this cycle are satisfied
  • Loading branch information
rcaudy committed Dec 2, 2022
1 parent e491527 commit 47ad711
Showing 1 changed file with 136 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,34 @@
*/
package io.deephaven.engine.table.impl;

import io.deephaven.engine.exceptions.UncheckedTableException;
import io.deephaven.base.log.LogOutput;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.rowset.TrackingRowSet;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.perf.BasePerformanceEntry;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzer;
import io.deephaven.engine.table.impl.util.AsyncClientErrorNotifier;
import io.deephaven.engine.table.iterators.ObjectColumnIterator;
import io.deephaven.engine.updategraph.AbstractNotification;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.TerminalNotification;
import io.deephaven.engine.updategraph.UpdateGraphProcessor;
import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Map;
import java.util.function.LongPredicate;

/**
* A Shift-Aware listener for Select or Update. It uses the SelectAndViewAnalyzer to calculate how columns affect other
* columns, then creates a column set transformer which will be used by onUpdate to transform updates.
*/
class SelectOrUpdateListener extends BaseTable.ListenerImpl {

private final QueryTable dependent;
private final TrackingRowSet resultRowSet;
private final ModifiedColumnSet.Transformer transformer;
Expand All @@ -34,6 +41,10 @@ class SelectOrUpdateListener extends BaseTable.ListenerImpl {
private final BitSet allNewColumns = new BitSet();
private final boolean enableParallelUpdate;

private final String[] dependencyColumnNames;
private final ColumnSource<? extends NotificationQueue.Dependency>[] dependencyColumnSources;
private final ModifiedColumnSet[] dependencyModifiedColumnSets;

/**
* @param description Description of this listener
* @param parent The parent table
Expand Down Expand Up @@ -63,6 +74,18 @@ class SelectOrUpdateListener extends BaseTable.ListenerImpl {
&& UpdateGraphProcessor.DEFAULT.getUpdateThreads() > 1))
&& analyzer.allowCrossColumnParallelization();
analyzer.setAllNewColumns(allNewColumns);

dependencyColumnNames = dependent.getDefinition().getColumnStream()
.filter(cd -> NotificationQueue.Dependency.class.isAssignableFrom(cd.getDataType()))
.map(ColumnDefinition::getName)
.toArray(String[]::new);
// noinspection unchecked
dependencyColumnSources = Arrays.stream(dependencyColumnNames)
.map(cn -> dependent.getColumnSource(cn, NotificationQueue.Dependency.class))
.toArray(ColumnSource[]::new);
dependencyModifiedColumnSets = Arrays.stream(dependencyColumnNames)
.map(dependent::newModifiedColumnSet)
.toArray(ModifiedColumnSet[]::new);
}

@Override
Expand Down Expand Up @@ -111,7 +134,7 @@ private void completionRoutine(TableUpdate upstream, SelectAndViewAnalyzer.JobSc
final TableUpdateImpl downstream = new TableUpdateImpl(upstream.added().copy(), upstream.removed().copy(),
upstream.modified().copy(), upstream.shifted(), dependent.getModifiedColumnSetForUpdates());
transformer.clearAndTransform(upstream.modifiedColumnSet(), downstream.modifiedColumnSet);
dependent.notifyListeners(downstream);
maybeDeferDownstreamUntilDependencyColumnsSatisfied(downstream);
upstream.release();
toClear.close();
updateHelper.close();
Expand All @@ -135,4 +158,112 @@ public void run() {
public boolean satisfied(long step) {
return super.satisfied(step) && !updateInProgress;
}

private void maybeDeferDownstreamUntilDependencyColumnsSatisfied(@NotNull final TableUpdate downstream) {
final boolean dependencySourcesAffected;
if (dependencyColumnNames.length == 0) {
dependencySourcesAffected = false;
} else if (downstream.added().isNonempty() || downstream.removed().isNonempty()) {
dependencySourcesAffected = true;
} else if (downstream.modified().isNonempty()) {
dependencySourcesAffected = Arrays.stream(dependencyModifiedColumnSets)
.anyMatch(downstream.modifiedColumnSet()::containsAny);
} else {
// Only shifts
dependencySourcesAffected = false;
}
if (dependencySourcesAffected) {
UpdateGraphProcessor.DEFAULT.addNotification(new DeferredNotification(dependent, downstream,
(final long step) -> dependencyColumnsSatisfied(step, downstream)));
} else {
dependent.notifyListeners(downstream);
}
}

private boolean dependencyColumnsSatisfied(final long step, @NotNull final TableUpdate downstream) {
for (int dci = 0; dci < dependencyColumnSources.length; ++dci) {
final ColumnSource<? extends NotificationQueue.Dependency> columnSource = dependencyColumnSources[dci];
final ModifiedColumnSet modifiedColumnSet = dependencyModifiedColumnSets[dci];
if (downstream.added().isNonempty()) {
try (final ObjectColumnIterator<? extends NotificationQueue.Dependency> addedDependencies =
new ObjectColumnIterator<>(columnSource, downstream.added())) {
if (addedDependencies.stream().anyMatch(addedDep -> !addedDep.satisfied(step))) {
return false;
}
}
}
if (downstream.removed().isNonempty()) {
try (final ObjectColumnIterator<? extends NotificationQueue.Dependency> removedDependencies =
new ObjectColumnIterator<>(columnSource.getPrevSource(), downstream.removed())) {
if (removedDependencies.stream().anyMatch(removedDep -> !removedDep.satisfied(step))) {
return false;
}
}
}
if (downstream.modified().isNonempty() && downstream.modifiedColumnSet().containsAny(modifiedColumnSet)) {
// @formatter:off
try (final ObjectColumnIterator<? extends NotificationQueue.Dependency> postModifiedDependencies =
new ObjectColumnIterator<>(columnSource, downstream.modified());
final ObjectColumnIterator<? extends NotificationQueue.Dependency> preModifiedDependencies =
new ObjectColumnIterator<>(columnSource.getPrevSource(), downstream.getModifiedPreShift())) {
// @formatter:off
while (postModifiedDependencies.hasNext()) {
final NotificationQueue.Dependency current = postModifiedDependencies.next();
final NotificationQueue.Dependency previous = preModifiedDependencies.next();
// This is somewhat conservative. We could instead only check satisfaction when
// current != previous, and that would be enough for careful listeners that avoid unnecessary
// work, but we don't know that all listeners are careful.
if (!current.satisfied(step)) {
return false;
}
if (current != previous && !previous.satisfied(step)) {
return false;
}
}
}
}
}
return true;
}

/**
* Deferred notification that cannot execute until a predicate becomes true.
*/
private static final class DeferredNotification extends AbstractNotification {

private final BaseTable notifier;
private final TableUpdate downstream;
private final LongPredicate trigger;

private DeferredNotification(
@NotNull final BaseTable notifier,
@NotNull final TableUpdate downstream,
@NotNull final LongPredicate trigger) {
super(false);
this.notifier = notifier;
this.downstream = downstream;
this.trigger = trigger;
}

@Override
public LogOutput append(@NotNull final LogOutput output) {
return output.append("Notification{").append(System.identityHashCode(this))
.append(" for deferred downstream table update from ").append(notifier).append('}');
}

@Override
public boolean canExecute(final long step) {
return trigger.test(step);
}

@Override
public ExecutionContext getExecutionContext() {
return null;
}

@Override
public void run() {
notifier.notifyListeners(downstream);
}
}
}

0 comments on commit 47ad711

Please sign in to comment.