diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryCompilerRequestProcessor.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryCompilerRequestProcessor.java index 4aa259851a8..49e6e2e6b14 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryCompilerRequestProcessor.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryCompilerRequestProcessor.java @@ -4,15 +4,11 @@ package io.deephaven.engine.table.impl; import io.deephaven.UncheckedDeephavenException; -import io.deephaven.api.util.NameValidator; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.context.QueryCompiler; import io.deephaven.engine.context.QueryCompilerRequest; -import io.deephaven.engine.context.QueryLibrary; -import io.deephaven.engine.context.QueryScope; -import io.deephaven.engine.rowset.TrackingWritableRowSet; -import io.deephaven.engine.table.WritableColumnSource; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; +import io.deephaven.engine.table.impl.select.codegen.FormulaAnalyzer; import io.deephaven.util.MultiException; import io.deephaven.util.SafeCloseable; import io.deephaven.util.CompletionStageFuture; @@ -21,12 +17,7 @@ import org.jetbrains.annotations.VisibleForTesting; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -48,76 +39,20 @@ public static QueryCompilerRequestProcessor.BatchProcessor batch() { } /** - * @return a CachingSupplier that supplies a snapshot of the current query scope variables + * @return a CachingSupplier that supplies a snapshot of current query scope variables and query library imports */ @VisibleForTesting - public static CachingSupplier> newQueryScopeVariableSupplier() { - final QueryScope queryScope = ExecutionContext.getContext().getQueryScope(); - return new CachingSupplier<>(() -> Collections.unmodifiableMap( - queryScope.toMap((name, value) -> NameValidator.isValidQueryParameterName(name)))); + public static CachingSupplier newFormulaImportsSupplier() { + return new CachingSupplier<>(FormulaAnalyzer.Imports::new); } - /** - * @return a CachingSupplier that supplies a snapshot of the current {@link QueryLibrary} package imports - */ - private static CachingSupplier> newPackageImportsSupplier() { - final QueryLibrary queryLibrary = ExecutionContext.getContext().getQueryLibrary(); - return new CachingSupplier<>(() -> Set.copyOf(queryLibrary.getPackageImports())); - } - - /** - * @return a CachingSupplier that supplies a snapshot of the current {@link QueryLibrary} class imports - */ - private static CachingSupplier>> newClassImportsSupplier() { - final QueryLibrary queryLibrary = ExecutionContext.getContext().getQueryLibrary(); - return new CachingSupplier<>(() -> { - final Collection> classImports = new HashSet<>(queryLibrary.getClassImports()); - // because QueryLibrary is in the context package, without visibility, we need to add these manually - classImports.add(TrackingWritableRowSet.class); - classImports.add(WritableColumnSource.class); - return Collections.unmodifiableCollection(classImports); - }); - } - - /** - * @return a CachingSupplier that supplies a snapshot of the current {@link QueryLibrary} static imports - */ - private static CachingSupplier>> newStaticImportsSupplier() { - final QueryLibrary queryLibrary = ExecutionContext.getContext().getQueryLibrary(); - return new CachingSupplier<>(() -> Set.copyOf(queryLibrary.getStaticImports())); - } - - private final CachingSupplier> queryScopeVariableSupplier = newQueryScopeVariableSupplier(); - private final CachingSupplier> packageImportsSupplier = newPackageImportsSupplier(); - private final CachingSupplier>> classImportsSupplier = newClassImportsSupplier(); - private final CachingSupplier>> staticImportsSupplier = newStaticImportsSupplier(); - - /** - * @return a lazily cached snapshot of the current query scope variables - */ - public final Map getQueryScopeVariables() { - return queryScopeVariableSupplier.get(); - } - - /** - * @return a lazily cached snapshot of the current {@link QueryLibrary} package imports - */ - public final Collection getPackageImports() { - return packageImportsSupplier.get(); - } - - /** - * @return a lazily cached snapshot of the current {@link QueryLibrary} class imports - */ - public final Collection> getClassImports() { - return classImportsSupplier.get(); - } + private final CachingSupplier formulaImportsSupplier = newFormulaImportsSupplier(); /** - * @return a lazily cached snapshot of the current {@link QueryLibrary} static imports + * @return a lazily cached snapshot of current query scope variables and query library imports */ - public final Collection> getStaticImports() { - return staticImportsSupplier.get(); + public final FormulaAnalyzer.Imports getFormulaImports() { + return formulaImportsSupplier.get(); } /** diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index 4bb5e19dad4..8e18674cb32 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1498,7 +1498,7 @@ public Table update(final Collection newColumns) { public SelectValidationResult validateSelect(final SelectColumn... selectColumns) { final SelectColumn[] clones = SelectColumn.copyFrom(selectColumns); SelectAndViewAnalyzer.AnalyzerContext analyzerContext = SelectAndViewAnalyzer.createContext( - this, SelectAndViewAnalyzer.Mode.SELECT_STATIC, getModifiedColumnSetForUpdates(), true, + this, SelectAndViewAnalyzer.Mode.SELECT_STATIC, true, false, clones); return new SelectValidationResult(analyzerContext.createAnalyzer(), clones); } @@ -1526,8 +1526,7 @@ private Table selectOrUpdate(Flavor flavor, final SelectColumn... selectColumns) } final boolean publishTheseSources = flavor == Flavor.Update; final SelectAndViewAnalyzer.AnalyzerContext analyzerContext = SelectAndViewAnalyzer.createContext( - this, mode, getModifiedColumnSetForUpdates(), publishTheseSources, true, - selectColumns); + this, mode, publishTheseSources, true, selectColumns); final SelectAndViewAnalyzer analyzer = analyzerContext.createAnalyzer(); final SelectColumn[] processedColumns = analyzerContext.getProcessedColumns() @@ -1597,12 +1596,6 @@ this, mode, getModifiedColumnSetForUpdates(), publishTheseSources, true, resultTable.setFlat(); } propagateDataIndexes(processedColumns, resultTable); - for (final ColumnSource columnSource : analyzerContext.getNewColumnSources() - .values()) { - if (columnSource instanceof PossiblyImmutableColumnSource) { - ((PossiblyImmutableColumnSource) columnSource).setImmutable(); - } - } } } propagateFlatness(resultTable); @@ -1766,8 +1759,7 @@ updateDescription, sizeForInstrumentation(), () -> { final SelectAndViewAnalyzer.AnalyzerContext analyzerContext = SelectAndViewAnalyzer.createContext( this, SelectAndViewAnalyzer.Mode.VIEW_EAGER, - getModifiedColumnSetForUpdates(), publishTheseSources, true, - viewColumns); + publishTheseSources, true, viewColumns); final SelectColumn[] processedViewColumns = analyzerContext.getProcessedColumns() .toArray(SelectColumn[]::new); QueryTable queryTable = new QueryTable( @@ -1857,8 +1849,7 @@ public Table lazyUpdate(final Collection newColumns) { final SelectAndViewAnalyzer.AnalyzerContext analyzerContext = SelectAndViewAnalyzer.createContext( - this, SelectAndViewAnalyzer.Mode.VIEW_LAZY, - getModifiedColumnSetForUpdates(), true, true, selectColumns); + this, SelectAndViewAnalyzer.Mode.VIEW_LAZY, true, true, selectColumns); final SelectColumn[] processedColumns = analyzerContext.getProcessedColumns() .toArray(SelectColumn[]::new); final QueryTable result = new QueryTable( diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java index f5a0bddc7db..d7236ce6010 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java @@ -16,6 +16,7 @@ import io.deephaven.engine.table.impl.util.UpdateGraphJobScheduler; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** * A Shift-Aware listener for Select or Update. It uses the SelectAndViewAnalyzer to calculate how columns affect other @@ -86,9 +87,19 @@ public void onUpdate(final TableUpdate upstream) { jobScheduler = new ImmediateJobScheduler(); } + // do not allow a double-notify + final AtomicBoolean hasNotified = new AtomicBoolean(); analyzer.applyUpdate(acquiredUpdate, toClear, updateHelper, jobScheduler, this, - () -> completionRoutine(acquiredUpdate, jobScheduler, toClear, updateHelper), - this::handleException); + () -> { + if (!hasNotified.getAndSet(true)) { + completionRoutine(acquiredUpdate, jobScheduler, toClear, updateHelper); + } + }, + error -> { + if (!hasNotified.getAndSet(true)) { + handleException(error); + } + }); } private void handleException(Exception e) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/lang/QueryLanguageParser.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/lang/QueryLanguageParser.java index b84f5a0230f..d457a4ab0b3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/lang/QueryLanguageParser.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/lang/QueryLanguageParser.java @@ -192,7 +192,7 @@ public final class QueryLanguageParser extends GenericVisitorAdapter, Q * imported. * @param variables A map of the names of scope variables to their types * @param variableTypeArguments A map of the names of scope variables to their type arguments - * @param queryScopeVariables A mutable map of the names of query scope variables to their values + * @param queryScopeVariables A map of the names of query scope variables to their values * @param columnVariables A set of column variable names * @param unboxArguments If true it will unbox the query scope arguments * @param timeConversionResult The result of converting time literals in the expression @@ -266,7 +266,7 @@ public QueryLanguageParser( * imported. * @param variables A map of the names of scope variables to their types * @param variableTypeArguments A map of the names of scope variables to their type arguments - * @param queryScopeVariables A mutable map of the names of query scope variables to their values + * @param queryScopeVariables A map of the names of query scope variables to their values * @param columnVariables A set of column variable names * @param unboxArguments If true it will unbox the query scope arguments * @param verifyIdempotence If true, the parser will verify that the result expression will not mutate when parsed diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/AbstractConditionFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/AbstractConditionFilter.java index 6c7de789aef..2007eee6526 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/AbstractConditionFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/AbstractConditionFilter.java @@ -89,11 +89,7 @@ public synchronized void init( try { final QueryLanguageParser.Result result = FormulaAnalyzer.parseFormula( formula, tableDefinition.getColumnNameMap(), outerToInnerNames, - compilationProcessor.getQueryScopeVariables(), - compilationProcessor.getPackageImports(), - compilationProcessor.getClassImports(), - compilationProcessor.getStaticImports(), - unboxArguments); + compilationProcessor.getFormulaImports(), unboxArguments); formulaShiftColPair = result.getFormulaShiftColPair(); if (formulaShiftColPair != null) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DhFormulaColumn.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DhFormulaColumn.java index c7e1c32ffdd..7ed70800dfa 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DhFormulaColumn.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DhFormulaColumn.java @@ -34,7 +34,6 @@ import io.deephaven.io.logger.Logger; import io.deephaven.util.CompletionStageFuture; import io.deephaven.util.type.TypeUtils; -import io.deephaven.vector.ObjectVector; import io.deephaven.vector.VectorFactory; import org.jetbrains.annotations.NotNull; import org.jpy.PyObject; @@ -181,10 +180,7 @@ public List initDef( try { final QueryLanguageParser.Result result = FormulaAnalyzer.parseFormula( formulaString, columnDefinitionMap, Collections.emptyMap(), - compilationRequestProcessor.getQueryScopeVariables(), - compilationRequestProcessor.getPackageImports(), - compilationRequestProcessor.getClassImports(), - compilationRequestProcessor.getStaticImports()); + compilationRequestProcessor.getFormulaImports()); analyzedFormula = FormulaAnalyzer.analyze(formulaString, columnDefinitionMap, result); hasConstantValue = result.isConstantValueExpression(); formulaShiftColPair = result.getFormulaShiftColPair(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MatchFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MatchFilter.java index d4026f087aa..4a7921e39b1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MatchFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MatchFilter.java @@ -223,7 +223,8 @@ public synchronized void init( return; } final List valueList = new ArrayList<>(); - final Map queryScopeVariables = compilationProcessor.getQueryScopeVariables(); + final Map queryScopeVariables = + compilationProcessor.getFormulaImports().getQueryScopeVariables(); final ColumnTypeConvertor convertor = ColumnTypeConvertorFactory.getConvertor(column.getDataType()); for (String strValue : strValues) { convertor.convertValue(column, tableDefinition, strValue, queryScopeVariables, valueList::add); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/RangeFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/RangeFilter.java index 65d65b75e03..5236862c6d5 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/RangeFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/RangeFilter.java @@ -183,7 +183,7 @@ public void init( try { boolean wasAnArrayType = convertor.convertValue( - def, tableDefinition, value, compilationProcessor.getQueryScopeVariables(), + def, tableDefinition, value, compilationProcessor.getFormulaImports().getQueryScopeVariables(), realValue::setValue); if (wasAnArrayType) { conversionError = diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/DependencyLayerBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/DependencyLayerBase.java index 44e081451f6..7d00107d17e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/DependencyLayerBase.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/DependencyLayerBase.java @@ -29,7 +29,7 @@ public abstract class DependencyLayerBase extends SelectAndViewAnalyzer.Layer { this.selectColumn = selectColumn; selectColumnHoldsVector = Vector.class.isAssignableFrom(selectColumn.getReturnedType()); this.columnSource = columnSource; - context.populateModifiedColumnSet(mcsBuilder, dependencies); + context.populateParentDependenciesMCS(mcsBuilder, dependencies); this.myModifiedColumnSet = mcsBuilder; this.myLayerDependencySet = new BitSet(); context.populateLayerDependencySet(myLayerDependencySet, dependencies); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java index 6e34e9b39e0..91b85a4c77c 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java @@ -3,9 +3,12 @@ // package io.deephaven.engine.table.impl.select.analyzers; +import gnu.trove.map.TObjectIntMap; +import gnu.trove.map.hash.TObjectIntHashMap; import io.deephaven.base.Pair; import io.deephaven.base.log.LogOutput; import io.deephaven.base.log.LogOutputAppendable; +import io.deephaven.base.verify.Assert; import io.deephaven.engine.liveness.LivenessNode; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; @@ -22,6 +25,7 @@ import io.deephaven.engine.table.impl.select.SourceColumn; import io.deephaven.engine.table.impl.select.SwitchColumn; import io.deephaven.engine.table.impl.sources.InMemoryColumnSource; +import io.deephaven.engine.table.impl.sources.PossiblyImmutableColumnSource; import io.deephaven.engine.table.impl.sources.RedirectedColumnSource; import io.deephaven.engine.table.impl.sources.SingleValueColumnSource; import io.deephaven.engine.table.impl.sources.WritableRedirectedColumnSource; @@ -39,7 +43,6 @@ import org.jetbrains.annotations.Nullable; import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.stream.Stream; @@ -79,7 +82,6 @@ public static void initializeSelectColumns( public static AnalyzerContext createContext( final QueryTable parentTable, final Mode mode, - final ModifiedColumnSet parentMcs, final boolean publishParentSources, boolean useShiftedColumns, final SelectColumn... selectColumns) { @@ -161,7 +163,7 @@ public static AnalyzerContext createContext( // note: if flatResult is true then we are not preserving any parent columns final boolean useResultKeySpace = context.flatResult && Stream.concat(sc.getColumns().stream(), sc.getColumnArrays().stream()) - .anyMatch(context.newSources::containsKey); + .anyMatch(columnName -> context.getLayerIndexFor(columnName) != Layer.PARENT_TABLE_INDEX); sc.initInputs(rowSet, useResultKeySpace ? context.allSourcesInResultKeySpace : context.allSources); @@ -172,7 +174,7 @@ public static AnalyzerContext createContext( final Stream allDependencies = Stream.concat(sc.getColumns().stream(), sc.getColumnArrays().stream()); final String[] distinctDeps = allDependencies.distinct().toArray(String[]::new); - final ModifiedColumnSet mcsBuilder = new ModifiedColumnSet(parentMcs); + final ModifiedColumnSet mcsBuilder = new ModifiedColumnSet(parentTable.getModifiedColumnSetForUpdates()); if (useShiftedColumns && hasConstantArrayAccess(sc)) { // we use the first shifted column to split between processed columns and remaining columns @@ -231,6 +233,7 @@ public static AnalyzerContext createContext( final WritableColumnSource scs = parentIsFlat || context.flatResult ? sc.newFlatDestInstance(targetDestinationCapacity) : sc.newDestInstance(targetDestinationCapacity); + maybeSetStaticColumnSourceImmutable(scs); maybeCreateAlias.accept(scs); context.addLayer(new SelectColumnLayer( updateGraph, rowSet, context, sc, scs, null, distinctDeps, mcsBuilder, false, @@ -241,6 +244,7 @@ public static AnalyzerContext createContext( final WritableColumnSource underlyingSource = sc.newDestInstance(rowSet.size()); final WritableColumnSource scs = WritableRedirectedColumnSource.maybeRedirect( rowRedirection, underlyingSource, rowSet.size()); + maybeSetStaticColumnSourceImmutable(scs); maybeCreateAlias.accept(scs); context.addLayer(new SelectColumnLayer( updateGraph, rowSet, context, sc, scs, underlyingSource, distinctDeps, mcsBuilder, true, @@ -272,6 +276,12 @@ public static AnalyzerContext createContext( return context; } + private static void maybeSetStaticColumnSourceImmutable(final ColumnSource columnSource) { + if (columnSource instanceof PossiblyImmutableColumnSource) { + ((PossiblyImmutableColumnSource) columnSource).setImmutable(); + } + } + private static @Nullable SourceColumn tryToGetSourceColumn(final SelectColumn sc) { final SourceColumn realColumn; if (sc instanceof SourceColumn) { @@ -357,16 +367,17 @@ public final static class AnalyzerContext { /** The analyzer that we are building. */ private final List layers = new ArrayList<>(); - /** The sources that are available to the analyzer, including parent columns. */ + /** + * The sources that are available to the analyzer, including parent columns. Parent columns are in parent key + * space, others are in result key space. + */ private final Map> allSources = new LinkedHashMap<>(); /** The sources that are available to the analyzer, including parent columns, in result key space. */ private final Map> allSourcesInResultKeySpace; - /** The sources that are explicitly defined to the analyzer, including preserved parent columns. */ - private final Map> newSources = new HashMap<>(); /** The sources that are published to the child table. */ private final Map> publishedSources = new LinkedHashMap<>(); /** A mapping from result column name to the layer index that created it. */ - private final Map columnToLayerIndex = new HashMap<>(); + private final TObjectIntMap columnToLayerIndex; /** The select columns that have been processed so far. */ private final List processedCols = new ArrayList<>(); @@ -379,23 +390,24 @@ public final static class AnalyzerContext { /** Whether the result should be flat. */ private boolean flatResult; /** The layer that will be used to process redirection, if we have one. */ - private int redirectionLayer = -1; + private int redirectionLayer = Layer.UNSET_INDEX; AnalyzerContext( final QueryTable parentTable, - final boolean publishTheseSources, + final boolean publishParentSources, final boolean flatResult) { - final Map> sources = parentTable.getColumnSourceMap(); + final Map> parentSources = parentTable.getColumnSourceMap(); + columnToLayerIndex = new TObjectIntHashMap<>(parentSources.size(), 0.5f, Layer.UNSET_INDEX); this.flatResult = flatResult; - allSources.putAll(sources); + allSources.putAll(parentSources); for (final String columnName : allSources.keySet()) { - columnToLayerIndex.put(columnName, -1); + columnToLayerIndex.put(columnName, Layer.PARENT_TABLE_INDEX); } - if (publishTheseSources) { - publishedSources.putAll(sources); + if (publishParentSources) { + publishedSources.putAll(parentSources); } if (!flatResult) { @@ -417,7 +429,7 @@ public final static class AnalyzerContext { */ void addLayer(final Layer layer) { if (layer instanceof RedirectionLayer) { - if (redirectionLayer != -1) { + if (redirectionLayer != Layer.UNSET_INDEX) { throw new IllegalStateException("Cannot have more than one redirection layer"); } redirectionLayer = layers.size(); @@ -427,7 +439,6 @@ void addLayer(final Layer layer) { if (flatResult) { layer.populateColumnSources(allSourcesInResultKeySpace); } - layer.populateColumnSources(newSources); layer.populateColumnSources(publishedSources); layers.add(layer); @@ -452,8 +463,8 @@ int getNextLayerIndex() { * @return the layerIndex */ int getLayerIndexFor(String column) { - final Integer layerIndex = columnToLayerIndex.get(column); - if (layerIndex == null) { + final int layerIndex = columnToLayerIndex.get(column); + if (layerIndex == Layer.UNSET_INDEX) { throw new IllegalStateException("Column " + column + " not found in any layer of the analyzer"); } return layerIndex; @@ -465,19 +476,19 @@ int getLayerIndexFor(String column) { * @param mcsBuilder the result ModifiedColumnSet to populate * @param dependencies the immediate dependencies */ - void populateModifiedColumnSet( + void populateParentDependenciesMCS( final ModifiedColumnSet mcsBuilder, final String[] dependencies) { for (final String dep : dependencies) { final int layerIndex = getLayerIndexFor(dep); - if (layerIndex != -1) { + if (layerIndex == Layer.PARENT_TABLE_INDEX) { + // this is a preserved parent column + mcsBuilder.setAll(dep); + } else if (layerIndex != Layer.UNSET_INDEX) { // Forward-looking mcsBuilder.setAll(layers.get(layerIndex).getModifiedColumnSet()); - } else if (!allSources.containsKey(dep)) { + } else { // we should have blown up during initDef if this is the case throw new IllegalStateException("Column " + dep + " not found in any layer of the analyzer"); - } else if (!newSources.containsKey(dep)) { - // this is a preserved parent column - mcsBuilder.setAll(dep); } } } @@ -493,13 +504,13 @@ void populateLayerDependencySet( final String[] dependencies) { for (final String dep : dependencies) { final int layerIndex = getLayerIndexFor(dep); - if (layerIndex != -1) { - layerDependencySet.or(layers.get(layerIndex).getLayerDependencySet()); - } else if (!allSources.containsKey(dep)) { + if (layerIndex == Layer.UNSET_INDEX) { // we should have blown up during initDef if this is the case throw new IllegalStateException("Column " + dep + " not found in any layer of the analyzer"); + } else if (layerIndex != Layer.PARENT_TABLE_INDEX) { + // note that implicitly preserved columns do not belong to a layer. + layerDependencySet.or(layers.get(layerIndex).getLayerDependencySet()); } - // Note that preserved columns do not belong to a layer. } } @@ -509,23 +520,16 @@ void populateLayerDependencySet( * @param layerDependencies the result bitset to populate */ void setRedirectionLayer(final BitSet layerDependencies) { - if (redirectionLayer != -1) { + if (redirectionLayer != Layer.UNSET_INDEX) { layerDependencies.set(redirectionLayer); } } - /** - * @return the column sources explicitly created by the analyzer - */ - public Map> getNewColumnSources() { - return newSources; - } - /** * @return the column sources that are published through the child table */ public Map> getPublishedColumnSources() { - // Note that if we have a shift column that we forcefully publish all columns. + // Note that if we have a shift column that we forcibly publish all columns. return shiftColumn == null ? publishedSources : allSources; } @@ -565,7 +569,7 @@ public Map calcEffects() { for (final String columnName : resultMap.keySet()) { final int layerIndex = getLayerIndexFor(columnName); final String[] dependencies; - if (layerIndex == -1) { + if (layerIndex == Layer.PARENT_TABLE_INDEX) { dependencies = new String[] {columnName}; } else { dependencies = layers.get(layerIndex).getModifiedColumnSet().dirtyColumnNames(); @@ -601,7 +605,7 @@ public Map calcEffects() { * @return the final result */ public QueryTable applyShiftsAndRemainingColumns( - final @NotNull QueryTable parentTable, + @NotNull final QueryTable parentTable, @NotNull QueryTable resultSoFar, final UpdateFlavor updateFlavor) { if (shiftColumn != null) { @@ -670,6 +674,10 @@ public QueryTable applyShiftsAndRemainingColumns( } static abstract class Layer implements LogOutputAppendable { + private static final BitSet EMPTY_BITSET = new BitSet(); + + public static final int UNSET_INDEX = -1; + public static final int PARENT_TABLE_INDEX = -2; /** * The layerIndex is used to identify each layer uniquely within the bitsets for completion. @@ -705,7 +713,7 @@ ModifiedColumnSet getModifiedColumnSet() { * @return the layer dependency set indicating which layers this layer depends on */ BitSet getLayerDependencySet() { - return new BitSet(); + return EMPTY_BITSET; } @Override @@ -777,6 +785,12 @@ public UpdateHelper(RowSet parentRowSet, TableUpdate upstream) { this.upstream = upstream; } + /** + * Flatten the upstream update from the parent key space to the destination key space. We are guaranteed to be + * in STATIC_SELECT mode. + * + * @return the flattened update + */ TableUpdate resultKeySpaceUpdate() { if (upstreamInResultSpace == null) { upstreamInResultSpace = new TableUpdateImpl( @@ -863,6 +877,7 @@ public void applyUpdate( final Runnable onSuccess, final Consumer onError) { + Assert.assertion(remainingLayers.isEmpty(), "remainingLayers.isEmpty()"); remainingLayers.or(requiredLayers); final Runnable[] runners = new Runnable[layers.length]; @@ -871,6 +886,8 @@ public void applyUpdate( for (int ii = 0; ii < layers.length; ++ii) { final Layer layer = layers[ii]; if (layer != null) { + // TODO (deephaven-core#4896): this error handling allows concurrent layers to fail without ensuring + // that other tasks are finished. runners[ii] = layer.createUpdateHandler( upstream, toClear, helper, jobScheduler, liveResultOwner, () -> scheduler.onLayerComplete(layer.getLayerIndex()), onError); @@ -882,13 +899,14 @@ public void applyUpdate( private class UpdateScheduler { private final ReentrantLock runLock = new ReentrantLock(); - private final AtomicBoolean needsRun = new AtomicBoolean(); private final Runnable[] runners; private final Runnable onSuccess; private final Consumer onError; - private volatile boolean updateComplete; + private volatile boolean needsRun; + /** whether we have already invoked onSuccess */ + private boolean updateComplete; public UpdateScheduler( final Runnable[] runners, @@ -908,7 +926,7 @@ public void onLayerComplete(final int layerIndex) { } private void tryToKickOffWork() { - needsRun.set(true); + needsRun = true; while (true) { if (runLock.isHeldByCurrentThread() || !runLock.tryLock()) { // do not permit re-entry or waiting on another thread doing exactly this work @@ -916,7 +934,8 @@ private void tryToKickOffWork() { } try { - if (needsRun.compareAndSet(true, false)) { + if (needsRun) { + needsRun = false; doKickOffWork(); } } catch (final Exception exception) { @@ -928,7 +947,7 @@ private void tryToKickOffWork() { runLock.unlock(); } - if (!needsRun.get()) { + if (!needsRun) { return; } } @@ -947,7 +966,7 @@ private void doKickOffWork() { Runnable runner = null; synchronized (remainingLayers) { complete = remainingLayers.isEmpty(); - nextLayer = remainingLayers.nextSetBit(nextLayer); + nextLayer = complete ? -1 : remainingLayers.nextSetBit(nextLayer); if (nextLayer != -1) { if ((runner = runners[nextLayer]) != null) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/codegen/FormulaAnalyzer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/codegen/FormulaAnalyzer.java index b617bb0283f..c38ec276b6b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/codegen/FormulaAnalyzer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/codegen/FormulaAnalyzer.java @@ -3,6 +3,9 @@ // package io.deephaven.engine.table.impl.select.codegen; +import io.deephaven.api.util.NameValidator; +import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.context.QueryLibrary; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.impl.lang.QueryLanguageParser; import io.deephaven.engine.table.impl.select.QueryScopeParamTypeUtil; @@ -24,6 +27,42 @@ public class FormulaAnalyzer { private static final Logger log = LoggerFactory.getLogger(FormulaAnalyzer.class); + /** + * A container to hold a single copy of imports required to compile formulas for one operation. + */ + public static final class Imports { + private final Map queryScopeVariables; + private final Collection packageImports; + private final Collection> classImports; + private final Collection> staticImports; + + public Imports() { + final ExecutionContext context = ExecutionContext.getContext(); + queryScopeVariables = Collections.unmodifiableMap( + context.getQueryScope().toMap((name, value) -> NameValidator.isValidQueryParameterName(name))); + final QueryLibrary queryLibrary = context.getQueryLibrary(); + packageImports = Set.copyOf(queryLibrary.getPackageImports()); + classImports = Set.copyOf(queryLibrary.getClassImports()); + staticImports = Set.copyOf(queryLibrary.getStaticImports()); + } + + public Map getQueryScopeVariables() { + return queryScopeVariables; + } + + public Collection getPackageImports() { + return packageImports; + } + + public Collection> getClassImports() { + return classImports; + } + + public Collection> getStaticImports() { + return staticImports; + } + } + public static Result analyze(final String rawFormulaString, final Map> columnDefinitionMap, final QueryLanguageParser.Result queryLanguageResult) throws Exception { @@ -71,7 +110,7 @@ public static Result analyze(final String rawFormulaString, * @param formulaString The raw formula string * @param availableColumns The columns available for use in the formula * @param columnRenames Outer to inner column name mapping - * @param queryScopeVariables The query scope variables + * @param imports The query scope variables, package, class, and static imports * @return The parsed formula {@link QueryLanguageParser.Result result} * @throws Exception If the formula cannot be parsed */ @@ -79,12 +118,8 @@ public static QueryLanguageParser.Result parseFormula( @NotNull final String formulaString, @NotNull final Map> availableColumns, @NotNull final Map columnRenames, - @NotNull final Map queryScopeVariables, - @NotNull final Collection packageImports, - @NotNull final Collection> classImports, - @NotNull final Collection> staticImports) throws Exception { - return parseFormula(formulaString, availableColumns, columnRenames, queryScopeVariables, packageImports, - classImports, staticImports, true); + @NotNull final Imports imports) throws Exception { + return parseFormula(formulaString, availableColumns, columnRenames, imports, true); } /** @@ -93,10 +128,7 @@ public static QueryLanguageParser.Result parseFormula( * @param formulaString The raw formula string * @param availableColumns The columns available for use in the formula * @param columnRenames Outer to inner column name mapping - * @param queryScopeVariables The query scope variables - * @param packageImports The package imports - * @param classImports The class imports - * @param staticImports The static imports + * @param imports The query scope variables, package, class, and static imports * @param unboxArguments If true it will unbox the query scope arguments * @return The parsed formula {@link QueryLanguageParser.Result result} * @throws Exception If the formula cannot be parsed @@ -105,10 +137,7 @@ public static QueryLanguageParser.Result parseFormula( @NotNull final String formulaString, @NotNull final Map> availableColumns, @NotNull final Map columnRenames, - @NotNull final Map queryScopeVariables, - @NotNull final Collection packageImports, - @NotNull final Collection> classImports, - @NotNull final Collection> staticImports, + @NotNull final Imports imports, final boolean unboxArguments) throws Exception { final TimeLiteralReplacedExpression timeConversionResult = @@ -183,7 +212,7 @@ public static QueryLanguageParser.Result parseFormula( } // Parameters come last. - for (Map.Entry param : queryScopeVariables.entrySet()) { + for (Map.Entry param : imports.queryScopeVariables.entrySet()) { if (possibleVariables.containsKey(param.getKey())) { // Columns and column arrays take precedence over parameters. continue; @@ -206,9 +235,10 @@ public static QueryLanguageParser.Result parseFormula( possibleVariables.putAll(timeConversionResult.getNewVariables()); - return new QueryLanguageParser(timeConversionResult.getConvertedFormula(), packageImports, - classImports, staticImports, possibleVariables, possibleVariableParameterizedTypes, - queryScopeVariables, columnVariables, unboxArguments, timeConversionResult).getResult(); + return new QueryLanguageParser(timeConversionResult.getConvertedFormula(), imports.getPackageImports(), + imports.getClassImports(), imports.getStaticImports(), possibleVariables, + possibleVariableParameterizedTypes, imports.getQueryScopeVariables(), columnVariables, unboxArguments, + timeConversionResult).getResult(); } public static class Result { diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/lang/TestQueryLanguageParser.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/lang/TestQueryLanguageParser.java index 32523cbf766..4f1558f0c1d 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/lang/TestQueryLanguageParser.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/lang/TestQueryLanguageParser.java @@ -3178,7 +3178,7 @@ private void check(String expression, String resultExpression, Class resultTy final Map possibleParams; final QueryScope queryScope = ExecutionContext.getContext().getQueryScope(); if (!(queryScope instanceof PoisonedQueryScope)) { - possibleParams = QueryCompilerRequestProcessor.newQueryScopeVariableSupplier().get(); + possibleParams = QueryCompilerRequestProcessor.newFormulaImportsSupplier().get().getQueryScopeVariables(); } else { possibleParams = null; }