Skip to content

Commit

Permalink
add tests to verify alias based optimziation scenarios
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
  • Loading branch information
eirsep committed Oct 18, 2024
1 parent 67be543 commit 4de98fc
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ class InputService(
val (amount, unit) = matchResult?.destructured?.let { (a, u) -> a to u }
?: throw IllegalArgumentException("Invalid timeframe format: $timeframeString")
val duration = when (unit) {
"s" -> Duration.ofSeconds(amount.toLong())
"m" -> Duration.ofMinutes(amount.toLong())
"h" -> Duration.ofHours(amount.toLong())
"d" -> Duration.ofDays(amount.toLong())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,22 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
}
}

protected fun insertSampleTimeSerializedDataCurrentTime(index: String, data: List<String>) {
data.forEachIndexed { i, value ->
val time = ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(time)
val testDoc = """
{
"test_strict_date_time": "$testTime",
"test_field": "$value",
"number": "$i"
}
""".trimIndent()
// Indexing documents with deterministic doc id to allow for easy selected deletion during testing
indexDoc(index, (i + 1).toString(), testDoc)
}
}

protected fun deleteDataWithDocIds(index: String, docIds: List<String>) {
docIds.forEach {
deleteDoc(index, it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.alerting.model.destination.email.Email
import org.opensearch.alerting.model.destination.email.Recipient
import org.opensearch.alerting.util.DestinationType
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.client.Request
import org.opensearch.client.ResponseException
import org.opensearch.client.WarningFailureException
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -1190,7 +1191,16 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
assertEquals("Incorrect search result", 2, buckets.size)
}

fun `test execute bucket-level monitor with alias`() {
fun `test execute bucket-level monitor with alias optimization - indices not skipped`() {
val testIndex = createTestIndex()
insertSampleTimeSerializedDataCurrentTime(
testIndex,
listOf(
"test_value_3",
"test_value_4", // adding duplicate to verify aggregation
"test_value_5"
)
)
val indexMapping = """
"properties" : {
"test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
Expand All @@ -1200,17 +1210,81 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
""".trimIndent()
val alias = createTestAlias(randomAlphaOfLength(10), 10, true, indexMapping)
val aliasName = alias.keys.first()
insertSampleTimeSerializedData(
insertSampleTimeSerializedDataCurrentTime(
aliasName,
listOf(
"test_value_1",
"test_value_1", // adding duplicate to verify aggregation
"test_value_2"
)
)
addIndexToAlias(testIndex, aliasName)
val query = QueryBuilders.rangeQuery("test_strict_date_time")
.gt("{{period_end}}||-10s")
.lte("{{period_end}}")
.format("epoch_millis")
val compositeSources = listOf(
TermsValuesSourceBuilder("test_field").field("test_field")
)
val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources)
val input = SearchInput(indices = listOf(aliasName), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg))
val triggerScript = """
params.docCount > 0
""".trimIndent()

var trigger = randomBucketLevelTrigger()
trigger = trigger.copy(
bucketSelector = BucketSelectorExtAggregationBuilder(
name = trigger.id,
bucketsPathsMap = mapOf("docCount" to "_count"),
script = Script(triggerScript),
parentBucketPath = "composite_agg",
filter = null
)
)
val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger)))
val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR)
val output = entityAsMap(response)

assertEquals(monitor.name, output["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List<Map<String, Any>>
assertEquals("Incorrect search result", 5, buckets.size)
}

fun `test execute bucket-level monitor with alias optimization - indices skipped from query`() {
val testIndex = createTestIndex()
insertSampleTimeSerializedDataCurrentTime(
testIndex,
listOf(
"test_value_1",
"test_value_1", // adding duplicate to verify aggregation
"test_value_2"
)
)
Thread.sleep(10000)
val indexMapping = """
"properties" : {
"test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
"test_field" : { "type" : "keyword" },
"number" : { "type" : "keyword" }
}
""".trimIndent()
val alias = createTestAlias(randomAlphaOfLength(10), 10, true, indexMapping)
val aliasName = alias.keys.first()
insertSampleTimeSerializedDataCurrentTime(
aliasName,
listOf(
"test_value_1",
"test_value_1", // adding duplicate to verify aggregation
"test_value_2"
)
)
addIndexToAlias(testIndex, aliasName)
val query = QueryBuilders.rangeQuery("test_strict_date_time")
.gt("{{period_end}}||-10d")
.gt("{{period_end}}||-10s")
.lte("{{period_end}}")
.format("epoch_millis")
val compositeSources = listOf(
Expand Down Expand Up @@ -1241,7 +1315,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List<Map<String, Any>>
assertEquals("Incorrect search result", 2, buckets.size)
Assert.assertTrue(buckets.size <= 2)
}

fun `test execute bucket-level monitor returns search result with multi term agg`() {
Expand Down Expand Up @@ -2240,4 +2314,21 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
private fun Map<String, Any>.objectMap(key: String): Map<String, Map<String, Any>> {
return this[key] as Map<String, Map<String, Any>>
}

fun addIndexToAlias(index: String, alias: String) {
val request = Request("POST", "/_aliases")
request.setJsonEntity(
"""{"actions": [{"add": {"index": "$index","alias": "$alias"}} ]}""".trimIndent()
)

try {
val response = client().performRequest(request)
if (response.statusLine.statusCode != RestStatus.OK.status) {
throw ResponseException(response)
}
} catch (e: Exception) {
// Handle the exception appropriately, e.g., log it or rethrow
throw RuntimeException("Failed to add index to alias", e)
}
}
}

0 comments on commit 4de98fc

Please sign in to comment.