Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Location removal support and update SourcePartitionedTable #4373

Merged
merged 28 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d758e8d
Add removeTableLocation to TableLocationProvider.Listener and plumb i…
abaranec Aug 21, 2023
a961f09
Add Mark/sweep capability, update merged listener and listener recor…
abaranec Aug 24, 2023
b24d9e6
Updated SourcePartitionedTable to handle deleted locations.
abaranec Aug 24, 2023
3d588a9
Merge remote-tracking branch 'origin/main' into abaranec_pr4372
abaranec Aug 24, 2023
cb8606a
Cleanu
abaranec Aug 24, 2023
9847202
Merge branch 'deephaven:main' into abaranec_pr4372
abaranec Aug 29, 2023
8c95d5f
Code review
abaranec Aug 29, 2023
f455dae
Code revire
abaranec Aug 29, 2023
cd7beaa
cleanup
abaranec Aug 29, 2023
3c32171
More fixing
abaranec Aug 29, 2023
5511c40
Added a unit test for add/remove from SourcePartitionedTable
abaranec Aug 31, 2023
3f63717
Merge branch 'deephaven:main' into abaranec_pr4372
abaranec Aug 31, 2023
689cdde
Merge branch 'abaranec_pr4372' of github.com:abaranec/deephaven-core …
abaranec Aug 31, 2023
fd37478
Spotless
abaranec Aug 31, 2023
b6b4f40
Code review intermediate point. Foot-gun prevention
abaranec Sep 6, 2023
92003d2
Finished up code review
abaranec Sep 7, 2023
107c5e4
Added a commentin an interface
abaranec Sep 7, 2023
c1cf1bd
Spotless
abaranec Sep 7, 2023
9d87586
Add Poisioning of regions on location removals / data exceptions
abaranec Sep 7, 2023
332c669
More code review
abaranec Sep 8, 2023
455e3fc
Fixed unit tests
abaranec Sep 8, 2023
1d7fec9
Unit test fix
abaranec Sep 8, 2023
1a447be
spotless
abaranec Sep 8, 2023
c1db5bd
Merge branch 'deephaven:main' into abaranec_pr4372
abaranec Sep 8, 2023
4d6d8aa
Review
abaranec Sep 8, 2023
222a7f4
spotless..
abaranec Sep 8, 2023
40e9f03
Fix test
abaranec Sep 8, 2023
165f802
Update engine/table/src/main/java/io/deephaven/engine/table/impl/sour…
abaranec Sep 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Util/src/main/java/io/deephaven/util/FindExceptionCause.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.jetbrains.annotations.NotNull;

import java.util.Arrays;
import java.util.Optional;

