Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integration step 1 and 2 for anomaly localization #113

Merged
merged 2 commits into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ public enum FunctionName {
LINEAR_REGRESSION,
KMEANS,
SAMPLE_ALGO,
LOCAL_SAMPLE_CALCULATOR
LOCAL_SAMPLE_CALCULATOR,
ANOMALY_LOCALIZATION
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,121 @@

package org.opensearch.ml.engine.algorithms.anomalylocalization;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import org.opensearch.common.ParseField;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.ml.common.parameter.FunctionName;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.AggregatorFactories;

import lombok.AllArgsConstructor;
import lombok.Data;

import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;

/**
* Information about aggregate, time, etc to localize.
*/
@Data
public class Input {
@AllArgsConstructor
public class Input implements org.opensearch.ml.common.parameter.Input {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we rename this class name to avoid full class path org.opensearch.ml.common.parameter.Input as they have same class name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, renamed.


public static final String FIELD_INDEX_NAME = "index_name";
public static final String FIELD_ATTTRIBUTE_FIELD_NAMES = "attribute_field_names";
public static final String FIELD_AGGREGATIONS = "aggregations";
public static final String FIELD_TIME_FIELD_NAME = "time_field_name";
public static final String FIELD_START_TIME = "start_time";
public static final String FIELD_END_TIME = "end_time";
public static final String FIELD_MIN_TIME_INTERVAL = "min_time_interval";
public static final String FIELD_NUM_OUTPUTS = "num_outputs";
public static final String FIELD_ANOMALY_START_TIME = "anomaly_start_time";
public static final String FIELD_FILTER_QUERY = "filter_query";
public static final NamedXContentRegistry.Entry XCONTENT_REGISTRY_ENTRY = new NamedXContentRegistry.Entry(
org.opensearch.ml.common.parameter.Input.class,
new ParseField(FunctionName.ANOMALY_LOCALIZATION.name()),
parser -> parse(parser)
);

public static Input parse(XContentParser parser) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
String indexName = null;
List<String> attributeFieldNames = new ArrayList<>();
List<AggregationBuilder> aggregations = new ArrayList<>();
String timeFieldName = null;
long startTime = 0;
long endTime = 0;
long minTimeInterval = 0;
int numOutputs = 0;
Optional<Long> anomalyStartTime = Optional.empty();
Optional<QueryBuilder> filterQuery = Optional.empty();

while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
switch (parser.currentName()) {
case FIELD_INDEX_NAME:
parser.nextToken();
indexName = parser.text();
break;
case FIELD_ATTTRIBUTE_FIELD_NAMES:
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
attributeFieldNames.add(parser.text());
}
ensureExpectedToken(XContentParser.Token.END_ARRAY, parser.currentToken(), parser);
break;
case FIELD_AGGREGATIONS:
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
aggregations.addAll(AggregatorFactories.parseAggregators(parser).getAggregatorFactories());
ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.currentToken(), parser);
}
ensureExpectedToken(XContentParser.Token.END_ARRAY, parser.currentToken(), parser);
break;
case FIELD_TIME_FIELD_NAME:
parser.nextToken();
timeFieldName = parser.text();
break;
case FIELD_START_TIME:
parser.nextToken();
startTime = parser.longValue();
break;
case FIELD_END_TIME:
parser.nextToken();
endTime = parser.longValue();
break;
case FIELD_MIN_TIME_INTERVAL:
parser.nextToken();
minTimeInterval = parser.longValue();
break;
case FIELD_NUM_OUTPUTS:
parser.nextToken();
numOutputs = parser.intValue();
break;
case FIELD_ANOMALY_START_TIME:
parser.nextToken();
anomalyStartTime = Optional.of(parser.longValue());
break;
case FIELD_FILTER_QUERY:
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
filterQuery = Optional.of(parseInnerQueryBuilder(parser));
ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.currentToken(), parser);
break;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skip children for default case?

                default:
                    parser.skipChildren();
                    break;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure added

}
}
return new Input(indexName, attributeFieldNames, aggregations, timeFieldName, startTime, endTime, minTimeInterval, numOutputs,
anomalyStartTime, filterQuery);
}

