Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Location removal support and update SourcePartitionedTable #4373

Merged
merged 28 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d758e8d
Add removeTableLocation to TableLocationProvider.Listener and plumb i…
abaranec Aug 21, 2023
a961f09
Add Mark/sweep capability, update merged listener and listener recor…
abaranec Aug 24, 2023
b24d9e6
Updated SourcePartitionedTable to handle deleted locations.
abaranec Aug 24, 2023
3d588a9
Merge remote-tracking branch 'origin/main' into abaranec_pr4372
abaranec Aug 24, 2023
cb8606a
Cleanu
abaranec Aug 24, 2023
9847202
Merge branch 'deephaven:main' into abaranec_pr4372
abaranec Aug 29, 2023
8c95d5f
Code review
abaranec Aug 29, 2023
f455dae
Code revire
abaranec Aug 29, 2023
cd7beaa
cleanup
abaranec Aug 29, 2023
3c32171
More fixing
abaranec Aug 29, 2023
5511c40
Added a unit test for add/remove from SourcePartitionedTable
abaranec Aug 31, 2023
3f63717
Merge branch 'deephaven:main' into abaranec_pr4372
abaranec Aug 31, 2023
689cdde
Merge branch 'abaranec_pr4372' of github.com:abaranec/deephaven-core …
abaranec Aug 31, 2023
fd37478
Spotless
abaranec Aug 31, 2023
b6b4f40
Code review intermediate point. Foot-gun prevention
abaranec Sep 6, 2023
92003d2
Finished up code review
abaranec Sep 7, 2023
107c5e4
Added a commentin an interface
abaranec Sep 7, 2023
c1cf1bd
Spotless
abaranec Sep 7, 2023
9d87586
Add Poisioning of regions on location removals / data exceptions
abaranec Sep 7, 2023
332c669
More code review
abaranec Sep 8, 2023
455e3fc
Fixed unit tests
abaranec Sep 8, 2023
1d7fec9
Unit test fix
abaranec Sep 8, 2023
1a447be
spotless
abaranec Sep 8, 2023
c1db5bd
Merge branch 'deephaven:main' into abaranec_pr4372
abaranec Sep 8, 2023
4d6d8aa
Review
abaranec Sep 8, 2023
222a7f4
spotless..
abaranec Sep 8, 2023
40e9f03
Fix test
abaranec Sep 8, 2023
165f802
Update engine/table/src/main/java/io/deephaven/engine/table/impl/sour…
abaranec Sep 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,17 @@ public void onUpdate(final TableUpdate upstream) {

@Override
protected void onFailureInternal(@NotNull final Throwable originalException, @Nullable final Entry sourceEntry) {
this.notificationStep = getUpdateGraph().clock().currentStep();
setNotificationStep(getUpdateGraph().clock().currentStep());
if (mergedListener == null) {
throw new IllegalStateException("Merged listener not set");
}
mergedListener.notifyOnUpstreamError(originalException, sourceEntry);
}

protected void setNotificationStep(final long step) {
this.notificationStep = step;
}

@Override
public boolean canExecute(final long step) {
return parent.satisfied(step);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,31 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
/*
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.engine.table.impl;

import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.chunk.LongChunk;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.partitioned.PartitionedTableImpl;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.UpdateSourceCombiner;
import io.deephaven.engine.table.impl.locations.*;
import io.deephaven.engine.table.impl.locations.impl.SingleTableLocationProvider;
import io.deephaven.engine.table.impl.locations.impl.TableLocationSubscriptionBuffer;
import io.deephaven.engine.table.impl.locations.impl.TableLocationUpdateSubscriptionBuffer;
import io.deephaven.engine.table.impl.partitioned.PartitionedTableImpl;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.table.impl.sources.regioned.RegionedTableComponentFactoryImpl;
import io.deephaven.engine.updategraph.UpdateSourceCombiner;
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedNode;
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedQueue;

import java.util.*;

import org.apache.commons.lang3.mutable.MutableLong;
import org.jetbrains.annotations.NotNull;

import java.util.*;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
Expand All @@ -40,6 +37,8 @@ public class SourcePartitionedTable extends PartitionedTableImpl {
private static final String KEY_COLUMN_NAME = "TableLocationKey";
private static final String CONSTITUENT_COLUMN_NAME = "LocationTable";

private static final int CHUNK_CAPACITY = 2048;

/**
* Construct a {@link SourcePartitionedTable} from the supplied parameters.
* <p>
Expand Down Expand Up @@ -113,11 +112,11 @@ private UnderlyingTableMaintainer(
resultRows = RowSetFactory.empty().toTracking();
resultTableLocationKeys = ArrayBackedColumnSource.getMemoryColumnSource(TableLocationKey.class, null);
resultLocationTables = ArrayBackedColumnSource.getMemoryColumnSource(Table.class, null);

final Map<String, ColumnSource<?>> resultSources = new LinkedHashMap<>(2);
resultSources.put(KEY_COLUMN_NAME, resultTableLocationKeys);
resultSources.put(CONSTITUENT_COLUMN_NAME, resultLocationTables);
result = new QueryTable(resultRows, resultSources);
result.setFlat();

final boolean needToRefreshLocations = refreshLocations && tableLocationProvider.supportsSubscriptions();
if (needToRefreshLocations || refreshSizes) {
Expand Down Expand Up @@ -160,9 +159,7 @@ protected void instrumentedRefresh() {
}

if (result.isRefreshing()) {
// noinspection ConstantConditions
UpdateGraph updateGraph = result.getUpdateGraph();
updateGraph.addSource(refreshCombiner);
result.getUpdateGraph().addSource(refreshCombiner);
}
}

Expand All @@ -186,20 +183,43 @@ private RowSet sortAndAddLocations(@NotNull final Stream<TableLocation> location
result.manage(constituentTable);
});
return initialLastRowKey == lastInsertedRowKey.longValue()
? RowSetFactory.empty()
: RowSetFactory.fromRange(initialLastRowKey, lastInsertedRowKey.longValue());
? RowSetFactory.empty()
: RowSetFactory.fromRange(initialLastRowKey + 1, lastInsertedRowKey.longValue());
}

private Table makeConstituentTable(@NotNull final TableLocation tableLocation) {
return applyTablePermissions.apply(new PartitionAwareSourceTable(
final PartitionAwareSourceTable constituent = new PartitionAwareSourceTable(
constituentDefinition,
"SingleLocationSourceTable-" + tableLocation,
RegionedTableComponentFactoryImpl.INSTANCE,
new SingleTableLocationProvider(tableLocation),
refreshSizes ? refreshCombiner : null));
refreshSizes ? refreshCombiner : null);

// These can't be systemic or when they get notified on error, they will crash the worker.
constituent.setAttribute(Table.SYSTEMIC_TABLE_ATTRIBUTE, false);
return applyTablePermissions.apply(constituent);
}

private void processPendingLocations(final boolean notifyListeners) {
// First let's deal with removed locations
final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = subscriptionBuffer.processPending();
final RowSet removed = processRemovals(locationUpdate);
final RowSet added = processAdditions(locationUpdate);

resultRows.update(added, removed);
if (notifyListeners) {
result.notifyListeners(new TableUpdateImpl(added,
removed,
RowSetFactory.empty(),
RowSetShiftData.EMPTY,
ModifiedColumnSet.EMPTY));
} else {
added.close();
removed.close();
}
}

private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) {
/*
* This block of code is unfortunate, because it largely duplicates the intent and effort of similar code in
* RegionedColumnSourceManager. I think that the RegionedColumnSourceManager could be changed to
Expand All @@ -209,32 +229,74 @@ private void processPendingLocations(final boolean notifyListeners) {
* population in STM ColumnSources.
*/
// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
subscriptionBuffer.processPending().stream().filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation).map(PendingLocationState::new)
.forEach(pendingLocationStates::offer);
locationUpdate.getPendingAddedLocationKeys().stream()
.filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation)
.map(PendingLocationState::new)
.forEach(pendingLocationStates::offer);
for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
final PendingLocationState pendingLocationState = iter.next();
if (pendingLocationState.exists()) {
iter.remove();
readyLocationStates.offer(pendingLocationState);
}
}
final RowSet added = sortAndAddLocations(readyLocationStates.stream().map(PendingLocationState::release));
resultRows.insert(added);
if (notifyListeners) {
result.notifyListeners(new TableUpdateImpl(added,
RowSetFactory.empty(), RowSetFactory.empty(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY));
} else {
added.close();
}
final RowSet added = sortAndAddLocations(readyLocationStates.stream()
.map(PendingLocationState::release));
readyLocationStates.clearFast();
return added;
}

