Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Location removal support and update SourcePartitionedTable #4373

Merged
merged 28 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d758e8d
Add removeTableLocation to TableLocationProvider.Listener and plumb i…
abaranec Aug 21, 2023
a961f09
Add Mark/sweep capability, update merged listener and listener recor…
abaranec Aug 24, 2023
b24d9e6
Updated SourcePartitionedTable to handle deleted locations.
abaranec Aug 24, 2023
3d588a9
Merge remote-tracking branch 'origin/main' into abaranec_pr4372
abaranec Aug 24, 2023
cb8606a
Cleanu
abaranec Aug 24, 2023
9847202
Merge branch 'deephaven:main' into abaranec_pr4372
abaranec Aug 29, 2023
8c95d5f
Code review
abaranec Aug 29, 2023
f455dae
Code revire
abaranec Aug 29, 2023
cd7beaa
cleanup
abaranec Aug 29, 2023
3c32171
More fixing
abaranec Aug 29, 2023
5511c40
Added a unit test for add/remove from SourcePartitionedTable
abaranec Aug 31, 2023
3f63717
Merge branch 'deephaven:main' into abaranec_pr4372
abaranec Aug 31, 2023
689cdde
Merge branch 'abaranec_pr4372' of github.com:abaranec/deephaven-core …
abaranec Aug 31, 2023
fd37478
Spotless
abaranec Aug 31, 2023
b6b4f40
Code review intermediate point. Foot-gun prevention
abaranec Sep 6, 2023
92003d2
Finished up code review
abaranec Sep 7, 2023
107c5e4
Added a commentin an interface
abaranec Sep 7, 2023
c1cf1bd
Spotless
abaranec Sep 7, 2023
9d87586
Add Poisioning of regions on location removals / data exceptions
abaranec Sep 7, 2023
332c669
More code review
abaranec Sep 8, 2023
455e3fc
Fixed unit tests
abaranec Sep 8, 2023
1d7fec9
Unit test fix
abaranec Sep 8, 2023
1a447be
spotless
abaranec Sep 8, 2023
c1db5bd
Merge branch 'deephaven:main' into abaranec_pr4372
abaranec Sep 8, 2023
4d6d8aa
Review
abaranec Sep 8, 2023
222a7f4
spotless..
abaranec Sep 8, 2023
40e9f03
Fix test
abaranec Sep 8, 2023
165f802
Update engine/table/src/main/java/io/deephaven/engine/table/impl/sour…
abaranec Sep 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.engine.table.impl;

