Skip to content

Commit

Permalink
Support forecast tasks in profile API; enable index field modificatio…
Browse files Browse the repository at this point in the history
…ns (#1316)

This PR adds support for forecast task targets in profile API and enables modification of categorical and custom result index fields
* Support forecast task targets in profile API: Previously, the profile API returned empty results for forecast task targets. This update adds support for them. For details, refer to ForecastTask.
* Allow modification of categorical and custom result index fields in forecasting: These fields are not editable in Anomaly Detection (AD). Implemented a lastUiBreakingChangeTime field in the config index to manage changes. Whenever the categorical or custom result index fields are updated, lastUiBreakingChangeTime is refreshed. The UI will not display results before this time to prevent inconsistencies. For details, refer to Config and AbstractTimeSeriesActionHandler.

Testing Done:
* Added ITs for both scenarios.

Signed-off-by: Kaituo Li <kaituo@amazon.com>
  • Loading branch information
kaituo authored Sep 18, 2024
1 parent dbf8785 commit 062db14
Show file tree
Hide file tree
Showing 32 changed files with 1,078 additions and 102 deletions.
4 changes: 0 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -696,15 +696,11 @@ List<String> jacocoExclusions = [

// TODO: add test coverage (kaituo)
'org.opensearch.forecast.*',
'org.opensearch.ad.transport.ADHCImputeNodeResponse',
'org.opensearch.ad.transport.GetAnomalyDetectorTransportAction',
'org.opensearch.timeseries.transport.BooleanNodeResponse',
'org.opensearch.timeseries.ml.TimeSeriesSingleStreamCheckpointDao',
'org.opensearch.timeseries.transport.JobRequest',
'org.opensearch.timeseries.transport.handler.ResultBulkIndexingHandler',
'org.opensearch.timeseries.ml.Inferencer',
'org.opensearch.timeseries.transport.SingleStreamResultRequest',
'org.opensearch.timeseries.transport.BooleanResponse',
'org.opensearch.timeseries.rest.handler.IndexJobActionHandler.1',
'org.opensearch.timeseries.transport.SuggestConfigParamResponse',
'org.opensearch.timeseries.transport.SuggestConfigParamRequest',
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/opensearch/ad/model/ADTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ public static ADTask parse(XContentParser parser, String taskId) throws IOExcept
detector.getCustomResultIndexMinSize(),
detector.getCustomResultIndexMinAge(),
detector.getCustomResultIndexTTL(),
detector.getFlattenResultIndexMapping()
detector.getFlattenResultIndexMapping(),
detector.getLastBreakingUIChangeTime()
);
return new Builder()
.taskId(parsedTaskId)
Expand Down
17 changes: 14 additions & 3 deletions src/main/java/org/opensearch/ad/model/AnomalyDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ public Integer getShingleSize(Integer customShingleSize) {
* @param customResultIndexMinAge custom result index lifecycle management min age condition
* @param customResultIndexTTL custom result index lifecycle management ttl
* @param flattenResultIndexMapping flag to indicate whether to flatten result index mapping or not
* @param lastBreakingUIChangeTime last update time to configuration that can break UI and we have
* to display updates from the changed time
*/
public AnomalyDetector(
String detectorId,
Expand Down Expand Up @@ -178,7 +180,8 @@ public AnomalyDetector(
Integer customResultIndexMinSize,
Integer customResultIndexMinAge,
Integer customResultIndexTTL,
Boolean flattenResultIndexMapping
Boolean flattenResultIndexMapping,
Instant lastBreakingUIChangeTime
) {
super(
detectorId,
Expand Down Expand Up @@ -206,7 +209,8 @@ public AnomalyDetector(
customResultIndexMinSize,
customResultIndexMinAge,
customResultIndexTTL,
flattenResultIndexMapping
flattenResultIndexMapping,
lastBreakingUIChangeTime
);

checkAndThrowValidationErrors(ValidationAspect.DETECTOR);
Expand Down Expand Up @@ -284,6 +288,7 @@ public AnomalyDetector(StreamInput input) throws IOException {
this.customResultIndexMinAge = input.readOptionalInt();
this.customResultIndexTTL = input.readOptionalInt();
this.flattenResultIndexMapping = input.readOptionalBoolean();
this.lastUIBreakingChangeTime = input.readOptionalInstant();
}

public XContentBuilder toXContent(XContentBuilder builder) throws IOException {
Expand Down Expand Up @@ -350,6 +355,7 @@ public void writeTo(StreamOutput output) throws IOException {
output.writeOptionalInt(customResultIndexMinAge);
output.writeOptionalInt(customResultIndexTTL);
output.writeOptionalBoolean(flattenResultIndexMapping);
output.writeOptionalInstant(lastUIBreakingChangeTime);
}

@Override
Expand Down Expand Up @@ -447,6 +453,7 @@ public static AnomalyDetector parse(
Integer customResultIndexMinAge = null;
Integer customResultIndexTTL = null;
Boolean flattenResultIndexMapping = null;
Instant lastBreakingUIChangeTime = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -584,6 +591,9 @@ public static AnomalyDetector parse(
case FLATTEN_RESULT_INDEX_MAPPING:
flattenResultIndexMapping = onlyParseBooleanValue(parser);
break;
case BREAKING_UI_CHANGE_TIME:
lastBreakingUIChangeTime = ParseUtils.toInstant(parser);
break;
default:
parser.skipChildren();
break;
Expand Down Expand Up @@ -615,7 +625,8 @@ public static AnomalyDetector parse(
customResultIndexMinSize,
customResultIndexMinAge,
customResultIndexTTL,
flattenResultIndexMapping
flattenResultIndexMapping,
lastBreakingUIChangeTime
);
detector.setDetectionDateRange(detectionDateRange);
return detector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
: WriteRequest.RefreshPolicy.IMMEDIATE;
RestRequest.Method method = request.getHttpRequest().method();

if (method == RestRequest.Method.POST && detectorId != AnomalyDetector.NO_ID) {
// reset detector to empty string detectorId is only meant for updating detector
detectorId = AnomalyDetector.NO_ID;
}

IndexAnomalyDetectorRequest indexAnomalyDetectorRequest = new IndexAnomalyDetectorRequest(
detectorId,
seqNo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ protected AnomalyDetector copyConfig(User user, Config config) {
config.getCustomResultIndexMinSize(),
config.getCustomResultIndexMinAge(),
config.getCustomResultIndexTTL(),
config.getFlattenResultIndexMapping()
config.getFlattenResultIndexMapping(),
breakingUIChange ? Instant.now() : config.getLastBreakingUIChangeTime()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,18 @@ public class ForecastTaskProfileRunner implements TaskProfileRunner<ForecastTask

@Override
public void getTaskProfile(ForecastTask configLevelTask, ActionListener<ForecastTaskProfile> listener) {
// return null since forecasting have no in-memory task profiles as AD
listener.onResponse(null);
// return null in other fields since forecasting have no in-memory task profiles as AD
listener
.onResponse(
new ForecastTaskProfile(
configLevelTask,
null,
null,
null,
configLevelTask == null ? null : configLevelTask.getTaskId(),
null
)
);
}

}
9 changes: 6 additions & 3 deletions src/main/java/org/opensearch/forecast/model/ForecastTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ public static ForecastTask parse(XContentParser parser, String taskId) throws IO
forecaster.getCustomResultIndexMinSize(),
forecaster.getCustomResultIndexMinAge(),
forecaster.getCustomResultIndexTTL(),
forecaster.getFlattenResultIndexMapping()
forecaster.getFlattenResultIndexMapping(),
forecaster.getLastBreakingUIChangeTime()
);
return new Builder()
.taskId(parsedTaskId)
Expand Down Expand Up @@ -375,10 +376,12 @@ public static ForecastTask parse(XContentParser parser, String taskId) throws IO
@Generated
@Override
public boolean equals(Object other) {
if (this == other)
if (this == other) {
return true;
if (other == null || getClass() != other.getClass())
}
if (other == null || getClass() != other.getClass()) {
return false;
}
ForecastTask that = (ForecastTask) other;
return super.equals(that)
&& Objects.equal(getForecaster(), that.getForecaster())
Expand Down
13 changes: 10 additions & 3 deletions src/main/java/org/opensearch/forecast/model/Forecaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public Forecaster(
Integer customResultIndexMinSize,
Integer customResultIndexMinAge,
Integer customResultIndexTTL,
Boolean flattenResultIndexMapping
Boolean flattenResultIndexMapping,
Instant lastBreakingUIChangeTime
) {
super(
forecasterId,
Expand Down Expand Up @@ -163,7 +164,8 @@ public Forecaster(
customResultIndexMinSize,
customResultIndexMinAge,
customResultIndexTTL,
flattenResultIndexMapping
flattenResultIndexMapping,
lastBreakingUIChangeTime
);

checkAndThrowValidationErrors(ValidationAspect.FORECASTER);
Expand Down Expand Up @@ -306,6 +308,7 @@ public static Forecaster parse(
Integer customResultIndexMinAge = null;
Integer customResultIndexTTL = null;
Boolean flattenResultIndexMapping = null;
Instant lastBreakingUIChangeTime = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -437,6 +440,9 @@ public static Forecaster parse(
case FLATTEN_RESULT_INDEX_MAPPING:
flattenResultIndexMapping = parser.booleanValue();
break;
case BREAKING_UI_CHANGE_TIME:
lastBreakingUIChangeTime = ParseUtils.toInstant(parser);
break;
default:
parser.skipChildren();
break;
Expand Down Expand Up @@ -468,7 +474,8 @@ public static Forecaster parse(
customResultIndexMinSize,
customResultIndexMinAge,
customResultIndexTTL,
flattenResultIndexMapping
flattenResultIndexMapping,
lastBreakingUIChangeTime
);
return forecaster;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
: WriteRequest.RefreshPolicy.IMMEDIATE;
RestRequest.Method method = request.getHttpRequest().method();

if (method == RestRequest.Method.POST && forecasterId != Config.NO_ID) {
// reset detector to empty string detectorId is only meant for updating detector
forecasterId = Config.NO_ID;
}

IndexForecasterRequest indexAnomalyDetectorRequest = new IndexForecasterRequest(
forecasterId,
seqNo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ protected Config copyConfig(User user, Config config) {
config.getCustomResultIndexMinSize(),
config.getCustomResultIndexMinAge(),
config.getCustomResultIndexTTL(),
config.getFlattenResultIndexMapping()
config.getFlattenResultIndexMapping(),
breakingUIChange ? Instant.now() : config.getLastBreakingUIChangeTime()
);
}

Expand Down
19 changes: 18 additions & 1 deletion src/main/java/org/opensearch/timeseries/model/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public abstract class Config implements Writeable, ToXContentObject {
public static final String RESULT_INDEX_FIELD_MIN_AGE = "result_index_min_age";
public static final String RESULT_INDEX_FIELD_TTL = "result_index_ttl";
public static final String FLATTEN_RESULT_INDEX_MAPPING = "flatten_result_index_mapping";
// Changing categorical field, feature attributes, interval, windowDelay, time field, horizon, indices,
// result index would force us to display results only from the most recent update. Otherwise,
// the UI appear cluttered and unclear.
// We cannot use last update time as it would change whenever other fields like name changes.
public static final String BREAKING_UI_CHANGE_TIME = "last_ui_breaking_change_time";

protected String id;
protected Long version;
Expand Down Expand Up @@ -120,6 +125,7 @@ public abstract class Config implements Writeable, ToXContentObject {
protected Integer customResultIndexMinAge;
protected Integer customResultIndexTTL;
protected Boolean flattenResultIndexMapping;
protected Instant lastUIBreakingChangeTime;

public static String INVALID_RESULT_INDEX_NAME_SIZE = "Result index name size must contains less than "
+ MAX_RESULT_INDEX_NAME_SIZE
Expand Down Expand Up @@ -151,7 +157,8 @@ protected Config(
Integer customResultIndexMinSize,
Integer customResultIndexMinAge,
Integer customResultIndexTTL,
Boolean flattenResultIndexMapping
Boolean flattenResultIndexMapping,
Instant lastBreakingUIChangeTime
) {
if (Strings.isBlank(name)) {
errorMessage = CommonMessages.EMPTY_NAME;
Expand Down Expand Up @@ -291,6 +298,7 @@ protected Config(
this.customResultIndexMinAge = Strings.trimToNull(resultIndex) == null ? null : customResultIndexMinAge;
this.customResultIndexTTL = Strings.trimToNull(resultIndex) == null ? null : customResultIndexTTL;
this.flattenResultIndexMapping = Strings.trimToNull(resultIndex) == null ? null : flattenResultIndexMapping;
this.lastUIBreakingChangeTime = lastBreakingUIChangeTime;
}

public int suggestHistory() {
Expand Down Expand Up @@ -335,6 +343,7 @@ public Config(StreamInput input) throws IOException {
this.customResultIndexMinAge = input.readOptionalInt();
this.customResultIndexTTL = input.readOptionalInt();
this.flattenResultIndexMapping = input.readOptionalBoolean();
this.lastUIBreakingChangeTime = input.readOptionalInstant();
}

/*
Expand Down Expand Up @@ -388,6 +397,7 @@ public void writeTo(StreamOutput output) throws IOException {
output.writeOptionalInt(customResultIndexMinAge);
output.writeOptionalInt(customResultIndexTTL);
output.writeOptionalBoolean(flattenResultIndexMapping);
output.writeOptionalInstant(lastUIBreakingChangeTime);
}

public boolean invalidShingleSizeRange(Integer shingleSizeToTest) {
Expand Down Expand Up @@ -525,6 +535,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (flattenResultIndexMapping != null) {
builder.field(FLATTEN_RESULT_INDEX_MAPPING, flattenResultIndexMapping);
}
if (lastUIBreakingChangeTime != null) {
builder.field(BREAKING_UI_CHANGE_TIME, lastUIBreakingChangeTime.toEpochMilli());
}
return builder;
}

Expand Down Expand Up @@ -737,6 +750,10 @@ public Boolean getFlattenResultIndexMapping() {
return flattenResultIndexMapping;
}

public Instant getLastBreakingUIChangeTime() {
return lastUIBreakingChangeTime;
}

/**
* Identifies redundant feature names.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public abstract class AbstractTimeSeriesActionHandler<T extends ActionResponse,
protected final Clock clock;
protected final Settings settings;
protected final ValidationAspect configValidationAspect;
protected boolean breakingUIChange;

public AbstractTimeSeriesActionHandler(
Config config,
Expand Down Expand Up @@ -203,6 +204,7 @@ public AbstractTimeSeriesActionHandler(
this.settings = settings;
this.handler = new ConfigUpdateConfirmer<>(taskManager, transportService);
this.configValidationAspect = configValidationAspect;
this.breakingUIChange = false;
}

/**
Expand Down Expand Up @@ -456,6 +458,11 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S
);
return;
}
} else {
if (!ParseUtils.listEqualsWithoutConsideringOrder(existingConfig.getCategoryFields(), config.getCategoryFields())
|| !Objects.equals(existingConfig.getCustomResultIndexOrAlias(), config.getCustomResultIndexOrAlias())) {
breakingUIChange = true;
}
}

ActionListener<Void> confirmBatchRunningListener = ActionListener
Expand Down Expand Up @@ -675,7 +682,6 @@ protected void validateCategoricalField(
);
}

@SuppressWarnings("unchecked")
protected void searchConfigInputIndices(String configId, boolean indexingDryRun, ActionListener<T> listener) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.query(QueryBuilders.matchAllQuery())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public boolean isAnswerTrue() {

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(answer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public boolean isAnswerTrue() {

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(answer);
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/mappings/anomaly-detection-state.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"dynamic": false,
"_meta": {
"schema_version": 4
"schema_version": 5
},
"properties": {
"schema_version": {
Expand Down
6 changes: 5 additions & 1 deletion src/main/resources/mappings/config.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"dynamic": false,
"_meta": {
"schema_version": 6
"schema_version": 7
},
"properties": {
"schema_version": {
Expand Down Expand Up @@ -232,6 +232,10 @@
},
"flatten_result_index_mapping": {
"type": "boolean"
},
"last_ui_breaking_change_time" : {
"type": "date",
"format": "strict_date_time||epoch_millis"
}
}
}
Loading

0 comments on commit 062db14

Please sign in to comment.