-
Notifications
You must be signed in to change notification settings - Fork 114
/
Copy pathRollupRunner.kt
473 lines (435 loc) · 24.3 KB
/
RollupRunner.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.indexmanagement.rollup
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.core.action.ActionListener
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.support.WriteRequest
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution
import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.opensearchapi.withClosableContext
import org.opensearch.indexmanagement.rollup.action.get.GetRollupAction
import org.opensearch.indexmanagement.rollup.action.get.GetRollupRequest
import org.opensearch.indexmanagement.rollup.action.get.GetRollupResponse
import org.opensearch.indexmanagement.rollup.action.index.IndexRollupAction
import org.opensearch.indexmanagement.rollup.action.index.IndexRollupRequest
import org.opensearch.indexmanagement.rollup.action.index.IndexRollupResponse
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupJobValidationResult
import org.opensearch.indexmanagement.rollup.model.RollupMetadata
import org.opensearch.indexmanagement.rollup.model.RollupStats
import org.opensearch.indexmanagement.rollup.model.incrementStats
import org.opensearch.indexmanagement.rollup.model.mergeStats
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.util.acquireLockForScheduledJob
import org.opensearch.indexmanagement.util.releaseLockForScheduledJob
import org.opensearch.indexmanagement.util.renewLockForScheduledJob
import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.LockModel
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
import org.opensearch.script.ScriptService
import org.opensearch.search.aggregations.bucket.composite.InternalComposite
import org.opensearch.threadpool.ThreadPool
@Suppress("TooManyFunctions")
object RollupRunner :
ScheduledJobRunner,
CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("RollupRunner")) {
private val logger = LogManager.getLogger(javaClass)
private val backoffPolicy = BackoffPolicy.exponentialBackoff(
TimeValue.timeValueMillis(RollupSettings.DEFAULT_ACQUIRE_LOCK_RETRY_DELAY),
RollupSettings.DEFAULT_ACQUIRE_LOCK_RETRY_COUNT
)
private lateinit var clusterService: ClusterService
private lateinit var client: Client
private lateinit var xContentRegistry: NamedXContentRegistry
private lateinit var scriptService: ScriptService
private lateinit var settings: Settings
private lateinit var threadPool: ThreadPool
private lateinit var rollupMapperService: RollupMapperService
private lateinit var rollupIndexer: RollupIndexer
private lateinit var rollupSearchService: RollupSearchService
private lateinit var rollupMetadataService: RollupMetadataService
private lateinit var clusterConfigurationProvider: SkipExecution
fun registerClusterService(clusterService: ClusterService): RollupRunner {
this.clusterService = clusterService
return this
}
fun registerClient(client: Client): RollupRunner {
this.client = client
return this
}
fun registerNamedXContentRegistry(xContentRegistry: NamedXContentRegistry): RollupRunner {
this.xContentRegistry = xContentRegistry
return this
}
fun registerScriptService(scriptService: ScriptService): RollupRunner {
this.scriptService = scriptService
return this
}
fun registerSettings(settings: Settings): RollupRunner {
this.settings = settings
return this
}
fun registerThreadPool(threadPool: ThreadPool): RollupRunner {
this.threadPool = threadPool
return this
}
fun registerMapperService(rollupMapperService: RollupMapperService): RollupRunner {
this.rollupMapperService = rollupMapperService
return this
}
fun registerIndexer(rollupIndexer: RollupIndexer): RollupRunner {
this.rollupIndexer = rollupIndexer
return this
}
fun registerSearcher(rollupSearchService: RollupSearchService): RollupRunner {
this.rollupSearchService = rollupSearchService
return this
}
fun registerMetadataServices(
rollupMetadataService: RollupMetadataService
): RollupRunner {
this.rollupMetadataService = rollupMetadataService
return this
}
fun registerClusterConfigurationProvider(clusterConfigurationProvider: SkipExecution): RollupRunner {
this.clusterConfigurationProvider = clusterConfigurationProvider
return this
}
fun registerConsumers(): RollupRunner {
return this
}
@Suppress("ComplexMethod")
override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) {
if (job !is Rollup) {
throw IllegalArgumentException("Invalid job type, found ${job.javaClass.simpleName} with id: ${context.jobId}")
}
launch {
var metadata: RollupMetadata? = null
try {
// Get Metadata does a get request to the config index which the role will not have access to. This is an internal
// call used by the plugin to populate the metadata itself so do not run this with role's context
if (job.metadataID != null) {
metadata = when (val getMetadataResult = rollupMetadataService.getExistingMetadata(job)) {
is MetadataResult.Success -> getMetadataResult.metadata
is MetadataResult.NoMetadata -> null
is MetadataResult.Failure ->
throw RollupMetadataException("Failed to get existing rollup metadata [${job.metadataID}]", getMetadataResult.cause)
}
}
} catch (e: RollupMetadataException) {
// If the metadata was not able to be retrieved, the exception will be logged and the job run will be a no-op
logger.error(e.message, e.cause)
return@launch
}
// Check if rollup should be processed before acquiring the lock
// If metadata does not exist, it will either be initialized for the first time or it will be recreated to communicate the failed state
if (rollupSearchService.shouldProcessRollup(job, metadata)) {
val lock = acquireLockForScheduledJob(job, context, backoffPolicy)
if (lock == null) {
logger.debug("Could not acquire lock for ${job.id}")
} else {
runRollupJob(job, context, lock)
releaseLockForScheduledJob(context, lock)
}
} else if (job.isEnabled) {
// We are doing this outside of ShouldProcess as schedule job interval can be more frequent than rollup and we want to fail
// validation as soon as possible
when (val jobValidity = isJobValid(job)) {
is RollupJobValidationResult.Invalid -> {
val lock = acquireLockForScheduledJob(job, context, backoffPolicy)
if (lock != null) {
setFailedMetadataAndDisableJob(job, jobValidity.reason)
logger.info("updating metadata service to disable the job [${job.id}]")
releaseLockForScheduledJob(context, lock)
}
}
else -> {}
}
}
}
}
// TODO: Clean up runner
// TODO: Scenario: The rollup job is finished, but I (the user) want to redo it all again
/*
* TODO situations:
* There is a rollup.metadataID and doc but theres no job in target index?
* -> index was deleted and recreated as rollup -> just recreate (but we would have to start over)? Or move to FAILED?
* */
@Suppress("ReturnCount", "NestedBlockDepth", "ComplexMethod", "LongMethod", "ThrowsCount")
private suspend fun runRollupJob(job: Rollup, context: JobExecutionContext, lock: LockModel) {
logger.debug("Running rollup job [${job.id}]")
var updatableLock = lock
try {
when (val jobValidity = isJobValid(job)) {
is RollupJobValidationResult.Invalid -> {
logger.error("Invalid job [${job.id}]: [${jobValidity.reason}]")
setFailedMetadataAndDisableJob(job, jobValidity.reason)
return
}
is RollupJobValidationResult.Failure -> {
logger.error("Failed to validate [${job.id}]: [${jobValidity.message}]")
setFailedMetadataAndDisableJob(job, jobValidity.message)
return
}
else -> {}
}
// Anything related to creating, reading, and deleting metadata should not require role's context
var metadata = when (val initMetadataResult = rollupMetadataService.init(job)) {
is MetadataResult.Success -> initMetadataResult.metadata
is MetadataResult.NoMetadata -> {
logger.info("Init metadata NoMetadata returning early")
return
} // No-op this execution
is MetadataResult.Failure ->
throw RollupMetadataException("Failed to initialize rollup metadata", initMetadataResult.cause)
}
if (metadata.status == RollupMetadata.Status.FAILED) {
logger.info("Metadata status is FAILED, disabling job $metadata")
disableJob(job, metadata)
return
}
// If metadata was created for the first time, update job with the id
var updatableJob = job
if (updatableJob.metadataID == null && metadata.status == RollupMetadata.Status.INIT) {
when (val updateRollupJobResult = updateRollupJob(updatableJob.copy(metadataID = metadata.id), metadata)) {
is RollupJobResult.Success -> updatableJob = updateRollupJobResult.rollup
is RollupJobResult.Failure -> {
logger.error(
"Failed to update the rollup job [${updatableJob.id}] with metadata id [${metadata.id}]", updateRollupJobResult.cause
)
return // Exit runner early
}
}
}
val result = withClosableContext(
IndexManagementSecurityContext(job.id, settings, threadPool.threadContext, job.user)
) {
rollupMapperService.attemptCreateRollupTargetIndex(updatableJob, clusterConfigurationProvider.hasLegacyPlugin)
}
when (result) {
is RollupJobValidationResult.Failure -> {
setFailedMetadataAndDisableJob(updatableJob, result.message, metadata)
return
}
is RollupJobValidationResult.Invalid -> {
setFailedMetadataAndDisableJob(updatableJob, result.reason, metadata)
return
}
else -> {}
}
while (rollupSearchService.shouldProcessRollup(updatableJob, metadata)) {
do {
try {
val rollupSearchResult = withClosableContext(
IndexManagementSecurityContext(job.id, settings, threadPool.threadContext, job.user)
) {
rollupSearchService.executeCompositeSearch(updatableJob, metadata)
}
val rollupResult = when (rollupSearchResult) {
is RollupSearchResult.Success -> {
val compositeRes: InternalComposite = rollupSearchResult.searchResponse.aggregations.get(updatableJob.id)
metadata = metadata.incrementStats(rollupSearchResult.searchResponse, compositeRes)
val rollupIndexResult = withClosableContext(
IndexManagementSecurityContext(job.id, settings, threadPool.threadContext, job.user)
) {
rollupIndexer.indexRollups(updatableJob, compositeRes)
}
when (rollupIndexResult) {
is RollupIndexResult.Success -> RollupResult.Success(compositeRes, rollupIndexResult.stats)
is RollupIndexResult.Failure -> RollupResult.Failure(rollupIndexResult.message, rollupIndexResult.cause)
}
}
is RollupSearchResult.Failure -> {
RollupResult.Failure(rollupSearchResult.message, rollupSearchResult.cause)
}
}
when (rollupResult) {
is RollupResult.Success -> {
metadata = rollupMetadataService.updateMetadata(
updatableJob,
metadata.mergeStats(rollupResult.stats), rollupResult.internalComposite
)
updatableJob = withClosableContext(
IndexManagementSecurityContext(job.id, settings, threadPool.threadContext, null)
) {
client.suspendUntil { listener: ActionListener<GetRollupResponse> ->
execute(GetRollupAction.INSTANCE, GetRollupRequest(updatableJob.id, null, "_local"), listener)
}.rollup ?: error("Unable to get rollup job")
}
}
is RollupResult.Failure -> {
rollupMetadataService.updateMetadata(
metadata.copy(status = RollupMetadata.Status.FAILED, failureReason = rollupResult.cause.message)
)
}
}
val renewedLock = renewLockForScheduledJob(context, updatableLock, backoffPolicy)
if (renewedLock == null) {
// If we fail to renew the lock it doesn't mean we need to perm fail the job, we can just return early
// and let the next execution try to process the data from where this one left off
releaseLockForScheduledJob(context, updatableLock)
return
} else {
updatableLock = renewedLock
}
} catch (e: RollupMetadataException) {
// Rethrow this exception so it doesn't get consumed here
logger.info("RollupMetadataException being thrown", e)
throw e
} catch (e: Exception) {
// TODO: Should update metadata and disable job here instead of allowing the rollup to keep going
logger.error("Failed to rollup ", e)
releaseLockForScheduledJob(context, updatableLock)
return
}
} while (metadata.afterKey != null)
}
if (!updatableJob.continuous) {
if (listOf(RollupMetadata.Status.STOPPED, RollupMetadata.Status.FINISHED, RollupMetadata.Status.FAILED).contains(metadata.status)) {
disableJob(updatableJob, metadata)
}
}
// If we have been constantly renewing the lock then the seqNo/primaryTerm will have changed
// and the releaseLock call outside of runRollupJob will fail, so release here with updatableLock
// and outside just in case we returned early at a different point (attempting to release twice won't hurt)
releaseLockForScheduledJob(context, updatableLock)
} catch (e: RollupMetadataException) {
// In most scenarios in the runner, the metadata will be used to communicate the result to the user
// If change to the metadata itself fails, there is nothing else to relay state change
// In these cases, the cause of the metadata operation will be logged here and the runner execution will exit
logger.error(e.message, e.cause)
releaseLockForScheduledJob(context, updatableLock)
}
}
/**
* Updates a rollup job.
*
* Takes in the metadata for the rollup job as well. This metadata should be create/updated
* before passing it to this method as it will not create/update the metadata during normal conditions.
*
* However, in the case that the update to the rollup job fails, the provided metadata will be set to FAILED
* status and updated to reflect the change.
*/
private suspend fun updateRollupJob(job: Rollup, metadata: RollupMetadata): RollupJobResult {
try {
return withClosableContext(
IndexManagementSecurityContext(job.id, settings, threadPool.threadContext, null)
) {
val req = IndexRollupRequest(rollup = job, refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE)
val res: IndexRollupResponse = client.suspendUntil { execute(IndexRollupAction.INSTANCE, req, it) }
// TODO: Verify the seqNo/primterm got updated
return@withClosableContext RollupJobResult.Success(res.rollup)
}
} catch (e: Exception) {
// TODO: Catching general exceptions for now, can make more granular
// Set metadata to failed since update to rollup job failed
val errorMessage = "An error occurred when updating rollup job [${job.id}]"
return when (val setFailedMetadataResult = rollupMetadataService.setFailedMetadata(job, errorMessage, metadata)) {
is MetadataResult.Success -> RollupJobResult.Failure(errorMessage, e)
// If the metadata update failed as well, throw an exception to end the runner execution
is MetadataResult.Failure ->
throw RollupMetadataException(setFailedMetadataResult.message, setFailedMetadataResult.cause)
// Should not get NoMetadata here
is MetadataResult.NoMetadata ->
throw RollupMetadataException("Unexpected state when updating metadata", null)
}
}
}
// TODO: Source index could be a pattern but it's used at runtime so it could match new indices which weren't matched before
// which means we always need to validate the source index on every execution?
@Suppress("ReturnCount", "ComplexMethod")
private suspend fun isJobValid(job: Rollup): RollupJobValidationResult {
return withClosableContext(
IndexManagementSecurityContext(job.id, settings, threadPool.threadContext, job.user)
) {
var metadata: RollupMetadata? = null
if (job.metadataID != null) {
logger.debug("Fetching associated metadata for rollup job [${job.id}]")
metadata = when (val getMetadataResult = rollupMetadataService.getExistingMetadata(job)) {
is MetadataResult.Success -> getMetadataResult.metadata
is MetadataResult.NoMetadata -> null
is MetadataResult.Failure ->
throw RollupMetadataException("Failed to get existing rollup metadata [${job.metadataID}]", getMetadataResult.cause)
}
}
logger.debug("Validating source index [${job.sourceIndex}] for rollup job [${job.id}]")
when (val sourceIndexValidationResult = rollupMapperService.isSourceIndexValid(job)) {
is RollupJobValidationResult.Valid -> {
} // No action taken when valid
else -> return@withClosableContext sourceIndexValidationResult
}
// we validate target index only if there is metadata document in the rollup
if (metadata != null) {
logger.debug("Attempting to create/validate target index [${job.targetIndex}] for rollup job [${job.id}]")
return@withClosableContext rollupMapperService.attemptCreateRollupTargetIndex(job, clusterConfigurationProvider.hasLegacyPlugin)
}
return@withClosableContext RollupJobValidationResult.Valid
}
}
/**
* Sets a failed metadata (updating an existing metadata if provided, otherwise creating a new one) and disables the job.
*
* Returns true if disabling the job was successful. If any metadata operations fail along the way, RollupMetadataException
* is thrown to be caught by the runner.
*/
private suspend fun setFailedMetadataAndDisableJob(job: Rollup, reason: String, existingMetadata: RollupMetadata? = null): Boolean {
val updatedMetadata = when (val setFailedMetadataResult = rollupMetadataService.setFailedMetadata(job, reason, existingMetadata)) {
is MetadataResult.Success -> setFailedMetadataResult.metadata
is MetadataResult.Failure ->
throw RollupMetadataException(setFailedMetadataResult.message, setFailedMetadataResult.cause)
// Should not get NoMetadata here
is MetadataResult.NoMetadata ->
throw RollupMetadataException("Unexpected state when setting failed metadata", null)
}
return disableJob(job, updatedMetadata)
}
/**
* Disables a given job. Also takes in an existing metadata to replace the metadataID of the job
* if this was a newly created metadata.
*
* This method will return true or false based on whether or not updating the job was successful. When
* calling this method, it can be assumed that when the response is false, the failed metadata was updated
* to relay the failure. If the metadata update failed, a RollupMetadataException would be thrown which gets
* caught in the runner so that case does not need to be explicitly handled by the caller of the method.
*
* Note: Set metadata to failed and ensure it's updated before passing it to this method because it
* will not update the metadata (unless updateRollupJob job fails).
*/
private suspend fun disableJob(job: Rollup, metadata: RollupMetadata): Boolean {
val updatedRollupJob = if (metadata.id != job.metadataID) {
job.copy(metadataID = metadata.id, enabled = false, jobEnabledTime = null)
} else {
job.copy(enabled = false, jobEnabledTime = null)
}
return when (val updateRollupJobResult = updateRollupJob(updatedRollupJob, metadata)) {
is RollupJobResult.Success -> true
is RollupJobResult.Failure -> {
logger.error("Failed to disable rollup job [${job.id}]", updateRollupJobResult.cause)
false
}
}
}
}
sealed class RollupJobResult {
data class Success(val rollup: Rollup) : RollupJobResult()
data class Failure(val message: String = "An error occurred for rollup job", val cause: Exception) : RollupJobResult()
}
sealed class RollupResult {
data class Success(val internalComposite: InternalComposite, val stats: RollupStats) : RollupResult()
data class Failure(val message: String = "An error occurred while rolling up", val cause: Exception) : RollupResult()
}