Skip to content

Commit

Permalink
Add support for aggregation profiler with concurrent aggregation
Browse files Browse the repository at this point in the history
Signed-off-by: Ticheng Lin <ticheng@amazon.com>
  • Loading branch information
ticheng-aws committed Jul 20, 2023
1 parent 27a14c7 commit a89699f
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ private ProfileResult doGetTree(int token) {
breakdown.toBreakdownMap(),
breakdown.toDebugMap(),
breakdown.toNodeTime(),
breakdown.toNodeStartTime(),
childrenProfileResults
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ protected final Map<String, Long> buildBreakdownMap(AbstractProfileBreakdown<T>
Map<String, Long> map = new HashMap<>(breakdown.timings.length * 2);
for (T timingType : breakdown.timingTypes) {
map.put(timingType.toString(), breakdown.timings[timingType.ordinal()].getApproximateTiming());
map.put(timingType.toString() + "_count", breakdown.timings[timingType.ordinal()].getCount());
map.put(timingType + "_count", breakdown.timings[timingType.ordinal()].getCount());
map.put(timingType + "_startTime", breakdown.timings[timingType.ordinal()].getTimerStartTime());
}
return Collections.unmodifiableMap(map);
}
Expand All @@ -103,4 +104,8 @@ public final long toNodeTime() {
}
return total;
}

