Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Star Tree] [Search] Resolving Date histogram with metric aggregation using star-tree #17163

Merged
merged 1 commit into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added new Setting property UnmodifiableOnRestore to prevent updating settings on restore snapshot ([#16957](https://github.com/opensearch-project/OpenSearch/pull/16957))
- Introduce Template query ([#16818](https://github.com/opensearch-project/OpenSearch/pull/16818))
- Propagate the sourceIncludes and excludes fields from fetchSourceContext to FieldsVisitor. ([#17080](https://github.com/opensearch-project/OpenSearch/pull/17080))
- [Star Tree] [Search] Resolving Date histogram with metric aggregation using star-tree ([#16674](https://github.com/opensearch-project/OpenSearch/pull/16674))

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand Down
9 changes: 9 additions & 0 deletions server/src/main/java/org/opensearch/common/Rounding.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,10 @@ public void writeTo(StreamOutput out) throws IOException {

public abstract byte id();

public DateTimeUnit unit() {
return null;
}

/**
* A strategy for rounding milliseconds since epoch.
*
Expand Down Expand Up @@ -525,6 +529,11 @@ public byte id() {
return ID;
}

@Override
public DateTimeUnit unit() {
return unit;
}

private LocalDateTime truncateLocalDateTime(LocalDateTime localDateTime) {
switch (unit) {
case SECOND_OF_MINUTE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,24 @@ public int compare(DateTimeUnitRounding unit1, DateTimeUnitRounding unit2) {
}
}

/**
* Returns the closest valid calendar interval to be used for the search interval
*/
public DateTimeUnitRounding findClosestValidInterval(DateTimeUnitRounding searchInterval) {
DateTimeUnitComparator comparator = new DateTimeUnitComparator();
DateTimeUnitRounding closestValidInterval = null;

// Find the largest interval that is less than or equal to search interval
for (DateTimeUnitRounding interval : sortedCalendarIntervals) {
if (comparator.compare(interval, searchInterval) <= 0) {
closestValidInterval = interval;
} else {
break;
}
}
return closestValidInterval;
}

/**
* Returns a sorted list of dateTimeUnits based on the DateTimeUnitComparator
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@
import org.opensearch.common.lucene.Lucene;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.codec.composite.CompositeIndexReader;
import org.opensearch.index.compositeindex.datacube.DateDimension;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.utils.date.DateTimeUnitAdapter;
import org.opensearch.index.compositeindex.datacube.startree.utils.date.DateTimeUnitRounding;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
import org.opensearch.index.mapper.CompositeDataCubeFieldType;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.aggregations.AggregatorFactory;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.StarTreeBucketCollector;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
import org.opensearch.search.aggregations.metrics.MetricAggregatorFactory;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.builder.SearchSourceBuilder;
Expand All @@ -37,9 +41,10 @@
import org.opensearch.search.startree.StarTreeQueryContext;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -74,10 +79,16 @@ public static StarTreeQueryContext getStarTreeQueryContext(SearchContext context
);

for (AggregatorFactory aggregatorFactory : context.aggregations().factories().getFactories()) {
MetricStat metricStat = validateStarTreeMetricSupport(compositeMappedFieldType, aggregatorFactory);
if (metricStat == null) {
return null;
// first check for aggregation is a metric aggregation
if (validateStarTreeMetricSupport(compositeMappedFieldType, aggregatorFactory)) {
continue;
}

// if not a metric aggregation, check for applicable date histogram shape
if (validateDateHistogramSupport(compositeMappedFieldType, aggregatorFactory)) {
continue;
}
return null;
}

// need to cache star tree values only for multiple aggregations
Expand All @@ -100,63 +111,86 @@ private static StarTreeQueryContext tryCreateStarTreeQueryContext(
if (queryBuilder == null || queryBuilder instanceof MatchAllQueryBuilder) {
queryMap = null;
} else if (queryBuilder instanceof TermQueryBuilder) {
TermQueryBuilder termQueryBuilder = (TermQueryBuilder) queryBuilder;
// TODO: Add support for keyword fields
if (compositeFieldType.getDimensions().stream().anyMatch(d -> d.getDocValuesType() != DocValuesType.SORTED_NUMERIC)) {
// return null for non-numeric fields
return null;
}

List<String> supportedDimensions = compositeFieldType.getDimensions()
Dimension matchedDimension = compositeFieldType.getDimensions()
.stream()
.map(Dimension::getField)
.collect(Collectors.toList());
queryMap = getStarTreePredicates(queryBuilder, supportedDimensions);
if (queryMap == null) {
.filter(d -> (d.getField().equals(termQueryBuilder.fieldName()) && d.getDocValuesType() == DocValuesType.SORTED_NUMERIC))
.findFirst()
.orElse(null);
if (matchedDimension == null) {
return null;
}
queryMap = Map.of(termQueryBuilder.fieldName(), Long.parseLong(termQueryBuilder.value().toString()));
} else {
return null;
}
return new StarTreeQueryContext(compositeIndexFieldInfo, queryMap, cacheStarTreeValuesSize);
}

/**
* Parse query body to star-tree predicates
* @param queryBuilder to match star-tree supported query shape
* @return predicates to match
*/
private static Map<String, Long> getStarTreePredicates(QueryBuilder queryBuilder, List<String> supportedDimensions) {
TermQueryBuilder tq = (TermQueryBuilder) queryBuilder;
String field = tq.fieldName();
if (!supportedDimensions.contains(field)) {
return null;
}
long inputQueryVal = Long.parseLong(tq.value().toString());

// Create a map with the field and the value
Map<String, Long> predicateMap = new HashMap<>();
predicateMap.put(field, inputQueryVal);
return predicateMap;
}

private static MetricStat validateStarTreeMetricSupport(
private static boolean validateStarTreeMetricSupport(
CompositeDataCubeFieldType compositeIndexFieldInfo,
AggregatorFactory aggregatorFactory
) {
if (aggregatorFactory instanceof MetricAggregatorFactory && aggregatorFactory.getSubFactories().getFactories().length == 0) {
MetricAggregatorFactory metricAggregatorFactory = (MetricAggregatorFactory) aggregatorFactory;
String field;
Map<String, List<MetricStat>> supportedMetrics = compositeIndexFieldInfo.getMetrics()
.stream()
.collect(Collectors.toMap(Metric::getField, Metric::getMetrics));

MetricStat metricStat = ((MetricAggregatorFactory) aggregatorFactory).getMetricStat();
field = ((MetricAggregatorFactory) aggregatorFactory).getField();
MetricStat metricStat = metricAggregatorFactory.getMetricStat();
field = metricAggregatorFactory.getField();

return supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(metricStat);
}
return false;
}

private static boolean validateDateHistogramSupport(
CompositeDataCubeFieldType compositeIndexFieldInfo,
AggregatorFactory aggregatorFactory
) {
if (!(aggregatorFactory instanceof DateHistogramAggregatorFactory)
|| aggregatorFactory.getSubFactories().getFactories().length < 1) {
return false;
}
DateHistogramAggregatorFactory dateHistogramAggregatorFactory = (DateHistogramAggregatorFactory) aggregatorFactory;

// Find the DateDimension in the dimensions list
DateDimension starTreeDateDimension = null;
for (Dimension dimension : compositeIndexFieldInfo.getDimensions()) {
if (dimension instanceof DateDimension) {
starTreeDateDimension = (DateDimension) dimension;
break;
}
}

// If no DateDimension is found, validation fails
if (starTreeDateDimension == null) {
return false;
}

// Ensure the rounding is not null
if (dateHistogramAggregatorFactory.getRounding() == null) {
return false;
}

// Find the closest valid interval in the DateTimeUnitRounding class associated with star tree
DateTimeUnitRounding rounding = starTreeDateDimension.findClosestValidInterval(
new DateTimeUnitAdapter(dateHistogramAggregatorFactory.getRounding())
);
if (rounding == null) {
return false;
}

if (field != null && supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(metricStat)) {
return metricStat;
// Validate all sub-factories
for (AggregatorFactory subFactory : aggregatorFactory.getSubFactories().getFactories()) {
if (!validateStarTreeMetricSupport(compositeIndexFieldInfo, subFactory)) {
return false;
}
}
return null;
return true;
}

public static CompositeIndexFieldInfo getSupportedStarTree(SearchContext context) {
Expand Down Expand Up @@ -222,11 +256,37 @@ public static LeafBucketCollector getStarTreeLeafCollector(
// Call the final consumer after processing all entries
finalConsumer.run();

// Return a LeafBucketCollector that terminates collection
return new LeafBucketCollectorBase(sub, valuesSource.doubleValues(ctx)) {
// Terminate after pre-computing aggregation
throw new CollectionTerminatedException();
}

public static StarTreeBucketCollector getStarTreeBucketMetricCollector(
CompositeIndexFieldInfo starTree,
String metric,
ValuesSource.Numeric valuesSource,
StarTreeBucketCollector parentCollector,
Consumer<Long> growArrays,
BiConsumer<Long, Long> updateBucket
) throws IOException {
assert parentCollector != null;
return new StarTreeBucketCollector(parentCollector) {
String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(
starTree.getField(),
((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(),
metric
);
SortedNumericStarTreeValuesIterator metricValuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
.getMetricValuesIterator(metricName);

@Override
public void collect(int doc, long bucket) {
throw new CollectionTerminatedException();
public void collectStarTreeEntry(int starTreeEntryBit, long bucket) throws IOException {
growArrays.accept(bucket);
// Advance the valuesIterator to the current bit
if (!metricValuesIterator.advanceExact(starTreeEntryBit)) {
return; // Skip if no entries for this document
}
long metricValue = metricValuesIterator.nextValue();
updateBucket.accept(bucket, metricValue);
}
};
}
Expand All @@ -240,7 +300,7 @@ public static FixedBitSet getStarTreeFilteredValues(SearchContext context, LeafR
throws IOException {
FixedBitSet result = context.getStarTreeQueryContext().getStarTreeValues(ctx);
if (result == null) {
result = StarTreeFilter.getStarTreeResult(starTreeValues, context.getStarTreeQueryContext().getQueryMap());
result = StarTreeFilter.getStarTreeResult(starTreeValues, context.getStarTreeQueryContext().getQueryMap(), Set.of());
context.getStarTreeQueryContext().setStarTreeValues(ctx, result);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations;

import org.apache.lucene.util.FixedBitSet;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Collector for star tree aggregation
* This abstract class exposes utilities to help avoid traversing star-tree multiple times and
* collect relevant metrics across nested aggregations in a single traversal
* @opensearch.internal
*/
@ExperimentalApi
public abstract class StarTreeBucketCollector {

protected final StarTreeValues starTreeValues;
protected final FixedBitSet matchingDocsBitSet;
protected final List<StarTreeBucketCollector> subCollectors = new ArrayList<>();

public StarTreeBucketCollector(StarTreeValues starTreeValues, FixedBitSet matchingDocsBitSet) throws IOException {
this.starTreeValues = starTreeValues;
this.matchingDocsBitSet = matchingDocsBitSet;
this.setSubCollectors();
}

public StarTreeBucketCollector(StarTreeBucketCollector parent) throws IOException {
this.starTreeValues = parent.getStarTreeValues();
this.matchingDocsBitSet = parent.getMatchingDocsBitSet();
this.setSubCollectors();
}

/**
* Sets the sub-collectors to track nested aggregators
*/
public void setSubCollectors() throws IOException {};

/**
* Returns a list of sub-collectors to track nested aggregators
*/
public List<StarTreeBucketCollector> getSubCollectors() {
return subCollectors;
}

/**
* Returns the tree values to iterate
*/
public StarTreeValues getStarTreeValues() {
return starTreeValues;
}

/**
* Returns the matching docs bitset to iterate upon the star-tree values based on search query
*/
public FixedBitSet getMatchingDocsBitSet() {
return matchingDocsBitSet;
}

/**
* Collects the star tree entry and bucket ordinal to update
* The method implementation should identify the metrics to collect from that star-tree entry to the specified bucket
*/
public abstract void collectStarTreeEntry(int starTreeEntry, long bucket) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations;

import org.apache.lucene.index.LeafReaderContext;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;

import java.io.IOException;

/**
* This interface is used to pre-compute the star tree bucket collector for each segment/leaf.
* It is utilized by parent aggregation to retrieve a StarTreeBucketCollector which can be used to
* pre-compute the associated aggregation along with its parent pre-computation using star-tree
*
* @opensearch.internal
*/
public interface StarTreePreComputeCollector {
/**
* Get the star tree bucket collector for the specified segment/leaf
*/
StarTreeBucketCollector getStarTreeBucketCollector(
LeafReaderContext ctx,
CompositeIndexFieldInfo starTree,
StarTreeBucketCollector parentCollector
) throws IOException;
}
Loading
Loading