public class FindExceptionCause {
/**
Expand Down Expand Up @@ -42,6 +43,28 @@ public static Throwable findCause(Throwable original, Class<? extends Throwable>
return original;
}

/**
* Given a {@link Throwable}, and an expected type, return an optional that is populated if the original was an
* instance of the expected type or was caused by the expected type.
*
* @param original The original throwable
* @param expectedType The expected type to find
* @return A completed {@link Optional} containing the found cause, or an empty {@link Optional}
*/
public static <E extends Throwable> Optional<E> isOrCausedBy(
@NotNull final Throwable original,
@NotNull final Class<E> expectedType) {
Throwable cause = original;
while (cause != null) {
if (expectedType.isAssignableFrom(cause.getClass())) {
// noinspection unchecked
return Optional.of((E) cause);
}
cause = cause.getCause();
}
return Optional.empty();
}

/**
* Given an exception, provide a short description of the causes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.engine.table.impl;

import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey;
import io.deephaven.engine.table.impl.locations.TableLocation;
import io.deephaven.engine.table.impl.sources.DeferredGroupingColumnSource;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -66,4 +67,12 @@ public interface ColumnSourceManager {
* @return True if there are no included locations
*/
boolean isEmpty();

/**
* Remove a table location key from the sources.
*
* @return true if the location key was actually removed
* @param tableLocationKey the location key being removed
*/
boolean removeLocationKey(@NotNull ImmutableTableLocationKey tableLocationKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void onUpdate(final TableUpdate upstream) {
this.update = upstream.acquire();
final long currentStep = getUpdateGraph().clock().currentStep();
Assert.lt(this.notificationStep, "this.notificationStep", currentStep, "currentStep");
this.notificationStep = currentStep;
setNotificationStep(currentStep);

// notify the downstream listener merger
if (mergedListener == null) {
Expand All @@ -74,13 +74,17 @@ public void onUpdate(final TableUpdate upstream) {

@Override
protected void onFailureInternal(@NotNull final Throwable originalException, @Nullable final Entry sourceEntry) {
this.notificationStep = getUpdateGraph().clock().currentStep();
setNotificationStep(getUpdateGraph().clock().currentStep());
if (mergedListener == null) {
throw new IllegalStateException("Merged listener not set");
}
mergedListener.notifyOnUpstreamError(originalException, sourceEntry);
}

protected void setNotificationStep(final long step) {
this.notificationStep = step;
}

@Override
public boolean canExecute(final long step) {
return parent.satisfied(step);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public abstract class MergedListener extends LivenessArtifact implements Notific
private final String listenerDescription;
protected final QueryTable result;
@Nullable
private final PerformanceEntry entry;
protected final PerformanceEntry entry;
private final String logPrefix;

@SuppressWarnings("FieldMayBeFinal")
Expand Down Expand Up @@ -135,11 +135,20 @@ private void notifyInternal(@Nullable final Throwable upstreamError,
getUpdateGraph().addNotification(new MergedNotification());
}

private void propagateError(
final boolean fromProcess, @NotNull final Throwable error, @Nullable final TableListener.Entry entry) {
/**
* Propagate an error to downstream listeners.
*
* @param uncaughtExceptionFromProcess true if the exception was thrown from {@link #process()}, false otherwise
* @param error the error to propagate
* @param entry the {@link io.deephaven.engine.table.TableListener.Entry} that threw the error.
*/
protected void propagateError(
final boolean uncaughtExceptionFromProcess,
@NotNull final Throwable error,
@Nullable final TableListener.Entry entry) {
forceReferenceCountToZero();
recorders.forEach(ListenerRecorder::forceReferenceCountToZero);
propagateErrorDownstream(fromProcess, error, entry);
propagateErrorDownstream(uncaughtExceptionFromProcess, error, entry);
try {
if (systemicResult()) {
AsyncClientErrorNotifier.reportError(error);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,31 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
/*
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.engine.table.impl;

import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.primitive.iterator.CloseableIterator;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.partitioned.PartitionedTableImpl;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.UpdateSourceCombiner;
import io.deephaven.engine.table.impl.locations.*;
import io.deephaven.engine.table.impl.locations.impl.SingleTableLocationProvider;
import io.deephaven.engine.table.impl.locations.impl.TableLocationSubscriptionBuffer;
import io.deephaven.engine.table.impl.locations.impl.TableLocationUpdateSubscriptionBuffer;
import io.deephaven.engine.table.impl.partitioned.PartitionedTableImpl;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.table.impl.sources.regioned.RegionedTableComponentFactoryImpl;
import io.deephaven.engine.table.iterators.ChunkedObjectColumnIterator;
import io.deephaven.engine.updategraph.UpdateCommitter;
import io.deephaven.engine.updategraph.UpdateSourceCombiner;
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedNode;
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedQueue;

import java.util.*;

import org.apache.commons.lang3.mutable.MutableLong;
import org.jetbrains.annotations.NotNull;

import java.util.*;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
Expand Down Expand Up @@ -96,6 +93,9 @@ private static final class UnderlyingTableMaintainer {
@SuppressWarnings("FieldCanBeLocal") // We need to hold onto this reference for reachability purposes.
private final Runnable processNewLocationsUpdateRoot;

private final UpdateCommitter<UnderlyingTableMaintainer> removedLocationsComitter;
private List<Table> removedConstituents = null;

private UnderlyingTableMaintainer(
@NotNull final TableDefinition constituentDefinition,
@NotNull final UnaryOperator<Table> applyTablePermissions,
Expand All @@ -113,11 +113,11 @@ private UnderlyingTableMaintainer(
resultRows = RowSetFactory.empty().toTracking();
resultTableLocationKeys = ArrayBackedColumnSource.getMemoryColumnSource(TableLocationKey.class, null);
resultLocationTables = ArrayBackedColumnSource.getMemoryColumnSource(Table.class, null);

final Map<String, ColumnSource<?>> resultSources = new LinkedHashMap<>(2);
resultSources.put(KEY_COLUMN_NAME, resultTableLocationKeys);
resultSources.put(CONSTITUENT_COLUMN_NAME, resultLocationTables);
result = new QueryTable(resultRows, resultSources);
result.setFlat();

final boolean needToRefreshLocations = refreshLocations && tableLocationProvider.supportsSubscriptions();
if (needToRefreshLocations || refreshSizes) {
Expand Down Expand Up @@ -145,12 +145,23 @@ protected void instrumentedRefresh() {
};
result.addParentReference(processNewLocationsUpdateRoot);
refreshCombiner.addSource(processNewLocationsUpdateRoot);

this.removedLocationsComitter = new UpdateCommitter<>(
this,
result.getUpdateGraph(),
ignored -> {
Assert.neqNull(removedConstituents, "removedConstituents");
removedConstituents.forEach(result::unmanage);
removedConstituents = null;
});

processPendingLocations(false);
} else {
subscriptionBuffer = null;
pendingLocationStates = null;
readyLocationStates = null;
processNewLocationsUpdateRoot = null;
removedLocationsComitter = null;
tableLocationProvider.refresh();
try (final RowSet added = sortAndAddLocations(tableLocationProvider.getTableLocationKeys().stream()
.filter(locationKeyMatcher)
Expand All @@ -159,10 +170,8 @@ protected void instrumentedRefresh() {
}
}

if (result.isRefreshing()) {
// noinspection ConstantConditions
UpdateGraph updateGraph = result.getUpdateGraph();
updateGraph.addSource(refreshCombiner);
if (refreshCombiner != null) {
result.getUpdateGraph().addSource(refreshCombiner);
}
}

Expand All @@ -187,19 +196,42 @@ private RowSet sortAndAddLocations(@NotNull final Stream<TableLocation> location
});
return initialLastRowKey == lastInsertedRowKey.longValue()
? RowSetFactory.empty()
: RowSetFactory.fromRange(initialLastRowKey, lastInsertedRowKey.longValue());
: RowSetFactory.fromRange(initialLastRowKey + 1, lastInsertedRowKey.longValue());
}

private Table makeConstituentTable(@NotNull final TableLocation tableLocation) {
return applyTablePermissions.apply(new PartitionAwareSourceTable(
final PartitionAwareSourceTable constituent = new PartitionAwareSourceTable(
constituentDefinition,
"SingleLocationSourceTable-" + tableLocation,
RegionedTableComponentFactoryImpl.INSTANCE,
new SingleTableLocationProvider(tableLocation),
refreshSizes ? refreshCombiner : null));
refreshSizes ? refreshCombiner : null);

// Be careful to propagate the systemic attribute properly to child tables
constituent.setAttribute(Table.SYSTEMIC_TABLE_ATTRIBUTE, result.isSystemicObject());
return applyTablePermissions.apply(constituent);
}

private void processPendingLocations(final boolean notifyListeners) {
final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = subscriptionBuffer.processPending();
final RowSet removed = processRemovals(locationUpdate);
final RowSet added = processAdditions(locationUpdate);

resultRows.update(added, removed);
if (notifyListeners) {
result.notifyListeners(new TableUpdateImpl(
added,
removed,
RowSetFactory.empty(),
RowSetShiftData.EMPTY,
ModifiedColumnSet.EMPTY));
} else {
added.close();
removed.close();
}
}

private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) {
/*
* This block of code is unfortunate, because it largely duplicates the intent and effort of similar code in
* RegionedColumnSourceManager. I think that the RegionedColumnSourceManager could be changed to
Expand All @@ -209,8 +241,10 @@ private void processPendingLocations(final boolean notifyListeners) {
* population in STM ColumnSources.
*/
// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
subscriptionBuffer.processPending().stream().filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation).map(PendingLocationState::new)
locationUpdate.getPendingAddedLocationKeys().stream()
.filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation)
.map(PendingLocationState::new)
.forEach(pendingLocationStates::offer);
for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
final PendingLocationState pendingLocationState = iter.next();
Expand All @@ -219,22 +253,64 @@ private void processPendingLocations(final boolean notifyListeners) {
readyLocationStates.offer(pendingLocationState);
}
}
final RowSet added = sortAndAddLocations(readyLocationStates.stream().map(PendingLocationState::release));
resultRows.insert(added);
if (notifyListeners) {
result.notifyListeners(new TableUpdateImpl(added,
RowSetFactory.empty(), RowSetFactory.empty(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY));
} else {
added.close();
}
final RowSet added = sortAndAddLocations(readyLocationStates.stream()
.map(PendingLocationState::release));
readyLocationStates.clearFast();
return added;
}

private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) {
final Set<ImmutableTableLocationKey> relevantRemovedLocations =
locationUpdate.getPendingRemovedLocationKeys()
.stream()
.filter(locationKeyMatcher)
.collect(Collectors.toSet());

if (relevantRemovedLocations.isEmpty()) {
return RowSetFactory.empty();
}

// At the end of the cycle we need to make sure we unmanage any removed constituents.
this.removedConstituents = new ArrayList<>(relevantRemovedLocations.size());
final RowSetBuilderSequential deleteBuilder = RowSetFactory.builderSequential();

// We don't have a map of location key to row key, so we have to iterate them. If we decide this is too
// slow, we could add a TObjectIntMap as we process pending added locations and then we can just make an
// RowSet of rows to remove by looking up in that map.
// @formatter:off
try (final CloseableIterator<ImmutableTableLocationKey> keysIterator =
ChunkedObjectColumnIterator.make(resultTableLocationKeys, resultRows);
final CloseableIterator<Table> constituentsIterator =
ChunkedObjectColumnIterator.make(resultLocationTables, resultRows);
final RowSet.Iterator rowsIterator = resultRows.iterator()) {
// @formatter:on
while (keysIterator.hasNext()) {
final TableLocationKey key = keysIterator.next();
final Table constituent = constituentsIterator.next();
final long rowKey = rowsIterator.nextLong();
if (relevantRemovedLocations.contains(key)) {
deleteBuilder.appendKey(rowKey);
removedConstituents.add(constituent);
}
}
}

if (removedConstituents.isEmpty()) {
removedConstituents = null;
return RowSetFactory.empty();
}
this.removedLocationsComitter.maybeActivate();

final WritableRowSet deletedRows = deleteBuilder.build();
resultTableLocationKeys.setNull(deletedRows);
resultLocationTables.setNull(deletedRows);
return deletedRows;
}
}

private static class PendingLocationState extends IntrusiveDoublyLinkedNode.Impl<PendingLocationState> {
private static final class PendingLocationState extends IntrusiveDoublyLinkedNode.Impl<PendingLocationState> {

private final TableLocation location;

private final TableLocationUpdateSubscriptionBuffer subscriptionBuffer;

private PendingLocationState(@NotNull final TableLocation location) {
Expand Down
Loading