import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey;
import io.deephaven.engine.table.impl.locations.TableLocation;
import io.deephaven.engine.table.impl.sources.DeferredGroupingColumnSource;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -66,4 +67,11 @@ public interface ColumnSourceManager {
* @return True if there are no included locations
*/
boolean isEmpty();

/**
* Remove a table location key from the sources.
*
* @param tableLocationKey the location key being removed
*/
void removeLocationKey(@NotNull ImmutableTableLocationKey tableLocationKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void onUpdate(final TableUpdate upstream) {
this.update = upstream.acquire();
final long currentStep = getUpdateGraph().clock().currentStep();
Assert.lt(this.notificationStep, "this.notificationStep", currentStep, "currentStep");
this.notificationStep = currentStep;
setNotificationStep(currentStep);

// notify the downstream listener merger
if (mergedListener == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public abstract class MergedListener extends LivenessArtifact implements Notific
private final String listenerDescription;
protected final QueryTable result;
@Nullable
private final PerformanceEntry entry;
protected final PerformanceEntry entry;
private final String logPrefix;

@SuppressWarnings("FieldMayBeFinal")
Expand Down Expand Up @@ -135,7 +135,7 @@ private void notifyInternal(@Nullable final Throwable upstreamError,
getUpdateGraph().addNotification(new MergedNotification());
}

private void propagateError(
protected void propagateError(
final boolean fromProcess, @NotNull final Throwable error, @Nullable final TableListener.Entry entry) {
forceReferenceCountToZero();
recorders.forEach(ListenerRecorder::forceReferenceCountToZero);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
*/
package io.deephaven.engine.table.impl;

import io.deephaven.chunk.LongChunk;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.primitive.iterator.CloseableIterator;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.locations.*;
import io.deephaven.engine.table.impl.locations.impl.SingleTableLocationProvider;
Expand All @@ -16,6 +13,8 @@
import io.deephaven.engine.table.impl.partitioned.PartitionedTableImpl;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.table.impl.sources.regioned.RegionedTableComponentFactoryImpl;
import io.deephaven.engine.table.iterators.ChunkedObjectColumnIterator;
import io.deephaven.engine.updategraph.UpdateCommitter;
import io.deephaven.engine.updategraph.UpdateSourceCombiner;
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedNode;
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedQueue;
Expand All @@ -37,8 +36,6 @@ public class SourcePartitionedTable extends PartitionedTableImpl {
private static final String KEY_COLUMN_NAME = "TableLocationKey";
private static final String CONSTITUENT_COLUMN_NAME = "LocationTable";

private static final int CHUNK_CAPACITY = 2048;

/**
* Construct a {@link SourcePartitionedTable} from the supplied parameters.
* <p>
Expand Down Expand Up @@ -95,6 +92,9 @@ private static final class UnderlyingTableMaintainer {
@SuppressWarnings("FieldCanBeLocal") // We need to hold onto this reference for reachability purposes.
private final Runnable processNewLocationsUpdateRoot;

private UpdateCommitter<UnderlyingTableMaintainer> removedLocationsComitter = null;
private List<Table> removedConstituents = null;

private UnderlyingTableMaintainer(
@NotNull final TableDefinition constituentDefinition,
@NotNull final UnaryOperator<Table> applyTablePermissions,
Expand Down Expand Up @@ -158,7 +158,7 @@ protected void instrumentedRefresh() {
}
}

if (result.isRefreshing()) {
if (refreshCombiner != null) {
result.getUpdateGraph().addSource(refreshCombiner);
}
}
Expand Down Expand Up @@ -201,14 +201,14 @@ private Table makeConstituentTable(@NotNull final TableLocation tableLocation) {
}

private void processPendingLocations(final boolean notifyListeners) {
// First let's deal with removed locations
final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = subscriptionBuffer.processPending();
final RowSet removed = processRemovals(locationUpdate);
final RowSet added = processAdditions(locationUpdate);

resultRows.update(added, removed);
if (notifyListeners) {
result.notifyListeners(new TableUpdateImpl(added,
result.notifyListeners(new TableUpdateImpl(
added,
removed,
RowSetFactory.empty(),
RowSetShiftData.EMPTY,
Expand Down Expand Up @@ -248,45 +248,46 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
}

private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) {
final Set<ImmutableTableLocationKey> relevantRemovedLocations = locationUpdate.getPendingRemovedLocations()
.stream()
.map(TableLocation::getKey)
.filter(locationKeyMatcher)
.collect(Collectors.toSet());
final Set<ImmutableTableLocationKey> relevantRemovedLocations =
locationUpdate.getPendingRemovedLocationKeys()
.stream()
.filter(locationKeyMatcher)
.collect(Collectors.toSet());

if (relevantRemovedLocations.isEmpty()) {
return RowSetFactory.empty();
}

// At the end of the cycle we need to make sure we unmanage any removed constituents.
this.removedConstituents = new ArrayList<>(relevantRemovedLocations.size());
this.removedLocationsComitter = new UpdateCommitter<>(
this,
result.getUpdateGraph(),
ignored -> {
removedConstituents.forEach(result::unmanage);
removedConstituents = null;
});
this.removedLocationsComitter.maybeActivate();

final RowSetBuilderSequential deleteBuilder = RowSetFactory.builderSequential();

// We don't have a map of location to row key, so we have to iterate them. If we decide this is too slow, we
// could add a TObjectIntMap
// as we process pending added locations and then we can just make an index of rows to remove by looking up
// in that map.
try (final ChunkSource.GetContext locContext = resultTableLocationKeys.makeGetContext(CHUNK_CAPACITY);
final ChunkSource.GetContext tableContext = resultLocationTables.makeGetContext(CHUNK_CAPACITY);
final RowSequence.Iterator it = resultRows.getRowSequenceIterator()) {

while (it.hasMore()) {
final RowSequence subSeq = it.getNextRowSequenceWithLength(CHUNK_CAPACITY);
final LongChunk<OrderedRowKeys> keyChunk = subSeq.asRowKeyChunk();
final ObjectChunk<Table, ? extends Values> tableChunk =
resultLocationTables.getChunk(tableContext, subSeq)
.asObjectChunk();
final ObjectChunk<ImmutableTableLocationKey, ? extends Values> removedKeys = resultTableLocationKeys
.getChunk(locContext,
subSeq)
.asObjectChunk();

for (int chunkPos = 0; chunkPos < keyChunk.size(); chunkPos++) {
if (relevantRemovedLocations.contains(removedKeys.get(chunkPos))) {
deleteBuilder.appendKey(keyChunk.get(chunkPos));

final Table deletedTable = tableChunk.get(chunkPos);
result.unmanage(deletedTable);
deletedTable.close();
}
// We don't have a map of location key to row key, so we have to iterate them. If we decide this is too
// slow, we could add a TObjectIntMap as we process pending added locations and then we can just make an
// RowSet of rows to remove by looking up in that map.
// @formatter:off
try (final CloseableIterator<ImmutableTableLocationKey> keysIterator =
ChunkedObjectColumnIterator.make(resultTableLocationKeys, resultRows);
final CloseableIterator<Table> constituentsIterator =
ChunkedObjectColumnIterator.make(resultLocationTables, resultRows);
final RowSet.Iterator rowsIterator = resultRows.iterator()) {
// @formatter:on
while (keysIterator.hasNext()) {
final TableLocationKey key = keysIterator.next();
final Table constituent = constituentsIterator.next();
final long rowKey = rowsIterator.nextLong();
if (relevantRemovedLocations.contains(key)) {
deleteBuilder.appendKey(rowKey);
removedConstituents.add(constituent);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.locations.TableLocationKey;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey;
Expand Down Expand Up @@ -139,8 +140,7 @@ protected final void initializeAvailableLocations() {
new TableLocationSubscriptionBuffer(locationProvider);
final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
locationBuffer.processPending();
throwIfLocationsRemoved(locationUpdate);

maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
updateSourceRegistrar.addSource(locationChangePoller = new LocationChangePoller(locationBuffer));
} else {
Expand All @@ -160,6 +160,14 @@ private void maybeAddLocations(@NotNull final Collection<ImmutableTableLocationK
.forEach(lk -> columnSourceManager.addLocation(locationProvider.getTableLocation(lk)));
}

private void maybeRemoveLocations(@NotNull final Collection<ImmutableTableLocationKey> removedKeys) {
if (removedKeys.isEmpty()) {
return;
}

filterLocationKeys(removedKeys).forEach(columnSourceManager::removeLocationKey);
}

private void initializeLocationSizes() {
Assert.assertion(locationsInitialized, "locationInitialized");
if (locationSizesInitialized) {
Expand Down Expand Up @@ -195,7 +203,6 @@ private WritableRowSet refreshLocationSizes() {
}

private class LocationChangePoller extends InstrumentedUpdateSource {

private final TableLocationSubscriptionBuffer locationBuffer;

private LocationChangePoller(@NotNull final TableLocationSubscriptionBuffer locationBuffer) {
Expand All @@ -207,7 +214,7 @@ private LocationChangePoller(@NotNull final TableLocationSubscriptionBuffer loca
protected void instrumentedRefresh() {
try {
final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = locationBuffer.processPending();
throwIfLocationsRemoved(locationUpdate);
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
// NB: This class previously had functionality to notify "location listeners", but it was never used.
// Resurrect from git history if needed.
Expand All @@ -222,17 +229,13 @@ protected void instrumentedRefresh() {
rowSet.insert(added);
notifyListeners(added, RowSetFactory.empty(), RowSetFactory.empty());
} catch (Exception e) {
getUpdateGraph().removeSource(this);

// Notify listeners to the SourceTable when we had an issue refreshing available locations.
notifyListenersOnError(e, null);
getUpdateGraph().removeSource(this);
}
}
}

private void throwIfLocationsRemoved(TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) {
if (!locationUpdate.getPendingRemovedLocations().isEmpty()) {
throw new TableDataException(getClass().getSimpleName() + " does not support removing locations");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
*/
package io.deephaven.engine.table.impl.locations;

import io.deephaven.util.annotations.InternalUseOnly;
import io.deephaven.util.type.NamedImplementation;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -38,13 +36,13 @@ interface Listener extends BasicTableDataListener {
void handleTableLocationKey(@NotNull ImmutableTableLocationKey tableLocationKey);

/**
* Notify the listener of a table location that has been removed encountered while initiating or maintaining the
* location subscription. This should occur at most once per location, but the order of delivery is <i>not</i>
* guaranteed.
* Notify the listener of a {@link TableLocationKey} that has been removed encountered while initiating or
* maintaining the location subscription. This should occur at most once per location, but the order of delivery
* is <i>not</i> guaranteed.
*
* @param tableLocation The table location
* @param tableLocationKey The table location
*/
void handleTableLocationRemoved(@NotNull TableLocation tableLocation);
void handleTableLocationKeyRemoved(@NotNull ImmutableTableLocationKey tableLocationKey);
}

/**
Expand Down Expand Up @@ -112,18 +110,6 @@ interface Listener extends BasicTableDataListener {
*/
boolean hasTableLocationKey(@NotNull final TableLocationKey tableLocationKey);

/**
* Remove the given table location.
*
* @apiNote Use with caution: When this is called all subscribers to the specified location will be notified,
* causing them to stop updating. Tables backed by those notifications will end up 'failed'. This location
* provider will continue to update other locations and will no longer provide or request information about
* the deleted location.
*
* @param locationKey the TableLocation to remove
*/
void removeTableLocationKey(@NotNull final TableLocationKey locationKey);

/**
* @param tableLocationKey A {@link TableLocationKey} specifying the location to get
* @return The {@link TableLocation} matching the given key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import org.jetbrains.annotations.NotNull;

/**
* This exception is thrown when a {@link TableLocation} has been removed from a provider.
*/
public class TableLocationRemovedException extends TableDataException {
private final TableKey tableKey;
private final TableLocationKey locationKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ protected final void deliverInitialSnapshot(@NotNull final TableLocationProvider
protected final void handleTableLocationKey(@NotNull final TableLocationKey locationKey) {
if (!supportsSubscriptions()) {
tableLocations.putIfAbsent(locationKey, TableLocationKey::makeImmutable);
visitLocationKey(locationKey);
visitLocationKey(toKeyImmutable(locationKey));
return;
}

Expand All @@ -121,7 +121,7 @@ protected final void handleTableLocationKey(@NotNull final TableLocationKey loca
* before notifications have been delivered to any subscriptions, if applicable. The default implementation does
* nothing, and may be overridden to implement additional features.
*
* @param locationKey the {@link TableLocationKey} that was visited.
* @param locationKey The {@link TableLocationKey} that was visited.
*/
protected void visitLocationKey(@NotNull final TableLocationKey locationKey) {}

Expand Down Expand Up @@ -211,30 +211,37 @@ public TableLocation getTableLocationIfPresent(@NotNull final TableLocationKey t
}
}

@Override
/**
* Remove the given table location.
*
* @apiNote Use with caution: When this is called all subscribers to the specified location will be notified,
* causing them to stop updating. Tables backed by those notifications will end up 'failed'. This location
* provider will continue to update other locations and will no longer provide or request information about
* the deleted location.
*
* @param locationKey the TableLocation to remove
*/
public void removeTableLocationKey(@NotNull final TableLocationKey locationKey) {
final Object removedLocation = tableLocations.remove(locationKey);

handleTableLocationKeyRemoved(locationKey.makeImmutable());
// need to notify subscribers of removed location, and of "removed" size
if (removedLocation instanceof TableLocation) {
// remove the location from the TableLocationProvider (and notify subscribers)
handleTableLocationRemoved((TableLocation) removedLocation);
// notify subscribers of this location that the data is gone
((AbstractTableLocation) removedLocation).handleUpdate(null, System.currentTimeMillis());
((AbstractTableLocation) removedLocation).clearColumnLocations();
if (removedLocation instanceof AbstractTableLocation) {
final AbstractTableLocation abstractLocation = (AbstractTableLocation) removedLocation;
abstractLocation.handleUpdate(null, System.currentTimeMillis());
abstractLocation.clearColumnLocations();
}
}

/**
* Remove the location, and notify subscribers that it is gone
*
* @param location the TableLocation to be removed
* Notify subscribers that {@code location} was removed.
*
* @param locationKey the TableLocation that was removed
*/
protected void handleTableLocationRemoved(@NotNull final TableLocation location) {
// Note: the location has already been removed from tableLocations
protected void handleTableLocationKeyRemoved(@NotNull final ImmutableTableLocationKey locationKey) {
if (supportsSubscriptions()) {
synchronized (subscriptions) {
if (subscriptions.deliverNotification(Listener::handleTableLocationRemoved, location, true)) {
if (subscriptions.deliverNotification(Listener::handleTableLocationKeyRemoved, locationKey, true)) {
onEmpty();
}
}
Expand Down
Loading