diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java index fb8c1c2c719bd..f8b1f91e516f9 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java @@ -1109,6 +1109,70 @@ public void testContinuousDateNanos() throws Exception { deleteIndex(indexName); } + public void testPivotWithPercentile() throws Exception { + String transformId = "percentile_pivot"; + String transformIndex = "percentile_pivot_reviews"; + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex); + final Request createTransformRequest = createRequestWithAuth( + "PUT", + getTransformEndpoint() + transformId, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS + ); + String config = "{" + + " \"source\": {\"index\":\"" + + REVIEWS_INDEX_NAME + + "\"}," + + " \"dest\": {\"index\":\"" + + transformIndex + + "\"}," + + " \"frequency\": \"1s\"," + + " \"pivot\": {" + + " \"group_by\": {" + + " \"reviewer\": {" + + " \"terms\": {" + + " \"field\": \"user_id\"" + + " } } }," + + " \"aggregations\": {" + + " \"avg_rating\": {" + + " \"avg\": {" + + " \"field\": \"stars\"" + + " } }," + + " \"p\": {" + + " \"percentiles\" : {" + + " \"field\": \"stars\", " + + " \"percents\": [5, 50, 90, 99.9]" + + " }" + + " } } }" + + "}"; + createTransformRequest.setJsonEntity(config); + Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); + assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + startAndWaitForTransform(transformId, transformIndex); + assertTrue(indexExists(transformIndex)); + // get and check some users + assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_11", 3.846153846); + assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918); + + Map searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4"); + assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); + Number actual = (Number) ((List) XContentMapValues.extractValue("hits.hits._source.p.5", searchResult)).get(0); + assertEquals(1, actual.longValue()); + actual = (Number) ((List) XContentMapValues.extractValue("hits.hits._source.p.50", searchResult)).get(0); + assertEquals(5, actual.longValue()); + actual = (Number) ((List) XContentMapValues.extractValue("hits.hits._source.p.99_9", searchResult)).get(0); + assertEquals(5, actual.longValue()); + + searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_11"); + assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); + actual = (Number) ((List) XContentMapValues.extractValue("hits.hits._source.p.5", searchResult)).get(0); + assertEquals(1, actual.longValue()); + actual = (Number) ((List) XContentMapValues.extractValue("hits.hits._source.p.50", searchResult)).get(0); + assertEquals(4, actual.longValue()); + actual = (Number) ((List) XContentMapValues.extractValue("hits.hits._source.p.99_9", searchResult)).get(0); + assertEquals(5, actual.longValue()); + } + private void createDateNanoIndex(String indexName, int numDocs) throws IOException { // create mapping try (XContentBuilder builder = jsonBuilder()) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java index 25735602a2977..5b5813400807a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java @@ -20,11 +20,14 @@ import org.elasticsearch.search.aggregations.metrics.GeoBounds; import org.elasticsearch.search.aggregations.metrics.GeoCentroid; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue; +import org.elasticsearch.search.aggregations.metrics.Percentile; +import org.elasticsearch.search.aggregations.metrics.Percentiles; import org.elasticsearch.search.aggregations.metrics.ScriptedMetric; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig; import org.elasticsearch.xpack.transform.transforms.IDGenerator; +import org.elasticsearch.xpack.transform.utils.OutputFieldNameConverter; import java.util.Arrays; import java.util.Collection; @@ -46,6 +49,7 @@ public final class AggregationResultUtils { tempMap.put(ScriptedMetric.class.getName(), new ScriptedMetricAggExtractor()); tempMap.put(GeoCentroid.class.getName(), new GeoCentroidAggExtractor()); tempMap.put(GeoBounds.class.getName(), new GeoBoundsAggExtractor()); + tempMap.put(Percentiles.class.getName(), new PercentilesAggExtractor()); TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap); } @@ -111,6 +115,8 @@ static AggValueExtractor getExtractor(Aggregation aggregation) { return TYPE_VALUE_EXTRACTOR_MAP.get(GeoCentroid.class.getName()); } else if (aggregation instanceof GeoBounds) { return TYPE_VALUE_EXTRACTOR_MAP.get(GeoBounds.class.getName()); + } else if (aggregation instanceof Percentiles) { + return TYPE_VALUE_EXTRACTOR_MAP.get(Percentiles.class.getName()); } else { // Execution should never reach this point! // Creating transforms with unsupported aggregations shall not be possible @@ -190,6 +196,21 @@ public Object value(Aggregation agg, String fieldType) { } } + static class PercentilesAggExtractor implements AggValueExtractor { + @Override + public Object value(Aggregation agg, String fieldType) { + Percentiles aggregation = (Percentiles) agg; + + HashMap percentiles = new HashMap<>(); + + for (Percentile p : aggregation) { + percentiles.put(OutputFieldNameConverter.fromDouble(p.getPercent()), p.getValue()); + } + + return percentiles; + } + } + static class ScriptedMetricAggExtractor implements AggValueExtractor { @Override public Object value(Aggregation agg, String fieldType) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java index effa3eb1e0e5f..79ad791986fae 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java @@ -6,7 +6,14 @@ package org.elasticsearch.xpack.transform.transforms.pivot; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder; +import org.elasticsearch.xpack.transform.utils.OutputFieldNameConverter; + +import java.util.Arrays; +import java.util.Collections; import java.util.Locale; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -17,6 +24,7 @@ public final class Aggregations { private static final String DYNAMIC = "_dynamic"; // the field mapping should be determined explicitly from the source field mapping if possible. private static final String SOURCE = "_source"; + private Aggregations() {} /** @@ -40,7 +48,8 @@ enum AggregationType { SCRIPTED_METRIC("scripted_metric", DYNAMIC), WEIGHTED_AVG("weighted_avg", DYNAMIC), BUCKET_SELECTOR("bucket_selector", DYNAMIC), - BUCKET_SCRIPT("bucket_script", DYNAMIC); + BUCKET_SCRIPT("bucket_script", DYNAMIC), + PERCENTILES("percentiles", "double"); private final String aggregationType; private final String targetMapping; @@ -59,8 +68,9 @@ public String getTargetMapping() { } } - private static Set aggregationSupported = Stream.of(AggregationType.values()).map(AggregationType::name) - .collect(Collectors.toSet()); + private static Set aggregationSupported = Stream.of(AggregationType.values()) + .map(AggregationType::name) + .collect(Collectors.toSet()); public static boolean isSupportedByTransform(String aggregationType) { return aggregationSupported.contains(aggregationType.toUpperCase(Locale.ROOT)); @@ -74,4 +84,19 @@ public static String resolveTargetMapping(String aggregationType, String sourceT AggregationType agg = AggregationType.valueOf(aggregationType.toUpperCase(Locale.ROOT)); return agg.getTargetMapping().equals(SOURCE) ? sourceType : agg.getTargetMapping(); } + + public static Map getAggregationOutputTypes(AggregationBuilder agg) { + if (agg instanceof PercentilesAggregationBuilder) { + PercentilesAggregationBuilder percentilesAgg = (PercentilesAggregationBuilder) agg; + + // note: eclipse does not like p -> agg.getType() + // the merge function (p1, p2) -> p1 ignores duplicates + return Arrays.stream(percentilesAgg.percentiles()) + .mapToObj(OutputFieldNameConverter::fromDouble) + .collect(Collectors.toMap(p -> agg.getName() + "." + p, p -> { return agg.getType(); }, (p1, p2) -> p1)); + } + // catch all + return Collections.singletonMap(agg.getName(), agg.getType()); + } + } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java index b9b3d121e5aea..44998d45b60d3 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java @@ -66,7 +66,7 @@ public static void deduceMappings( ) { // collects the fieldnames used as source for aggregations Map aggregationSourceFieldNames = new HashMap<>(); - // collects the aggregation types by source name + // collects the aggregation types by output field name Map aggregationTypes = new HashMap<>(); // collects the fieldnames and target fieldnames used for grouping Map fieldNamesForGrouping = new HashMap<>(); @@ -79,9 +79,9 @@ public static void deduceMappings( if (agg instanceof ValuesSourceAggregationBuilder) { ValuesSourceAggregationBuilder valueSourceAggregation = (ValuesSourceAggregationBuilder) agg; aggregationSourceFieldNames.put(valueSourceAggregation.getName(), valueSourceAggregation.field()); - aggregationTypes.put(valueSourceAggregation.getName(), valueSourceAggregation.getType()); + aggregationTypes.putAll(Aggregations.getAggregationOutputTypes(valueSourceAggregation)); } else if (agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) { - aggregationTypes.put(agg.getName(), agg.getType()); + aggregationTypes.putAll(Aggregations.getAggregationOutputTypes(agg)); } else { // execution should not reach this point listener.onFailure(new RuntimeException("Unsupported aggregation type [" + agg.getType() + "]")); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/OutputFieldNameConverter.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/OutputFieldNameConverter.java new file mode 100644 index 0000000000000..3f5f6e5ae9056 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/OutputFieldNameConverter.java @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.utils; + +public final class OutputFieldNameConverter { + + private OutputFieldNameConverter() {} + + public static String fromDouble(double d) { + if (d == (long) d) return String.valueOf((long) d); + else return String.valueOf(d).replace('.', '_'); + } +} diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationsTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationsTests.java index ee66d20859f4f..d0cd0adae42ee 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationsTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationsTests.java @@ -61,5 +61,9 @@ public void testResolveTargetMapping() { // weighted_avg assertEquals("_dynamic", Aggregations.resolveTargetMapping("weighted_avg", null)); assertEquals("_dynamic", Aggregations.resolveTargetMapping("weighted_avg", "double")); + + // percentile + assertEquals("double", Aggregations.resolveTargetMapping("percentiles", null)); + assertEquals("double", Aggregations.resolveTargetMapping("percentiles", "int")); } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/OutputFieldNameConverterTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/OutputFieldNameConverterTests.java new file mode 100644 index 0000000000000..9cd20eb524790 --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/OutputFieldNameConverterTests.java @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.utils; + +import org.elasticsearch.test.ESTestCase; + +public class OutputFieldNameConverterTests extends ESTestCase { + + public void testFromDouble() { + assertEquals("42_42", OutputFieldNameConverter.fromDouble(42.42)); + // remove '.0' if possible + assertEquals("42", OutputFieldNameConverter.fromDouble(42.0)); + // digit limit + assertEquals("42_42424242424242", OutputFieldNameConverter.fromDouble(42.4242424242424242424242424242424242)); + // scientific notation keep the '.0' + assertEquals("1_0E-100", OutputFieldNameConverter.fromDouble(1.0E-100)); + // scientific with digits + assertEquals("1_12345E-100", OutputFieldNameConverter.fromDouble(1.12345E-100)); + // NaN (OutputFieldNameConverter clients should disallow that) + assertEquals("NaN", OutputFieldNameConverter.fromDouble(Double.NaN)); + // infinity + assertEquals("-Infinity", OutputFieldNameConverter.fromDouble(Double.NEGATIVE_INFINITY)); + } +}