Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce AggSpec and AggregateAllByTable #1618

Merged
merged 21 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package io.deephaven.engine.table.impl;

import io.deephaven.api.agg.key.Key;
import io.deephaven.api.agg.key.KeyAbsSum;
import io.deephaven.api.agg.key.KeyAvg;
import io.deephaven.api.agg.key.KeyCountDistinct;
import io.deephaven.api.agg.key.KeyDistinct;
import io.deephaven.api.agg.key.KeyFirst;
import io.deephaven.api.agg.key.KeyGroup;
import io.deephaven.api.agg.key.KeyLast;
import io.deephaven.api.agg.key.KeyMax;
import io.deephaven.api.agg.key.KeyMedian;
import io.deephaven.api.agg.key.KeyMin;
import io.deephaven.api.agg.key.KeyPct;
import io.deephaven.api.agg.key.KeySortedFirst;
import io.deephaven.api.agg.key.KeySortedLast;
import io.deephaven.api.agg.key.KeyStd;
import io.deephaven.api.agg.key.KeySum;
import io.deephaven.api.agg.key.KeyUnique;
import io.deephaven.api.agg.key.KeyVar;
import io.deephaven.api.agg.key.KeyWAvg;
import io.deephaven.api.agg.key.KeyWSum;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.BaseTable.CopyAttributeOperation;

import java.util.Objects;

