Skip to content

Commit

Permalink
All of our review comments from yesterday.
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Aug 15, 2024
1 parent 2bfd26f commit 9744fc1
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.context.QueryCompiler;
import io.deephaven.engine.context.QueryCompilerRequest;
import io.deephaven.engine.context.QueryLibrary;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.util.MultiException;
import io.deephaven.util.SafeCloseable;
Expand All @@ -18,68 +21,121 @@
import org.jetbrains.annotations.VisibleForTesting;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public interface QueryCompilerRequestProcessor {
public abstract class QueryCompilerRequestProcessor {

/**
* @return An immediate QueryCompilerRequestProcessor
*/
static QueryCompilerRequestProcessor.ImmediateProcessor immediate() {
public static QueryCompilerRequestProcessor.ImmediateProcessor immediate() {
return new ImmediateProcessor();
}

/**
* @return A batch QueryCompilerRequestProcessor
*/
static QueryCompilerRequestProcessor.BatchProcessor batch() {
public static QueryCompilerRequestProcessor.BatchProcessor batch() {
return new BatchProcessor();
}

/**
* @return a CachingSupplier that supplies a snapshot of the current query scope variables
*/
@VisibleForTesting
static CachingSupplier<Map<String, Object>> newQueryScopeVariableSupplier() {
public static CachingSupplier<Map<String, Object>> newQueryScopeVariableSupplier() {
final QueryScope queryScope = ExecutionContext.getContext().getQueryScope();
return new CachingSupplier<>(() -> Collections.unmodifiableMap(
queryScope.toMap((name, value) -> NameValidator.isValidQueryParameterName(name))));
}

/**
* @return a CachingSupplier that supplies a snapshot of the current {@link QueryLibrary} package imports
*/
private static CachingSupplier<Collection<Package>> newPackageImportsSupplier() {
final QueryLibrary queryLibrary = ExecutionContext.getContext().getQueryLibrary();
return new CachingSupplier<>(() -> Set.copyOf(queryLibrary.getPackageImports()));
}

/**
* @return a CachingSupplier that supplies a snapshot of the current {@link QueryLibrary} class imports
*/
private static CachingSupplier<Collection<Class<?>>> newClassImportsSupplier() {
final QueryLibrary queryLibrary = ExecutionContext.getContext().getQueryLibrary();
return new CachingSupplier<>(() -> {
final Collection<Class<?>> classImports = new HashSet<>(queryLibrary.getClassImports());
// because QueryLibrary is in the context package, without visibility, we need to add these manually
classImports.add(TrackingWritableRowSet.class);
classImports.add(WritableColumnSource.class);
return Collections.unmodifiableCollection(classImports);
});
}

/**
* @return a CachingSupplier that supplies a snapshot of the current {@link QueryLibrary} static imports
*/
private static CachingSupplier<Collection<Class<?>>> newStaticImportsSupplier() {
final QueryLibrary queryLibrary = ExecutionContext.getContext().getQueryLibrary();
return new CachingSupplier<>(() -> Set.copyOf(queryLibrary.getStaticImports()));
}

private final CachingSupplier<Map<String, Object>> queryScopeVariableSupplier = newQueryScopeVariableSupplier();
private final CachingSupplier<Collection<Package>> packageImportsSupplier = newPackageImportsSupplier();
private final CachingSupplier<Collection<Class<?>>> classImportsSupplier = newClassImportsSupplier();
private final CachingSupplier<Collection<Class<?>>> staticImportsSupplier = newStaticImportsSupplier();

/**
* @return a lazily cached snapshot of the current query scope variables
*/
Map<String, Object> getQueryScopeVariables();
public final Map<String, Object> getQueryScopeVariables() {
return queryScopeVariableSupplier.get();
}

/**
* @return a lazily cached snapshot of the current {@link QueryLibrary} package imports
*/
public final Collection<Package> getPackageImports() {
return packageImportsSupplier.get();
}

/**
* @return a lazily cached snapshot of the current {@link QueryLibrary} class imports
*/
public final Collection<Class<?>> getClassImports() {
return classImportsSupplier.get();
}

/**
* @return a lazily cached snapshot of the current {@link QueryLibrary} static imports
*/
public final Collection<Class<?>> getStaticImports() {
return staticImportsSupplier.get();
}

/**
* Submit a request for compilation. The QueryCompilerRequestProcessor is not required to immediately compile this
* request.
*
* @param request the request to compile
*/
CompletionStageFuture<Class<?>> submit(@NotNull QueryCompilerRequest request);
public abstract CompletionStageFuture<Class<?>> submit(@NotNull QueryCompilerRequest request);

/**
* A QueryCompilerRequestProcessor that immediately compiles requests.
*/
class ImmediateProcessor implements QueryCompilerRequestProcessor {

private final CachingSupplier<Map<String, Object>> queryScopeVariableSupplier = newQueryScopeVariableSupplier();

public static class ImmediateProcessor extends QueryCompilerRequestProcessor {
private ImmediateProcessor() {
// force use of static factory method
}

@Override
public Map<String, Object> getQueryScopeVariables() {
return queryScopeVariableSupplier.get();
}

@Override
public CompletionStageFuture<Class<?>> submit(@NotNull final QueryCompilerRequest request) {
final String desc = "Compile: " + request.description();
Expand Down Expand Up @@ -108,20 +164,14 @@ public CompletionStageFuture<Class<?>> submit(@NotNull final QueryCompilerReques
* <p>
* The compile method must be called to actually compile the requests.
*/
class BatchProcessor implements QueryCompilerRequestProcessor {
public static class BatchProcessor extends QueryCompilerRequestProcessor {
private final List<QueryCompilerRequest> requests = new ArrayList<>();
private final List<CompletionStageFuture.Resolver<Class<?>>> resolvers = new ArrayList<>();
private final CachingSupplier<Map<String, Object>> queryScopeVariableSupplier = newQueryScopeVariableSupplier();

private BatchProcessor() {
// force use of static factory method
}

@Override
public Map<String, Object> getQueryScopeVariables() {
return queryScopeVariableSupplier.get();
}

@Override
public CompletionStageFuture<Class<?>> submit(@NotNull final QueryCompilerRequest request) {
final CompletionStageFuture.Resolver<Class<?>> resolver = CompletionStageFuture.make();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.vector.Vector;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableObject;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -1498,7 +1497,7 @@ public Table update(final Collection<? extends Selectable> newColumns) {
*/
public SelectValidationResult validateSelect(final SelectColumn... selectColumns) {
final SelectColumn[] clones = SelectColumn.copyFrom(selectColumns);
SelectAndViewAnalyzer.AnalyzerContext analyzerContext = SelectAndViewAnalyzer.create(
SelectAndViewAnalyzer.AnalyzerContext analyzerContext = SelectAndViewAnalyzer.createContext(
this, SelectAndViewAnalyzer.Mode.SELECT_STATIC, getModifiedColumnSetForUpdates(), true,
false, clones);
return new SelectValidationResult(analyzerContext.createAnalyzer(), clones);
Expand Down Expand Up @@ -1526,7 +1525,7 @@ private Table selectOrUpdate(Flavor flavor, final SelectColumn... selectColumns)
}
}
final boolean publishTheseSources = flavor == Flavor.Update;
final SelectAndViewAnalyzer.AnalyzerContext analyzerContext = SelectAndViewAnalyzer.create(
final SelectAndViewAnalyzer.AnalyzerContext analyzerContext = SelectAndViewAnalyzer.createContext(
this, mode, getModifiedColumnSetForUpdates(), publishTheseSources, true,
selectColumns);

Expand Down Expand Up @@ -1557,23 +1556,10 @@ this, mode, getModifiedColumnSetForUpdates(), publishTheseSources, true,
new SelectAndViewAnalyzer.UpdateHelper(emptyRowSet, fakeUpdate)) {

try {
final MutableBoolean errorOccurred = new MutableBoolean();

analyzer.applyUpdate(
fakeUpdate, emptyRowSet, updateHelper, jobScheduler, liveResultCapture,
() -> {
if (errorOccurred.booleanValue()) {
return;
}
waitForResult.complete(null);
},
err -> {
if (errorOccurred.booleanValue()) {
return;
}
errorOccurred.setTrue();
waitForResult.completeExceptionally(err);
});
() -> waitForResult.complete(null),
waitForResult::completeExceptionally);
} catch (Exception e) {
waitForResult.completeExceptionally(e);
}
Expand Down Expand Up @@ -1611,7 +1597,7 @@ this, mode, getModifiedColumnSetForUpdates(), publishTheseSources, true,
resultTable.setFlat();
}
propagateDataIndexes(processedColumns, resultTable);
for (final ColumnSource<?> columnSource : analyzerContext.getSelectedColumnSources()
for (final ColumnSource<?> columnSource : analyzerContext.getNewColumnSources()
.values()) {
if (columnSource instanceof PossiblyImmutableColumnSource) {
((PossiblyImmutableColumnSource) columnSource).setImmutable();
Expand Down Expand Up @@ -1778,7 +1764,7 @@ updateDescription, sizeForInstrumentation(), () -> {
initializeWithSnapshot(humanReadablePrefix, sc, (usePrev, beforeClockValue) -> {
final boolean publishTheseSources = flavor == Flavor.UpdateView;
final SelectAndViewAnalyzer.AnalyzerContext analyzerContext =
SelectAndViewAnalyzer.create(
SelectAndViewAnalyzer.createContext(
this, SelectAndViewAnalyzer.Mode.VIEW_EAGER,
getModifiedColumnSetForUpdates(), publishTheseSources, true,
viewColumns);
Expand Down Expand Up @@ -1869,9 +1855,10 @@ public Table lazyUpdate(final Collection<? extends Selectable> newColumns) {
sizeForInstrumentation(), () -> {
checkInitiateOperation();

final SelectAndViewAnalyzer.AnalyzerContext analyzerContext = SelectAndViewAnalyzer.create(
this, SelectAndViewAnalyzer.Mode.VIEW_LAZY,
getModifiedColumnSetForUpdates(), true, true, selectColumns);
final SelectAndViewAnalyzer.AnalyzerContext analyzerContext =
SelectAndViewAnalyzer.createContext(
this, SelectAndViewAnalyzer.Mode.VIEW_LAZY,
getModifiedColumnSetForUpdates(), true, true, selectColumns);
final SelectColumn[] processedColumns = analyzerContext.getProcessedColumns()
.toArray(SelectColumn[]::new);
final QueryTable result = new QueryTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ public final class QueryLanguageParser extends GenericVisitorAdapter<Class<?>, Q
* Create a QueryLanguageParser and parse the given {@code expression}. After construction, the
* {@link QueryLanguageParser.Result result} of parsing the {@code expression} is available with the
* {@link #getResult()}} method.
* <p>
* Note that the provided Collections and Maps must not be mutated concurrently with or after construction.
*
* @param expression The query language expression to parse
* @param packageImports Wildcard package imports
Expand All @@ -190,9 +192,10 @@ public final class QueryLanguageParser extends GenericVisitorAdapter<Class<?>, Q
* imported.
* @param variables A map of the names of scope variables to their types
* @param variableTypeArguments A map of the names of scope variables to their type arguments
* @param unboxArguments If true it will unbox the query scope arguments
* @param queryScopeVariables A mutable map of the names of query scope variables to their values
* @param columnVariables A set of column variable names
* @param unboxArguments If true it will unbox the query scope arguments
* @param timeConversionResult The result of converting time literals in the expression
* @throws QueryLanguageParseException If any exception or error is encountered
*/
public QueryLanguageParser(
Expand Down Expand Up @@ -225,6 +228,8 @@ public QueryLanguageParser(
* Create a QueryLanguageParser and parse the given {@code expression}. After construction, the
* {@link QueryLanguageParser.Result result} of parsing the {@code expression} is available with the
* {@link #getResult()}} method.
* <p>
* Note that the provided Collections and Maps must not be mutated concurrently with or after construction.
*
* @param expression The query language expression to parse
* @param packageImports Wildcard package imports
Expand All @@ -247,6 +252,28 @@ public QueryLanguageParser(
variableTypeArguments, null, null, true, null);
}

/**
* Create a QueryLanguageParser and parse the given {@code expression}. After construction, the
* {@link QueryLanguageParser.Result result} of parsing the {@code expression} is available with the
* {@link #getResult()}} method.
* <p>
* Note that the provided Collections and Maps must not be mutated concurrently with or after construction.
*
* @param expression The query language expression to parse
* @param packageImports Wildcard package imports
* @param classImports Individual class imports
* @param staticImports Wildcard static imports. All static variables and methods for the given classes are
* imported.
* @param variables A map of the names of scope variables to their types
* @param variableTypeArguments A map of the names of scope variables to their type arguments
* @param queryScopeVariables A mutable map of the names of query scope variables to their values
* @param columnVariables A set of column variable names
* @param unboxArguments If true it will unbox the query scope arguments
* @param verifyIdempotence If true, the parser will verify that the result expression will not mutate when parsed
* @param pyCallableWrapperImplName The name of the PyCallableWrapper implementation to use
* @param timeConversionResult The result of converting time literals in the expression
* @throws QueryLanguageParseException If any exception or error is encountered
*/
@VisibleForTesting
QueryLanguageParser(
String expression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ public synchronized void init(
try {
final QueryLanguageParser.Result result = FormulaAnalyzer.parseFormula(
formula, tableDefinition.getColumnNameMap(), outerToInnerNames,
compilationProcessor.getQueryScopeVariables(), unboxArguments);
compilationProcessor.getQueryScopeVariables(),
compilationProcessor.getPackageImports(),
compilationProcessor.getClassImports(),
compilationProcessor.getStaticImports(),
unboxArguments);

formulaShiftColPair = result.getFormulaShiftColPair();
if (formulaShiftColPair != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,12 @@ public List<String> initInputs(
return usedColumns;
}

private HashMap<String, ColumnSource<?>> filterColumnSources(
private Map<String, ColumnSource<?>> filterColumnSources(
final Map<String, ? extends ColumnSource<?>> columnsOfInterest) {
if (usedColumns.isEmpty() && usedColumnArrays.isEmpty()) {
return Map.of();
}

final HashMap<String, ColumnSource<?>> sources = new HashMap<>();
for (String columnName : usedColumns) {
sources.put(columnName, columnsOfInterest.get(columnName));
Expand Down Expand Up @@ -131,7 +135,7 @@ public void validateSafeForRefresh(BaseTable<?> sourceTable) {
}

protected void applyUsedVariables(
@NotNull final Map<String, ColumnDefinition<?>> columnDefinitionMap,
@NotNull final Map<String, ColumnDefinition<?>> parentColumnDefinitions,
@NotNull final Set<String> variablesUsed,
@NotNull final Map<String, Object> possibleParams) {
// the column definition map passed in is being mutated by the caller, so we need to make a copy
Expand All @@ -141,7 +145,7 @@ protected void applyUsedVariables(
usedColumns = new ArrayList<>();
usedColumnArrays = new ArrayList<>();
for (String variable : variablesUsed) {
ColumnDefinition<?> columnDefinition = columnDefinitionMap.get(variable);
ColumnDefinition<?> columnDefinition = parentColumnDefinitions.get(variable);
if (variable.equals("i")) {
usesI = true;
} else if (variable.equals("ii")) {
Expand All @@ -154,7 +158,7 @@ protected void applyUsedVariables(
} else {
String strippedColumnName =
variable.substring(0, Math.max(0, variable.length() - COLUMN_SUFFIX.length()));
columnDefinition = columnDefinitionMap.get(strippedColumnName);
columnDefinition = parentColumnDefinitions.get(strippedColumnName);
if (variable.endsWith(COLUMN_SUFFIX) && columnDefinition != null) {
columnDefinitions.put(strippedColumnName, columnDefinition);
usedColumnArrays.add(strippedColumnName);
Expand Down
Loading

0 comments on commit 9744fc1

Please sign in to comment.