Skip to content

Commit

Permalink
Adds setting to search all rollup jobs on a target index (#165)
Browse files Browse the repository at this point in the history
* Adds cluster setting to search all rollup jobs

Signed-off-by: Clay Downs <downsrob@amazon.com>
  • Loading branch information
downsrob authored Oct 6, 2021
1 parent 108f625 commit 11883ae
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
RollupSettings.ROLLUP_ENABLED,
RollupSettings.ROLLUP_SEARCH_ENABLED,
RollupSettings.ROLLUP_DASHBOARDS,
RollupSettings.ROLLUP_SEARCH_ALL_JOBS,
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_COUNT,
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_MILLIS,
TransformSettings.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,15 @@ class RollupInterceptor(
private val logger = LogManager.getLogger(javaClass)

@Volatile private var searchEnabled = RollupSettings.ROLLUP_SEARCH_ENABLED.get(settings)
@Volatile private var searchAllJobs = RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ENABLED) {
searchEnabled = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ALL_JOBS) {
searchAllJobs = it
}
}

@Suppress("SpreadOperator")
Expand Down Expand Up @@ -126,12 +130,9 @@ class RollupInterceptor(
throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues")
}

val matchedRollup = pickRollupJob(matchingRollupJobs.keys)
val fieldNameMappingTypeMap = matchingRollupJobs.getValue(matchedRollup).associateBy({ it.fieldName }, { it.mappingType })

// only rebuild if there is necessity to rebuild
if (fieldMappings.isNotEmpty()) {
request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap))
rewriteShardSearchForRollupJobs(request, matchingRollupJobs)
}
}
}
Expand Down Expand Up @@ -298,4 +299,14 @@ class RollupInterceptor(
DateHistogramInterval(rollup.getDateHistogram().fixedInterval).estimateMillis()
}
}

