Skip to content

Commit

Permalink
keyword, numeric terms aggregation
Browse files Browse the repository at this point in the history
Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
  • Loading branch information
sandeshkr419 committed Jan 28, 2025
1 parent 32a88eb commit b54d3bd
Show file tree
Hide file tree
Showing 11 changed files with 926 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introduce Template query ([#16818](https://github.com/opensearch-project/OpenSearch/pull/16818))
- Propagate the sourceIncludes and excludes fields from fetchSourceContext to FieldsVisitor. ([#17080](https://github.com/opensearch-project/OpenSearch/pull/17080))
- [Star Tree] [Search] Resolving Date histogram with metric aggregation using star-tree ([#16674](https://github.com/opensearch-project/OpenSearch/pull/16674))
- [Star Tree] [Search] Resolving keyword & numeric bucket aggregation with metric aggregation using star-tree ([#17165](https://github.com/opensearch-project/OpenSearch/pull/17165))

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.StarTreeBucketCollector;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregatorFactory;
import org.opensearch.search.aggregations.metrics.MetricAggregatorFactory;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -79,15 +80,25 @@ public static StarTreeQueryContext getStarTreeQueryContext(SearchContext context
);

for (AggregatorFactory aggregatorFactory : context.aggregations().factories().getFactories()) {
// first check for aggregation is a metric aggregation
// validation for metric aggregation
if (validateStarTreeMetricSupport(compositeMappedFieldType, aggregatorFactory)) {
continue;
}
if (validateStarTreeMetricSupport(compositeMappedFieldType, aggregatorFactory)) {
continue;
}

// if not a metric aggregation, check for applicable date histogram shape
// validation for date histogram aggregation
if (validateDateHistogramSupport(compositeMappedFieldType, aggregatorFactory)) {
continue;
}

// validation for terms aggregation
if (validateKeywordTermsAggregationSupport(compositeMappedFieldType, aggregatorFactory)) {
continue;
}

// invalid query shape
return null;
}

Expand Down Expand Up @@ -146,6 +157,34 @@ private static boolean validateStarTreeMetricSupport(
return false;
}

private static boolean validateKeywordTermsAggregationSupport(
CompositeDataCubeFieldType compositeIndexFieldInfo,
AggregatorFactory aggregatorFactory
) {
if (!(aggregatorFactory instanceof TermsAggregatorFactory termsAggregatorFactory)
|| aggregatorFactory.getSubFactories().getFactories().length < 1) {
return false;
}

String requestField = termsAggregatorFactory.getField();
Set<String> supportedDimensions = compositeIndexFieldInfo.getDimensions()
.stream()
.map(Dimension::getField)
.collect(Collectors.toSet());

if (!supportedDimensions.contains(requestField)) {
return false;
}

// Validate all sub-factories
for (AggregatorFactory subFactory : aggregatorFactory.getSubFactories().getFactories()) {
if (!validateStarTreeMetricSupport(compositeIndexFieldInfo, subFactory)) {
return false;
}
}
return true;
}

private static boolean validateDateHistogramSupport(
CompositeDataCubeFieldType compositeIndexFieldInfo,
AggregatorFactory aggregatorFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,8 @@ public TermsEnum termsEnum() throws IOException {
public TermsEnum intersect(CompiledAutomaton automaton) throws IOException {
return ((SortedSetDocValues) docIdSetIterator).intersect(automaton);
}

public boolean advanceExact(int target) throws IOException {
return ((SortedSetDocValues) docIdSetIterator).advanceExact(target);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.PriorityQueue;
import org.opensearch.common.SetOnce;
import org.opensearch.common.lease.Releasable;
Expand All @@ -52,6 +54,13 @@
import org.opensearch.common.util.LongHash;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedSetStarTreeValuesIterator;
import org.opensearch.index.mapper.DocCountFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.AggregationExecutionException;
Expand All @@ -64,15 +73,19 @@
import org.opensearch.search.aggregations.InternalOrder;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.StarTreeBucketCollector;
import org.opensearch.search.aggregations.StarTreePreComputeCollector;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes;
import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.startree.StarTreeFilter;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongPredicate;
Expand All @@ -86,7 +99,7 @@
*
* @opensearch.internal
*/
public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggregator {
public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggregator implements StarTreePreComputeCollector {
protected final ResultStrategy<?, ?, ?> resultStrategy;
protected final ValuesSource.Bytes.WithOrdinals valuesSource;

Expand All @@ -98,6 +111,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
private final SetOnce<SortedSetDocValues> dvs = new SetOnce<>();
protected int segmentsWithSingleValuedOrds = 0;
protected int segmentsWithMultiValuedOrds = 0;
LongUnaryOperator globalOperator;

/**
* Lookup global ordinals
Expand Down Expand Up @@ -229,6 +243,14 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
SortedSetDocValues globalOrds = valuesSource.globalOrdinalsValues(ctx);
collectionStrategy.globalOrdsReady(globalOrds);
globalOperator = valuesSource.globalOrdinalsMapping(ctx);

CompositeIndexFieldInfo supportedStarTree = StarTreeQueryHelper.getSupportedStarTree(this.context);
if (supportedStarTree != null) {
if (preComputeWithStarTree(ctx, supportedStarTree) == true) {
throw new CollectionTerminatedException();
}
}

if (collectionStrategy instanceof DenseGlobalOrds
&& this.resultStrategy instanceof StandardTermsResults
Expand Down Expand Up @@ -314,6 +336,79 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
});
}

public StarTreeBucketCollector getStarTreeBucketCollector(
LeafReaderContext ctx,
CompositeIndexFieldInfo starTree,
StarTreeBucketCollector parent
) throws IOException {
assert parent == null;
StarTreeValues starTreeValues = StarTreeQueryHelper.getStarTreeValues(ctx, starTree);
return new StarTreeBucketCollector(
starTreeValues,
StarTreeFilter.getStarTreeResult(starTreeValues, context.getStarTreeQueryContext().getQueryMap(), Set.of(fieldName))
) {
@Override
public void setSubCollectors() throws IOException {
for (Aggregator aggregator : subAggregators) {
this.subCollectors.add(((StarTreePreComputeCollector) aggregator).getStarTreeBucketCollector(ctx, starTree, this));
}
}

SortedSetStarTreeValuesIterator valuesIterator = (SortedSetStarTreeValuesIterator) starTreeValues.getDimensionValuesIterator(
fieldName
);

String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(
starTree.getField(),
"_doc_count",
MetricStat.DOC_COUNT.getTypeName()
);
SortedNumericStarTreeValuesIterator docCountsIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
.getMetricValuesIterator(metricName);

@Override
public void collectStarTreeEntry(int starTreeEntry, long owningBucketOrd) throws IOException {

if (valuesIterator.advanceExact(starTreeEntry) == false) {
return;
}

for (int i = 0, count = valuesIterator.docValueCount(); i < count; i++) {
long dimensionValue = valuesIterator.nextOrd();
long ord = globalOperator.applyAsLong(dimensionValue);

if (docCountsIterator.advanceExact(starTreeEntry)) {
long metricValue = docCountsIterator.nextValue();

long bucketOrd = collectionStrategy.globalOrdToBucketOrd(0, ord);
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
} else {
grow(bucketOrd + 1);
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
}
}
}
}
};
}

private boolean preComputeWithStarTree(LeafReaderContext ctx, CompositeIndexFieldInfo supportedStarTree) throws IOException {
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, supportedStarTree, null);
FixedBitSet matchingDocsBitSet = starTreeBucketCollector.getMatchingDocsBitSet();

int numBits = matchingDocsBitSet.length();
if (numBits > 0) {
for (int bit = matchingDocsBitSet.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits)
? matchingDocsBitSet.nextSetBit(bit + 1)
: DocIdSetIterator.NO_MORE_DOCS) {
starTreeBucketCollector.collectStarTreeEntry(bit, 0);
}
}
return true;
}

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return resultStrategy.buildAggregations(owningBucketOrds);
Expand Down
Loading

0 comments on commit b54d3bd

Please sign in to comment.