private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) {
final Set<ImmutableTableLocationKey> relevantRemovedLocations = locationUpdate.getPendingRemovedLocationKeys()
.stream()
.map(TableLocation::getKey)
.filter(locationKeyMatcher)
.collect(Collectors.toSet());

if(relevantRemovedLocations.isEmpty()) {
return RowSetFactory.empty();
}

final RowSetBuilderSequential deleteBuilder = RowSetFactory.builderSequential();

// We don't have a map of location to row key, so we have to iterate them. If we decide this is too slow, we could add a TObjectIntMap
// as we process pending added locations and then we can just make an index of rows to remove by looking up in that map.
try (final ChunkSource.GetContext locContext = resultTableLocationKeys.makeGetContext(CHUNK_CAPACITY);
final ChunkSource.GetContext tableContext = resultLocationTables.makeGetContext(CHUNK_CAPACITY);
final RowSequence.Iterator it = resultRows.getRowSequenceIterator()) {

while (it.hasMore()) {
final RowSequence subSeq = it.getNextRowSequenceWithLength(CHUNK_CAPACITY);
final LongChunk<OrderedRowKeys> keyChunk = subSeq.asRowKeyChunk();
final ObjectChunk<Table, ? extends Values> tableChunk = resultLocationTables.getChunk(tableContext, subSeq)
.asObjectChunk();
final ObjectChunk<ImmutableTableLocationKey, ? extends Values> removedKeys = resultTableLocationKeys.getChunk(locContext,
subSeq)
.asObjectChunk();

for (int chunkPos = 0; chunkPos < keyChunk.size(); chunkPos++) {
if (relevantRemovedLocations.contains(removedKeys.get(chunkPos))) {
deleteBuilder.appendKey(keyChunk.get(chunkPos));

final Table deletedTable = tableChunk.get(chunkPos);
result.unmanage(deletedTable);
deletedTable.close();
}
}
}
}

final WritableRowSet deletedRows = deleteBuilder.build();
resultTableLocationKeys.setNull(deletedRows);
resultLocationTables.setNull(deletedRows);
return deletedRows;
}
}

