Skip to content

Commit

Permalink
Select Refactoring to Remove Recursion During Update Processing
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Aug 10, 2024
1 parent 9156dd8 commit 1cf0152
Show file tree
Hide file tree
Showing 15 changed files with 702 additions and 601 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,7 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc

// Init all the rows by cooking up a fake Update
final TableUpdate fakeUpdate = new TableUpdateImpl(
analyzer.alreadyFlattenedSources() ? RowSetFactory.flat(rowSet.size()) : rowSet.copy(),
analyzer.flatResult() ? RowSetFactory.flat(rowSet.size()) : rowSet.copy(),
RowSetFactory.empty(), RowSetFactory.empty(),
RowSetShiftData.EMPTY, ModifiedColumnSet.ALL);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void onUpdate(final TableUpdate upstream) {
}

analyzer.applyUpdate(acquiredUpdate, toClear, updateHelper, jobScheduler, this,
new SelectAndViewAnalyzer.SelectLayerCompletionHandler(allNewColumns, completedColumns) {
new SelectAndViewAnalyzer.Layer.CompletionHandler(allNewColumns, completedColumns) {
@Override
public void onAllRequiredColumnsCompleted() {
completionRoutine(acquiredUpdate, jobScheduler, toClear, updateHelper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,8 @@ public QueryLanguageParser(
this.packageImports = packageImports == null ? Collections.emptySet() : Set.copyOf(packageImports);
this.classImports = classImports == null ? Collections.emptySet() : Set.copyOf(classImports);
this.staticImports = staticImports == null ? Collections.emptySet() : Set.copyOf(staticImports);
this.variables = variables == null ? Collections.emptyMap() : Map.copyOf(variables);
this.variableTypeArguments =
variableTypeArguments == null ? Collections.emptyMap() : Map.copyOf(variableTypeArguments);
this.variables = variables == null ? Collections.emptyMap() : variables;
this.variableTypeArguments = variableTypeArguments == null ? Collections.emptyMap() : variableTypeArguments;
this.queryScopeVariables = queryScopeVariables == null ? new HashMap<>() : queryScopeVariables;
this.columnVariables = columnVariables == null ? Collections.emptySet() : columnVariables;
this.unboxArguments = unboxArguments;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public abstract class AbstractFormulaColumn implements FormulaColumn {
private Formula formula;
protected QueryScopeParam<?>[] params;
protected Map<String, ? extends ColumnSource<?>> columnSources;
protected Map<String, ? extends ColumnDefinition<?>> columnDefinitions;
protected Map<String, ColumnDefinition<?>> columnDefinitions;
private TrackingRowSet rowSet;
protected Class<?> returnedType;
public static final String COLUMN_SUFFIX = "_";
Expand Down Expand Up @@ -90,12 +90,24 @@ public List<String> initInputs(
@NotNull final Map<String, ? extends ColumnSource<?>> columnsOfInterest) {
this.rowSet = rowSet;

this.columnSources = columnsOfInterest;
if (usedColumns != null) {
return usedColumns;
if (usedColumns == null) {
initDef(extractDefinitions(columnsOfInterest), QueryCompilerRequestProcessor.immediate());
}
this.columnSources = filterColumnSources(columnsOfInterest);

return initDef(extractDefinitions(columnsOfInterest), QueryCompilerRequestProcessor.immediate());
return usedColumns;
}

private HashMap<String, ColumnSource<?>> filterColumnSources(
final Map<String, ? extends ColumnSource<?>> columnsOfInterest) {
final HashMap<String, ColumnSource<?>> sources = new HashMap<>();
for (String columnName : usedColumns) {
sources.put(columnName, columnsOfInterest.get(columnName));
}
for (String columnName : usedColumnArrays) {
sources.put(columnName, columnsOfInterest.get(columnName));
}
return sources;
}

@Override
Expand Down Expand Up @@ -123,24 +135,28 @@ protected void applyUsedVariables(
@NotNull final Set<String> variablesUsed,
@NotNull final Map<String, Object> possibleParams) {
// the column definition map passed in is being mutated by the caller, so we need to make a copy
columnDefinitions = Map.copyOf(columnDefinitionMap);
columnDefinitions = new HashMap<>();

final List<QueryScopeParam<?>> paramsList = new ArrayList<>();
usedColumns = new ArrayList<>();
usedColumnArrays = new ArrayList<>();
for (String variable : variablesUsed) {
ColumnDefinition<?> columnDefinition = columnDefinitionMap.get(variable);
if (variable.equals("i")) {
usesI = true;
} else if (variable.equals("ii")) {
usesII = true;
} else if (variable.equals("k")) {
usesK = true;
} else if (columnDefinitions.get(variable) != null) {
} else if (columnDefinition != null) {
columnDefinitions.put(variable, columnDefinition);
usedColumns.add(variable);
} else {
String strippedColumnName =
variable.substring(0, Math.max(0, variable.length() - COLUMN_SUFFIX.length()));
if (variable.endsWith(COLUMN_SUFFIX) && columnDefinitions.get(strippedColumnName) != null) {
columnDefinition = columnDefinitionMap.get(strippedColumnName);
if (variable.endsWith(COLUMN_SUFFIX) && columnDefinition != null) {
columnDefinitions.put(strippedColumnName, columnDefinition);
usedColumnArrays.add(strippedColumnName);
} else if (possibleParams.containsKey(variable)) {
paramsList.add(new QueryScopeParam<>(variable, possibleParams.get(variable)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.deephaven.util.CompletionStageFuture;
import io.deephaven.util.type.TypeUtils;
import io.deephaven.vector.ObjectVector;
import io.deephaven.vector.Vector;
import org.jetbrains.annotations.NotNull;
import org.jpy.PyObject;

Expand Down Expand Up @@ -167,13 +166,23 @@ public static Class<?> getVectorType(Class<?> declaredType) {
} else {
final String declaredTypeSimpleName =
io.deephaven.util.type.TypeUtils.getUnboxedType(declaredType).getSimpleName();
try {
return Class.forName(Vector.class.getPackage().getName() + '.'
+ Character.toUpperCase(declaredTypeSimpleName.charAt(0))
+ declaredTypeSimpleName.substring(1)
+ "Vector");
} catch (ClassNotFoundException e) {
throw new RuntimeException("Unexpected exception for type " + declaredType, e);
switch (declaredTypeSimpleName) {
case "byte":
return io.deephaven.vector.ByteVector.class;
case "short":
return io.deephaven.vector.ShortVector.class;
case "char":
return io.deephaven.vector.CharVector.class;
case "int":
return io.deephaven.vector.IntVector.class;
case "long":
return io.deephaven.vector.LongVector.class;
case "float":
return io.deephaven.vector.FloatVector.class;
case "double":
return io.deephaven.vector.DoubleVector.class;
default:
throw new RuntimeException("Unexpected type " + declaredType);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import java.util.*;

public class BaseLayer extends SelectAndViewAnalyzer {
public class BaseLayer extends SelectAndViewAnalyzer.Layer {
private final Map<String, ColumnSource<?>> sources;
private final boolean publishTheseSources;

Expand All @@ -25,73 +25,62 @@ public class BaseLayer extends SelectAndViewAnalyzer {
}

@Override
int getLayerIndexFor(String column) {
if (sources.containsKey(column)) {
return BASE_LAYER_INDEX;
}
throw new IllegalArgumentException("Unknown column: " + column);
}

@Override
void setBaseBits(BitSet bitset) {
bitset.set(BASE_LAYER_INDEX);
Set<String> getLayerColumnNames() {
return sources.keySet();
}

@Override
public void setAllNewColumns(BitSet bitset) {
bitset.set(BASE_LAYER_INDEX);
}

@Override
void populateModifiedColumnSetRecurse(ModifiedColumnSet mcsBuilder, Set<String> remainingDepsToSatisfy) {
void populateModifiedColumnSetInReverse(
final ModifiedColumnSet mcsBuilder,
final Set<String> remainingDepsToSatisfy) {
mcsBuilder.setAll(remainingDepsToSatisfy.toArray(String[]::new));
}

@Override
final Map<String, ColumnSource<?>> getColumnSourcesRecurse(GetMode mode) {
void populateColumnSources(
final Map<String, ColumnSource<?>> result,
final GetMode mode) {
// We specifically return a LinkedHashMap so the columns get populated in order
final Map<String, ColumnSource<?>> result = new LinkedHashMap<>();
if (mode == GetMode.All || (mode == GetMode.Published && publishTheseSources)) {
result.putAll(sources);
}
return result;
}

@Override
public void applyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper helper, JobScheduler jobScheduler,
@Nullable LivenessNode liveResultOwner, SelectLayerCompletionHandler onCompletion) {
// nothing to do at the base layer
onCompletion.onLayerCompleted(BASE_LAYER_INDEX);
public CompletionHandler createUpdateHandler(
final TableUpdate upstream,
final RowSet toClear,
final SelectAndViewAnalyzer.UpdateHelper helper,
final JobScheduler jobScheduler,
@Nullable final LivenessNode liveResultOwner,
final CompletionHandler onCompletion) {
return new CompletionHandler(new BitSet(), onCompletion) {
@Override
protected void onAllRequiredColumnsCompleted() {
// nothing to do at the base layer
onCompletion.onLayerCompleted(getLayerIndex());
}
};
}

@Override
final Map<String, Set<String>> calcDependsOnRecurse(boolean forcePublishAllSources) {
final Map<String, Set<String>> result = new HashMap<>();
final void calcDependsOn(
final Map<String, Set<String>> result,
boolean forcePublishAllSources) {
if (publishTheseSources || forcePublishAllSources) {
for (final String col : sources.keySet()) {
result.computeIfAbsent(col, dummy -> new HashSet<>()).add(col);
}
}
return result;
}

@Override
public SelectAndViewAnalyzer getInner() {
return null;
}

@Override
public void startTrackingPrev() {
// nothing to do
boolean allowCrossColumnParallelization() {
return true;
}

@Override
public LogOutput append(LogOutput logOutput) {
return logOutput.append("{BaseLayer").append(", layerIndex=").append(getLayerIndex()).append("}");
}

@Override
public boolean allowCrossColumnParallelization() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,17 @@

public class ConstantColumnLayer extends SelectOrViewColumnLayer {
private final BitSet dependencyBitSet;
private final boolean flattenedResult;
private final boolean alreadyFlattenedSources;

ConstantColumnLayer(
SelectAndViewAnalyzer inner,
SelectAndViewAnalyzer analyzer,
String name,
SelectColumn sc,
WritableColumnSource<?> ws,
String[] deps,
ModifiedColumnSet mcsBuilder,
boolean flattenedResult,
boolean alreadyFlattenedSources) {
super(inner, name, sc, ws, null, deps, mcsBuilder);
ModifiedColumnSet mcsBuilder) {
super(analyzer, name, sc, ws, null, deps, mcsBuilder);
this.dependencyBitSet = new BitSet();
this.flattenedResult = flattenedResult;
this.alreadyFlattenedSources = alreadyFlattenedSources;
Arrays.stream(deps).mapToInt(inner::getLayerIndexFor).forEach(dependencyBitSet::set);
Arrays.stream(deps).mapToInt(analyzer::getLayerIndexFor).forEach(dependencyBitSet::set);
initialize(ws);
}

Expand All @@ -60,38 +54,31 @@ private void initialize(final WritableColumnSource<?> writableSource) {
}

@Override
public void applyUpdate(final TableUpdate upstream, final RowSet toClear, final UpdateHelper helper,
final JobScheduler jobScheduler, @Nullable final LivenessNode liveResultOwner,
final SelectLayerCompletionHandler onCompletion) {
public CompletionHandler createUpdateHandler(
final TableUpdate upstream,
final RowSet toClear,
final SelectAndViewAnalyzer.UpdateHelper helper,
final JobScheduler jobScheduler,
@Nullable final LivenessNode liveResultOwner,
final CompletionHandler onCompletion) {
// Nothing to do at this level, but need to recurse because my inner layers might need to be called (e.g.
// because they are SelectColumnLayers)
inner.applyUpdate(upstream, toClear, helper, jobScheduler, liveResultOwner,
new SelectLayerCompletionHandler(dependencyBitSet, onCompletion) {
@Override
public void onAllRequiredColumnsCompleted() {
// we don't need to do anything specific here; our result value is constant
onCompletion.onLayerCompleted(getLayerIndex());
}
});
return new CompletionHandler(dependencyBitSet, onCompletion) {
@Override
public void onAllRequiredColumnsCompleted() {
// we don't need to do anything specific here; our result value is constant
onCompletion.onLayerCompleted(getLayerIndex());
}
};
}

@Override
public LogOutput append(LogOutput logOutput) {
return logOutput.append("{ConstantColumnLayer: ").append(selectColumn.toString()).append("}");
}

@Override
public boolean flattenedResult() {
return flattenedResult;
boolean allowCrossColumnParallelization() {
return true;
}

@Override
public boolean alreadyFlattenedSources() {
return alreadyFlattenedSources;
}

@Override
public boolean allowCrossColumnParallelization() {
return inner.allowCrossColumnParallelization();
public LogOutput append(LogOutput logOutput) {
return logOutput.append("{ConstantColumnLayer: ").append(selectColumn.toString()).append("}");
}
}
Loading

0 comments on commit 1cf0152

Please sign in to comment.