Skip to content

Commit

Permalink
Add support for aggregation profiler with concurrent aggregation (ope…
Browse files Browse the repository at this point in the history
…nsearch-project#8801)

Signed-off-by: Ticheng Lin <ticheng@amazon.com>
  • Loading branch information
ticheng-aws committed Jul 25, 2023
1 parent b9b5e5c commit 4d7b996
Show file tree
Hide file tree
Showing 12 changed files with 670 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Exclude 'benchmarks' from codecov report ([#8805](https://github.com/opensearch-project/OpenSearch/pull/8805))
- [Refactor] MediaTypeParser to MediaTypeParserRegistry ([#8636](https://github.com/opensearch-project/OpenSearch/pull/8636))
- Create separate SourceLookup instance per segment slice in SignificantTextAggregatorFactory ([#8807](https://github.com/opensearch-project/OpenSearch/pull/8807))
- Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801))

### Deprecated

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ private ProfileResult doGetTree(int token) {
breakdown.toBreakdownMap(),
breakdown.toDebugMap(),
breakdown.toNodeTime(),
breakdown.toNodeStartTime(),
childrenProfileResults
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ protected final Map<String, Long> buildBreakdownMap(AbstractProfileBreakdown<T>
Map<String, Long> map = new HashMap<>(breakdown.timings.length * 2);
for (T timingType : breakdown.timingTypes) {
map.put(timingType.toString(), breakdown.timings[timingType.ordinal()].getApproximateTiming());
map.put(timingType.toString() + "_count", breakdown.timings[timingType.ordinal()].getCount());
map.put(timingType + "_count", breakdown.timings[timingType.ordinal()].getCount());
map.put(timingType + "_startTime", breakdown.timings[timingType.ordinal()].getEarliestTimerStartTime());
}
return Collections.unmodifiableMap(map);
}
Expand All @@ -103,4 +104,8 @@ public final long toNodeTime() {
}
return total;
}

public final long toNodeStartTime() {
return timings[timingTypes[0].ordinal()].getEarliestTimerStartTime();
}
}
132 changes: 131 additions & 1 deletion server/src/main/java/org/opensearch/search/profile/ProfileResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.search.profile;

import org.opensearch.Version;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand All @@ -44,8 +45,10 @@

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

Expand All @@ -69,15 +72,29 @@ public final class ProfileResult implements Writeable, ToXContentObject {
static final ParseField BREAKDOWN = new ParseField("breakdown");
static final ParseField DEBUG = new ParseField("debug");
static final ParseField NODE_TIME = new ParseField("time");
static final ParseField SLICE_START_TIME = new ParseField("slice_start_time");
static final ParseField MAX_SLICE_NODE_TIME = new ParseField("max_slice_time");
static final ParseField MIN_SLICE_NODE_TIME = new ParseField("min_slice_time");
static final ParseField AVG_SLICE_NODE_TIME = new ParseField("avg_slice_time");
static final ParseField NODE_TIME_RAW = new ParseField("time_in_nanos");
static final ParseField SLICE_START_TIME_RAW = new ParseField("slice_start_time_in_nanos");
static final ParseField MAX_SLICE_NODE_TIME_RAW = new ParseField("max_slice_time_in_nanos");
static final ParseField MIN_SLICE_NODE_TIME_RAW = new ParseField("min_slice_time_in_nanos");
static final ParseField AVG_SLICE_NODE_TIME_RAW = new ParseField("avg_slice_time_in_nanos");
static final ParseField CHILDREN = new ParseField("children");
static final ParseField CONCURRENT = new ParseField("concurrent");

private final String type;
private final String description;
private final Map<String, Long> breakdown;
private final Map<String, Object> debug;
private final long nodeTime;
private final long nodeStartTime;
private final long maxSliceNodeTime;
private final long minSliceNodeTime;
private final long avgSliceNodeTime;
private final List<ProfileResult> children;
private final boolean concurrent;

public ProfileResult(
String type,
Expand All @@ -86,13 +103,63 @@ public ProfileResult(
Map<String, Object> debug,
long nodeTime,
List<ProfileResult> children
) {
this(type, description, breakdown, debug, nodeTime, -1, -1, -1, -1, children, false);
}

public ProfileResult(
String type,
String description,
Map<String, Long> breakdown,
Map<String, Object> debug,
long nodeTime,
long nodeStartTime,
List<ProfileResult> children
) {
this(type, description, breakdown, debug, nodeTime, nodeStartTime, nodeTime, nodeTime, nodeTime, children, false);
}

/**
* Read from the concurrent segment search case.
*/
public ProfileResult(
String type,
String description,
Map<String, Long> breakdown,
Map<String, Object> debug,
long nodeTime,
long maxSliceNodeTime,
long minSliceNodeTime,
long avgSliceNodeTime,
List<ProfileResult> children
) {
this(type, description, breakdown, debug, nodeTime, -1, maxSliceNodeTime, minSliceNodeTime, avgSliceNodeTime, children, true);
}

public ProfileResult(
String type,
String description,
Map<String, Long> breakdown,
Map<String, Object> debug,
long nodeTime,
long nodeStartTime,
long maxSliceNodeTime,
long minSliceNodeTime,
long avgSliceNodeTime,
List<ProfileResult> children,
boolean concurrent
) {
this.type = type;
this.description = description;
this.breakdown = Objects.requireNonNull(breakdown, "required breakdown argument missing");
this.debug = debug == null ? Map.of() : debug;
this.children = children == null ? List.of() : children;
this.nodeTime = nodeTime;
this.nodeStartTime = nodeStartTime;
this.maxSliceNodeTime = maxSliceNodeTime;
this.minSliceNodeTime = minSliceNodeTime;
this.avgSliceNodeTime = avgSliceNodeTime;
this.concurrent = concurrent;
}

/**
Expand All @@ -105,6 +172,19 @@ public ProfileResult(StreamInput in) throws IOException {
breakdown = in.readMap(StreamInput::readString, StreamInput::readLong);
debug = in.readMap(StreamInput::readString, StreamInput::readGenericValue);
children = in.readList(ProfileResult::new);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.nodeStartTime = in.readLong();
this.maxSliceNodeTime = in.readLong();
this.minSliceNodeTime = in.readLong();
this.avgSliceNodeTime = in.readLong();
this.concurrent = in.readBoolean();
} else {
this.nodeStartTime = -1;
this.maxSliceNodeTime = this.nodeTime;
this.minSliceNodeTime = this.nodeTime;
this.avgSliceNodeTime = this.nodeTime;
this.concurrent = false;
}
}

@Override
Expand All @@ -115,6 +195,13 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(breakdown, StreamOutput::writeString, StreamOutput::writeLong);
out.writeMap(debug, StreamOutput::writeString, StreamOutput::writeGenericValue);
out.writeList(children);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeLong(nodeStartTime);
out.writeLong(maxSliceNodeTime);
out.writeLong(minSliceNodeTime);
out.writeLong(avgSliceNodeTime);
out.writeBoolean(concurrent);
}
}

/**
Expand Down Expand Up @@ -154,6 +241,26 @@ public long getTime() {
return nodeTime;
}

public long getSliceStartTime() {
return nodeStartTime;
}

public long getMaxSliceTime() {
return maxSliceNodeTime;
}

public long getMinSliceTime() {
return minSliceNodeTime;
}

public long getAvgSliceTime() {
return avgSliceNodeTime;
}

public boolean isConcurrent() {
return concurrent;
}

/**
* Returns a list of all profiled children queries
*/
Expand All @@ -168,9 +275,22 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(DESCRIPTION.getPreferredName(), description);
if (builder.humanReadable()) {
builder.field(NODE_TIME.getPreferredName(), new TimeValue(getTime(), TimeUnit.NANOSECONDS).toString());
if (concurrent) {
builder.field(MAX_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMaxSliceTime(), TimeUnit.NANOSECONDS).toString());
builder.field(MIN_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMinSliceTime(), TimeUnit.NANOSECONDS).toString());
builder.field(AVG_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getAvgSliceTime(), TimeUnit.NANOSECONDS).toString());
}
}
builder.field(NODE_TIME_RAW.getPreferredName(), getTime());
builder.field(BREAKDOWN.getPreferredName(), breakdown);
Map<String, Long> modifiedBreakdown = new LinkedHashMap<>(breakdown);
if (concurrent) {
builder.field(MAX_SLICE_NODE_TIME_RAW.getPreferredName(), getMaxSliceTime());
builder.field(MIN_SLICE_NODE_TIME_RAW.getPreferredName(), getMinSliceTime());
builder.field(AVG_SLICE_NODE_TIME_RAW.getPreferredName(), getAvgSliceTime());
} else {
removeStartTimeFields(modifiedBreakdown);
}
builder.field(BREAKDOWN.getPreferredName(), modifiedBreakdown);
if (false == debug.isEmpty()) {
builder.field(DEBUG.getPreferredName(), debug);
}
Expand All @@ -186,6 +306,16 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder.endObject();
}

static void removeStartTimeFields(Map<String, Long> modifiedBreakdown) {
Iterator<Map.Entry<String, Long>> iterator = modifiedBreakdown.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Long> entry = iterator.next();
if (entry.getKey().endsWith("_startTime")) {
iterator.remove();
}
}
}

private static final InstantiatingObjectParser<ProfileResult, Void> PARSER;
static {
InstantiatingObjectParser.Builder<ProfileResult, Void> parser = InstantiatingObjectParser.builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public final class Profilers {
public Profilers(ContextIndexSearcher searcher) {
this.searcher = searcher;
this.queryProfilers = new ArrayList<>();
this.aggProfiler = new AggregationProfiler();
this.aggProfiler = new AggregationProfiler(searcher.getExecutor() != null);
addQueryProfiler();
}

Expand Down
16 changes: 14 additions & 2 deletions server/src/main/java/org/opensearch/search/profile/Timer.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
*/
public class Timer {

private boolean doTiming;
private long timing, count, lastCount, start;
private boolean doTiming, isStarted;
private long timing, count, lastCount, start, earliestTimerStartTime;

/** pkg-private for testing */
long nanoTime() {
Expand All @@ -71,6 +71,10 @@ public final void start() {
doTiming = (count - lastCount) >= Math.min(lastCount >>> 8, 1024);
if (doTiming) {
start = nanoTime();
if (isStarted == false) {
earliestTimerStartTime = start;
isStarted = true;
}
}
count++;
}
Expand All @@ -92,6 +96,14 @@ public final long getCount() {
return count;
}

/** Return the timer start time in nanoseconds.*/
public final long getEarliestTimerStartTime() {
if (start != 0) {
throw new IllegalStateException("#start call misses a matching #stop call");
}
return earliestTimerStartTime;
}

/** Return an approximation of the total time spent between consecutive calls of #start and #stop. */
public final long getApproximateTiming() {
if (start != 0) {
Expand Down
Loading

0 comments on commit 4d7b996

Please sign in to comment.