private static class PendingLocationState extends IntrusiveDoublyLinkedNode.Impl<PendingLocationState> {
private static final class PendingLocationState extends IntrusiveDoublyLinkedNode.Impl<PendingLocationState> {

private final TableLocation location;

private final TableLocationUpdateSubscriptionBuffer subscriptionBuffer;

private PendingLocationState(@NotNull final TableLocation location) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ protected final void initializeAvailableLocations() {
if (isRefreshing()) {
final TableLocationSubscriptionBuffer locationBuffer =
new TableLocationSubscriptionBuffer(locationProvider);
maybeAddLocations(locationBuffer.processPending());
final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = locationBuffer.processPending();
assertNoLocationsRemoved(locationUpdate);

maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
updateSourceRegistrar.addSource(locationChangePoller = new LocationChangePoller(locationBuffer));
} else {
locationProvider.refresh();
Expand Down Expand Up @@ -202,7 +205,9 @@ private LocationChangePoller(@NotNull final TableLocationSubscriptionBuffer loca
@Override
protected void instrumentedRefresh() {
try {
maybeAddLocations(locationBuffer.processPending());
final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = locationBuffer.processPending();
assertNoLocationsRemoved(locationUpdate);
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
// NB: This class previously had functionality to notify "location listeners", but it was never used.
// Resurrect from git history if needed.
if (!locationSizesInitialized) {
Expand All @@ -218,10 +223,17 @@ protected void instrumentedRefresh() {
} catch (Exception e) {
// Notify listeners to the SourceTable when we had an issue refreshing available locations.
notifyListenersOnError(e, null);
getUpdateGraph().removeSource(this);
}
}
}

private static void assertNoLocationsRemoved(TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) {
if(!locationUpdate.getPendingRemovedLocationKeys().isEmpty()) {
throw new TableDataException(SourceTable.class.getSimpleName() + " does not support removing locations");
}
}

/**
* Hook to allow found location keys to be filtered (e.g. according to a where-clause on the partitioning columns of
* a {@link PartitionAwareSourceTable}. The default implementation returns its input unmolested.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
*/
package io.deephaven.engine.table.impl.locations;

import io.deephaven.util.annotations.InternalUseOnly;
import io.deephaven.util.type.NamedImplementation;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -34,6 +36,15 @@ interface Listener extends BasicTableDataListener {
* @param tableLocationKey The new table location key
*/
void handleTableLocationKey(@NotNull ImmutableTableLocationKey tableLocationKey);

/**
* Notify the listener of a table location that has been removed encountered while initiating or maintaining the location
* subscription. This should occur at most once per location, but the order of delivery is <i>not</i>
* guaranteed.
*
* @param tableLocation The table location
*/
void handleTableLocationRemoved(@NotNull TableLocation tableLocation);
}

/**
Expand Down Expand Up @@ -101,6 +112,16 @@ interface Listener extends BasicTableDataListener {
*/
boolean hasTableLocationKey(@NotNull final TableLocationKey tableLocationKey);

/**
* Remove the given table location.
*
* @apiNote Use with caution: the intent is that when a {@link TableLocationProvider} is told that a location is gone, we should
* no longer provide it in the list of locations, and stop requesting size from downstream providers.
*
* @param locationKey the TableLocation to remove
*/
void removeTableLocationKey(@NotNull final TableLocationKey locationKey);

/**
* @param tableLocationKey A {@link TableLocationKey} specifying the location to get
* @return The {@link TableLocation} matching the given key
Expand Down
Loading