Skip to content

Commit

Permalink
Improve performance for ConstituentDependency, tighter double-notific…
Browse files Browse the repository at this point in the history
…ation testing, MergeListener paranoia, SwapListener swapping (#3142)

* Tighten expectations for notification delivery in BaseTable
* Unit test to reproduce theorized dependency error (if we disable the code that guards against it)
* Optimizations to ConstituentDependency: avoid concurrent checks, record lowest unsatisfied constituent position to resume from
* MergedListener (including the multiplexing one for SyncTableFilter) changed to be a little more cautious about double-notifies, and support next-cycle error notification when needed
* Get rid of ShiftObliviousSwapListener
* Switch from SwapListener as a permanent link in the DAG to a temporary one which is always eliminated by the end of the instantiation cycle
* Fix some missing destroy implementations and parent manage calls for "exotic" listeners
  • Loading branch information
rcaudy authored Dec 6, 2022
1 parent 451150a commit 40e9134
Show file tree
Hide file tree
Showing 34 changed files with 563 additions and 431 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.base.reference;

import org.jetbrains.annotations.NotNull;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
* {@link SimpleReference} implementation that delegates to an internal {@link SimpleReference} which can be replaced
* using the {@link #swapDelegate(SimpleReference, SimpleReference)} method.
*/
public class SwappableDelegatingReference<T> implements SimpleReference<T> {

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<SwappableDelegatingReference, SimpleReference> DELEGATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(
SwappableDelegatingReference.class, SimpleReference.class, "delegate");

private volatile SimpleReference<T> delegate;

public SwappableDelegatingReference(@NotNull final SimpleReference<T> delegate) {
this.delegate = Objects.requireNonNull(delegate);
}

/**
* Swap the delegate assigned to this SwappableDelegatingReference.
*
* @param oldDelegate The delegate to swap out
* @param newDelegate The delegate to swap in
* @throws IllegalArgumentException if {@code oldDelegate} is not the current delegate value
*/
public void swapDelegate(
@NotNull final SimpleReference<T> oldDelegate,
@NotNull final SimpleReference<T> newDelegate) {
if (!DELEGATE_UPDATER.compareAndSet(this, oldDelegate, newDelegate)) {
throw new IllegalArgumentException(
"Previous delegate mismatch: found " + delegate + ", expected " + oldDelegate);
}
}

@Override
public T get() {
return delegate.get();
}

@Override
public void clear() {
delegate.clear();
delegate = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ private static Table makeDownsampledQueryTable(final QueryTable wholeQueryTable,

if (swapListener != null) {
swapListener.setListenerAndResult(downsampleListener, downsampleListener.resultTable);
downsampleListener.resultTable.addParentReference(swapListener);
downsampleListener.resultTable.addParentReference(downsampleListener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,11 +677,15 @@ public void addUpdateListener(final TableUpdateListener listener) {
private SimpleReferenceManager<TableUpdateListener, ? extends SimpleReference<TableUpdateListener>> ensureChildListenerReferences() {
// noinspection unchecked
return ensureField(CHILD_LISTENER_REFERENCES_UPDATER, EMPTY_CHILD_LISTENER_REFERENCES,
() -> new SimpleReferenceManager<>((
final TableUpdateListener tableUpdateListener) -> tableUpdateListener instanceof LegacyListenerAdapter
? (LegacyListenerAdapter) tableUpdateListener
: new WeakSimpleReference<>(tableUpdateListener),
true));
() -> new SimpleReferenceManager<>((final TableUpdateListener tableUpdateListener) -> {
if (tableUpdateListener instanceof LegacyListenerAdapter) {
return (LegacyListenerAdapter) tableUpdateListener;
} else if (tableUpdateListener instanceof SwapListener) {
return ((SwapListener) tableUpdateListener).getReferenceForSource();
} else {
return new WeakSimpleReference<>(tableUpdateListener);
}
}, true));
}

@Override
Expand Down Expand Up @@ -739,6 +743,11 @@ public final void notifyListeners(RowSet added, RowSet removed, RowSet modified)
* callers should pass a {@code copy} for updates they intend to further use.
*/
public final void notifyListeners(final TableUpdate update) {
Assert.eqFalse(isFailed, "isFailed");
final long currentStep = LogicalClock.DEFAULT.currentStep();
// tables may only be updated once per cycle
Assert.lt(lastNotificationStep, "lastNotificationStep", currentStep, "LogicalClock.DEFAULT.currentStep()");

Assert.eqTrue(update.valid(), "update.valid()");
if (update.empty()) {
update.release();
Expand All @@ -749,8 +758,6 @@ public final void notifyListeners(final TableUpdate update) {

final boolean hasNoListeners = !hasListeners();
if (hasNoListeners) {
final long currentStep = LogicalClock.DEFAULT.currentStep();
Assert.lt(lastNotificationStep, "lastNotificationStep", currentStep, "LogicalClock.DEFAULT.currentStep()");
lastNotificationStep = currentStep;
update.release();
return;
Expand Down Expand Up @@ -784,10 +791,6 @@ public final void notifyListeners(final TableUpdate update) {
validateUpdateOverlaps(update);
}

// tables may only be updated once per cycle
final long currentStep = LogicalClock.DEFAULT.currentStep();
Assert.lt(lastNotificationStep, "lastNotificationStep", currentStep, "LogicalClock.DEFAULT.currentStep()");

lastNotificationStep = currentStep;

// notify children
Expand Down Expand Up @@ -892,11 +895,14 @@ private void validateUpdateOverlaps(final TableUpdate update) {
* @param e error
* @param sourceEntry performance tracking
*/
public final void notifyListenersOnError(final Throwable e,
@Nullable final TableListener.Entry sourceEntry) {
public final void notifyListenersOnError(final Throwable e, @Nullable final TableListener.Entry sourceEntry) {
Assert.eqFalse(isFailed, "isFailed");
final long currentStep = LogicalClock.DEFAULT.currentStep();
Assert.lt(lastNotificationStep, "lastNotificationStep", currentStep, "LogicalClock.DEFAULT.currentStep()");

isFailed = true;
maybeSignal();
lastNotificationStep = LogicalClock.DEFAULT.currentStep();
lastNotificationStep = currentStep;

final NotificationQueue notificationQueue = getNotificationQueue();
childListenerReferences.forEach((listenerRef, listener) -> notificationQueue
Expand Down Expand Up @@ -1390,7 +1396,6 @@ public Table copy() {
if (swapListener != null) {
final ListenerImpl listener = new ListenerImpl("copy()", this, resultTable);
swapListener.setListenerAndResult(listener, resultTable);
resultTable.addParentReference(swapListener);
}

result.setValue(resultTable);
Expand Down Expand Up @@ -1422,16 +1427,16 @@ public Table setColumnRenderers(String directive) {
return result;
}

public static <SL extends SwapListenerBase<?>> void initializeWithSnapshot(
String logPrefix, SL swapListener, ConstructSnapshot.SnapshotFunction snapshotFunction) {
public static void initializeWithSnapshot(
String logPrefix, SwapListener swapListener, ConstructSnapshot.SnapshotFunction snapshotFunction) {
if (swapListener == null) {
snapshotFunction.call(false, LogicalClock.DEFAULT.currentValue());
return;
}
ConstructSnapshot.callDataSnapshotFunction(logPrefix, swapListener.makeSnapshotControl(), snapshotFunction);
}

public interface SwapListenerFactory<T extends SwapListenerBase<?>> {
public interface SwapListenerFactory<T extends SwapListener> {
T newListener(BaseTable sourceTable);
}

Expand All @@ -1443,7 +1448,7 @@ public interface SwapListenerFactory<T extends SwapListenerBase<?>> {
* @return a swap listener for this table (or null)
*/
@Nullable
public <T extends SwapListenerBase<?>> T createSwapListenerIfRefreshing(final SwapListenerFactory<T> factory) {
public <T extends SwapListener> T createSwapListenerIfRefreshing(final SwapListenerFactory<T> factory) {
if (!isRefreshing()) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.engine.table.impl;

import io.deephaven.base.log.LogOutput;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSequence;
Expand Down Expand Up @@ -58,6 +59,7 @@ public static void install(
private final ColumnSource<? extends Dependency>[] dependencyColumns;

private volatile long lastSatisfiedStep;
private long firstUnsatisfiedRowPosition = 0;

private ConstituentDependency(
@NotNull final Dependency resultUpdatedDependency,
Expand Down Expand Up @@ -90,41 +92,55 @@ public boolean satisfied(final long step) {
lastSatisfiedStep = step;
return true;
}
final int chunkSize = Math.toIntExact(Math.min(ColumnIterator.DEFAULT_CHUNK_SIZE, resultRows.size()));
final int numColumns = dependencyColumns.length;
final ChunkSource.GetContext[] contexts = new ChunkSource.GetContext[numColumns];
try (final SharedContext sharedContext = numColumns > 1 ? SharedContext.makeSharedContext() : null;
final SafeCloseable ignored = new SafeCloseableArray<>(contexts);
final RowSequence.Iterator rows = resultRows.getRowSequenceIterator()) {
for (int ci = 0; ci < numColumns; ++ci) {
contexts[ci] = dependencyColumns[ci].makeGetContext(chunkSize, sharedContext);
synchronized (this) {
if (lastSatisfiedStep == step) {
return true;
}
while (rows.hasMore()) {
final RowSequence sliceRows = rows.getNextRowSequenceWithLength(chunkSize);
final int numConstituents = sliceRows.intSize();
final int chunkSize = Math.toIntExact(Math.min(ColumnIterator.DEFAULT_CHUNK_SIZE, resultRows.size()));
final int numColumns = dependencyColumns.length;
final ChunkSource.GetContext[] contexts = new ChunkSource.GetContext[numColumns];
try (final SharedContext sharedContext = numColumns > 1 ? SharedContext.makeSharedContext() : null;
final SafeCloseable ignored = new SafeCloseableArray<>(contexts);
final RowSequence.Iterator rows = resultRows.getRowSequenceIterator()) {
if (firstUnsatisfiedRowPosition > 0) {
rows.advance(resultRows.get(firstUnsatisfiedRowPosition));
}
for (int ci = 0; ci < numColumns; ++ci) {
final ObjectChunk<? extends Dependency, ? extends Values> dependencies =
dependencyColumns[ci].getChunk(contexts[ci], sliceRows).asObjectChunk();
for (int di = 0; di < numConstituents; ++di) {
final Dependency constituent = dependencies.get(di);
if (constituent != null && !constituent.satisfied(step)) {
UpdateGraphProcessor.DEFAULT.logDependencies()
.append("Constituent dependencies not satisfied for ")
.append(this).append(", constituent=").append(constituent)
.endl();
return false;
contexts[ci] = dependencyColumns[ci].makeGetContext(chunkSize, sharedContext);
}
while (rows.hasMore()) {
final RowSequence sliceRows = rows.getNextRowSequenceWithLength(chunkSize);
final int numConstituents = sliceRows.intSize();
for (int ci = 0; ci < numColumns; ++ci) {
final ObjectChunk<? extends Dependency, ? extends Values> dependencies =
dependencyColumns[ci].getChunk(contexts[ci], sliceRows).asObjectChunk();
for (int di = 0; di < numConstituents; ++di) {
final Dependency constituent = dependencies.get(di);
if (constituent != null && !constituent.satisfied(step)) {
UpdateGraphProcessor.DEFAULT.logDependencies()
.append("Constituent dependencies not satisfied for ")
.append(this).append(", constituent=").append(constituent)
.endl();
firstUnsatisfiedRowPosition += di;
return false;
}
}
}
}
if (sharedContext != null) {
sharedContext.reset();
firstUnsatisfiedRowPosition += numConstituents;
if (sharedContext != null) {
sharedContext.reset();
}
}
}
Assert.eq(firstUnsatisfiedRowPosition, "firstUnsatisfiedRowPosition", resultRows.size(),
"resultRows.size()");
UpdateGraphProcessor.DEFAULT.logDependencies()
.append("All constituent dependencies satisfied for ").append(this)
.endl();
lastSatisfiedStep = step;
firstUnsatisfiedRowPosition = 0; // Re-initialize for next cycle

return true;
}
UpdateGraphProcessor.DEFAULT.logDependencies()
.append("All constituent dependencies satisfied for ").append(this)
.endl();
lastSatisfiedStep = step;
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@ private <T> T throwUnsupported(String opName) {
final ListenerImpl listener =
new ListenerImpl("hierarchicalTable()", rootTable, table);
swapListener.setListenerAndResult(listener, table);
table.addParentReference(swapListener);
}

resultHolder.setValue(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public InstrumentedTableUpdateListenerAdapter(@Nullable final String description
if (this.retain = retain) {
RETENTION_CACHE.retain(this);
if (Liveness.DEBUG_MODE_ENABLED) {
Liveness.log.info().append("LivenessDebug: ShiftObliviousInstrumentedListenerAdapter ")
Liveness.log.info().append("LivenessDebug: InstrumentedTableUpdateListenerAdapter ")
.append(Utils.REFERENT_FORMATTER, this)
.append(" created with retention enabled").endl();
}
Expand Down
Loading

0 comments on commit 40e9134

Please sign in to comment.