private fun rewriteShardSearchForRollupJobs(request: ShardSearchRequest, matchingRollupJobs: Map<Rollup, Set<RollupFieldMapping>>) {
val matchedRollup = pickRollupJob(matchingRollupJobs.keys)
val fieldNameMappingTypeMap = matchingRollupJobs.getValue(matchedRollup).associateBy({ it.fieldName }, { it.mappingType })
if (searchAllJobs) {
request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap))
} else {
request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class RollupSettings {

companion object {
const val DEFAULT_ROLLUP_ENABLED = true
const val DEFAULT_SEARCH_ALL_JOBS = false
const val DEFAULT_ACQUIRE_LOCK_RETRY_COUNT = 3
const val DEFAULT_ACQUIRE_LOCK_RETRY_DELAY = 1000L
const val DEFAULT_RENEW_LOCK_RETRY_COUNT = 3
Expand Down Expand Up @@ -89,6 +90,13 @@ class RollupSettings {
Setting.Property.Dynamic
)

val ROLLUP_SEARCH_ALL_JOBS: Setting<Boolean> = Setting.boolSetting(
"plugins.rollup.search.search_all_jobs",
DEFAULT_SEARCH_ALL_JOBS,
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

val ROLLUP_DASHBOARDS: Setting<Boolean> = Setting.boolSetting(
"plugins.rollup.dashboards.enabled",
LegacyOpenDistroRollupSettings.ROLLUP_DASHBOARDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,10 +384,11 @@ fun Rollup.rewriteQueryBuilder(queryBuilder: QueryBuilder, fieldNameMappingTypeM
}
}

fun Rollup.buildRollupQuery(fieldNameMappingTypeMap: Map<String, String>, oldQuery: QueryBuilder): QueryBuilder {
fun Set<Rollup>.buildRollupQuery(fieldNameMappingTypeMap: Map<String, String>, oldQuery: QueryBuilder): QueryBuilder {
val wrappedQueryBuilder = BoolQueryBuilder()
wrappedQueryBuilder.must(this.rewriteQueryBuilder(oldQuery, fieldNameMappingTypeMap))
wrappedQueryBuilder.filter(TermQueryBuilder("rollup._id", this.id))
wrappedQueryBuilder.must(this.first().rewriteQueryBuilder(oldQuery, fieldNameMappingTypeMap))
wrappedQueryBuilder.should(TermsQueryBuilder("rollup._id", this.map { it.id }))
wrappedQueryBuilder.minimumShouldMatch(1)
return wrappedQueryBuilder
}

Expand All @@ -407,9 +408,10 @@ fun Rollup.populateFieldMappings(): Set<RollupFieldMapping> {
// TODO: Not a fan of this.. but I can't find a way to overwrite the aggregations on the shallow copy or original
// so we need to instantiate a new one so we can add the rewritten aggregation builders
@Suppress("ComplexMethod")
fun SearchSourceBuilder.rewriteSearchSourceBuilder(job: Rollup, fieldNameMappingTypeMap: Map<String, String>): SearchSourceBuilder {
fun SearchSourceBuilder.rewriteSearchSourceBuilder(jobs: Set<Rollup>, fieldNameMappingTypeMap: Map<String, String>): SearchSourceBuilder {
val ssb = SearchSourceBuilder()
this.aggregations()?.aggregatorFactories?.forEach { ssb.aggregation(job.rewriteAggregationBuilder(it)) }
// can use first() here as all jobs in the set will have a superset of the query's terms
this.aggregations()?.aggregatorFactories?.forEach { ssb.aggregation(jobs.first().rewriteAggregationBuilder(it)) }
if (this.explain() != null) ssb.explain(this.explain())
if (this.ext() != null) ssb.ext(this.ext())
ssb.fetchSource(this.fetchSource())
Expand All @@ -421,7 +423,7 @@ fun SearchSourceBuilder.rewriteSearchSourceBuilder(job: Rollup, fieldNameMapping
if (this.minScore() != null) ssb.minScore(this.minScore())
if (this.postFilter() != null) ssb.postFilter(this.postFilter())
ssb.profile(this.profile())
if (this.query() != null) ssb.query(job.buildRollupQuery(fieldNameMappingTypeMap, this.query()))
if (this.query() != null) ssb.query(jobs.buildRollupQuery(fieldNameMappingTypeMap, this.query()))
this.rescores()?.forEach { ssb.addRescorer(it) }
this.scriptFields()?.forEach { ssb.scriptField(it.fieldName(), it.script(), it.ignoreFailure()) }
if (this.searchAfter() != null) ssb.searchAfter(this.searchAfter())
Expand All @@ -440,6 +442,10 @@ fun SearchSourceBuilder.rewriteSearchSourceBuilder(job: Rollup, fieldNameMapping
return ssb
}

fun SearchSourceBuilder.rewriteSearchSourceBuilder(job: Rollup, fieldNameMappingTypeMap: Map<String, String>): SearchSourceBuilder {
return this.rewriteSearchSourceBuilder(setOf(job), fieldNameMappingTypeMap)
}

fun Rollup.getInitialDocValues(docCount: Long): MutableMap<String, Any?> =
mutableMapOf(
Rollup.ROLLUP_DOC_ID_FIELD to this.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
RollupSettings.ROLLUP_INDEX,
RollupSettings.ROLLUP_ENABLED,
RollupSettings.ROLLUP_SEARCH_ENABLED,
RollupSettings.ROLLUP_SEARCH_ALL_JOBS,
RollupSettings.ROLLUP_DASHBOARDS
)
)
Expand Down Expand Up @@ -172,6 +173,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
assertEquals(ManagedIndexSettings.SNAPSHOT_DENY_LIST.get(settings), listOf("1"))
assertEquals(RollupSettings.ROLLUP_ENABLED.get(settings), false)
assertEquals(RollupSettings.ROLLUP_SEARCH_ENABLED.get(settings), false)
assertEquals(RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings), false)
assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1))
assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_COUNT.get(settings), 1)
assertEquals(RollupSettings.ROLLUP_SEARCH_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.opensearch.indexmanagement.common.model.dimension.Dimension
import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupMetadata
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.util._ID
import org.opensearch.indexmanagement.util._PRIMARY_TERM
import org.opensearch.indexmanagement.util._SEQ_NO
Expand Down Expand Up @@ -233,4 +234,20 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {

assertEquals("Request failed", RestStatus.OK, response.restStatus())
}

protected fun updateSearchAllJobsClusterSetting(value: Boolean) {
val formattedValue = "\"${value}\""
val request = """
{
"persistent": {
"${RollupSettings.ROLLUP_SEARCH_ALL_JOBS.key}": $formattedValue
}
}
""".trimIndent()
val res = client().makeRequest(
"PUT", "_cluster/settings", emptyMap(),
StringEntity(request, APPLICATION_JSON)
)
assertEquals("Request failed", RestStatus.OK, res.restStatus())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -663,4 +663,151 @@ class RollupInterceptorIT : RollupRestTestCase() {
rollupAggRes.getValue("min_passenger_count")["value"]
)
}

fun `test rollup search all jobs`() {
generateNYCTaxiData("source_rollup_search_all_jobs_1")
generateNYCTaxiData("source_rollup_search_all_jobs_2")
val targetIndex = "target_rollup_search_all_jobs"
val rollupHourly = Rollup(
id = "hourly_basic_term_query_rollup_search_all",
enabled = true,
schemaVersion = 1L,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic search test",
sourceIndex = "source_rollup_search_all_jobs_1",
targetIndex = targetIndex,
metadataID = null,
roles = emptyList(),
pageSize = 10,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
Terms("RatecodeID", "RatecodeID"),
Terms("PULocationID", "PULocationID")
),
metrics = listOf(
RollupMetrics(
sourceField = "passenger_count", targetField = "passenger_count",
metrics = listOf(
Sum(), Min(), Max(),
ValueCount(), Average()
)
),
RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min()))
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollupHourly)

waitFor {
val rollupJob = getRollup(rollupId = rollupHourly.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}

val rollupMinutely = Rollup(
id = "minutely_basic_term_query_rollup_search_all",
enabled = true,
schemaVersion = 1L,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic search test",
sourceIndex = "source_rollup_search_all_jobs_2",
targetIndex = targetIndex,
metadataID = null,
roles = emptyList(),
pageSize = 10,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1m"),
Terms("RatecodeID", "RatecodeID")
),
metrics = listOf(
RollupMetrics(
sourceField = "passenger_count", targetField = "passenger_count",
metrics = listOf(
Sum(), Min(), Max(),
ValueCount(), Average()
)
),
RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min()))
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollupMinutely)

waitFor {
val rollupJob = getRollup(rollupId = rollupMinutely.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}

refreshAllIndices()

val req = """
{
"size": 0,
"query": {
"term": { "RatecodeID": 1 }
},
"aggs": {
"sum_passenger_count": { "sum": { "field": "passenger_count" } },
"max_passenger_count": { "max": { "field": "passenger_count" } },
"value_count_passenger_count": { "value_count": { "field": "passenger_count" } }
}
}
""".trimIndent()
val rawRes1 = client().makeRequest("POST", "/source_rollup_search_all_jobs_1/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes1.restStatus() == RestStatus.OK)
val rawRes2 = client().makeRequest("POST", "/source_rollup_search_all_jobs_2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes2.restStatus() == RestStatus.OK)
val rollupResSingle = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rollupResSingle.restStatus() == RestStatus.OK)
val rawAgg1Res = rawRes1.asMap()["aggregations"] as Map<String, Map<String, Any>>
val rawAgg2Res = rawRes2.asMap()["aggregations"] as Map<String, Map<String, Any>>
val rollupAggResSingle = rollupResSingle.asMap()["aggregations"] as Map<String, Map<String, Any>>

// When the cluster setting to search all jobs is off, the aggregations will be the same for searching a single job as for searching both
assertEquals(
"Searching single rollup job and rollup target index did not return the same max results",
rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResSingle.getValue("max_passenger_count")["value"]
)
assertEquals(
"Searching single rollup job and rollup target index did not return the same sum results",
rawAgg1Res.getValue("sum_passenger_count")["value"], rollupAggResSingle.getValue("sum_passenger_count")["value"]
)
val trueAggCount = rawAgg1Res.getValue("value_count_passenger_count")["value"] as Int + rawAgg2Res.getValue("value_count_passenger_count")["value"] as Int
assertEquals(
"Searching single rollup job and rollup target index did not return the same value count results",
rawAgg1Res.getValue("value_count_passenger_count")["value"], rollupAggResSingle.getValue("value_count_passenger_count")["value"]
)

val trueAggSum = rawAgg1Res.getValue("sum_passenger_count")["value"] as Double + rawAgg2Res.getValue("sum_passenger_count")["value"] as Double
updateSearchAllJobsClusterSetting(true)

val rollupResAll = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rollupResAll.restStatus() == RestStatus.OK)
val rollupAggResAll = rollupResAll.asMap()["aggregations"] as Map<String, Map<String, Any>>

// With search all jobs setting on, the sum, and value_count will now be equal to the sum of the single job search results
assertEquals(
"Searching single rollup job and rollup target index did not return the same sum results",
rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResAll.getValue("max_passenger_count")["value"]
)
assertEquals(
"Searching rollup target index did not return the sum for all of the rollup jobs on the index",
trueAggSum, rollupAggResAll.getValue("sum_passenger_count")["value"]
)
assertEquals(
"Searching rollup target index did not return the value count for all of the rollup jobs on the index",
trueAggCount, rollupAggResAll.getValue("value_count_passenger_count")["value"]
)
}
}
Loading

0 comments on commit 11883ae

Please sign in to comment.