private final String indexName; // name pattern of the data index
private final List<String> attributeFieldNames; // name of the field to localize/slice with
Expand All @@ -36,4 +138,63 @@ public class Input {
private final int numOutputs; // max number of values from localization/slicing
private final Optional<Long> anomalyStartTime; // time when anomaly change starts
private final Optional<QueryBuilder> filterQuery; // filter of data

public Input(StreamInput in) throws IOException {
this.indexName = in.readString();
this.attributeFieldNames = Arrays.asList(in.readStringArray());
this.aggregations = in.readNamedWriteableList(AggregationBuilder.class);
this.timeFieldName = in.readString();
this.startTime = in.readLong();
this.endTime = in.readLong();
this.minTimeInterval = in.readLong();
this.numOutputs = in.readInt();
this.anomalyStartTime = Optional.ofNullable(in.readOptionalLong());
this.filterQuery = Optional.ofNullable(in.readOptionalNamedWriteable(QueryBuilder.class));
}

@Override
public FunctionName getFunctionName() {
return FunctionName.ANOMALY_LOCALIZATION;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(FIELD_INDEX_NAME, indexName);
builder.field(FIELD_ATTTRIBUTE_FIELD_NAMES, attributeFieldNames);
builder.startArray(FIELD_AGGREGATIONS);
for (AggregationBuilder agg : aggregations) {
builder.startObject();
builder.value(agg);
builder.endObject();
}
builder.endArray();
builder.field(FIELD_TIME_FIELD_NAME, timeFieldName);
builder.field(FIELD_START_TIME, startTime);
builder.field(FIELD_END_TIME, endTime);
builder.field(FIELD_MIN_TIME_INTERVAL, minTimeInterval);
builder.field(FIELD_NUM_OUTPUTS, numOutputs);
if (anomalyStartTime.isPresent()) {
builder.field(FIELD_ANOMALY_START_TIME, anomalyStartTime.get());
}
if (filterQuery.isPresent()) {
builder.field(FIELD_FILTER_QUERY, filterQuery.get());
}
builder.endObject();
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(indexName);
out.writeStringArray(attributeFieldNames.toArray(new String[0]));
out.writeNamedWriteableList(aggregations);
out.writeString(timeFieldName);
out.writeLong(startTime);
out.writeLong(endTime);
out.writeLong(minTimeInterval);
out.writeInt(numOutputs);
out.writeOptionalLong(anomalyStartTime.orElse(null));
out.writeOptionalNamedWriteable(filterQuery.orElse(null));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
*/

package org.opensearch.ml.engine.algorithms.anomalylocalization;

import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;

import org.junit.Test;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchModule;
import org.opensearch.search.aggregations.AggregationBuilders;

import static org.junit.Assert.assertEquals;

public class InputTests {

@Test
public void testXContentFullObject() throws Exception {
Input input = new Input("indexName", Arrays.asList("attribute"), Arrays.asList(AggregationBuilders.max("max").field("field"),
AggregationBuilders.min("min").field("field")), "@timestamp", 0L, 10L, 1L, 2, Optional.of(3L),
Optional.of(QueryBuilders.matchAllQuery()));
XContentBuilder builder = XContentFactory.jsonBuilder();
builder = input.toXContent(builder, null);
String json = Strings.toString(builder);

XContentParser parser = XContentType.JSON.xContent().createParser(new NamedXContentRegistry(new SearchModule(Settings.EMPTY,
false, Collections.emptyList()).getNamedXContents()), null, json);
parser.nextToken();
Input newInput = Input.parse(parser);

assertEquals(input, newInput);
}

@Test
public void testXContentMissingAnomalyStartFilter() throws Exception {
Input input = new Input("indexName", Arrays.asList("attribute"), Arrays.asList(AggregationBuilders.max("max").field("field")),
"@timestamp", 0L, 10L, 1L, 2, Optional.empty(), Optional.empty());
XContentBuilder builder = XContentFactory.jsonBuilder();
builder = input.toXContent(builder, null);
String json = Strings.toString(builder);

XContentParser parser = XContentType.JSON.xContent().createParser(new NamedXContentRegistry(new SearchModule(Settings.EMPTY,
false, Collections.emptyList()).getNamedXContents()), null, json);
parser.nextToken();
Input newInput = Input.parse(parser);

assertEquals(input, newInput);
}

@Test
public void testWriteable() throws Exception {
Input input = new Input("indexName", Arrays.asList("attribute"), Arrays.asList(AggregationBuilders.max("max").field("field"),
AggregationBuilders.min("min").field("field")), "@timestamp", 0L, 10L, 1L, 2, Optional.of(3L),
Optional.of(QueryBuilders.matchAllQuery()));

BytesStreamOutput out = new BytesStreamOutput();
input.writeTo(out);
StreamInput in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(),
new NamedWriteableRegistry(new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedWriteables()));
Input newInput = new Input(in);

assertEquals(input, newInput);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
KMeansParams.XCONTENT_REGISTRY,
LinearRegressionParams.XCONTENT_REGISTRY,
SampleAlgoParams.XCONTENT_REGISTRY,
LocalSampleCalculatorInput.XCONTENT_REGISTRY
LocalSampleCalculatorInput.XCONTENT_REGISTRY,
org.opensearch.ml.engine.algorithms.anomalylocalization.Input.XCONTENT_REGISTRY_ENTRY
);
}
}