From a9a036ddae736175716eb7b9745fec5a52de1573 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 1 Nov 2023 18:15:30 +0000 Subject: [PATCH] Transform pipeline aggr test (#1027) * tester code: pipeline aggr. transform job Signed-off-by: n-dohrmann * made test case for pipeline aggregator in transform job Signed-off-by: n-dohrmann * removed unnec. test lines Signed-off-by: n-dohrmann * re-added method call on Transform obj Signed-off-by: n-dohrmann --------- Signed-off-by: n-dohrmann (cherry picked from commit 4eaf3650a49990768a7bb1ce27fceeacac147587) Signed-off-by: github-actions[bot] --- .../transform/TransformRunnerIT.kt | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index b21fc3cc2..3f4dfdcdb 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -27,9 +27,11 @@ import org.opensearch.script.ScriptType import org.opensearch.search.aggregations.AggregationBuilders import org.opensearch.search.aggregations.AggregatorFactories import org.opensearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder +import org.opensearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder import java.lang.Integer.min import java.time.Instant import java.time.temporal.ChronoUnit +import kotlin.test.assertFailsWith class TransformRunnerIT : TransformRestTestCase() { @@ -688,6 +690,42 @@ class TransformRunnerIT : TransformRestTestCase() { assertTrue("Expected failure message to be present", !metadata.failureReason.isNullOrBlank()) } + fun `test transform with invalid pipeline aggregation triggering search failure`() { + assertFailsWith(IllegalArgumentException::class, "Bucket-script aggregation must fail!") { + validateSourceIndex("transform-source-index") + + val aggregatorFactories = AggregatorFactories.builder() + aggregatorFactories.addPipelineAggregator( + BucketScriptPipelineAggregationBuilder( + "test_pipeline_aggregation", + Script("1") + ) + ) + + val transform = Transform( + id = "id_17", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform", + metadataId = null, + sourceIndex = "transform-source-index", + targetIndex = "transform-target-index", + roles = emptyList(), + pageSize = 1, + groups = listOf( + Terms(sourceField = "store_and_fwd_flag", targetField = "flag"), + Histogram(sourceField = "passenger_count", targetField = "count", interval = 2.0), + DateHistogram(sourceField = "tpep_pickup_datetime", targetField = "date", fixedInterval = "1d") + ), + aggregations = aggregatorFactories + ).let { createTransform(it, it.id) } + updateTransformStartTime(transform) + } + } + fun `test transform with data stream`() { // Create a data stream. val dataStreamName = "transform-data-stream"