Skip to content

Commit

Permalink
Add support for aggregation profiler with concurrent aggregation (ope…
Browse files Browse the repository at this point in the history
…nsearch-project#8801)

Signed-off-by: Ticheng Lin <ticheng@amazon.com>
  • Loading branch information
ticheng-aws committed Jul 26, 2023
1 parent b9b5e5c commit 1fc0b4b
Show file tree
Hide file tree
Showing 14 changed files with 749 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Exclude 'benchmarks' from codecov report ([#8805](https://github.com/opensearch-project/OpenSearch/pull/8805))
- [Refactor] MediaTypeParser to MediaTypeParserRegistry ([#8636](https://github.com/opensearch-project/OpenSearch/pull/8636))
- Create separate SourceLookup instance per segment slice in SignificantTextAggregatorFactory ([#8807](https://github.com/opensearch-project/OpenSearch/pull/8807))
- Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801))

### Deprecated

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,10 @@ public DirectoryReader getDirectoryReader() {
return (DirectoryReader) reader;
}

public SearchContext getSearchContext() {
return searchContext;
}

private static class MutableQueryTimeout implements ExitableDirectoryReader.QueryCancellation {

private final Set<Runnable> runnables = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public abstract class AbstractProfileBreakdown<T extends Enum<T>> {
/**
* The accumulated timings for this query node
*/
private final Timer[] timings;
private final T[] timingTypes;
public final Timer[] timings;
public final T[] timingTypes;

/** Sole constructor. */
public AbstractProfileBreakdown(Class<T> clazz) {
Expand Down Expand Up @@ -80,11 +80,11 @@ public Map<String, Long> toBreakdownMap() {
/**
* Build a timing count breakdown for arbitrary instance
*/
protected final Map<String, Long> buildBreakdownMap(AbstractProfileBreakdown<T> breakdown) {
Map<String, Long> map = new HashMap<>(breakdown.timings.length * 2);
protected Map<String, Long> buildBreakdownMap(AbstractProfileBreakdown<T> breakdown) {
Map<String, Long> map = new HashMap<>(breakdown.timings.length * 3);
for (T timingType : breakdown.timingTypes) {
map.put(timingType.toString(), breakdown.timings[timingType.ordinal()].getApproximateTiming());
map.put(timingType.toString() + "_count", breakdown.timings[timingType.ordinal()].getCount());
map.put(timingType + "_count", breakdown.timings[timingType.ordinal()].getCount());
}
return Collections.unmodifiableMap(map);
}
Expand All @@ -103,4 +103,8 @@ public final long toNodeTime() {
}
return total;
}

public final long toNodeStartTime() {
return timings[timingTypes[0].ordinal()].getEarliestTimerStartTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.search.profile;

import org.opensearch.Version;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand All @@ -44,8 +45,10 @@

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

Expand All @@ -69,15 +72,25 @@ public final class ProfileResult implements Writeable, ToXContentObject {
static final ParseField BREAKDOWN = new ParseField("breakdown");
static final ParseField DEBUG = new ParseField("debug");
static final ParseField NODE_TIME = new ParseField("time");
static final ParseField MAX_SLICE_NODE_TIME = new ParseField("max_slice_time");
static final ParseField MIN_SLICE_NODE_TIME = new ParseField("min_slice_time");
static final ParseField AVG_SLICE_NODE_TIME = new ParseField("avg_slice_time");
static final ParseField NODE_TIME_RAW = new ParseField("time_in_nanos");
static final ParseField MAX_SLICE_NODE_TIME_RAW = new ParseField("max_slice_time_in_nanos");
static final ParseField MIN_SLICE_NODE_TIME_RAW = new ParseField("min_slice_time_in_nanos");
static final ParseField AVG_SLICE_NODE_TIME_RAW = new ParseField("avg_slice_time_in_nanos");
static final ParseField CHILDREN = new ParseField("children");

private final String type;
private final String description;
private final Map<String, Long> breakdown;
private final Map<String, Object> debug;
private final long nodeTime;
private final long maxSliceNodeTime;
private final long minSliceNodeTime;
private final long avgSliceNodeTime;
private final List<ProfileResult> children;
private final boolean concurrent;

public ProfileResult(
String type,
Expand All @@ -86,13 +99,32 @@ public ProfileResult(
Map<String, Object> debug,
long nodeTime,
List<ProfileResult> children
) {
this(type, description, breakdown, debug, nodeTime, children, false, -1, -1, -1);
}

public ProfileResult(
String type,
String description,
Map<String, Long> breakdown,
Map<String, Object> debug,
long nodeTime,
List<ProfileResult> children,
boolean concurrent,
long maxSliceNodeTime,
long minSliceNodeTime,
long avgSliceNodeTime
) {
this.type = type;
this.description = description;
this.breakdown = Objects.requireNonNull(breakdown, "required breakdown argument missing");
this.debug = debug == null ? Map.of() : debug;
this.children = children == null ? List.of() : children;
this.nodeTime = nodeTime;
this.concurrent = concurrent;
this.maxSliceNodeTime = maxSliceNodeTime;
this.minSliceNodeTime = minSliceNodeTime;
this.avgSliceNodeTime = avgSliceNodeTime;
}

/**
Expand All @@ -105,6 +137,23 @@ public ProfileResult(StreamInput in) throws IOException {
breakdown = in.readMap(StreamInput::readString, StreamInput::readLong);
debug = in.readMap(StreamInput::readString, StreamInput::readGenericValue);
children = in.readList(ProfileResult::new);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.concurrent = in.readBoolean();
if (concurrent) {
this.maxSliceNodeTime = in.readLong();
this.minSliceNodeTime = in.readLong();
this.avgSliceNodeTime = in.readLong();
} else {
this.maxSliceNodeTime = this.nodeTime;
this.minSliceNodeTime = this.nodeTime;
this.avgSliceNodeTime = this.nodeTime;
}
} else {
this.concurrent = false;
this.maxSliceNodeTime = this.nodeTime;
this.minSliceNodeTime = this.nodeTime;
this.avgSliceNodeTime = this.nodeTime;
}
}

@Override
Expand All @@ -115,6 +164,14 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(breakdown, StreamOutput::writeString, StreamOutput::writeLong);
out.writeMap(debug, StreamOutput::writeString, StreamOutput::writeGenericValue);
out.writeList(children);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(concurrent);
if (concurrent) {
out.writeLong(maxSliceNodeTime);
out.writeLong(minSliceNodeTime);
out.writeLong(avgSliceNodeTime);
}
}
}

/**
Expand Down Expand Up @@ -154,6 +211,22 @@ public long getTime() {
return nodeTime;
}

public long getMaxSliceTime() {
return maxSliceNodeTime;
}

public long getMinSliceTime() {
return minSliceNodeTime;
}

public long getAvgSliceTime() {
return avgSliceNodeTime;
}

public boolean isConcurrent() {
return concurrent;
}

/**
* Returns a list of all profiled children queries
*/
Expand All @@ -168,9 +241,22 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(DESCRIPTION.getPreferredName(), description);
if (builder.humanReadable()) {
builder.field(NODE_TIME.getPreferredName(), new TimeValue(getTime(), TimeUnit.NANOSECONDS).toString());
if (concurrent) {
builder.field(MAX_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMaxSliceTime(), TimeUnit.NANOSECONDS).toString());
builder.field(MIN_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMinSliceTime(), TimeUnit.NANOSECONDS).toString());
builder.field(AVG_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getAvgSliceTime(), TimeUnit.NANOSECONDS).toString());
}
}
builder.field(NODE_TIME_RAW.getPreferredName(), getTime());
builder.field(BREAKDOWN.getPreferredName(), breakdown);
Map<String, Long> modifiedBreakdown = new LinkedHashMap<>(breakdown);
if (concurrent) {
builder.field(MAX_SLICE_NODE_TIME_RAW.getPreferredName(), getMaxSliceTime());
builder.field(MIN_SLICE_NODE_TIME_RAW.getPreferredName(), getMinSliceTime());
builder.field(AVG_SLICE_NODE_TIME_RAW.getPreferredName(), getAvgSliceTime());
} else {
removeStartTimeFields(modifiedBreakdown);
}
builder.field(BREAKDOWN.getPreferredName(), modifiedBreakdown);
if (false == debug.isEmpty()) {
builder.field(DEBUG.getPreferredName(), debug);
}
Expand All @@ -186,6 +272,16 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder.endObject();
}

static void removeStartTimeFields(Map<String, Long> modifiedBreakdown) {
Iterator<Map.Entry<String, Long>> iterator = modifiedBreakdown.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Long> entry = iterator.next();
if (entry.getKey().endsWith("_startTime")) {
iterator.remove();
}
}
}

private static final InstantiatingObjectParser<ProfileResult, Void> PARSER;
static {
InstantiatingObjectParser.Builder<ProfileResult, Void> parser = InstantiatingObjectParser.builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.profile.aggregation.AggregationProfiler;
import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler;
import org.opensearch.search.profile.query.QueryProfiler;

import java.util.ArrayList;
Expand All @@ -55,13 +56,15 @@ public final class Profilers {
public Profilers(ContextIndexSearcher searcher) {
this.searcher = searcher;
this.queryProfilers = new ArrayList<>();
this.aggProfiler = new AggregationProfiler();
this.aggProfiler = searcher.getSearchContext().isConcurrentSegmentSearchEnabled()
? new ConcurrentAggregationProfiler()
: new AggregationProfiler();
addQueryProfiler();
}

/** Switch to a new profile. */
public QueryProfiler addQueryProfiler() {
QueryProfiler profiler = new QueryProfiler(searcher.getExecutor() != null);
QueryProfiler profiler = new QueryProfiler(searcher.getSearchContext().isConcurrentSegmentSearchEnabled());
searcher.setProfiler(profiler);
queryProfilers.add(profiler);
return profiler;
Expand Down
13 changes: 12 additions & 1 deletion server/src/main/java/org/opensearch/search/profile/Timer.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
public class Timer {

private boolean doTiming;
private long timing, count, lastCount, start;
private long timing, count, lastCount, start, earliestTimerStartTime;

/** pkg-private for testing */
long nanoTime() {
Expand All @@ -71,6 +71,9 @@ public final void start() {
doTiming = (count - lastCount) >= Math.min(lastCount >>> 8, 1024);
if (doTiming) {
start = nanoTime();
if (count == 0) {
earliestTimerStartTime = start;
}
}
count++;
}
Expand All @@ -92,6 +95,14 @@ public final long getCount() {
return count;
}

/** Return the timer start time in nanoseconds.*/
public final long getEarliestTimerStartTime() {
if (start != 0) {
throw new IllegalStateException("#start call misses a matching #stop call");
}
return earliestTimerStartTime;
}

/** Return an approximation of the total time spent between consecutive calls of #start and #stop. */
public final long getApproximateTiming() {
if (start != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.search.profile.AbstractProfileBreakdown;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -62,4 +63,18 @@ public void addDebugInfo(String key, Object value) {
protected Map<String, Object> toDebugMap() {
return unmodifiableMap(extra);
}

/**
* Build a timing count startTime breakdown for aggregation timing types
*/
@Override
protected final Map<String, Long> buildBreakdownMap(AbstractProfileBreakdown<AggregationTimingType> breakdown) {
Map<String, Long> map = new HashMap<>(breakdown.timings.length * 3);
for (AggregationTimingType timingType : breakdown.timingTypes) {
map.put(timingType.toString(), breakdown.timings[timingType.ordinal()].getApproximateTiming());
map.put(timingType + "_count", breakdown.timings[timingType.ordinal()].getCount());
map.put(timingType + "_startTime", breakdown.timings[timingType.ordinal()].getEarliestTimerStartTime());
}
return Collections.unmodifiableMap(map);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.opensearch.search.profile.AbstractProfiler;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -47,29 +45,24 @@
*/
public class AggregationProfiler extends AbstractProfiler<AggregationProfileBreakdown, Aggregator> {

private final Map<List<String>, AggregationProfileBreakdown> profileBreakdownLookup = new HashMap<>();
private final Map<Aggregator, AggregationProfileBreakdown> profileBreakdownLookup = new HashMap<>();

public AggregationProfiler() {
super(new InternalAggregationProfileTree());
}

/**
* This method does not need to be thread safe for concurrent search use case as well.
* The `AggregationProfileBreakdown` for each Aggregation operator is created in sync path when `preCollection` is
* called on the Aggregation collector instances during construction.
*/
@Override
public AggregationProfileBreakdown getQueryBreakdown(Aggregator agg) {
List<String> path = getAggregatorPath(agg);
AggregationProfileBreakdown aggregationProfileBreakdown = profileBreakdownLookup.get(path);
AggregationProfileBreakdown aggregationProfileBreakdown = profileBreakdownLookup.get(agg);
if (aggregationProfileBreakdown == null) {
aggregationProfileBreakdown = super.getQueryBreakdown(agg);
profileBreakdownLookup.put(path, aggregationProfileBreakdown);
profileBreakdownLookup.put(agg, aggregationProfileBreakdown);
}
return aggregationProfileBreakdown;
}

public static List<String> getAggregatorPath(Aggregator agg) {
LinkedList<String> path = new LinkedList<>();
while (agg != null) {
path.addFirst(agg.name());
agg = agg.parent();
}
return path;
}
}
Loading

0 comments on commit 1fc0b4b

Please sign in to comment.