Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor parsing logic for Measurement #112

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.MatchQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
Expand Down Expand Up @@ -104,7 +107,9 @@ public List<SearchQueryRecord> read(final String from, final String to) {
try {
SearchResponse searchResponse = client.search(searchRequest).actionGet();
for (SearchHit hit : searchResponse.getHits()) {
SearchQueryRecord record = SearchQueryRecord.getRecord(hit, namedXContentRegistry);
XContentParser parser = XContentType.JSON.xContent()
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString());
SearchQueryRecord record = SearchQueryRecord.fromXContent(parser);
records.add(record);
}
} catch (IndexNotFoundException ignored) {} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,25 @@

import java.io.IOException;
import java.util.Objects;
import org.opensearch.core.common.ParsingException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;

/**
* Measurement that is stored in the SearchQueryRecord. Measurement can be of a specific AggregationType
*/
public class Measurement implements ToXContentObject, Writeable {
private static int DEFAULT_COUNT = 1;

private static final String NUMBER = "number";
private static final String COUNT = "count";
private static final String AGGREGATION_TYPE = "aggregationType";

private AggregationType aggregationType;
private Number number;
private int count;
Expand Down Expand Up @@ -55,6 +62,21 @@ public Measurement(Number number) {
this(number, DEFAULT_COUNT, AggregationType.DEFAULT_AGGREGATION_TYPE);
}

private Measurement() {}

/**
* Construct a measurement from {@link XContentParser}
*
* @param parser {@link XContentParser}
* @return {@link Measurement}
* @throws IOException IOException
*/
public static Measurement fromXContent(XContentParser parser) throws IOException {
Measurement builder = new Measurement();
builder.parseXContent(parser);
return builder;
}

/**
* Add measurement number to the current number based on the aggregationType.
* If aggregateType is NONE, replace the number since we are not aggregating in this case.
Expand Down Expand Up @@ -150,13 +172,45 @@ public void setAggregationType(AggregationType aggregationType) {
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field("number", number);
builder.field("count", count);
builder.field("aggregationType", aggregationType.toString());
builder.field(NUMBER, number);
builder.field(COUNT, count);
builder.field(AGGREGATION_TYPE, aggregationType.toString());
builder.endObject();
return builder;
}

/**
* Parse a measurement from {@link XContentParser}
*
* @param parser {@link XContentParser}
* @throws IOException IOException
*/
private void parseXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new ParsingException(
parser.getTokenLocation(),
"Expected [" + XContentParser.Token.START_OBJECT + "] but found [" + token + "]",
parser.getTokenLocation()
);
} else {
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (NUMBER.equals(currentFieldName)) {
this.number = parser.numberValue();
} else if (COUNT.equals(currentFieldName)) {
this.count = parser.intValue();
} else if (AGGREGATION_TYPE.equals(currentFieldName)) {
this.aggregationType = AggregationType.valueOf(parser.text());
}
}
}
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
writeNumber(out, number);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,17 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;

Expand Down Expand Up @@ -141,18 +137,17 @@ public SearchQueryRecord(final long timestamp, Map<MetricType, Measurement> meas
}

