Skip to content

Commit

Permalink
Allow to sort keys
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmitterdorfer committed Jan 24, 2024
1 parent 6df3d21 commit 24b8ce5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ public List<Setting<?>> getSettings() {
PROFILING_CHECK_OUTDATED_INDICES,
TransportGetStackTracesAction.PROFILING_MAX_STACKTRACE_QUERY_SLICES,
TransportGetStackTracesAction.PROFILING_MAX_DETAIL_QUERY_SLICES,
TransportGetStackTracesAction.PROFILING_QUERY_REALTIME
TransportGetStackTracesAction.PROFILING_QUERY_REALTIME,
TransportGetStackTracesAction.PROFILING_QUERY_SORT_KEYS
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
Expand All @@ -75,6 +76,12 @@ public class TransportGetStackTracesAction extends TransportAction<GetStackTrace
Setting.Property.NodeScope
);

public static final Setting<Boolean> PROFILING_QUERY_SORT_KEYS = Setting.boolSetting(
"xpack.profiling.stacktrace.sort_keys",
false,
Setting.Property.NodeScope
);

public static final Setting<Integer> PROFILING_MAX_DETAIL_QUERY_SLICES = Setting.intSetting(
"xpack.profiling.query.details.max_slices",
16,
Expand Down Expand Up @@ -119,6 +126,7 @@ public class TransportGetStackTracesAction extends TransportAction<GetStackTrace
private final int desiredSlices;
private final int desiredDetailSlices;
private final boolean realtime;
private final boolean sortKeys;

@Inject
public TransportGetStackTracesAction(
Expand All @@ -141,6 +149,7 @@ public TransportGetStackTracesAction(
this.desiredSlices = PROFILING_MAX_STACKTRACE_QUERY_SLICES.get(settings);
this.desiredDetailSlices = PROFILING_MAX_DETAIL_QUERY_SLICES.get(settings);
this.realtime = PROFILING_QUERY_REALTIME.get(settings);
this.sortKeys = PROFILING_QUERY_SORT_KEYS.get(settings);
}

@Override
Expand Down Expand Up @@ -429,6 +438,7 @@ private void retrieveStackTraces(
// We need to expect a set of slices for each resolved index, plus one for the host metadata.
slicedEventIds.size() * indices.size() + (uniqueHostIDs.isEmpty() ? 0 : 1),
uniqueHostIDs.size(),
sortKeys,
(s, e) -> retrieveStackTraceDetails(submitTask, clusterState, client, responseBuilder, s, e, submitListener)
);
for (List<String> slice : slicedEventIds) {
Expand Down Expand Up @@ -485,12 +495,8 @@ private static final class StackTraceHandler {
private final GetStackTracesResponseBuilder responseBuilder;
private final ActionListener<GetStackTracesResponse> submitListener;
private final Map<String, StackTrace> stackTracePerId;
// sort items lexicographically to access Lucene's term dictionary more efficiently when issuing an mget request.
// The term dictionary is lexicographically sorted and using the same order reduces the number of page faults
// needed to load it.
// TODO: Use a concurrent version for production code - for the prototype we don't need that as it is only one thread
private final Set<String> stackFrameIds = new HashSet<>();
private final Set<String> executableIds = new HashSet<>();
private final Set<String> stackFrameIds;
private final Set<String> executableIds;
private final AtomicInteger totalFrames = new AtomicInteger();
private final StopWatch watch = new StopWatch("retrieveStackTraces");
private final StopWatch hostsWatch = new StopWatch("retrieveHostMetadata");
Expand All @@ -504,9 +510,13 @@ private StackTraceHandler(
int stackTraceCount,
int expectedResponses,
int expectedHosts,
boolean sortKeys,
BiConsumer<List<String>, List<String>> detailsHandler
) {
this.stackTracePerId = new ConcurrentHashMap<>(stackTraceCount);
// pre-size with a bit of headroom so we don't resize too often
this.stackFrameIds = set(sortKeys, stackTraceCount * 5);
this.executableIds = set(sortKeys, stackTraceCount);
this.initialExpectedResponses = expectedResponses;
this.expectedResponses = new AtomicInteger(expectedResponses);
this.responseBuilder = responseBuilder;
Expand All @@ -515,6 +525,10 @@ private StackTraceHandler(
this.detailsHandler = detailsHandler;
}

private static Set<String> set(boolean sortKeys, int expectedSize) {
return sortKeys ? new ConcurrentSkipListSet<>() : Collections.newSetFromMap(new ConcurrentHashMap<>(expectedSize));
}

public void onStackTraceResponse(MultiGetResponse multiGetItemResponses) {
if (expectedResponses.get() == initialExpectedResponses) {
TraceLogger.start(null, "process stacktrace response");
Expand All @@ -530,7 +544,7 @@ public void onStackTraceResponse(MultiGetResponse multiGetItemResponses) {
// Duplicates are expected as we query multiple indices - do a quick pre-check before we deserialize a response
if (stackTracePerId.containsKey(id) == false) {
StackTrace stacktrace = StackTrace.fromSource(trace.getResponse().getSource());
//StackTrace stacktrace = StackTrace.fromBytes(trace.getResponse().getSourceAsBytesRef());
// StackTrace stacktrace = StackTrace.fromBytes(trace.getResponse().getSourceAsBytesRef());
// Guard against concurrent access and ensure we only handle each item once
if (stackTracePerId.putIfAbsent(id, stacktrace) == null) {
totalFrames.addAndGet(stacktrace.frameIds.size());
Expand Down

0 comments on commit 24b8ce5

Please sign in to comment.