Skip to content

Commit

Permalink
Add sub groups recursively
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmitterdorfer committed May 8, 2024
1 parent 885015b commit 1c20caa
Show file tree
Hide file tree
Showing 5 changed files with 330 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,29 +103,6 @@ public void testGetStackTracesGroupedByServiceName() throws Exception {
assertEquals("vmlinux", response.getExecutables().get("lHp5_WAgpLy2alrUVab6HA"));
}

public void testGetStackTracesGroupedByInvalidField() {
GetStackTracesRequest request = new GetStackTracesRequest(
1000,
600.0d,
1.0d,
1.0d,
null,
null,
null,
// only service.name is supported (note the trailing "s")
"service.names",
null,
null,
null,
null,
null,
null
);
request.setAdjustSampleCount(true);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, client().execute(GetStackTracesAction.INSTANCE, request));
assertEquals("Requested custom event aggregation fields [service.names] but only [service.name] is supported.", e.getMessage());
}

public void testGetStackTracesFromAPMWithMatchNoDownsampling() throws Exception {
BoolQueryBuilder query = QueryBuilders.boolQuery();
query.must().add(QueryBuilders.termQuery("transaction.name", "encodeSha1"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,22 @@ public SubGroup addCount(String name, long count) {
return this;
}

public SubGroup getOrAddChild(String name) {
if (subgroups.containsKey(name) == false) {
this.subgroups.put(name, new SubGroup(name, null, renderLegacyXContent, new HashMap<>()));
}
return this.subgroups.get(name);
}

public Long getCount(String name) {
SubGroup subGroup = this.subgroups.get(name);
return subGroup != null ? subGroup.count : null;
}

public SubGroup getSubGroup(String name) {
return this.subgroups.get(name);
}

public SubGroup copy() {
Map<String, SubGroup> copy = new HashMap<>(subgroups.size());
for (Map.Entry<String, SubGroup> subGroup : subgroups.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.profiling.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;

import java.util.Iterator;

public final class SubGroupCollector {
/**
* Users may provide a custom field via the API that is used to sub-divide profiling events. This is useful in the context of TopN
* where we want to provide additional breakdown of where a certain function has been called (e.g. a certain service or transaction).
*/
static final String CUSTOM_EVENT_SUB_AGGREGATION_NAME = "custom_event_group_";

private static final Logger log = LogManager.getLogger(SubGroupCollector.class);

private final String[] aggregationFields;
private final boolean legacyAggregationField;
private final AbstractAggregationBuilder<?> parentAggregation;

public static SubGroupCollector attach(
AbstractAggregationBuilder<?> parentAggregation,
String[] aggregationFields,
boolean legacyAggregationField
) {
SubGroupCollector c = new SubGroupCollector(parentAggregation, aggregationFields, legacyAggregationField);
c.addAggregations();
return c;
}

private SubGroupCollector(AbstractAggregationBuilder<?> parentAggregation, String[] aggregationFields, boolean legacyAggregationField) {
this.parentAggregation = parentAggregation;
this.aggregationFields = aggregationFields;
this.legacyAggregationField = legacyAggregationField;
}

private boolean hasAggregationFields() {
return aggregationFields != null && aggregationFields.length > 0;
}

private void addAggregations() {
if (hasAggregationFields()) {
// cast to Object to disambiguate this from a varargs call
log.trace("Grouping stacktrace events by {}.", (Object) aggregationFields);
AbstractAggregationBuilder<?> parentAgg = this.parentAggregation;
for (String aggregationField : aggregationFields) {
String aggName = CUSTOM_EVENT_SUB_AGGREGATION_NAME + aggregationField;
TermsAggregationBuilder agg = new TermsAggregationBuilder(aggName).field(aggregationField);
parentAgg.subAggregation(agg);
parentAgg = agg;
}
}
}

void collectResults(MultiBucketsAggregation.Bucket bucket, TraceEvent event) {
collectResults(new BucketAdapter(bucket), event);
}

void collectResults(Bucket bucket, TraceEvent event) {
if (hasAggregationFields()) {
if (event.subGroups == null) {
event.subGroups = SubGroup.root(aggregationFields[0], legacyAggregationField);
}
collectInternal(bucket.getAggregations(), event.subGroups, 0);
}
}

private void collectInternal(Agg parentAgg, SubGroup parentGroup, int aggField) {
if (aggField == aggregationFields.length) {
return;
}
String aggName = CUSTOM_EVENT_SUB_AGGREGATION_NAME + aggregationFields[aggField];
for (Bucket b : parentAgg.getBuckets(aggName)) {
String subGroupName = b.getKey();
parentGroup.addCount(subGroupName, b.getCount());
SubGroup currentGroup = parentGroup.getSubGroup(subGroupName);
int nextAggField = aggField + 1;
if (nextAggField < aggregationFields.length) {
collectInternal(b.getAggregations(), currentGroup.getOrAddChild(aggregationFields[nextAggField]), nextAggField);
}
}
}

// The sole purpose of the code below is to abstract our code from the aggs framework to make it unit-testable
interface Agg {
Iterable<Bucket> getBuckets(String aggName);

}

interface Bucket {
String getKey();

long getCount();

Agg getAggregations();
}

static class InternalAggregationAdapter implements Agg {
private final InternalAggregations agg;

InternalAggregationAdapter(InternalAggregations agg) {
this.agg = agg;
}

@Override
public Iterable<Bucket> getBuckets(String aggName) {
MultiBucketsAggregation multiBucketsAggregation = agg.get(aggName);
return () -> {
Iterator<? extends MultiBucketsAggregation.Bucket> it = multiBucketsAggregation.getBuckets().iterator();
return new Iterator<>() {
@Override
public boolean hasNext() {
return it.hasNext();
}

@Override
public Bucket next() {
return new BucketAdapter(it.next());
}
};
};
}
}

static class BucketAdapter implements Bucket {
private final MultiBucketsAggregation.Bucket bucket;

BucketAdapter(MultiBucketsAggregation.Bucket bucket) {
this.bucket = bucket;
}

@Override
public String getKey() {
return bucket.getKeyAsString();
}

@Override
public long getCount() {
return bucket.getDocCount();
}

@Override
public Agg getAggregations() {
return new InternalAggregationAdapter(bucket.getAggregations());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.countedterms.CountedTermsAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.sampler.random.RandomSamplerAggregationBuilder;
Expand All @@ -55,7 +54,6 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -256,18 +254,11 @@ private void searchGenericEventGroupedByStackTrace(
CountedTermsAggregationBuilder groupByStackTraceId = new CountedTermsAggregationBuilder("group_by").size(
MAX_TRACE_EVENTS_RESULT_SIZE
).field(request.getStackTraceIdsField());
if (request.hasAggregationFields()) {
String[] aggregationFields = request.getAggregationFields();
// cast to Object to disambiguate this from a varargs call
log.trace("Grouping stacktrace events by {}.", (Object) aggregationFields);
AbstractAggregationBuilder<?> parentAgg = groupByStackTraceId;
for (String aggregationField : aggregationFields) {
String aggName = CUSTOM_EVENT_SUB_AGGREGATION_NAME + aggregationField;
TermsAggregationBuilder agg = new TermsAggregationBuilder(aggName).field(aggregationField);
parentAgg.subAggregation(agg);
parentAgg = agg;
}
}
SubGroupCollector subGroups = SubGroupCollector.attach(
groupByStackTraceId,
request.getAggregationFields(),
request.isLegacyAggregationField()
);
RandomSamplerAggregationBuilder randomSampler = new RandomSamplerAggregationBuilder("sample").setSeed(request.hashCode())
.setProbability(responseBuilder.getSamplingRate())
.subAggregation(groupByStackTraceId);
Expand Down Expand Up @@ -308,19 +299,7 @@ private void searchGenericEventGroupedByStackTrace(
stackTraceEvents.put(stackTraceID, event);
}
event.count += count;
if (request.hasAggregationFields()) {
String[] aggregationFields = request.getAggregationFields();
if (event.subGroups == null) {
event.subGroups = SubGroup.root(aggregationFields[0], request.isLegacyAggregationField());
}
// TODO: Recursively add sub groups!
Terms eventSubGroup = stacktraceBucket.getAggregations()
.get(CUSTOM_EVENT_SUB_AGGREGATION_NAME + aggregationFields[0]);
for (Terms.Bucket b : eventSubGroup.getBuckets()) {
String subGroupName = b.getKeyAsString();
event.subGroups.addCount(subGroupName, b.getDocCount());
}
}
subGroups.collectResults(stacktraceBucket, event);
}
responseBuilder.setTotalSamples(totalSamples);
responseBuilder.setHostEventCounts(hostEventCounts);
Expand All @@ -346,19 +325,11 @@ private void searchEventGroupedByStackTrace(
// Especially with high cardinality fields, this makes aggregations really slow.
.executionHint("map")
.subAggregation(new SumAggregationBuilder("count").field("Stacktrace.count"));
if (request.hasAggregationFields()) {
String[] aggregationFields = request.getAggregationFields();
log.trace("Grouping stacktrace events by [{}].", (Object) aggregationFields);
// be strict about the accepted field names to avoid downstream errors or leaking unintended information
if (aggregationFields.length > 1 || aggregationFields[0].equals("service.name") == false) {
throw new IllegalArgumentException(
"Requested custom event aggregation fields "
+ Arrays.toString(aggregationFields)
+ " but only [service.name] is supported."
);
}
groupByStackTraceId.subAggregation(new TermsAggregationBuilder(CUSTOM_EVENT_SUB_AGGREGATION_NAME).field(aggregationFields[0]));
}
SubGroupCollector subGroups = SubGroupCollector.attach(
groupByStackTraceId,
request.getAggregationFields(),
request.isLegacyAggregationField()
);
client.prepareSearch(eventsIndex.getName())
.setTrackTotalHits(false)
.setSize(0)
Expand Down Expand Up @@ -420,18 +391,7 @@ The same stacktraces may come from different hosts (eventually from different da
stackTraceEvents.put(stackTraceID, event);
}
event.count += finalCount;
if (request.hasAggregationFields()) {
String[] aggregationFields = request.getAggregationFields();
if (event.subGroups == null) {
event.subGroups = SubGroup.root(aggregationFields[0], request.isLegacyAggregationField());
}
// recursion is not needed; we only support a single field
Terms eventSubGroup = stacktraceBucket.getAggregations().get(CUSTOM_EVENT_SUB_AGGREGATION_NAME);
for (Terms.Bucket b : eventSubGroup.getBuckets()) {
String subGroupName = b.getKeyAsString();
event.subGroups.addCount(subGroupName, b.getDocCount());
}
}
subGroups.collectResults(stacktraceBucket, event);
}
}
responseBuilder.setTotalSamples(totalFinalCount);
Expand Down
Loading

0 comments on commit 1c20caa

Please sign in to comment.