Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added minimum timeout for transforms search of 10 minutes #1033

Merged
merged 25 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
139b63e
Added minimum timeout for transforms search of 10 minutes
Joshua152 Nov 2, 2023
4a05706
Extracted cancel minimum code to function
Joshua152 Nov 8, 2023
a294816
Fixed transform code to use cluster setting
Joshua152 Dec 8, 2023
8253da7
Removed log statements
Joshua152 Dec 8, 2023
a79d754
Changed timeout logic
Joshua152 Dec 8, 2023
d60bf2f
Switched to basing off seconds
Joshua152 Dec 9, 2023
21fa899
[Feature] Support Transform as an ISM action (#760)
tanqiuliu Oct 11, 2023
7d8e4cc
[Test] increase the wait time after transform job triggered (#999)
bowenlan-amzn Oct 11, 2023
bf4266b
Drafted 2.11 release notes. (#1004)
AWSHurneyt Oct 11, 2023
1e3e529
Refactor change policy API and the policy in managed index to be non-…
bowenlan-amzn Oct 12, 2023
43346cb
Add more error notification at fail points (#1000)
bowenlan-amzn Oct 13, 2023
be9d898
fix the race condition in test reset action start time (#1007)
bowenlan-amzn Oct 13, 2023
5ddb80c
Bump bwc version after 2.11 release (#1015)
bowenlan-amzn Oct 19, 2023
0e0aa30
added type check for pipeline aggregator types in Transform initializ…
n-dohrmann Oct 19, 2023
2b5eeae
Improve security plugin enabling check (#1017)
Hailong-am Oct 20, 2023
5af2fce
Onboard jenkins prod docker images to github actions (#1025)
peterzhuamazon Oct 31, 2023
28946fe
Support switch aliases in shrink action. (#987)
ikibo Nov 1, 2023
f640e60
Transform pipeline aggr test (#1027)
n-dohrmann Nov 1, 2023
deb7117
Added unit test for switchAliases method. (#1035)
ikibo Nov 9, 2023
e5d5097
Interval schedule should take start time from the request, should not…
ikibo Nov 9, 2023
60f9abb
Added minimum for search.cancel_after_time_interval setting for rollu…
Joshua152 Nov 19, 2023
e5d02fb
Update 2.11.1 release note (#1042)
bowenlan-amzn Nov 20, 2023
457190e
Interval schedule should take start time from the request, should not…
ikibo Nov 29, 2023
4414a2a
Removed unused imports
Joshua152 Dec 12, 2023
a2354fe
Merge branch 'main' into issue710-transform
bowenlan-amzn Dec 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.service.ClusterService
Expand Down Expand Up @@ -47,6 +48,7 @@
import org.opensearch.indexmanagement.util.IndexUtils.Companion.ODFE_MAGIC_NULL
import org.opensearch.indexmanagement.util.IndexUtils.Companion.hashToFixedSize
import org.opensearch.core.rest.RestStatus
import org.opensearch.indexmanagement.transform.settings.TransformSettings.Companion.MINIMUM_CANCEL_AFTER_TIME_INTERVAL_SECONDS
import org.opensearch.search.aggregations.Aggregation
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
Expand Down Expand Up @@ -78,11 +80,17 @@
@Volatile private var backoffPolicy =
BackoffPolicy.constantBackoff(TRANSFORM_JOB_SEARCH_BACKOFF_MILLIS.get(settings), TRANSFORM_JOB_SEARCH_BACKOFF_COUNT.get(settings))

@Volatile private var cancelAfterTimeInterval = SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(TRANSFORM_JOB_SEARCH_BACKOFF_MILLIS, TRANSFORM_JOB_SEARCH_BACKOFF_COUNT) {
millis, count ->
backoffPolicy = BackoffPolicy.constantBackoff(millis, count)
}

clusterService.clusterSettings.addSettingsUpdateConsumer(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING) {
cancelAfterTimeInterval = it

Check warning on line 92 in src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt#L92

Added line #L92 was not covered by tests
}
}

@Suppress("RethrowCaughtException")
Expand Down Expand Up @@ -187,7 +195,11 @@
val searchStart = Instant.now().epochSecond
val searchResponse = backoffPolicy.retryTransformSearch(logger, transformContext.transformLockManager) {
val pageSizeDecay = 2f.pow(retryAttempt++)
val searchRequestTimeoutInSeconds = transformContext.getMaxRequestTimeoutInSeconds()

var searchRequestTimeoutInSeconds = transformContext.getMaxRequestTimeoutInSeconds()
if (searchRequestTimeoutInSeconds == null) {
searchRequestTimeoutInSeconds = getCancelAfterTimeIntervalSeconds(cancelAfterTimeInterval.seconds)

Check warning on line 201 in src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt#L201

Added line #L201 was not covered by tests
}

client.suspendUntil { listener: ActionListener<SearchResponse> ->
// If the previous request of the current transform job execution was successful, take the page size of previous request.
Expand Down Expand Up @@ -224,6 +236,16 @@
}
}

private fun getCancelAfterTimeIntervalSeconds(givenIntervalSeconds: Long): Long {
// The default value for the cancelAfterTimeInterval is -1 and so, in this case
// we should ignore processing on the value
if (givenIntervalSeconds == -1L) {
return -1

Check warning on line 243 in src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt#L243

Added line #L243 was not covered by tests
}

return max(givenIntervalSeconds, MINIMUM_CANCEL_AFTER_TIME_INTERVAL_SECONDS)

Check warning on line 246 in src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt#L246

Added line #L246 was not covered by tests
}

companion object {
const val failedSearchErrorMessage = "Failed to search data in source indices"
const val modifiedBucketsErrorMessage = "Failed to get the modified buckets in source indices"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class TransformSettings {
companion object {
const val DEFAULT_RENEW_LOCK_RETRY_COUNT = 3
const val DEFAULT_RENEW_LOCK_RETRY_DELAY = 1000L
const val MINIMUM_CANCEL_AFTER_TIME_INTERVAL_SECONDS = 600L

val TRANSFORM_JOB_SEARCH_BACKOFF_COUNT: Setting<Int> = Setting.intSetting(
"plugins.transform.internal.search.backoff_count",
Expand Down
Loading