Skip to content

Commit

Permalink
Pushdown Predicates
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Dec 4, 2023
1 parent 414be28 commit 7088710
Show file tree
Hide file tree
Showing 133 changed files with 3,608 additions and 589 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.deephaven.engine.exceptions;

import io.deephaven.UncheckedDeephavenException;

/**
* Runtime exception representing an incompatibility between table definitions.
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public class IncompatibleTableDefinitionException extends UncheckedDeephavenException {

public IncompatibleTableDefinitionException() {
super();
}

public IncompatibleTableDefinitionException(String message) {
super(message);
}

public IncompatibleTableDefinitionException(String message, Throwable cause) {
super(message, cause);
}

public IncompatibleTableDefinitionException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ WritableRowSet match(
* values until this method is called. This is an option, not an obligation: some simple ColumnSource
* implementations (like TSingleValueSource for various T) always track previous values; other implementations (like
* PrevColumnSource) never do; some (like TArrayColumnSource) only start tracking once this method is called.
*
* <p>
* An immutable column source can not have distinct prev values; therefore it is implemented as a no-op.
*/
default void startTrackingPrevValues() {
Expand Down Expand Up @@ -202,7 +202,7 @@ default <TYPE> ColumnSource<TYPE> cast(Class<? extends TYPE> clazz, @Nullable Cl
/**
* Most column sources will return the same value for a given row without respect to the order that the rows are
* read. Those columns sources are considered "stateless" and should return true.
*
* <p>
* Some column sources, however may be dependent on evaluation order. For example, a formula that updates a Map must
* be evaluated from the first row to the last row. A column source that has the potential to depend on the order of
* evaluation must return false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
*/
package io.deephaven.engine.table;

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.api.ColumnName;
import io.deephaven.base.cache.OpenAddressedCanonicalizationCache;
import io.deephaven.base.log.LogOutput;
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.table.impl.NoSuchColumnException;
import io.deephaven.engine.exceptions.IncompatibleTableDefinitionException;
import io.deephaven.io.log.impl.LogOutputStringImpl;
import io.deephaven.qst.column.header.ColumnHeader;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -654,29 +654,4 @@ private List<ColumnDefinition<?>> getWritableColumns(final boolean partitioningT
}
return columns;
}

/**
* Runtime exception representing an incompatibility between table definitions.
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public static class IncompatibleTableDefinitionException extends UncheckedDeephavenException {

private static final long serialVersionUID = 7668080323885707687L;

public IncompatibleTableDefinitionException() {
super();
}

public IncompatibleTableDefinitionException(String message) {
super(message);
}

public IncompatibleTableDefinitionException(String message, Throwable cause) {
super(message, cause);
}

public IncompatibleTableDefinitionException(Throwable cause) {
super(cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ default Chunk<ATTR> getPrevChunkByFilling(@NotNull final GetContext context,
default ChunkSource<ATTR> getPrevSource() {
final ChunkSource.WithPrev<ATTR> chunkSource = this;

return new ChunkSource<ATTR>() {
return new ChunkSource<>() {
@Override
public ChunkType getChunkType() {
return chunkSource.getChunkType();
Expand Down
1 change: 1 addition & 0 deletions engine/table/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ dependencies {

implementation project(':plugin')
implementation depCommonsLang3
implementation 'org.testng:testng:7.1.0'
Classpaths.inheritCommonsText(project, 'implementation')

Classpaths.inheritGroovy(project, 'groovy', 'implementation')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public WritableRowSet match(
final boolean caseInsensitive,
@NotNull final RowSet mapper,
final Object... keys) {
// TODO (deephaven-core#3851): port this to new grouping API
final Map<T, RowSet> groupToRange = (isImmutable() || !usePrev) ? getGroupToRange(mapper) : null;
if (groupToRange != null) {
RowSetBuilderRandom allInMatchingGroups = RowSetFactory.builderRandom();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private Table makeConstituentTable(@NotNull final TableLocation tableLocation) {
final PartitionAwareSourceTable constituent = new PartitionAwareSourceTable(
constituentDefinition,
"SingleLocationSourceTable-" + tableLocation,
RegionedTableComponentFactoryImpl.INSTANCE,
RegionedTableComponentFactoryImpl.make(),
new SingleTableLocationProvider(tableLocation),
refreshSizes ? refreshCombiner : null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.chunk.*;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
import io.deephaven.chunk.attributes.Values;
import org.jetbrains.annotations.NotNull;

public interface ChunkFilter {
/**
Expand Down Expand Up @@ -138,22 +140,25 @@ default void filter(Chunk<? extends Values> values, LongChunk<OrderedRowKeys> ke
* filter.
*
* @param selection the RowSet to filter
* @param columnSource the column source to filter
* @param chunkSource the chunk source to filter
* @param usePrev should we use previous values from the column source?
* @param chunkFilter the chunk filter to apply
*
* @return A new WritableRowSet representing the filtered values, owned by the caller
*/
static WritableRowSet applyChunkFilter(RowSet selection, ColumnSource<?> columnSource, boolean usePrev,
ChunkFilter chunkFilter) {
static WritableRowSet applyChunkFilter(
@NotNull final RowSet selection,
@NotNull final ChunkSource<? extends Values> chunkSource,
final boolean usePrev,
@NotNull final ChunkFilter chunkFilter) {
final RowSetBuilderSequential builder = RowSetFactory.builderSequential();

final int contextSize = (int) Math.min(FILTER_CHUNK_SIZE, selection.size());
long chunksBetweenChecks = INITIAL_INTERRUPTION_SIZE / FILTER_CHUNK_SIZE;
long filteredChunks = 0;
long lastInterruptCheck = System.currentTimeMillis();

try (final ColumnSource.GetContext getContext = columnSource.makeGetContext(contextSize);
try (final ColumnSource.GetContext getContext = chunkSource.makeGetContext(contextSize);
final WritableLongChunk<OrderedRowKeys> longChunk = WritableLongChunk.makeWritableChunk(contextSize);
final RowSequence.Iterator rsIt = selection.getRowSequenceIterator()) {
while (rsIt.hasMore()) {
Expand All @@ -176,9 +181,11 @@ static WritableRowSet applyChunkFilter(RowSet selection, ColumnSource<?> columnS

final Chunk<? extends Values> dataChunk;
if (usePrev) {
dataChunk = columnSource.getPrevChunk(getContext, okChunk);
// noinspection unchecked
dataChunk = ((ChunkSource.WithPrev<? extends Values>) chunkSource)
.getPrevChunk(getContext, okChunk);
} else {
dataChunk = columnSource.getChunk(getContext, okChunk);
dataChunk = chunkSource.getChunk(getContext, okChunk);
}
chunkFilter.filter(dataChunk, keyChunk, longChunk);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ public interface ColumnLocation extends StringUtils.StringKeyedObject, NamedImpl
*/
boolean exists();

/**
* Get this column location cast to the specified type
*
* @return {@code this}, with the appropriate cast applied
*/
default <CL extends ColumnLocation> CL cast() {
// noinspection unchecked
return (CL) this;
}

/**
* <p>
* Get the metadata object stored with this column, or null if no such data exists.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.deephaven.engine.table.impl.locations;

import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import org.jetbrains.annotations.NotNull;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
*/
package io.deephaven.engine.table.impl.locations;

import io.deephaven.api.SortColumn;
import io.deephaven.base.log.LogOutput;
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.engine.table.Table;
import io.deephaven.io.log.impl.LogOutputStringImpl;
import io.deephaven.util.annotations.FinalDefault;
import io.deephaven.util.type.NamedImplementation;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.List;

/**
* Building block for Deephaven "source" tables, with helper methods for discovering locations and their sizes. A
Expand Down Expand Up @@ -36,6 +41,12 @@ interface Listener extends BasicTableDataListener {
@NotNull
ImmutableTableKey getTableKey();

// TODO: NATE NOCOMMIT IF UNUSED ELSE MAKE CAST
default <T extends TableLocation> T as(Class<T> otherType) {
// noinspection unchecked
return (T) this;
}

/**
* @return An {@link ImmutableTableLocationKey} instance for this location
*/
Expand Down Expand Up @@ -77,6 +88,31 @@ interface Listener extends BasicTableDataListener {
*/
void refresh();

/**
* Get an ordered list of columns this location is sorted by.
*
* @return a non-null ordered list of {@link SortColumn}s
*/
@NotNull
List<SortColumn> getSortedColumns();

/**
* Check if this location has a data index for the specified columns.
*
* @param columns the set of columns to check for.
* @return true if the table has a Data Index for the specified columns
*/
boolean hasDataIndexFor(@NotNull String... columns);

/**
* Get the data index table for the specified set of columns. Note that the order of columns does not matter here.
*
* @param columns the key columns for the index
* @return the index table or null if one does not exist.
*/
@Nullable
Table getDataIndex(@NotNull String... columns);

/**
* @param name The column name
* @return The ColumnLocation for the defined column under this table location
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,28 @@
package io.deephaven.engine.table.impl.locations.impl;

import io.deephaven.base.verify.Require;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.util.string.StringUtils;
import io.deephaven.engine.table.impl.locations.*;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.hash.KeyedObjectHashMap;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.lang.ref.SoftReference;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* Partial TableLocation implementation for use by TableDataService implementations.
*/
public abstract class AbstractTableLocation
extends SubscriptionAggregator<TableLocation.Listener>
implements TableLocation {
protected static final SoftReference<Table> NO_GROUPING_SENTINEL = new SoftReference<>(null);

private final ImmutableTableKey tableKey;
private final ImmutableTableLocationKey tableLocationKey;
Expand All @@ -24,6 +34,11 @@ public abstract class AbstractTableLocation
private final KeyedObjectHashMap<CharSequence, ColumnLocation> columnLocations =
new KeyedObjectHashMap<>(StringUtils.charSequenceKey());

/**
* A map of grouping (or data index) columns to the materialized
*/
protected volatile Map<List<String>, SoftReference<Table>> cachedGroupings;

/**
* @param tableKey Table key for the table this location belongs to
* @param tableLocationKey Table location key that identifies this location
Expand Down Expand Up @@ -138,4 +153,63 @@ public final ColumnLocation getColumnLocation(@NotNull final CharSequence name)
protected final void clearColumnLocations() {
columnLocations.clear();
}

@Nullable
@Override
public final Table getDataIndex(@NotNull final String... columns) {
final List<String> colNames = Arrays.asList(columns);
Table grouping = null;
if (cachedGroupings != null) {
final SoftReference<Table> cachedGrouping = cachedGroupings.get(colNames);
if (cachedGrouping == NO_GROUPING_SENTINEL) {
return null;
}

if (cachedGrouping != null) {
grouping = cachedGrouping.get();
if (grouping != null) {
// System.out.println("HAD CACHE");
return grouping;
}
}
}

synchronized (this) {
if (cachedGroupings == null) {
cachedGroupings = new HashMap<>();
}

final SoftReference<Table> cachedGrouping = cachedGroupings.get(colNames);
if (cachedGrouping == NO_GROUPING_SENTINEL) {
return null;
}

if (cachedGrouping != null) {
grouping = cachedGrouping.get();
}

if (grouping == null) {
grouping = getDataIndexImpl(columns);

if (grouping == null || grouping.isEmpty()) {
cachedGroupings.put(colNames, NO_GROUPING_SENTINEL);
} else {
// System.out.println("NO CACHE");
cachedGroupings.put(colNames, new SoftReference<>(grouping));
}
}

return grouping;
}
}

/**
* Load the data index from the location implementation. Implementations of this method should not perform any
* result caching.
*
* @param columns the columns to load an index for.
* @return the data index table, or an empty table or null if none existed.
*/
@Nullable
protected abstract Table getDataIndexImpl(@NotNull final String... columns);
}
Loading

0 comments on commit 7088710

Please sign in to comment.