class AggAllByCopyAttributes implements Key.Visitor {

private final BaseTable parent;
private final Table result;

public AggAllByCopyAttributes(BaseTable parent, Table result) {
this.parent = Objects.requireNonNull(parent);
this.result = Objects.requireNonNull(result);
}

@Override
public void visit(KeyAbsSum absSum) {}

@Override
public void visit(KeyCountDistinct countDistinct) {}

@Override
public void visit(KeyDistinct distinct) {}

@Override
public void visit(KeyGroup group) {}

@Override
public void visit(KeyAvg avg) {}

@Override
public void visit(KeyFirst first) {
parent.copyAttributes(result, CopyAttributeOperation.FirstBy);
}

@Override
public void visit(KeyLast last) {
parent.copyAttributes(result, CopyAttributeOperation.LastBy);
}

@Override
public void visit(KeyMax max) {}

@Override
public void visit(KeyMedian median) {}

@Override
public void visit(KeyMin min) {}

@Override
public void visit(KeyPct pct) {}

@Override
public void visit(KeySortedFirst sortedFirst) {}

@Override
public void visit(KeySortedLast sortedLast) {}

@Override
public void visit(KeyStd std) {}

@Override
public void visit(KeySum sum) {}

@Override
public void visit(KeyUnique unique) {}

@Override
public void visit(KeyWAvg wAvg) {}

@Override
public void visit(KeyWSum wSum) {}

@Override
public void visit(KeyVar var) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,7 @@
import io.deephaven.api.agg.Aggregation;
import io.deephaven.api.agg.AggregationOutputs;
import io.deephaven.api.agg.key.Key;
import io.deephaven.api.agg.key.KeyAbsSum;
import io.deephaven.api.agg.key.KeyAvg;
import io.deephaven.api.agg.key.KeyColumnReferences;
import io.deephaven.api.agg.key.KeyCountDistinct;
import io.deephaven.api.agg.key.KeyDistinct;
import io.deephaven.api.agg.key.KeyFirst;
import io.deephaven.api.agg.key.KeyGroup;
import io.deephaven.api.agg.key.KeyLast;
import io.deephaven.api.agg.key.KeyMax;
import io.deephaven.api.agg.key.KeyMedian;
import io.deephaven.api.agg.key.KeyMin;
import io.deephaven.api.agg.key.KeyPct;
import io.deephaven.api.agg.key.KeySortedFirst;
import io.deephaven.api.agg.key.KeySortedLast;
import io.deephaven.api.agg.key.KeyStd;
import io.deephaven.api.agg.key.KeySum;
import io.deephaven.api.agg.key.KeyUnique;
import io.deephaven.api.agg.key.KeyVar;
import io.deephaven.api.agg.key.KeyWAvg;
import io.deephaven.api.agg.key.KeyWSum;
import io.deephaven.api.agg.key.KeyColumns;
import io.deephaven.api.filter.Filter;
import io.deephaven.base.StringUtils;
import io.deephaven.base.verify.Assert;
Expand All @@ -47,6 +28,8 @@
import io.deephaven.engine.updategraph.DynamicNode;
import io.deephaven.engine.util.ColumnFormattingValues;
import io.deephaven.engine.util.systemicmarking.SystemicObject;
import io.deephaven.qst.table.AggregateAllByTable;
import io.deephaven.qst.table.AggregationTable;
import io.deephaven.vector.Vector;
import io.deephaven.engine.updategraph.UpdateGraphProcessor;
import io.deephaven.engine.updategraph.NotificationQueue;
Expand Down Expand Up @@ -458,7 +441,7 @@ public LocalTableMap partitionBy(final boolean dropKeys, final String... keyColu
public Table rollup(Collection<? extends Aggregation> aggregations, boolean includeConstituents,
Selectable... groupByColumns) {
final List<AggregationFactory.AggregationElement> converted =
AggregationFactory.AggregationElement.convert(aggregations, this);
AggregationFactory.AggregationElement.convert(aggregations);
return rollup(new AggregationFactory(converted), includeConstituents, SelectColumn.from(groupByColumns));
}

Expand Down Expand Up @@ -608,42 +591,29 @@ public Table exactJoin(Table table, MatchPair[] columnsToMatch, MatchPair[] colu

@Override
public Table aggAllBy(Key key, Selectable... groupByColumns) {
final List<ColumnName> aggColumns = aggregateColumns(key, groupByColumns);
final Table result = aggColumns.isEmpty() ? aggBy(List.of(), List.of(groupByColumns))
: aggBy(key.aggregation(aggColumns), List.of(groupByColumns));
key.walk(new CopyAttributes(this, result));
return result;
}

private List<ColumnName> aggregateColumns(Key key, Selectable[] groupByColumns) {
final Set<String> doNotAgg = doNotAggregateColumns(key, groupByColumns);
final List<ColumnName> remainingColumns = new ArrayList<>(columns.size() - doNotAgg.size());
for (String columnName : columns.keySet()) {
if (doNotAgg.contains(columnName)) {
continue;
for (ColumnName name : KeyColumns.of(key)) {
if (!columns.containsKey(name.name())) {
throw new IllegalArgumentException("Key references column that does not exist: " + name.name());
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
}
remainingColumns.add(ColumnName.of(columnName));
}
return remainingColumns;
}

private Set<String> doNotAggregateColumns(Key key, Selectable[] groupByColumns) {
final Set<ColumnName> keyRefs = KeyColumnReferences.of(key);
final Set<String> doNotAgg = new HashSet<>(groupByColumns.length + keyRefs.size());
for (Selectable groupByColumn : groupByColumns) {
doNotAgg.add(groupByColumn.newColumn().name());
}
for (ColumnName columnName : keyRefs) {
doNotAgg.add(columnName.name());
}
return doNotAgg;
final List<Selectable> groupByList = Arrays.asList(groupByColumns);
final List<ColumnName> tableColumns =
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
columns.keySet().stream().map(ColumnName::of).collect(Collectors.toList());
final Optional<Aggregation> agg = AggregateAllByTable.singleAggregation(key, groupByList, tableColumns);
final Table result = aggBy(agg.stream().collect(Collectors.toList()), groupByList);
key.walk(new AggAllByCopyAttributes(this, result));
return result;
}

@Override
public Table aggBy(final Collection<? extends Aggregation> aggregations,
final Collection<? extends Selectable> groupByColumns) {
if (aggregations.isEmpty()) {
return selectDistinct(groupByColumns);
}

final List<AggregationFactory.AggregationElement> optimized =
AggregationFactory.AggregationElement.optimizeAndConvert(aggregations, this);
AggregationFactory.AggregationElement.optimizeAndConvert(aggregations);

final List<ColumnName> optimizedOrder = optimized.stream()
.map(AggregationFactory.AggregationElement::getResultPairs)
Expand Down Expand Up @@ -3389,75 +3359,4 @@ public Table wouldMatch(WouldMatchPair... matchers) {
return getResult(new WouldMatchOperation(this, matchers));
}

private static class CopyAttributes implements Key.Visitor {

private final BaseTable parent;
private final Table result;

public CopyAttributes(BaseTable parent, Table result) {
this.parent = Objects.requireNonNull(parent);
this.result = Objects.requireNonNull(result);
}

@Override
public void visit(KeyAbsSum absSum) {}

@Override
public void visit(KeyCountDistinct countDistinct) {}

@Override
public void visit(KeyDistinct distinct) {}

@Override
public void visit(KeyGroup group) {}

@Override
public void visit(KeyAvg avg) {}

@Override
public void visit(KeyFirst first) {
parent.copyAttributes(result, CopyAttributeOperation.FirstBy);
}

@Override
public void visit(KeyLast last) {
parent.copyAttributes(result, CopyAttributeOperation.LastBy);
}

@Override
public void visit(KeyMax max) {}

@Override
public void visit(KeyMedian median) {}

@Override
public void visit(KeyMin min) {}

@Override
public void visit(KeyPct pct) {}

@Override
public void visit(KeySortedFirst sortedFirst) {}

@Override
public void visit(KeySortedLast sortedLast) {}

@Override
public void visit(KeyStd std) {}

@Override
public void visit(KeySum sum) {}

@Override
public void visit(KeyUnique unique) {}

@Override
public void visit(KeyWAvg wAvg) {}

@Override
public void visit(KeyWSum wSum) {}

@Override
public void visit(KeyVar var) {}
}
}
Loading