Skip to content

Commit

Permalink
Stateful Selectables and SelectColumns
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Feb 24, 2024
1 parent e29e7c6 commit 861cd24
Show file tree
Hide file tree
Showing 37 changed files with 508 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ default <TYPE> ColumnSource<TYPE> cast(Class<? extends TYPE> clazz, @Nullable Cl
/**
* Most column sources will return the same value for a given row without respect to the order that the rows are
* read. Those columns sources are considered "stateless" and should return true.
*
* <p>
* Some column sources, however may be dependent on evaluation order. For example, a formula that updates a Map must
* be evaluated from the first row to the last row. A column source that has the potential to depend on the order of
* evaluation must return false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ boolean shouldParallelizeFilter(WhereFilter filter, long numberOfRows) {
return permitParallelization()
&& numberOfRows != 0
&& (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT)
&& filter.permitParallelization();
&& filter.isStateless();
}

/**
Expand All @@ -338,6 +338,6 @@ static boolean permitParallelization(WhereFilter[] filters) {
if (QueryTable.DISABLE_PARALLEL_WHERE) {
return false;
}
return Arrays.stream(filters).anyMatch(WhereFilter::permitParallelization);
return Arrays.stream(filters).anyMatch(WhereFilter::isStateless);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ public Table selectDistinctInternal(Collection<? extends Selectable> columns) {
} catch (Exception e) {
return null;
}
if (!((PartitionAwareSourceTable) table).isValidAgainstColumnPartitionTable(selectColumn.getColumns(),
selectColumn.getColumnArrays())) {
if (!selectColumn.isStateless() ||
!((PartitionAwareSourceTable) table).isValidAgainstColumnPartitionTable(
selectColumn.getColumns(), selectColumn.getColumnArrays())) {
return null;
}
}
Expand Down Expand Up @@ -325,7 +326,8 @@ public final Table selectDistinct(Collection<? extends Selectable> columns) {
final List<SelectColumn> selectColumns = Arrays.asList(SelectColumn.from(columns));
for (SelectColumn selectColumn : selectColumns) {
selectColumn.initDef(definition.getColumnNameMap());
if (!isValidAgainstColumnPartitionTable(selectColumn.getColumns(), selectColumn.getColumnArrays())) {
if (selectColumn.isStateless() ||
!isValidAgainstColumnPartitionTable(selectColumn.getColumns(), selectColumn.getColumnArrays())) {
// Be sure to invoke the super-class version of this method, rather than the array-based one that
// delegates to this method.
return super.selectDistinct(selectColumns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ public final boolean isRetain() {
return false;
}

@Override
public final boolean isStateless() {
return true;
}

static ColumnSource<Table> getAndValidateInputColumnSource(
@NotNull final String inputColumnName,
@NotNull final Map<String, ? extends ColumnSource<?>> columnsOfInterest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,6 @@ public final boolean isRetain() {
return false;
}

@Override
public final boolean isStateless() {
return true;
}

private static final class OutputFormulaFillContext implements Formula.FillContext {

private static final Formula.FillContext INSTANCE = new OutputFormulaFillContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ protected void destroy() {
}

@Override
public boolean permitParallelization() {
public boolean isStateless() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public int hashCode() {
}

@Override
public boolean permitParallelization() {
return Arrays.stream(componentFilters).allMatch(WhereFilter::permitParallelization);
public boolean isStateless() {
return Arrays.stream(componentFilters).allMatch(WhereFilter::isStateless);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -614,10 +614,4 @@ public ConditionFilter copy() {
public ConditionFilter renameFilter(Map<String, String> renames) {
return new ConditionFilter(formula, renames);
}

@Override
public boolean permitParallelization() {
// TODO (https://github.com/deephaven/deephaven-core/issues/4896): Assume statelessness by default.
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -834,38 +834,4 @@ String makeGetExpression(boolean usePrev) {
return String.format("%s.%s(k)", name, getGetterName(columnDefinition.getDataType(), usePrev));
}
}

/**
* Is this parameter immutable, and thus would contribute no state to the formula?
* <p>
* If any query scope parameter is not a primitive, String, or known immutable class; then it may be a mutable
* object that results in undefined results when the column is not evaluated strictly in order.
*
* @return true if this query scope parameter is immutable
*/
private static boolean isImmutableType(QueryScopeParam<?> param) {
final Object value = param.getValue();
if (value == null) {
return true;
}
final Class<?> type = value.getClass();
if (type == String.class || type == BigInteger.class || type == BigDecimal.class
|| type == Instant.class || type == ZonedDateTime.class || Table.class.isAssignableFrom(type)) {
return true;
}
// if it is a boxed type, then it is immutable; otherwise we don't know what to do with it
return TypeUtils.isBoxedType(type);
}

private boolean isUsedColumnStateless(String columnName) {
return columnSources.get(columnName).isStateless();
}

@Override
public boolean isStateless() {
return Arrays.stream(params).allMatch(DhFormulaColumn::isImmutableType)
&& usedColumns.stream().allMatch(this::isUsedColumnStateless)
&& usedColumnArrays.stream().allMatch(this::isUsedColumnStateless);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public DownsampledWhereFilter copy() {
}

@Override
public boolean permitParallelization() {
public boolean isStateless() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,7 @@
import io.deephaven.api.RawString;
import io.deephaven.api.expression.Function;
import io.deephaven.api.expression.Method;
import io.deephaven.api.filter.Filter;
import io.deephaven.api.filter.FilterAnd;
import io.deephaven.api.filter.FilterComparison;
import io.deephaven.api.filter.FilterIn;
import io.deephaven.api.filter.FilterIsNull;
import io.deephaven.api.filter.FilterNot;
import io.deephaven.api.filter.FilterOr;
import io.deephaven.api.filter.FilterPattern;
import io.deephaven.api.filter.*;

import java.util.List;

Expand Down Expand Up @@ -79,4 +72,9 @@ public List<Filter> visit(Method method) {
public List<Filter> visit(RawString rawString) {
return List.of(rawString);
}

@Override
public List<Filter> visit(StatefulFilter filter) {
return List.of(filter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,6 @@ public boolean isRetain() {
return false;
}

@Override
public boolean isStateless() {
return false;
}

@Override
public FunctionalColumn<S, D> copy() {
return new FunctionalColumn<>(sourceName, sourceDataType, destName, destDataType, function);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,6 @@ public boolean isRetain() {
return false;
}

@Override
public boolean isStateless() {
return false;
}

@Override
public MultiSourceFunctionalColumn<D> copy() {
return new MultiSourceFunctionalColumn<>(sourceNames, destName, destDataType, function);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,6 @@ public boolean isRetain() {
return false;
}

@Override
public boolean isStateless() {
return true;
}

@Override
public SelectColumn copy() {
// noinspection unchecked,rawtypes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ protected void destroy() {
}

@Override
public boolean permitParallelization() {
public boolean isStateless() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@
*/
package io.deephaven.engine.table.impl.select;

import io.deephaven.api.ColumnName;
import io.deephaven.api.RawString;
import io.deephaven.api.Selectable;
import io.deephaven.api.Strings;
import io.deephaven.api.*;
import io.deephaven.api.expression.Expression;
import io.deephaven.api.expression.Function;
import io.deephaven.api.expression.Method;
import io.deephaven.api.expression.StatefulExpression;
import io.deephaven.api.filter.Filter;
import io.deephaven.api.literal.Literal;
import io.deephaven.engine.context.QueryCompiler;
Expand Down Expand Up @@ -173,10 +171,12 @@ default void validateSafeForRefresh(final BaseTable<?> sourceTable) {
}

/**
* Returns true if this column is stateless (i.e. one row does not depend on the order of evaluation for another
* row).
* @return true if this column is stateless (i.e. one row does not depend on the order of evaluation for another
* row).
*/
boolean isStateless();
default boolean isStateless() {
return true;
}

/**
* Create a copy of this SelectColumn.
Expand Down Expand Up @@ -222,6 +222,11 @@ public SelectColumn visit(RawString rhs) {
return makeSelectColumn(Strings.of(rhs));
}

@Override
public SelectColumn visit(StatefulExpression statefulExpression) {
return StatefulSelectColumn.of(statefulExpression.innerExpression().walk(this));
}

private SelectColumn makeSelectColumn(String rhs) {
// TODO(deephaven-core#3740): Remove engine crutch on io.deephaven.api.Strings
return SelectColumnFactory.getExpression(String.format("%s=%s", lhs.name(), rhs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected WritableRowSet updateAndGetAddedIndex() {
}

@Override
public boolean permitParallelization() {
public boolean isStateless() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.engine.table.impl.select;

import io.deephaven.annotations.SimpleStyle;
import io.deephaven.engine.rowset.TrackingRowSet;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.engine.table.impl.MatchPair;
import org.immutables.value.Value;
import org.jetbrains.annotations.NotNull;

import java.util.List;
import java.util.Map;

/**
* A {@link SelectColumn} that wraps another {@code SelectColumn}, and is used to indicate that the wrapped column has
* side effects that prevent parallelization. (e.g. a column that depends on processing rows in order, or would suffer
* from lock contention if parallelized)
*/
@Value.Immutable
@SimpleStyle
public abstract class StatefulSelectColumn implements SelectColumn {

public static StatefulSelectColumn of(@NotNull final SelectColumn innerSelectColumn) {
return ImmutableStatefulSelectColumn.of(innerSelectColumn);
}

@Value.Parameter
public abstract SelectColumn innerSelectColumn();

@Override
public List<String> initInputs(
@NotNull final TrackingRowSet rowSet,
@NotNull final Map<String, ? extends ColumnSource<?>> columnsOfInterest) {
return innerSelectColumn().initInputs(rowSet, columnsOfInterest);
}

@Override
public List<String> initDef(
@NotNull final Map<String, ColumnDefinition<?>> columnDefinitionMap) {
return innerSelectColumn().initDef(columnDefinitionMap);
}

@Override
public Class<?> getReturnedType() {
return innerSelectColumn().getReturnedType();
}

@Override
public List<String> getColumns() {
return innerSelectColumn().getColumns();
}

@Override
public List<String> getColumnArrays() {
return innerSelectColumn().getColumnArrays();
}

@Override
public @NotNull ColumnSource<?> getDataView() {
return innerSelectColumn().getDataView();
}

@Override
public @NotNull ColumnSource<?> getLazyView() {
return innerSelectColumn().getLazyView();
}

@Override
public String getName() {
return innerSelectColumn().getName();
}

@Override
public MatchPair getMatchPair() {
return innerSelectColumn().getMatchPair();
}

@Override
public WritableColumnSource<?> newDestInstance(long size) {
return innerSelectColumn().newDestInstance(size);
}

@Override
public WritableColumnSource<?> newFlatDestInstance(long size) {
return innerSelectColumn().newFlatDestInstance(size);
}

@Override
public boolean isRetain() {
return innerSelectColumn().isRetain();
}

@Override
public SelectColumn copy() {
return StatefulSelectColumn.of(innerSelectColumn().copy());
}

@Override
public boolean isStateless() {
return false;
}
}
Loading

0 comments on commit 861cd24

Please sign in to comment.