/**
* Returns a SearchQueryRecord from a SearchHit
* Construct a SearchQueryRecord from {@link XContentParser}
*
* @param hit SearchHit to parse into SearchQueryRecord
* @param namedXContentRegistry NamedXContentRegistry for parsing purposes
* @return SearchQueryRecord
* @param parser {@link XContentParser}
* @return {@link SearchQueryRecord}
* @throws IOException IOException
*/
public static SearchQueryRecord getRecord(SearchHit hit, NamedXContentRegistry namedXContentRegistry) throws IOException {
public static SearchQueryRecord fromXContent(XContentParser parser) throws IOException {
long timestamp = 0L;
Map<MetricType, Measurement> measurements = new HashMap<>();
Map<Attribute, Object> attributes = new HashMap<>();
XContentParser parser = XContentType.JSON.xContent()
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString());

parser.nextToken();
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -167,7 +162,7 @@ public static SearchQueryRecord getRecord(SearchHit hit, NamedXContentRegistry n
case CPU:
case MEMORY:
MetricType metric = MetricType.fromString(fieldName);
measurements.put(metric, new Measurement(metric.parseValue(parser.longValue())));
measurements.put(metric, Measurement.fromXContent(parser));
break;
case SEARCH_TYPE:
attributes.put(Attribute.SEARCH_TYPE, parser.text());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,19 @@ public static SearchQueryRecord createFixedSearchQueryRecord() {
Map<MetricType, Measurement> measurements = Map.of(MetricType.LATENCY, new Measurement(1L));

Map<String, Long> phaseLatencyMap = new HashMap<>();
phaseLatencyMap.put("expand", 1L);
phaseLatencyMap.put("query", 10L);
phaseLatencyMap.put("fetch", 1L);
Map<Attribute, Object> attributes = new HashMap<>();
attributes.put(Attribute.SEARCH_TYPE, SearchType.QUERY_THEN_FETCH.toString().toLowerCase(Locale.ROOT));
attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap);
attributes.put(
Attribute.TASK_RESOURCE_USAGES,
List.of(
new TaskResourceInfo("action", 2L, 1L, "id", new TaskResourceUsage(1000L, 2000L)),
new TaskResourceInfo("action2", 3L, 1L, "id2", new TaskResourceUsage(2000L, 1000L))
)
);

return new SearchQueryRecord(timestamp, measurements, attributes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void testSerialize() throws Exception {

public void testToXContent() throws IOException {
char[] expectedXcontent =
"{\"top_queries\":[{\"timestamp\":1706574180000,\"node_id\":\"node_for_top_queries_test\",\"search_type\":\"query_then_fetch\",\"measurements\":{\"latency\":{\"number\":1,\"count\":1,\"aggregationType\":\"NONE\"}}}]}"
"{\"top_queries\":[{\"timestamp\":1706574180000,\"node_id\":\"node_for_top_queries_test\",\"phase_latency_map\":{\"expand\":1,\"query\":10,\"fetch\":1},\"task_resource_usages\":[{\"action\":\"action\",\"taskId\":2,\"parentTaskId\":1,\"nodeId\":\"id\",\"taskResourceUsage\":{\"cpu_time_in_nanos\":1000,\"memory_in_bytes\":2000}},{\"action\":\"action2\",\"taskId\":3,\"parentTaskId\":1,\"nodeId\":\"id2\",\"taskResourceUsage\":{\"cpu_time_in_nanos\":2000,\"memory_in_bytes\":1000}}],\"search_type\":\"query_then_fetch\",\"measurements\":{\"latency\":{\"number\":1,\"count\":1,\"aggregationType\":\"NONE\"}}}]}"
.toCharArray();
TopQueries topQueries = QueryInsightsTestUtils.createFixedTopQueries();
ClusterName clusterName = new ClusterName("test-cluster");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import java.util.List;
import java.util.Set;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.plugin.insights.QueryInsightsTestUtils;
import org.opensearch.test.OpenSearchTestCase;

Expand Down Expand Up @@ -54,6 +56,16 @@ public void testEqual() {
assertEquals(record1, record2);
}

public void testFromXContent() {
SearchQueryRecord record = QueryInsightsTestUtils.createFixedSearchQueryRecord();
try (XContentParser recordParser = createParser(JsonXContent.jsonXContent, record.toString())) {
SearchQueryRecord parsedRecord = SearchQueryRecord.fromXContent(recordParser);
QueryInsightsTestUtils.checkRecordsEquals(List.of(record), List.of(parsedRecord));
} catch (Exception e) {
fail("Test should not throw exceptions when parsing search query record");
}
}

/**
* Serialize and deserialize a SearchQueryRecord.
* @param record A SearchQueryRecord to serialize.
Expand Down
Loading