Skip to content

Commit

Permalink
Merge branch 'main' into step3
Browse files Browse the repository at this point in the history
  • Loading branch information
wnbts authored Jan 19, 2022
2 parents d39848f + 1c3e523 commit 04c7980
Show file tree
Hide file tree
Showing 21 changed files with 751 additions and 123 deletions.
1 change: 0 additions & 1 deletion build-tools/repositories.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,4 @@ repositories {
mavenLocal()
maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" }
mavenCentral()
jcenter()
}
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ buildscript {
maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" }
mavenCentral()
maven { url "https://plugins.gradle.org/m2/" }
jcenter()
}

dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ public enum FunctionName {
LINEAR_REGRESSION,
KMEANS,
SAMPLE_ALGO,
LOCAL_SAMPLE_CALCULATOR
LOCAL_SAMPLE_CALCULATOR,
ANOMALY_LOCALIZATION
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
*/

package org.opensearch.ml.engine.algorithms.anomalylocalization;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import org.opensearch.common.ParseField;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.ml.common.parameter.FunctionName;
import org.opensearch.ml.common.parameter.Input;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.AggregatorFactories;

import lombok.AllArgsConstructor;
import lombok.Data;

import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;

/**
* Information about aggregate, time, etc to localize.
*/
@Data
@AllArgsConstructor
public class AnomalyLocalizationInput implements Input {

public static final String FIELD_INDEX_NAME = "index_name";
public static final String FIELD_ATTTRIBUTE_FIELD_NAMES = "attribute_field_names";
public static final String FIELD_AGGREGATIONS = "aggregations";
public static final String FIELD_TIME_FIELD_NAME = "time_field_name";
public static final String FIELD_START_TIME = "start_time";
public static final String FIELD_END_TIME = "end_time";
public static final String FIELD_MIN_TIME_INTERVAL = "min_time_interval";
public static final String FIELD_NUM_OUTPUTS = "num_outputs";
public static final String FIELD_ANOMALY_START_TIME = "anomaly_start_time";
public static final String FIELD_FILTER_QUERY = "filter_query";
public static final NamedXContentRegistry.Entry XCONTENT_REGISTRY_ENTRY = new NamedXContentRegistry.Entry(
org.opensearch.ml.common.parameter.Input.class,
new ParseField(FunctionName.ANOMALY_LOCALIZATION.name()),
parser -> parse(parser)
);

public static AnomalyLocalizationInput parse(XContentParser parser) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
String indexName = null;
List<String> attributeFieldNames = new ArrayList<>();
List<AggregationBuilder> aggregations = new ArrayList<>();
String timeFieldName = null;
long startTime = 0;
long endTime = 0;
long minTimeInterval = 0;
int numOutputs = 0;
Optional<Long> anomalyStartTime = Optional.empty();
Optional<QueryBuilder> filterQuery = Optional.empty();

while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
switch (parser.currentName()) {
case FIELD_INDEX_NAME:
parser.nextToken();
indexName = parser.text();
break;
case FIELD_ATTTRIBUTE_FIELD_NAMES:
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
attributeFieldNames.add(parser.text());
}
ensureExpectedToken(XContentParser.Token.END_ARRAY, parser.currentToken(), parser);
break;
case FIELD_AGGREGATIONS:
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
aggregations.addAll(AggregatorFactories.parseAggregators(parser).getAggregatorFactories());
ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.currentToken(), parser);
}
ensureExpectedToken(XContentParser.Token.END_ARRAY, parser.currentToken(), parser);
break;
case FIELD_TIME_FIELD_NAME:
parser.nextToken();
timeFieldName = parser.text();
break;
case FIELD_START_TIME:
parser.nextToken();
startTime = parser.longValue();
break;
case FIELD_END_TIME:
parser.nextToken();
endTime = parser.longValue();
break;
case FIELD_MIN_TIME_INTERVAL:
parser.nextToken();
minTimeInterval = parser.longValue();
break;
case FIELD_NUM_OUTPUTS:
parser.nextToken();
numOutputs = parser.intValue();
break;
case FIELD_ANOMALY_START_TIME:
parser.nextToken();
anomalyStartTime = Optional.of(parser.longValue());
break;
case FIELD_FILTER_QUERY:
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
filterQuery = Optional.of(parseInnerQueryBuilder(parser));
ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.currentToken(), parser);
break;
default:
parser.skipChildren();
break;
}
}
return new AnomalyLocalizationInput(indexName, attributeFieldNames, aggregations, timeFieldName, startTime, endTime,
minTimeInterval, numOutputs,
anomalyStartTime, filterQuery);
}

