diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt index 80405878d..91b575a0b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt @@ -12,14 +12,17 @@ import org.opensearch.core.action.ActionListener import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.action.search.SearchPhaseExecutionException import org.opensearch.action.search.SearchResponse +import org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.core.common.breaker.CircuitBreakingException import org.opensearch.common.settings.Settings +import org.opensearch.common.unit.TimeValue import org.opensearch.indexmanagement.opensearchapi.retry import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupMetadata +import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.MINIMUM_CANCEL_AFTER_TIME_INTERVAL_MINUTES import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_SEARCH_BACKOFF_COUNT import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_SEARCH_BACKOFF_MILLIS import org.opensearch.indexmanagement.rollup.util.getRollupSearchRequest @@ -44,10 +47,16 @@ class RollupSearchService( @Volatile private var retrySearchPolicy = BackoffPolicy.constantBackoff(ROLLUP_SEARCH_BACKOFF_MILLIS.get(settings), ROLLUP_SEARCH_BACKOFF_COUNT.get(settings)) + @Volatile private var cancelAfterTimeInterval = SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING.get(settings) + init { clusterService.clusterSettings.addSettingsUpdateConsumer(ROLLUP_SEARCH_BACKOFF_MILLIS, ROLLUP_SEARCH_BACKOFF_COUNT) { millis, count -> retrySearchPolicy = BackoffPolicy.constantBackoff(millis, count) } + + clusterService.clusterSettings.addSettingsUpdateConsumer(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING) { + cancelAfterTimeInterval = it + } } // TODO: Failed shouldn't process? How to recover from failed -> how does a user retry a failed rollup @@ -103,7 +112,12 @@ class RollupSearchService( "Composite search failed for rollup, retrying [#${retryCount - 1}] -" + " reducing page size of composite aggregation from ${job.pageSize} to $pageSize" ) - search(job.copy(pageSize = pageSize).getRollupSearchRequest(metadata), listener) + + val searchRequest = job.copy(pageSize = pageSize).getRollupSearchRequest(metadata) + val cancelTimeoutTimeValue = TimeValue.timeValueMinutes(getCancelAfterTimeInterval(cancelAfterTimeInterval.minutes)) + searchRequest.cancelAfterTimeInterval = cancelTimeoutTimeValue + + search(searchRequest, listener) } } ) @@ -132,6 +146,16 @@ class RollupSearchService( RollupSearchResult.Failure(cause = e) } } + + private fun getCancelAfterTimeInterval(givenInterval: Long): Long { + // The default value for the cancelAfterTimeInterval is -1 and so, in this case + // we should ignore processing on the value + if (givenInterval == -1L) { + return givenInterval + } + + return max(cancelAfterTimeInterval.minutes(), MINIMUM_CANCEL_AFTER_TIME_INTERVAL_MINUTES) + } } sealed class RollupSearchResult { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt index d0464bd34..22238fd6d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt @@ -20,6 +20,7 @@ class RollupSettings { const val DEFAULT_RENEW_LOCK_RETRY_DELAY = 1000L const val DEFAULT_CLIENT_REQUEST_RETRY_COUNT = 3 const val DEFAULT_CLIENT_REQUEST_RETRY_DELAY = 1000L + const val MINIMUM_CANCEL_AFTER_TIME_INTERVAL_MINUTES = 10L val ROLLUP_ENABLED: Setting = Setting.boolSetting( "plugins.rollup.enabled",