public final long toNodeStartTime() {
return timings[timingTypes[0].ordinal()].getTimerStartTime();
}
}
115 changes: 115 additions & 0 deletions server/src/main/java/org/opensearch/search/profile/ProfileResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -69,15 +70,29 @@ public final class ProfileResult implements Writeable, ToXContentObject {
static final ParseField BREAKDOWN = new ParseField("breakdown");
static final ParseField DEBUG = new ParseField("debug");
static final ParseField NODE_TIME = new ParseField("time");
static final ParseField SLICE_START_TIME = new ParseField("slice_start_time");
static final ParseField MAX_SLICE_NODE_TIME = new ParseField("max_slice_time");
static final ParseField MIN_SLICE_NODE_TIME = new ParseField("min_slice_time");
static final ParseField AVG_SLICE_NODE_TIME = new ParseField("avg_slice_time");
static final ParseField NODE_TIME_RAW = new ParseField("time_in_nanos");
static final ParseField SLICE_START_TIME_RAW = new ParseField("slice_start_time_in_nanos");
static final ParseField MAX_SLICE_NODE_TIME_RAW = new ParseField("max_slice_time_in_nanos");
static final ParseField MIN_SLICE_NODE_TIME_RAW = new ParseField("min_slice_time_in_nanos");
static final ParseField AVG_SLICE_NODE_TIME_RAW = new ParseField("avg_slice_time_in_nanos");
static final ParseField CHILDREN = new ParseField("children");
static final ParseField CONCURRENT = new ParseField("concurrent");

private final String type;
private final String description;
private final Map<String, Long> breakdown;
private final Map<String, Object> debug;
private final long nodeTime;
private final long nodeStartTime;
private final long maxSliceNodeTime;
private final long minSliceNodeTime;
private final long avgSliceNodeTime;
private final List<ProfileResult> children;
private final boolean concurrent;

public ProfileResult(
String type,
Expand All @@ -86,13 +101,60 @@ public ProfileResult(
Map<String, Object> debug,
long nodeTime,
List<ProfileResult> children
) {
this(type, description, breakdown, debug, nodeTime, -1, nodeTime, nodeTime, nodeTime, children, false);
}

public ProfileResult(
String type,
String description,
Map<String, Long> breakdown,
Map<String, Object> debug,
long nodeTime,
long nodeStartTime,
List<ProfileResult> children
) {
this(type, description, breakdown, debug, nodeTime, nodeStartTime, nodeTime, nodeTime, nodeTime, children, false);
}

public ProfileResult(
String type,
String description,
Map<String, Long> breakdown,
Map<String, Object> debug,
long nodeTime,
long maxSliceNodeTime,
long minSliceNodeTime,
long avgSliceNodeTime,
List<ProfileResult> children
) {
this(type, description, breakdown, debug, nodeTime, -1, maxSliceNodeTime, minSliceNodeTime, avgSliceNodeTime, children, true);
}

public ProfileResult(
String type,
String description,
Map<String, Long> breakdown,
Map<String, Object> debug,
long nodeTime,
long nodeStartTime,
long maxSliceNodeTime,
long minSliceNodeTime,
long avgSliceNodeTime,
List<ProfileResult> children,
boolean concurrent
) {
this.type = type;
this.description = description;
this.breakdown = Objects.requireNonNull(breakdown, "required breakdown argument missing");
this.debug = debug == null ? Map.of() : debug;
this.children = children == null ? List.of() : children;
this.nodeTime = nodeTime;
this.nodeStartTime = nodeStartTime;
this.maxSliceNodeTime = maxSliceNodeTime;
this.minSliceNodeTime = minSliceNodeTime;
this.avgSliceNodeTime = avgSliceNodeTime;
this.concurrent = concurrent;
}

/**
Expand All @@ -102,19 +164,29 @@ public ProfileResult(StreamInput in) throws IOException {
this.type = in.readString();
this.description = in.readString();
this.nodeTime = in.readLong();
this.nodeStartTime = in.readLong();
this.maxSliceNodeTime = in.readLong();
this.minSliceNodeTime = in.readLong();
this.avgSliceNodeTime = in.readLong();
breakdown = in.readMap(StreamInput::readString, StreamInput::readLong);
debug = in.readMap(StreamInput::readString, StreamInput::readGenericValue);
children = in.readList(ProfileResult::new);
this.concurrent = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(type);
out.writeString(description);
out.writeLong(nodeTime); // not Vlong because can be negative
out.writeLong(nodeStartTime);
out.writeLong(maxSliceNodeTime);
out.writeLong(minSliceNodeTime);
out.writeLong(avgSliceNodeTime);
out.writeMap(breakdown, StreamOutput::writeString, StreamOutput::writeLong);
out.writeMap(debug, StreamOutput::writeString, StreamOutput::writeGenericValue);
out.writeList(children);
out.writeBoolean(concurrent);
}

/**
Expand Down Expand Up @@ -154,6 +226,26 @@ public long getTime() {
return nodeTime;
}

public long getSliceStartTime() {
return nodeStartTime;
}

public long getMaxSliceTime() {
return maxSliceNodeTime;
}

public long getMinSliceTime() {
return minSliceNodeTime;
}

public long getAvgSliceTime() {
return avgSliceNodeTime;
}

public boolean isConcurrent() {
return concurrent;
}

/**
* Returns a list of all profiled children queries
*/
Expand All @@ -168,8 +260,26 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(DESCRIPTION.getPreferredName(), description);
if (builder.humanReadable()) {
builder.field(NODE_TIME.getPreferredName(), new TimeValue(getTime(), TimeUnit.NANOSECONDS).toString());
if (concurrent) {
builder.field(MAX_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMaxSliceTime(), TimeUnit.NANOSECONDS).toString());
builder.field(MIN_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMinSliceTime(), TimeUnit.NANOSECONDS).toString());
builder.field(AVG_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getAvgSliceTime(), TimeUnit.NANOSECONDS).toString());
}
}
builder.field(NODE_TIME_RAW.getPreferredName(), getTime());
if (concurrent) {
builder.field(MAX_SLICE_NODE_TIME_RAW.getPreferredName(), getMaxSliceTime());
builder.field(MIN_SLICE_NODE_TIME_RAW.getPreferredName(), getMinSliceTime());
builder.field(AVG_SLICE_NODE_TIME_RAW.getPreferredName(), getAvgSliceTime());
} else {
Iterator<Map.Entry<String, Long>> iterator = breakdown.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Long> entry = iterator.next();
if (entry.getKey().endsWith("_startTime")) {
iterator.remove();
}
}
}
builder.field(BREAKDOWN.getPreferredName(), breakdown);
if (false == debug.isEmpty()) {
builder.field(DEBUG.getPreferredName(), debug);
Expand Down Expand Up @@ -198,7 +308,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
parser.declareObject(constructorArg(), (p, c) -> p.map(), BREAKDOWN);
parser.declareObject(optionalConstructorArg(), (p, c) -> p.map(), DEBUG);
parser.declareLong(constructorArg(), NODE_TIME_RAW);
parser.declareLong(constructorArg(), SLICE_START_TIME_RAW);
parser.declareLong(constructorArg(), MAX_SLICE_NODE_TIME_RAW);
parser.declareLong(constructorArg(), MIN_SLICE_NODE_TIME_RAW);
parser.declareLong(constructorArg(), AVG_SLICE_NODE_TIME_RAW);
parser.declareObjectArray(optionalConstructorArg(), (p, c) -> fromXContent(p), CHILDREN);
parser.declareBoolean(constructorArg(), CONCURRENT);
PARSER = parser.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public final class Profilers {
public Profilers(ContextIndexSearcher searcher) {
this.searcher = searcher;
this.queryProfilers = new ArrayList<>();
this.aggProfiler = new AggregationProfiler();
this.aggProfiler = new AggregationProfiler(searcher.getExecutor() != null);
addQueryProfiler();
}

Expand Down
16 changes: 14 additions & 2 deletions server/src/main/java/org/opensearch/search/profile/Timer.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
*/
public class Timer {

private boolean doTiming;
private long timing, count, lastCount, start;
private boolean doTiming, isStarted;
private long timing, count, lastCount, start, timerStartTime;

/** pkg-private for testing */
long nanoTime() {
Expand All @@ -71,6 +71,10 @@ public final void start() {
doTiming = (count - lastCount) >= Math.min(lastCount >>> 8, 1024);
if (doTiming) {
start = nanoTime();
if (isStarted == false) {
timerStartTime = start;
isStarted = true;
}
}
count++;
}
Expand All @@ -92,6 +96,14 @@ public final long getCount() {
return count;
}

/** Return the timer start time in nanoseconds.*/
public final long getTimerStartTime() {
if (start != 0) {
throw new IllegalStateException("#start call misses a matching #stop call");
}
return timerStartTime;
}

/** Return an approximation of the total time spent between consecutive calls of #start and #stop. */
public final long getApproximateTiming() {
if (start != 0) {
Expand Down
Loading

0 comments on commit a89699f

Please sign in to comment.