private final String indexName; // name pattern of the data index
private final List<String> attributeFieldNames; // name of the field to localize/slice with
private final List<AggregationBuilder> aggregations; // aggregate data to localize/slice on
private final String timeFieldName; // name of the timestamp field
private final long startTime; // start of entire time range, including normal and anomaly
private final long endTime; // end of entire time range, including normal and anomaly
private final long minTimeInterval; // minimal time interval/bucket
private final int numOutputs; // max number of values from localization/slicing
private final Optional<Long> anomalyStartTime; // time when anomaly change starts
private final Optional<QueryBuilder> filterQuery; // filter of data

public AnomalyLocalizationInput(StreamInput in) throws IOException {
this.indexName = in.readString();
this.attributeFieldNames = Arrays.asList(in.readStringArray());
this.aggregations = in.readNamedWriteableList(AggregationBuilder.class);
this.timeFieldName = in.readString();
this.startTime = in.readLong();
this.endTime = in.readLong();
this.minTimeInterval = in.readLong();
this.numOutputs = in.readInt();
this.anomalyStartTime = Optional.ofNullable(in.readOptionalLong());
this.filterQuery = Optional.ofNullable(in.readOptionalNamedWriteable(QueryBuilder.class));
}

@Override
public FunctionName getFunctionName() {
return FunctionName.ANOMALY_LOCALIZATION;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(FIELD_INDEX_NAME, indexName);
builder.field(FIELD_ATTTRIBUTE_FIELD_NAMES, attributeFieldNames);
builder.startArray(FIELD_AGGREGATIONS);
for (AggregationBuilder agg : aggregations) {
builder.startObject();
builder.value(agg);
builder.endObject();
}
builder.endArray();
builder.field(FIELD_TIME_FIELD_NAME, timeFieldName);
builder.field(FIELD_START_TIME, startTime);
builder.field(FIELD_END_TIME, endTime);
builder.field(FIELD_MIN_TIME_INTERVAL, minTimeInterval);
builder.field(FIELD_NUM_OUTPUTS, numOutputs);
if (anomalyStartTime.isPresent()) {
builder.field(FIELD_ANOMALY_START_TIME, anomalyStartTime.get());
}
if (filterQuery.isPresent()) {
builder.field(FIELD_FILTER_QUERY, filterQuery.get());
}
builder.endObject();
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(indexName);
out.writeStringArray(attributeFieldNames.toArray(new String[0]));
out.writeNamedWriteableList(aggregations);
out.writeString(timeFieldName);
out.writeLong(startTime);
out.writeLong(endTime);
out.writeLong(minTimeInterval);
out.writeInt(numOutputs);
out.writeOptionalLong(anomalyStartTime.orElse(null));
out.writeOptionalNamedWriteable(filterQuery.orElse(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ public interface AnomalyLocalizer {
* @param input Information about aggregation and metadata.
* @param listener Listener to localized details or exception.
*/
void getLocalizationResults(Input input, ActionListener<AnomalyLocalizationOutput> listener);
void getLocalizationResults(AnomalyLocalizationInput input, ActionListener<AnomalyLocalizationOutput> listener);
}
Loading

0 comments on commit 04c7980

Please sign in to comment.