Skip to content

Commit

Permalink
Adds implementation for the delay feature in rollup jobs (#147)
Browse files Browse the repository at this point in the history
* Adds delay implementation for rollup jobs

* Removes non-continuous delay implementation

* Adds additional rollup delay tests

Signed-off-by: Clay Downs <downsrob@amazon.com>
  • Loading branch information
downsrob authored Oct 13, 2021
1 parent 11883ae commit 6041689
Show file tree
Hide file tree
Showing 10 changed files with 268 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ class RollupSearchService(
logger.debug("Non-continuous job [${rollup.id}] is not processing next window [$metadata]")
return false
} else {
return hasNextFullWindow(metadata) // TODO: Behavior when next full window but 0 docs/afterkey is null
return hasNextFullWindow(rollup, metadata) // TODO: Behavior when next full window but 0 docs/afterkey is null
}
}

private fun hasNextFullWindow(metadata: RollupMetadata): Boolean {
return Instant.now().isAfter(metadata.continuous!!.nextWindowEndTime) // TODO: !!
private fun hasNextFullWindow(rollup: Rollup, metadata: RollupMetadata): Boolean {
return Instant.now().isAfter(metadata.continuous!!.nextWindowEndTime.plusMillis(rollup.delay ?: 0)) // TODO: !!
}

@Suppress("ComplexMethod")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ data class Rollup(
val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
val enabled: Boolean,
val schemaVersion: Long,
val jobSchedule: Schedule,
var jobSchedule: Schedule,
val jobLastUpdatedTime: Instant,
val jobEnabledTime: Instant?,
val description: String,
Expand All @@ -83,12 +83,26 @@ data class Rollup(
} else {
require(jobEnabledTime == null) { "Job enabled time must not be present if the job is disabled" }
}
// Copy the delay parameter of the job into the job scheduler for continuous jobs only
if (jobSchedule.delay != delay && continuous) {
jobSchedule = when (jobSchedule) {
is CronSchedule -> {
val cronSchedule = jobSchedule as CronSchedule
CronSchedule(cronSchedule.cronExpression, cronSchedule.timeZone, delay ?: 0)
}
is IntervalSchedule -> {
val intervalSchedule = jobSchedule as IntervalSchedule
IntervalSchedule(intervalSchedule.startTime, intervalSchedule.interval, intervalSchedule.unit, delay ?: 0)
}
else -> jobSchedule
}
}
when (jobSchedule) {
is CronSchedule -> {
// Job scheduler already correctly throws errors for this
}
is IntervalSchedule -> {
require(jobSchedule.interval >= MINIMUM_JOB_INTERVAL) { "Rollup job schedule interval must be greater than 0" }
require((jobSchedule as IntervalSchedule).interval >= MINIMUM_JOB_INTERVAL) { "Rollup job schedule interval must be greater than 0" }
}
}
require(sourceIndex != targetIndex) { "Your source and target index cannot be the same" }
Expand All @@ -97,7 +111,10 @@ data class Rollup(
}
require(dimensions.first().type == Dimension.Type.DATE_HISTOGRAM) { "The first dimension must be a date histogram" }
require(pageSize in MINIMUM_PAGE_SIZE..MAXIMUM_PAGE_SIZE) { "Page size must be between 1 and 10,000" }
if (delay != null) require(delay >= MINIMUM_DELAY) { "Delay must be non-negative if set" }
if (delay != null) {
require(delay >= MINIMUM_DELAY) { "Delay must be non-negative if set" }
require(delay <= Instant.now().toEpochMilli()) { "Delay must be less than the current unix time" }
}
}

override fun isEnabled() = enabled
Expand Down Expand Up @@ -331,7 +348,7 @@ data class Rollup(
// TODO: Make startTime public in Job Scheduler so we can just directly check the value
if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
if (schedule is IntervalSchedule) {
schedule = IntervalSchedule(Instant.now(), schedule.interval, schedule.unit)
schedule = IntervalSchedule(Instant.now(), schedule.interval, schedule.unit, schedule.delay ?: 0)
}
}
return Rollup(
Expand Down
20 changes: 19 additions & 1 deletion src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"_meta" : {
"schema_version": 11
"schema_version": 12
},
"dynamic": "strict",
"properties": {
Expand Down Expand Up @@ -600,6 +600,9 @@
"start_time": {
"type": "date",
"format": "strict_date_time||epoch_millis"
},
"schedule_delay": {
"type": "long"
}
}
},
Expand All @@ -610,6 +613,9 @@
},
"timezone": {
"type": "keyword"
},
"schedule_delay": {
"type": "long"
}
}
}
Expand Down Expand Up @@ -792,6 +798,9 @@
"start_time": {
"type": "date",
"format": "strict_date_time||epoch_millis"
},
"schedule_delay": {
"type": "long"
}
}
},
Expand All @@ -802,6 +811,9 @@
},
"timezone": {
"type": "keyword"
},
"schedule_delay": {
"type": "long"
}
}
}
Expand Down Expand Up @@ -1046,6 +1058,9 @@
"start_time": {
"type": "date",
"format": "strict_date_time||epoch_millis"
},
"schedule_delay": {
"type": "long"
}
}
},
Expand All @@ -1056,6 +1071,9 @@
},
"timezone": {
"type": "keyword"
},
"schedule_delay": {
"type": "long"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import javax.management.remote.JMXServiceURL

abstract class IndexManagementRestTestCase : ODFERestTestCase() {

val configSchemaVersion = 11
val configSchemaVersion = 12
val historySchemaVersion = 3

// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ fun randomRollup(): Rollup {
metadataID = if (OpenSearchRestTestCase.randomBoolean()) null else OpenSearchRestTestCase.randomAlphaOfLength(10),
roles = OpenSearchRestTestCase.randomList(10) { OpenSearchRestTestCase.randomAlphaOfLength(10) },
pageSize = OpenSearchRestTestCase.randomIntBetween(1, 10000),
delay = OpenSearchRestTestCase.randomNonNegativeLong(),
delay = 0,
continuous = OpenSearchRestTestCase.randomBoolean(),
dimensions = randomRollupDimensions(),
metrics = OpenSearchRestTestCase.randomList(20, ::randomRollupMetrics).distinctBy { it.targetField },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
package org.opensearch.indexmanagement.rollup.model

import org.opensearch.indexmanagement.randomInstant
import org.opensearch.indexmanagement.randomSchedule
import org.opensearch.indexmanagement.rollup.randomDateHistogram
import org.opensearch.indexmanagement.rollup.randomRollup
import org.opensearch.indexmanagement.rollup.randomTerms
Expand Down Expand Up @@ -95,9 +96,30 @@ class RollupTests : OpenSearchTestCase() {
assertFailsWith(IllegalArgumentException::class, "Delay was negative") {
randomRollup().copy(delay = -1)
}
assertFailsWith(IllegalArgumentException::class, "Delay was too high") {
randomRollup().copy(delay = Long.MAX_VALUE)
}

// These should successfully parse without exceptions
randomRollup().copy(delay = 0)
randomRollup().copy(delay = 930490)
randomRollup().copy(delay = null)
}

fun `test delay applies to continuous rollups only`() {
// Continuous rollup schedule matches delay
val newDelay: Long = 500
val continuousRollup = randomRollup().copy(
delay = newDelay,
continuous = true
)
assertEquals(newDelay, continuousRollup.jobSchedule.delay)
// Non continuous rollup schedule should have null delay
val nonContinuousRollup = randomRollup().copy(
jobSchedule = randomSchedule(),
delay = newDelay,
continuous = false
)
assertNull(nonContinuousRollup.jobSchedule.delay)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class WriteableTests : OpenSearchTestCase() {
}

fun `test rollup as stream`() {
val rollup = randomRollup()
val rollup = randomRollup().copy(delay = randomLongBetween(0, 60000000))
val out = BytesStreamOutput().also { rollup.writeTo(it) }
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val streamedRollup = Rollup(sin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class XContentTests : OpenSearchTestCase() {
}

fun `test rollup parsing with type`() {
val rollup = randomRollup()
val rollup = randomRollup().copy(delay = randomLongBetween(0, 60000000))
val rollupString = rollup.toJsonString()
val parser = parserWithType(rollupString)
val parsedRollup = parser.parseWithType(rollup.id, rollup.seqNo, rollup.primaryTerm, Rollup.Companion::parse)
Expand All @@ -135,7 +135,7 @@ class XContentTests : OpenSearchTestCase() {
}

fun `test rollup parsing without type`() {
val rollup = randomRollup()
val rollup = randomRollup().copy(delay = randomLongBetween(0, 60000000))
val rollupString = rollup.toJsonString(XCONTENT_WITHOUT_TYPE)
val parsedRollup = Rollup.parse(parser(rollupString), rollup.id, rollup.seqNo, rollup.primaryTerm)
// roles are deprecated and not populated in toXContent and parsed as part of parse
Expand Down
Loading

0 comments on commit 6041689

Please sign in to comment.