diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 1512794b7..c4d3b801f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -394,6 +394,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin RollupSettings.ROLLUP_ENABLED, RollupSettings.ROLLUP_SEARCH_ENABLED, RollupSettings.ROLLUP_DASHBOARDS, + RollupSettings.ROLLUP_SEARCH_ALL_JOBS, TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_COUNT, TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_MILLIS, TransformSettings.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index 9d8f8585a..4cd68c1b4 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -77,11 +77,15 @@ class RollupInterceptor( private val logger = LogManager.getLogger(javaClass) @Volatile private var searchEnabled = RollupSettings.ROLLUP_SEARCH_ENABLED.get(settings) + @Volatile private var searchAllJobs = RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings) init { clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ENABLED) { searchEnabled = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ALL_JOBS) { + searchAllJobs = it + } } @Suppress("SpreadOperator") @@ -126,12 +130,9 @@ class RollupInterceptor( throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues") } - val matchedRollup = pickRollupJob(matchingRollupJobs.keys) - val fieldNameMappingTypeMap = matchingRollupJobs.getValue(matchedRollup).associateBy({ it.fieldName }, { it.mappingType }) - // only rebuild if there is necessity to rebuild if (fieldMappings.isNotEmpty()) { - request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap)) + rewriteShardSearchForRollupJobs(request, matchingRollupJobs) } } } @@ -298,4 +299,14 @@ class RollupInterceptor( DateHistogramInterval(rollup.getDateHistogram().fixedInterval).estimateMillis() } } + + private fun rewriteShardSearchForRollupJobs(request: ShardSearchRequest, matchingRollupJobs: Map>) { + val matchedRollup = pickRollupJob(matchingRollupJobs.keys) + val fieldNameMappingTypeMap = matchingRollupJobs.getValue(matchedRollup).associateBy({ it.fieldName }, { it.mappingType }) + if (searchAllJobs) { + request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap)) + } else { + request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap)) + } + } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt index c6bd99968..063724df8 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt @@ -33,6 +33,7 @@ class RollupSettings { companion object { const val DEFAULT_ROLLUP_ENABLED = true + const val DEFAULT_SEARCH_ALL_JOBS = false const val DEFAULT_ACQUIRE_LOCK_RETRY_COUNT = 3 const val DEFAULT_ACQUIRE_LOCK_RETRY_DELAY = 1000L const val DEFAULT_RENEW_LOCK_RETRY_COUNT = 3 @@ -89,6 +90,13 @@ class RollupSettings { Setting.Property.Dynamic ) + val ROLLUP_SEARCH_ALL_JOBS: Setting = Setting.boolSetting( + "plugins.rollup.search.search_all_jobs", + DEFAULT_SEARCH_ALL_JOBS, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ) + val ROLLUP_DASHBOARDS: Setting = Setting.boolSetting( "plugins.rollup.dashboards.enabled", LegacyOpenDistroRollupSettings.ROLLUP_DASHBOARDS, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt index bc1c20ba7..a1bcf2f4e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt @@ -384,10 +384,11 @@ fun Rollup.rewriteQueryBuilder(queryBuilder: QueryBuilder, fieldNameMappingTypeM } } -fun Rollup.buildRollupQuery(fieldNameMappingTypeMap: Map, oldQuery: QueryBuilder): QueryBuilder { +fun Set.buildRollupQuery(fieldNameMappingTypeMap: Map, oldQuery: QueryBuilder): QueryBuilder { val wrappedQueryBuilder = BoolQueryBuilder() - wrappedQueryBuilder.must(this.rewriteQueryBuilder(oldQuery, fieldNameMappingTypeMap)) - wrappedQueryBuilder.filter(TermQueryBuilder("rollup._id", this.id)) + wrappedQueryBuilder.must(this.first().rewriteQueryBuilder(oldQuery, fieldNameMappingTypeMap)) + wrappedQueryBuilder.should(TermsQueryBuilder("rollup._id", this.map { it.id })) + wrappedQueryBuilder.minimumShouldMatch(1) return wrappedQueryBuilder } @@ -407,9 +408,10 @@ fun Rollup.populateFieldMappings(): Set { // TODO: Not a fan of this.. but I can't find a way to overwrite the aggregations on the shallow copy or original // so we need to instantiate a new one so we can add the rewritten aggregation builders @Suppress("ComplexMethod") -fun SearchSourceBuilder.rewriteSearchSourceBuilder(job: Rollup, fieldNameMappingTypeMap: Map): SearchSourceBuilder { +fun SearchSourceBuilder.rewriteSearchSourceBuilder(jobs: Set, fieldNameMappingTypeMap: Map): SearchSourceBuilder { val ssb = SearchSourceBuilder() - this.aggregations()?.aggregatorFactories?.forEach { ssb.aggregation(job.rewriteAggregationBuilder(it)) } + // can use first() here as all jobs in the set will have a superset of the query's terms + this.aggregations()?.aggregatorFactories?.forEach { ssb.aggregation(jobs.first().rewriteAggregationBuilder(it)) } if (this.explain() != null) ssb.explain(this.explain()) if (this.ext() != null) ssb.ext(this.ext()) ssb.fetchSource(this.fetchSource()) @@ -421,7 +423,7 @@ fun SearchSourceBuilder.rewriteSearchSourceBuilder(job: Rollup, fieldNameMapping if (this.minScore() != null) ssb.minScore(this.minScore()) if (this.postFilter() != null) ssb.postFilter(this.postFilter()) ssb.profile(this.profile()) - if (this.query() != null) ssb.query(job.buildRollupQuery(fieldNameMappingTypeMap, this.query())) + if (this.query() != null) ssb.query(jobs.buildRollupQuery(fieldNameMappingTypeMap, this.query())) this.rescores()?.forEach { ssb.addRescorer(it) } this.scriptFields()?.forEach { ssb.scriptField(it.fieldName(), it.script(), it.ignoreFailure()) } if (this.searchAfter() != null) ssb.searchAfter(this.searchAfter()) @@ -440,6 +442,10 @@ fun SearchSourceBuilder.rewriteSearchSourceBuilder(job: Rollup, fieldNameMapping return ssb } +fun SearchSourceBuilder.rewriteSearchSourceBuilder(job: Rollup, fieldNameMappingTypeMap: Map): SearchSourceBuilder { + return this.rewriteSearchSourceBuilder(setOf(job), fieldNameMappingTypeMap) +} + fun Rollup.getInitialDocValues(docCount: Long): MutableMap = mutableMapOf( Rollup.ROLLUP_DOC_ID_FIELD to this.id, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt index 0fbeb8c2d..7d5179218 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt @@ -110,6 +110,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() { RollupSettings.ROLLUP_INDEX, RollupSettings.ROLLUP_ENABLED, RollupSettings.ROLLUP_SEARCH_ENABLED, + RollupSettings.ROLLUP_SEARCH_ALL_JOBS, RollupSettings.ROLLUP_DASHBOARDS ) ) @@ -172,6 +173,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() { assertEquals(ManagedIndexSettings.SNAPSHOT_DENY_LIST.get(settings), listOf("1")) assertEquals(RollupSettings.ROLLUP_ENABLED.get(settings), false) assertEquals(RollupSettings.ROLLUP_SEARCH_ENABLED.get(settings), false) + assertEquals(RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings), false) assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1)) assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_COUNT.get(settings), 1) assertEquals(RollupSettings.ROLLUP_SEARCH_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1)) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index a234f6902..20e3b382c 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -48,6 +48,7 @@ import org.opensearch.indexmanagement.common.model.dimension.Dimension import org.opensearch.indexmanagement.makeRequest import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupMetadata +import org.opensearch.indexmanagement.rollup.settings.RollupSettings import org.opensearch.indexmanagement.util._ID import org.opensearch.indexmanagement.util._PRIMARY_TERM import org.opensearch.indexmanagement.util._SEQ_NO @@ -233,4 +234,20 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { assertEquals("Request failed", RestStatus.OK, response.restStatus()) } + + protected fun updateSearchAllJobsClusterSetting(value: Boolean) { + val formattedValue = "\"${value}\"" + val request = """ + { + "persistent": { + "${RollupSettings.ROLLUP_SEARCH_ALL_JOBS.key}": $formattedValue + } + } + """.trimIndent() + val res = client().makeRequest( + "PUT", "_cluster/settings", emptyMap(), + StringEntity(request, APPLICATION_JSON) + ) + assertEquals("Request failed", RestStatus.OK, res.restStatus()) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index 1d51e0699..872cccb0a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -663,4 +663,151 @@ class RollupInterceptorIT : RollupRestTestCase() { rollupAggRes.getValue("min_passenger_count")["value"] ) } + + fun `test rollup search all jobs`() { + generateNYCTaxiData("source_rollup_search_all_jobs_1") + generateNYCTaxiData("source_rollup_search_all_jobs_2") + val targetIndex = "target_rollup_search_all_jobs" + val rollupHourly = Rollup( + id = "hourly_basic_term_query_rollup_search_all", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = "source_rollup_search_all_jobs_1", + targetIndex = targetIndex, + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), + Terms("RatecodeID", "RatecodeID"), + Terms("PULocationID", "PULocationID") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", targetField = "passenger_count", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ), + RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min())) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollupHourly) + + waitFor { + val rollupJob = getRollup(rollupId = rollupHourly.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + val rollupMinutely = Rollup( + id = "minutely_basic_term_query_rollup_search_all", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = "source_rollup_search_all_jobs_2", + targetIndex = targetIndex, + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1m"), + Terms("RatecodeID", "RatecodeID") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", targetField = "passenger_count", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ), + RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min())) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollupMinutely) + + waitFor { + val rollupJob = getRollup(rollupId = rollupMinutely.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + refreshAllIndices() + + val req = """ + { + "size": 0, + "query": { + "term": { "RatecodeID": 1 } + }, + "aggs": { + "sum_passenger_count": { "sum": { "field": "passenger_count" } }, + "max_passenger_count": { "max": { "field": "passenger_count" } }, + "value_count_passenger_count": { "value_count": { "field": "passenger_count" } } + } + } + """.trimIndent() + val rawRes1 = client().makeRequest("POST", "/source_rollup_search_all_jobs_1/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes1.restStatus() == RestStatus.OK) + val rawRes2 = client().makeRequest("POST", "/source_rollup_search_all_jobs_2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes2.restStatus() == RestStatus.OK) + val rollupResSingle = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupResSingle.restStatus() == RestStatus.OK) + val rawAgg1Res = rawRes1.asMap()["aggregations"] as Map> + val rawAgg2Res = rawRes2.asMap()["aggregations"] as Map> + val rollupAggResSingle = rollupResSingle.asMap()["aggregations"] as Map> + + // When the cluster setting to search all jobs is off, the aggregations will be the same for searching a single job as for searching both + assertEquals( + "Searching single rollup job and rollup target index did not return the same max results", + rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResSingle.getValue("max_passenger_count")["value"] + ) + assertEquals( + "Searching single rollup job and rollup target index did not return the same sum results", + rawAgg1Res.getValue("sum_passenger_count")["value"], rollupAggResSingle.getValue("sum_passenger_count")["value"] + ) + val trueAggCount = rawAgg1Res.getValue("value_count_passenger_count")["value"] as Int + rawAgg2Res.getValue("value_count_passenger_count")["value"] as Int + assertEquals( + "Searching single rollup job and rollup target index did not return the same value count results", + rawAgg1Res.getValue("value_count_passenger_count")["value"], rollupAggResSingle.getValue("value_count_passenger_count")["value"] + ) + + val trueAggSum = rawAgg1Res.getValue("sum_passenger_count")["value"] as Double + rawAgg2Res.getValue("sum_passenger_count")["value"] as Double + updateSearchAllJobsClusterSetting(true) + + val rollupResAll = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupResAll.restStatus() == RestStatus.OK) + val rollupAggResAll = rollupResAll.asMap()["aggregations"] as Map> + + // With search all jobs setting on, the sum, and value_count will now be equal to the sum of the single job search results + assertEquals( + "Searching single rollup job and rollup target index did not return the same sum results", + rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResAll.getValue("max_passenger_count")["value"] + ) + assertEquals( + "Searching rollup target index did not return the sum for all of the rollup jobs on the index", + trueAggSum, rollupAggResAll.getValue("sum_passenger_count")["value"] + ) + assertEquals( + "Searching rollup target index did not return the value count for all of the rollup jobs on the index", + trueAggCount, rollupAggResAll.getValue("value_count_passenger_count")["value"] + ) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtilsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtilsTests.kt index 05bc7a59b..094e87a27 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtilsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtilsTests.kt @@ -35,10 +35,25 @@ import org.opensearch.index.query.RangeQueryBuilder import org.opensearch.index.query.TermQueryBuilder import org.opensearch.index.query.TermsQueryBuilder import org.opensearch.index.search.MatchQuery +import org.opensearch.indexmanagement.common.model.dimension.DateHistogram +import org.opensearch.indexmanagement.common.model.dimension.Dimension +import org.opensearch.indexmanagement.common.model.dimension.Histogram +import org.opensearch.indexmanagement.common.model.dimension.Terms +import org.opensearch.indexmanagement.opensearchapi.convertToMap import org.opensearch.indexmanagement.rollup.model.RollupFieldMapping +import org.opensearch.indexmanagement.rollup.model.RollupMetrics +import org.opensearch.indexmanagement.rollup.randomAverage +import org.opensearch.indexmanagement.rollup.randomMax +import org.opensearch.indexmanagement.rollup.randomMin import org.opensearch.indexmanagement.rollup.randomRollup +import org.opensearch.indexmanagement.rollup.randomSum import org.opensearch.indexmanagement.rollup.randomTermQuery +import org.opensearch.indexmanagement.rollup.randomValueCount +import org.opensearch.indexmanagement.transform.randomAggregationBuilder +import org.opensearch.search.aggregations.metrics.AvgAggregationBuilder +import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder import org.opensearch.test.OpenSearchTestCase +import org.opensearch.test.rest.OpenSearchRestTestCase class RollupUtilsTests : OpenSearchTestCase() { @@ -168,16 +183,32 @@ class RollupUtilsTests : OpenSearchTestCase() { fun `test buildRollupQuery`() { val rollup = randomRollup() val queryBuilder = MatchAllQueryBuilder() - val actual = rollup.buildRollupQuery(mapOf(), queryBuilder) as BoolQueryBuilder - val expectedFilter = TermQueryBuilder("rollup._id", rollup.id) - assertTrue(actual.should().isEmpty()) + val actual = setOf(rollup).buildRollupQuery(mapOf(), queryBuilder) as BoolQueryBuilder + val expectedShould = TermsQueryBuilder("rollup._id", rollup.id) assertTrue(actual.mustNot().isEmpty()) - assertFalse(actual.filter().isEmpty()) - assertFalse(actual.must().isEmpty()) + assertTrue(actual.filter().isEmpty()) + assertEquals("1", actual.minimumShouldMatch()) assertEquals(1, actual.must().size) assertEquals(rollup.rewriteQueryBuilder(queryBuilder, mapOf()), actual.must().first()) - assertEquals(1, actual.filter().size) - assertEquals(expectedFilter, actual.filter().first()) + assertEquals(1, actual.should().size) + assertEquals(1, (actual.should()[0] as TermsQueryBuilder).values().size) + assertEquals(expectedShould, actual.should().first()) + } + + fun `test buildRollupQuery multiple`() { + var rollups = setOf(randomRollup(), randomRollup(), randomRollup(), randomRollup(), randomRollup()) + rollups = OpenSearchRestTestCase.randomSubsetOf(randomIntBetween(2, 5), rollups).toSet() + val queryBuilder = MatchAllQueryBuilder() + val actual = rollups.buildRollupQuery(mapOf(), queryBuilder) as BoolQueryBuilder + val expectedShould = TermsQueryBuilder("rollup._id", rollups.map { it.id }) + assertTrue(actual.mustNot().isEmpty()) + assertTrue(actual.filter().isEmpty()) + assertEquals("1", actual.minimumShouldMatch()) + assertEquals(1, actual.must().size) + assertEquals(rollups.first().rewriteQueryBuilder(queryBuilder, mapOf()), actual.must().first()) + assertEquals(1, actual.should().size) + assertEquals(rollups.size, (actual.should()[0] as TermsQueryBuilder).values().size) + assertEquals(expectedShould, actual.should().first()) } fun `test rewriteQueryBuilder match phrase query`() { @@ -194,4 +225,30 @@ class RollupUtilsTests : OpenSearchTestCase() { assertEquals(matchPhraseQuery.value(), actual.value()) assertNull(actual.analyzer()) } + + fun `test rewriteAggregationBuilder`() { + var rollup = randomRollup() + val aggBuilder = randomAggregationBuilder() + val aggField = ((aggBuilder.convertToMap()[aggBuilder.name] as Map<*, *>).values.first() as Map<*, *>).values.first() as String + val newDims = mutableListOf() + // Make rollup dimensions and metrics contain the aggregation field name and aggregation metrics + rollup.dimensions.forEach { + val dimToAdd = when (it) { + is DateHistogram -> it.copy(sourceField = aggField, targetField = aggField) + is Terms -> it.copy(sourceField = aggField, targetField = aggField) + is Histogram -> it.copy(sourceField = aggField, targetField = aggField) + else -> it + } + newDims.add(dimToAdd) + } + val newMetrics = mutableListOf(RollupMetrics(aggField, aggField, listOf(randomAverage(), randomMax(), randomMin(), randomSum(), randomValueCount()))) + rollup = rollup.copy(dimensions = newDims, metrics = newMetrics) + val rewrittenAgg = rollup.rewriteAggregationBuilder(aggBuilder) + assertEquals("Rewritten aggregation builder does not have the same name", aggBuilder.name, rewrittenAgg.name) + if (aggBuilder is AvgAggregationBuilder || aggBuilder is ValueCountAggregationBuilder) { + assertEquals("Rewritten aggregation builder is not the correct type", "scripted_metric", rewrittenAgg.type) + } else { + assertEquals("Rewritten aggregation builder is not the correct type", aggBuilder.type, rewrittenAgg.type) + } + } }