Skip to content

Commit

Permalink
make transform ready for multi value aggregations and add support for…
Browse files Browse the repository at this point in the history
… percentile
  • Loading branch information
Hendrik Muhs committed Feb 3, 2020
1 parent ff00052 commit 481b219
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,70 @@ public void testContinuousDateNanos() throws Exception {
deleteIndex(indexName);
}

public void testPivotWithPercentile() throws Exception {
String transformId = "percentile_pivot";
String transformIndex = "percentile_pivot_reviews";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},"
+ " \"frequency\": \"1s\","
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } },"
+ " \"p\": {"
+ " \"percentiles\" : {"
+ " \"field\": \"stars\", "
+ " \"percents\": [5, 50, 90, 99.9]"
+ " }"
+ " } } }"
+ "}";
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));

startAndWaitForTransform(transformId, transformIndex);
assertTrue(indexExists(transformIndex));
// get and check some users
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_11", 3.846153846);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918);

Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.5", searchResult)).get(0);
assertEquals(1, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.50", searchResult)).get(0);
assertEquals(5, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.99_9", searchResult)).get(0);
assertEquals(5, actual.longValue());

searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_11");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.5", searchResult)).get(0);
assertEquals(1, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.50", searchResult)).get(0);
assertEquals(4, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.99_9", searchResult)).get(0);
assertEquals(5, actual.longValue());
}

private void createDateNanoIndex(String indexName, int numDocs) throws IOException {
// create mapping
try (XContentBuilder builder = jsonBuilder()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import org.elasticsearch.search.aggregations.metrics.GeoBounds;
import org.elasticsearch.search.aggregations.metrics.GeoCentroid;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
import org.elasticsearch.search.aggregations.metrics.Percentile;
import org.elasticsearch.search.aggregations.metrics.Percentiles;
import org.elasticsearch.search.aggregations.metrics.ScriptedMetric;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig;
import org.elasticsearch.xpack.transform.transforms.IDGenerator;
import org.elasticsearch.xpack.transform.utils.OutputFieldNameConverter;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -46,6 +49,7 @@ public final class AggregationResultUtils {
tempMap.put(ScriptedMetric.class.getName(), new ScriptedMetricAggExtractor());
tempMap.put(GeoCentroid.class.getName(), new GeoCentroidAggExtractor());
tempMap.put(GeoBounds.class.getName(), new GeoBoundsAggExtractor());
tempMap.put(Percentiles.class.getName(), new PercentilesAggExtractor());
TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
}

Expand Down Expand Up @@ -111,6 +115,8 @@ static AggValueExtractor getExtractor(Aggregation aggregation) {
return TYPE_VALUE_EXTRACTOR_MAP.get(GeoCentroid.class.getName());
} else if (aggregation instanceof GeoBounds) {
return TYPE_VALUE_EXTRACTOR_MAP.get(GeoBounds.class.getName());
} else if (aggregation instanceof Percentiles) {
return TYPE_VALUE_EXTRACTOR_MAP.get(Percentiles.class.getName());
} else {
// Execution should never reach this point!
// Creating transforms with unsupported aggregations shall not be possible
Expand Down Expand Up @@ -190,6 +196,21 @@ public Object value(Aggregation agg, String fieldType) {
}
}

static class PercentilesAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, String fieldType) {
Percentiles aggregation = (Percentiles) agg;

HashMap<String, Double> percentiles = new HashMap<>();

for (Percentile p : aggregation) {
percentiles.put(OutputFieldNameConverter.fromDouble(p.getPercent()), p.getValue());
}

return percentiles;
}
}

static class ScriptedMetricAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, String fieldType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@

package org.elasticsearch.xpack.transform.transforms.pivot;

import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
import org.elasticsearch.xpack.transform.utils.OutputFieldNameConverter;

import java.util.Arrays;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -17,6 +24,7 @@ public final class Aggregations {
private static final String DYNAMIC = "_dynamic";
// the field mapping should be determined explicitly from the source field mapping if possible.
private static final String SOURCE = "_source";

private Aggregations() {}

/**
Expand All @@ -40,7 +48,8 @@ enum AggregationType {
SCRIPTED_METRIC("scripted_metric", DYNAMIC),
WEIGHTED_AVG("weighted_avg", DYNAMIC),
BUCKET_SELECTOR("bucket_selector", DYNAMIC),
BUCKET_SCRIPT("bucket_script", DYNAMIC);
BUCKET_SCRIPT("bucket_script", DYNAMIC),
PERCENTILES("percentiles", "double");

private final String aggregationType;
private final String targetMapping;
Expand All @@ -59,8 +68,9 @@ public String getTargetMapping() {
}
}

private static Set<String> aggregationSupported = Stream.of(AggregationType.values()).map(AggregationType::name)
.collect(Collectors.toSet());
private static Set<String> aggregationSupported = Stream.of(AggregationType.values())
.map(AggregationType::name)
.collect(Collectors.toSet());

public static boolean isSupportedByTransform(String aggregationType) {
return aggregationSupported.contains(aggregationType.toUpperCase(Locale.ROOT));
Expand All @@ -74,4 +84,19 @@ public static String resolveTargetMapping(String aggregationType, String sourceT
AggregationType agg = AggregationType.valueOf(aggregationType.toUpperCase(Locale.ROOT));
return agg.getTargetMapping().equals(SOURCE) ? sourceType : agg.getTargetMapping();
}

public static Map<String, String> getAggregationOutputTypes(AggregationBuilder agg) {
if (agg instanceof PercentilesAggregationBuilder) {
PercentilesAggregationBuilder percentilesAgg = (PercentilesAggregationBuilder) agg;

// note: eclipse does not like p -> agg.getType()
// the merge function (p1, p2) -> p1 ignores duplicates
return Arrays.stream(percentilesAgg.percentiles())
.mapToObj(OutputFieldNameConverter::fromDouble)
.collect(Collectors.toMap(p -> agg.getName() + "." + p, p -> { return agg.getType(); }, (p1, p2) -> p1));
}
// catch all
return Collections.singletonMap(agg.getName(), agg.getType());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static void deduceMappings(
) {
// collects the fieldnames used as source for aggregations
Map<String, String> aggregationSourceFieldNames = new HashMap<>();
// collects the aggregation types by source name
// collects the aggregation types by output field name
Map<String, String> aggregationTypes = new HashMap<>();
// collects the fieldnames and target fieldnames used for grouping
Map<String, String> fieldNamesForGrouping = new HashMap<>();
Expand All @@ -79,9 +79,9 @@ public static void deduceMappings(
if (agg instanceof ValuesSourceAggregationBuilder) {
ValuesSourceAggregationBuilder<?, ?> valueSourceAggregation = (ValuesSourceAggregationBuilder<?, ?>) agg;
aggregationSourceFieldNames.put(valueSourceAggregation.getName(), valueSourceAggregation.field());
aggregationTypes.put(valueSourceAggregation.getName(), valueSourceAggregation.getType());
aggregationTypes.putAll(Aggregations.getAggregationOutputTypes(valueSourceAggregation));
} else if (agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) {
aggregationTypes.put(agg.getName(), agg.getType());
aggregationTypes.putAll(Aggregations.getAggregationOutputTypes(agg));
} else {
// execution should not reach this point
listener.onFailure(new RuntimeException("Unsupported aggregation type [" + agg.getType() + "]"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.transform.utils;

public final class OutputFieldNameConverter {

private OutputFieldNameConverter() {}

public static String fromDouble(double d) {
if (d == (long) d) return String.valueOf((long) d);
else return String.valueOf(d).replace('.', '_');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,9 @@ public void testResolveTargetMapping() {
// weighted_avg
assertEquals("_dynamic", Aggregations.resolveTargetMapping("weighted_avg", null));
assertEquals("_dynamic", Aggregations.resolveTargetMapping("weighted_avg", "double"));

// percentile
assertEquals("double", Aggregations.resolveTargetMapping("percentiles", null));
assertEquals("double", Aggregations.resolveTargetMapping("percentiles", "int"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.transform.utils;

import org.elasticsearch.test.ESTestCase;

public class OutputFieldNameConverterTests extends ESTestCase {

public void testFromDouble() {
assertEquals("42_42", OutputFieldNameConverter.fromDouble(42.42));
// remove '.0' if possible
assertEquals("42", OutputFieldNameConverter.fromDouble(42.0));
// digit limit
assertEquals("42_42424242424242", OutputFieldNameConverter.fromDouble(42.4242424242424242424242424242424242));
// scientific notation keep the '.0'
assertEquals("1_0E-100", OutputFieldNameConverter.fromDouble(1.0E-100));
// scientific with digits
assertEquals("1_12345E-100", OutputFieldNameConverter.fromDouble(1.12345E-100));
// NaN (OutputFieldNameConverter clients should disallow that)
assertEquals("NaN", OutputFieldNameConverter.fromDouble(Double.NaN));
// infinity
assertEquals("-Infinity", OutputFieldNameConverter.fromDouble(Double.NEGATIVE_INFINITY));
}
}

0 comments on commit 481b219

Please sign in to comment.