Skip to content

Commit

Permalink
feat: Added support for parallel snapshot (#5558)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored Aug 2, 2024
1 parent f2b70b8 commit beecac5
Show file tree
Hide file tree
Showing 10 changed files with 428 additions and 180 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.engine.exceptions;

import io.deephaven.UncheckedDeephavenException;
import org.jetbrains.annotations.NotNull;

/**
* This exception is thrown when {@link io.deephaven.engine.table.impl.remote.ConstructSnapshot} fails to successfully
* collect snapshot data for a column in parallel.
*/
public class ColumnSnapshotUnsuccessfulException extends UncheckedDeephavenException {
public ColumnSnapshotUnsuccessfulException(@NotNull final String message, @NotNull final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,18 @@ public interface MemoizableOperation<T extends DynamicNode & NotificationStepRec
static boolean FORCE_PARALLEL_SELECT_AND_UPDATE =
Configuration.getInstance().getBooleanWithDefault("QueryTable.forceParallelSelectAndUpdate", false);

/**
* You can choose to enable or disable the parallel snapshot.
*/
public static boolean ENABLE_PARALLEL_SNAPSHOT =
Configuration.getInstance().getBooleanWithDefault("QueryTable.enableParallelSnapshot", true);

/**
* Minimum snapshot "chunk" size for parallel reading of columns, defaults to 1 million.
*/
public static long MINIMUM_PARALLEL_SNAPSHOT_ROWS =
Configuration.getInstance().getLongWithDefault("QueryTable.minimumParallelSnapshotRows", 1L << 20);

// Whether we should track the entire RowSet of firstBy and lastBy operations
@VisibleForTesting
public static boolean TRACKED_LAST_BY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
//
package io.deephaven.engine.table.impl.hierarchical;

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.api.ColumnName;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.engine.exceptions.SnapshotUnsuccessfulException;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.hierarchical.HierarchicalTable;
Expand All @@ -18,7 +18,7 @@
* {@link HierarchicalTable#snapshot(HierarchicalTable.SnapshotState, Table, ColumnName, BitSet, RowSequence, WritableChunk[])
* snapshot}.
*/
public class HierarchicalTableSnapshotException extends UncheckedDeephavenException {
public class HierarchicalTableSnapshotException extends SnapshotUnsuccessfulException {

public HierarchicalTableSnapshotException(@NotNull final String message, @NotNull final Throwable cause) {
super(message, cause);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void onAllRequiredColumnsCompleted() {

if (canParallelizeThisColumn && jobScheduler.threadCount() > 1 && !hasShifts &&
((resultTypeIsTableOrRowSet && totalSize > 0)
|| totalSize > QueryTable.MINIMUM_PARALLEL_SELECT_ROWS)) {
|| totalSize >= QueryTable.MINIMUM_PARALLEL_SELECT_ROWS)) {
final long divisionSize = resultTypeIsTableOrRowSet ? 1
: Math.max(QueryTable.MINIMUM_PARALLEL_SELECT_ROWS,
(totalSize + jobScheduler.threadCount() - 1) / jobScheduler.threadCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ public RowRedirection getRowRedirection() {
return rowRedirection;
}

public final ColumnSource<T> getInnerSource() {
return innerSource;
}

@Override
public void startTrackingPrevValues() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.util.SafeCloseable;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
Expand All @@ -31,14 +32,14 @@ public static class ModColumnData {
public RowSet rowsModified;
public Class<?> type;
public Class<?> componentType;
public ArrayList<Chunk<Values>> data;
public List<Chunk<Values>> data;
public ChunkType chunkType;
}

public static class AddColumnData {
public Class<?> type;
public Class<?> componentType;
public ArrayList<Chunk<Values>> data;
public List<Chunk<Values>> data;
public ChunkType chunkType;
}

Expand Down Expand Up @@ -94,34 +95,30 @@ public void close() {
rowsRemoved.close();
}
if (addColumnData != null) {
for (final BarrageMessage.AddColumnData acd : addColumnData) {
if (acd == null) {
continue;
for (final AddColumnData acd : addColumnData) {
if (acd != null) {
closeChunkData(acd.data);
}

for (Chunk<Values> chunk : acd.data) {
if (chunk instanceof PoolableChunk) {
((PoolableChunk) chunk).close();
}
}

acd.data.clear();
}
}
if (modColumnData != null) {
for (final ModColumnData mcd : modColumnData) {
if (mcd == null) {
continue;
}

for (Chunk<Values> chunk : mcd.data) {
if (chunk instanceof PoolableChunk) {
((PoolableChunk) chunk).close();
}
if (mcd != null) {
closeChunkData(mcd.data);
}
}
}
}

mcd.data.clear();
private static void closeChunkData(final Collection<Chunk<Values>> data) {
if (data.isEmpty()) {
return;
}
for (final Chunk<Values> chunk : data) {
if (chunk instanceof PoolableChunk) {
((PoolableChunk) chunk).close();
}
}
data.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import io.deephaven.base.Pair;
import io.deephaven.base.log.LogOutput;
import io.deephaven.base.verify.AssertionFailure;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.exceptions.UpdateGraphConflictException;
Expand Down Expand Up @@ -75,7 +77,7 @@
/**
* Test of QueryTable functionality.
* <p>
* This test used to be a catch all, but at over 7,000 lines became unwieldy. It is still somewhat of a catch-all, but
* This test used to be a catch-all, but at over 7,000 lines became unwieldy. It is still somewhat of a catch-all, but
* some specific classes of tests have been broken out.
* <p>
* See also {@link QueryTableAggregationTest}, {@link QueryTableJoinTest}, {@link QueryTableSelectUpdateTest},
Expand Down Expand Up @@ -2703,18 +2705,114 @@ public void testUngroupingAgnostic() {
ColumnVectors.ofObject(t1, "Y", String.class).toArray());
}

public void testEmptyTableSnapshot() {
final Table emptyTableNoColumns = emptyTable(0);
final Table emptyTableWithSingleColumn = emptyTable(0).update("X = i");
final Table emptyTableWithMultipleColumns = emptyTable(0).update("X = i", "Y = 2*i", "Z = 3*i");
try (final BarrageMessage snap =
ConstructSnapshot.constructBackplaneSnapshot(this, (BaseTable<?>) emptyTableNoColumns)) {
assertTrue(snap.rowsIncluded.isEmpty());
assertTrue(snap.addColumnData.length == 0);
assertTrue(snap.modColumnData.length == 0);
}

try (final BarrageMessage snap =
ConstructSnapshot.constructBackplaneSnapshot(this, (BaseTable<?>) emptyTableWithSingleColumn)) {
assertTrue(snap.rowsIncluded.isEmpty());
assertTrue(snap.addColumnData.length == 1);
assertTrue(snap.modColumnData.length == 1);
assertTrue(snap.addColumnData[0].data.isEmpty());
assertTrue(snap.modColumnData[0].data.isEmpty());
}

try (final BarrageMessage snap =
ConstructSnapshot.constructBackplaneSnapshot(this, (BaseTable<?>) emptyTableWithMultipleColumns)) {
assertTrue(snap.rowsIncluded.isEmpty());
assertTrue(snap.addColumnData.length == 3);
assertTrue(snap.modColumnData.length == 3);
assertTrue(snap.addColumnData[0].data.isEmpty());
assertTrue(snap.addColumnData[1].data.isEmpty());
assertTrue(snap.addColumnData[2].data.isEmpty());
assertTrue(snap.modColumnData[0].data.isEmpty());
assertTrue(snap.modColumnData[1].data.isEmpty());
assertTrue(snap.modColumnData[2].data.isEmpty());
}
}

public void testUngroupConstructSnapshotOfBoxedNull() {
final Table t =
testRefreshingTable(i(0).toTracking())
.update("X = new Integer[]{null, 2, 3}", "Z = new Integer[]{4, 5, null}");
final Table ungrouped = t.ungroup();
try (final BarrageMessage snap =
ConstructSnapshot.constructBackplaneSnapshot(this, (BaseTable<?>) ungrouped)) {
testUngroupConstructSnapshotBoxedNullAllColumnHelper(snap);
}

// Snapshot the second column for last two rows
final BitSet columnsToSnapshot = new BitSet(2);
columnsToSnapshot.set(1);
final RowSequence rowsToSnapshot = RowSequenceFactory.forRange(1, 2);
try (final BarrageMessage snap =
ConstructSnapshot.constructBackplaneSnapshotInPositionSpace(this, (BaseTable<?>) ungrouped,
columnsToSnapshot, rowsToSnapshot, null)) {
testUngroupConstructSnapshotBoxedNullFewColumnsHelper(snap);
}

final Table selected = ungrouped.select(); // Will convert column sources to in memory
try (final BarrageMessage snap =
ConstructSnapshot.constructBackplaneSnapshot(this, (BaseTable<?>) selected)) {
testUngroupConstructSnapshotBoxedNullAllColumnHelper(snap);
}

try (final BarrageMessage snap =
ConstructSnapshot.constructBackplaneSnapshotInPositionSpace(this, (BaseTable<?>) selected,
columnsToSnapshot, RowSequenceFactory.forRange(1, 2), null)) {
testUngroupConstructSnapshotBoxedNullFewColumnsHelper(snap);
}
}

private static void testUngroupConstructSnapshotBoxedNullAllColumnHelper(@NotNull final BarrageMessage snap) {
assertEquals(snap.rowsAdded, i(0, 1, 2));
final List<Chunk<Values>> firstColChunk = snap.addColumnData[0].data;
final int[] firstColExpected = new int[] {QueryConstants.NULL_INT, 2, 3};
final List<Chunk<Values>> secondColChunk = snap.addColumnData[1].data;
final int[] secondColExpected = new int[] {4, 5, QueryConstants.NULL_INT};
for (int i = 0; i < 3; i++) {
assertEquals(firstColChunk.get(0).asIntChunk().get(i), firstColExpected[i]);
assertEquals(secondColChunk.get(0).asIntChunk().get(i), secondColExpected[i]);
}
}

private static void testUngroupConstructSnapshotBoxedNullFewColumnsHelper(@NotNull final BarrageMessage snap) {
assertEquals(snap.rowsIncluded, i(1, 2));
assertEquals(snap.addColumnData[1].data.get(0).asIntChunk().get(0), 5);
assertEquals(snap.addColumnData[1].data.get(0).asIntChunk().get(1), QueryConstants.NULL_INT);
}


public void testUngroupConstructSnapshotSingleColumnTable() {
final Table t =
testRefreshingTable(i(0).toTracking())
.update("X = new Integer[]{null, 2, 3}");
final Table ungrouped = t.ungroup();
try (final BarrageMessage snap =
ConstructSnapshot.constructBackplaneSnapshot(this, (BaseTable<?>) ungrouped)) {
testUngroupConstructSnapshotSingleColumnHelper(snap);
}

final Table selected = ungrouped.select(); // Will convert column sources to in memory
try (final BarrageMessage snap = ConstructSnapshot.constructBackplaneSnapshot(this, (BaseTable<?>) selected)) {
testUngroupConstructSnapshotSingleColumnHelper(snap);
}
}

try (final BarrageMessage snap = ConstructSnapshot.constructBackplaneSnapshot(this, (BaseTable<?>) ungrouped)) {
assertEquals(snap.rowsAdded, i(0, 1, 2));
assertEquals(snap.addColumnData[0].data.get(0).asIntChunk().get(0),
io.deephaven.util.QueryConstants.NULL_INT);
assertEquals(snap.addColumnData[1].data.get(0).asIntChunk().get(2),
io.deephaven.util.QueryConstants.NULL_INT);
private static void testUngroupConstructSnapshotSingleColumnHelper(@NotNull final BarrageMessage snap) {
assertEquals(snap.rowsAdded, i(0, 1, 2));
final List<Chunk<Values>> firstColChunk = snap.addColumnData[0].data;
final int[] firstColExpected = new int[] {QueryConstants.NULL_INT, 2, 3};
for (int i = 0; i < 3; i++) {
assertEquals(firstColChunk.get(0).asIntChunk().get(i), firstColExpected[i]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ private ParquetTools() {}

/**
* Reads in a table from a single parquet file, metadata file, or directory with recognized layout. The source
* provided can be a local file path or a URI to be resolved via the provided
* {@link SeekableChannelsProviderPlugin}.
*
* provided can be a local file path or a URI to be resolved.
* <p>
* This method attempts to "do the right thing." It examines the source to determine if it's a single parquet file,
* a metadata file, or a directory. If it's a directory, it additionally tries to guess the layout to use. Unless a
Expand All @@ -106,8 +104,7 @@ public static Table readTable(@NotNull final String source) {

/**
* Reads in a table from a single parquet file, metadata file, or directory with recognized layout. The source
* provided can be a local file path or a URI to be resolved via the provided
* {@link SeekableChannelsProviderPlugin}.
* provided can be a local file path or a URI to be resolved.
*
* <p>
* If the {@link ParquetFileLayout} is not provided in the {@link ParquetInstructions instructions}, this method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.deephaven.engine.table.impl.util.ServerStateTracker;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.engine.util.AbstractScriptSession;
import io.deephaven.engine.util.ScriptSession;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
Expand Down

0 comments on commit beecac5

Please sign in to comment.