From d98fd5308f6302077ceae0834d861f84298f7ea9 Mon Sep 17 00:00:00 2001 From: Joshua Au Date: Tue, 31 Oct 2023 16:12:57 -0400 Subject: [PATCH 1/5] Added minimum for search.cancel_after_time_interval setting for rollups Signed-off-by: Joshua Au --- .../indexmanagement/rollup/RollupSearchService.kt | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt index 80405878d..d08052df1 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt @@ -12,10 +12,12 @@ import org.opensearch.core.action.ActionListener import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.action.search.SearchPhaseExecutionException import org.opensearch.action.search.SearchResponse +import org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.core.common.breaker.CircuitBreakingException import org.opensearch.common.settings.Settings +import org.opensearch.common.unit.TimeValue import org.opensearch.indexmanagement.opensearchapi.retry import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.rollup.model.Rollup @@ -44,10 +46,16 @@ class RollupSearchService( @Volatile private var retrySearchPolicy = BackoffPolicy.constantBackoff(ROLLUP_SEARCH_BACKOFF_MILLIS.get(settings), ROLLUP_SEARCH_BACKOFF_COUNT.get(settings)) + @Volatile private var cancelAfterTimeInterval = SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING.get(settings) + init { clusterService.clusterSettings.addSettingsUpdateConsumer(ROLLUP_SEARCH_BACKOFF_MILLIS, ROLLUP_SEARCH_BACKOFF_COUNT) { millis, count -> retrySearchPolicy = BackoffPolicy.constantBackoff(millis, count) } + + clusterService.clusterSettings.addSettingsUpdateConsumer(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING) { + cancelAfterTimeInterval = it + } } // TODO: Failed shouldn't process? How to recover from failed -> how does a user retry a failed rollup @@ -103,7 +111,12 @@ class RollupSearchService( "Composite search failed for rollup, retrying [#${retryCount - 1}] -" + " reducing page size of composite aggregation from ${job.pageSize} to $pageSize" ) - search(job.copy(pageSize = pageSize).getRollupSearchRequest(metadata), listener) + + val searchRequest = job.copy(pageSize = pageSize).getRollupSearchRequest(metadata) + val timeoutMins = max(cancelAfterTimeInterval.minutes(), 10) + searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(timeoutMins) + + search(searchRequest, listener) } } ) From 166c303e0e42d10d9b1fb5188126bd32079a0d20 Mon Sep 17 00:00:00 2001 From: Joshua Au Date: Tue, 31 Oct 2023 16:12:57 -0400 Subject: [PATCH 2/5] Added constant for cancel_after_time_interval for rollup search Signed-off-by: Joshua Au --- .../opensearch/indexmanagement/rollup/RollupSearchService.kt | 3 ++- .../indexmanagement/rollup/settings/RollupSettings.kt | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt index d08052df1..52573cfa4 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt @@ -22,6 +22,7 @@ import org.opensearch.indexmanagement.opensearchapi.retry import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupMetadata +import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.MINIMUM_CANCEL_AFTER_TIME_INTERVAL_MINUTES import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_SEARCH_BACKOFF_COUNT import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_SEARCH_BACKOFF_MILLIS import org.opensearch.indexmanagement.rollup.util.getRollupSearchRequest @@ -113,7 +114,7 @@ class RollupSearchService( ) val searchRequest = job.copy(pageSize = pageSize).getRollupSearchRequest(metadata) - val timeoutMins = max(cancelAfterTimeInterval.minutes(), 10) + val timeoutMins = max(cancelAfterTimeInterval.minutes(), MINIMUM_CANCEL_AFTER_TIME_INTERVAL_MINUTES) searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(timeoutMins) search(searchRequest, listener) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt index d0464bd34..22238fd6d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt @@ -20,6 +20,7 @@ class RollupSettings { const val DEFAULT_RENEW_LOCK_RETRY_DELAY = 1000L const val DEFAULT_CLIENT_REQUEST_RETRY_COUNT = 3 const val DEFAULT_CLIENT_REQUEST_RETRY_DELAY = 1000L + const val MINIMUM_CANCEL_AFTER_TIME_INTERVAL_MINUTES = 10L val ROLLUP_ENABLED: Setting = Setting.boolSetting( "plugins.rollup.enabled", From 687c7e78ecd543d91fc3e490cac227148df722a2 Mon Sep 17 00:00:00 2001 From: Joshua Au Date: Wed, 8 Nov 2023 00:35:09 -0500 Subject: [PATCH 3/5] Handled case of default value for cancel interval Signed-off-by: Joshua Au --- .../indexmanagement/rollup/RollupSearchService.kt | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt index 52573cfa4..cf9f62eac 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt @@ -114,8 +114,7 @@ class RollupSearchService( ) val searchRequest = job.copy(pageSize = pageSize).getRollupSearchRequest(metadata) - val timeoutMins = max(cancelAfterTimeInterval.minutes(), MINIMUM_CANCEL_AFTER_TIME_INTERVAL_MINUTES) - searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(timeoutMins) + searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(getCancelAfterTimeInterval(cancelAfterTimeInterval.minutes)) search(searchRequest, listener) } @@ -146,6 +145,14 @@ class RollupSearchService( RollupSearchResult.Failure(cause = e) } } + + private fun getCancelAfterTimeInterval(givenInterval: Long): Long { + if(givenInterval == -1L) { + return givenInterval + } + + return max(cancelAfterTimeInterval.minutes(), MINIMUM_CANCEL_AFTER_TIME_INTERVAL_MINUTES) + } } sealed class RollupSearchResult { From a259d1c1c3285f67d4172257f1f295453cba0916 Mon Sep 17 00:00:00 2001 From: Joshua Au Date: Wed, 8 Nov 2023 00:37:41 -0500 Subject: [PATCH 4/5] Added comment explanation for default rollup cancel after time interval Signed-off-by: Joshua Au --- .../opensearch/indexmanagement/rollup/RollupSearchService.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt index cf9f62eac..cf5370998 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt @@ -147,7 +147,9 @@ class RollupSearchService( } private fun getCancelAfterTimeInterval(givenInterval: Long): Long { - if(givenInterval == -1L) { + // The default value for the cancelAfterTimeInterval is -1 and so, in this case + // we should ignore processing on the value + if (givenInterval == -1L) { return givenInterval } From a37c2487cbb7b20df20b851a8837802fcfc8b6c1 Mon Sep 17 00:00:00 2001 From: Joshua Au Date: Wed, 8 Nov 2023 13:14:57 -0500 Subject: [PATCH 5/5] Fixed github workflow checks Signed-off-by: Joshua Au --- .../opensearch/indexmanagement/rollup/RollupSearchService.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt index cf5370998..91b575a0b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt @@ -114,7 +114,8 @@ class RollupSearchService( ) val searchRequest = job.copy(pageSize = pageSize).getRollupSearchRequest(metadata) - searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(getCancelAfterTimeInterval(cancelAfterTimeInterval.minutes)) + val cancelTimeoutTimeValue = TimeValue.timeValueMinutes(getCancelAfterTimeInterval(cancelAfterTimeInterval.minutes)) + searchRequest.cancelAfterTimeInterval = cancelTimeoutTimeValue search(searchRequest, listener) }