-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML] add new bucket_correlation aggregation with initial count_correlation function #72133
[ML] add new bucket_correlation aggregation with initial count_correlation function #72133
Conversation
4cadeee
to
0f5bf18
Compare
Pinging @elastic/ml-core (Team:ML) |
0f5bf18
to
0c1970a
Compare
0c1970a
to
5216280
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a couple of minor suggestions.
docs/reference/aggregations/pipeline/bucket-correlation-aggregation.asciidoc
Outdated
Show resolved
Hide resolved
[[search-aggregations-bucket-correlation-aggregation]] | ||
=== Bucket Correlation Aggregation | ||
++++ | ||
<titleabbrev>Bucket Correlation Aggregation</titleabbrev> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above.
<titleabbrev>Bucket Correlation Aggregation</titleabbrev> | |
<titleabbrev>Bucket correlation aggregation</titleabbrev> |
docs/reference/aggregations/pipeline/bucket-correlation-aggregation.asciidoc
Outdated
Show resolved
Hide resolved
docs/reference/aggregations/pipeline/bucket-correlation-aggregation.asciidoc
Outdated
Show resolved
Hide resolved
docs/reference/aggregations/pipeline/bucket-correlation-aggregation.asciidoc
Outdated
Show resolved
Hide resolved
docs/reference/aggregations/pipeline/bucket-correlation-aggregation.asciidoc
Outdated
Show resolved
Hide resolved
docs/reference/aggregations/pipeline/bucket-correlation-aggregation.asciidoc
Outdated
Show resolved
Hide resolved
docs/reference/aggregations/pipeline/bucket-correlation-aggregation.asciidoc
Outdated
Show resolved
Hide resolved
docs/reference/aggregations/pipeline/bucket-correlation-aggregation.asciidoc
Outdated
Show resolved
Hide resolved
@elasticmachine update branch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs are LGTM! Thanks for writing them!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good stuff! There is a few things to work through after my first pass.
the correlation of the term values with the latency. | ||
<2> The range aggregation on the latency field. The ranges were created referencing the percentiles of the latency field. | ||
<3> The bucket correlation aggregation that calculates the correlation of the number of term values within each range | ||
and the previously calculated indicator values. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also have an example response. It seems to be present for most other aggs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See below :)
for (BulkItemResponse itemResponse : bulkResponse) { | ||
if (itemResponse.isFailed()) { | ||
failures++; | ||
logger.error("Item response failure [{}]", itemResponse.getFailureMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure we should log for each failure here. I think you could just log bulkResponse.buildFailureMessage()
to have a single log entry.
@@ -1089,7 +1091,14 @@ protected Clock getClock() { | |||
(parser, name) -> InferencePipelineAggregationBuilder.parse(modelLoadingService, getLicenseState(), name, parser)); | |||
spec.addResultReader(InternalInferenceAggregation::new); | |||
|
|||
return Collections.singletonList(spec); | |||
return Arrays.asList( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this is the first time we're adding a second agg, I think it'd be nice to make this list here read nice and simple. How about we create private methods that create the spec for each one so the list in this method reads nice?
|
||
void validate(PipelineAggregationBuilder.ValidationContext context, String bucketPath); | ||
|
||
static double sum(double[] xs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to use MovingFunctions.sum
instead. It also deals with NaN
. It's worth checking it out.
|
||
@Override | ||
public int hashCode() { | ||
int result = Objects.hash(docCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Objects.hash(docCount, Arrays.hashCode(expectations), Arrays.hashCode(fractions))
seems to be a good way to do this without the 31
s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is simply the intellij autogenerated one. I can change it :)
@Override | ||
protected void validate(ValidationContext context) { | ||
|
||
final String firstAgg = bucketsPaths[0].split("[>\\.]")[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact that the validation here is the same as in BucketMetricsPipelineAggregationBuilder
made me wonder whether the bucket correlation agg should extend that class. I didn't look further if that's possible but I think it's worth checking it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BucketMetricsPipelineAggregationBuilder
This class supports things we don't want to support like user provided formats + gap policies. I could overwrite those methods to make sure prevent them from being set internally (it can be prevented in the parser).
I will see if extending that class will work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, I did not inherit from that class because of an initial design choice. But that has since changed. So, hopefully, it works nicely
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dimitris-athanasiou I think to do this, I would need to do a refactoring of BucketMetricsPipelineAggregationBuilder
to either allow format
and gapPolicy
to be accessed directly in the sub-class, or that all accesses go through their respective getters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we not pass in the zero gap policy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd feel more comfortable merging with the similar code and performing a more mechanical refactoring in a follow up then pushing it into this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we not pass in the zero gap policy?
I am not sure where we would do that. Also, when serializing in XContent the gap_policy is written. I suppose we call gapPolicy(GapPolicy)
in the subclass ctor, but that felt...messy (accessing a method in a ctor).
} | ||
|
||
private final CorrelationFunction correlationFunction; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see anything about gap policy here. Should we not support various gap policies? Or is it not applicable for correlation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we shouldn't. There are two gap policys: Skip and Zero. Skip breaks things as we need the two arrays to be the same length.
import java.util.List; | ||
import java.util.Objects; | ||
|
||
public class CorrelativeValue implements Writeable, ToXContentObject { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A comment explaining what this class encapsulates would benefit readability I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also wonder if this should be renamed to CorrelationIndicator
.
if (indicator.getFractions() == null) { | ||
double sum = CorrelationFunction.sum(indicator.getExpectations()); | ||
xMean = sum / indicator.getExpectations().length; | ||
double var = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is MovingFunctions.stdDev
which calculates this as part of it. I wonder if it makes sense to add a MovingFunctions.variance
method and reuse it from MovingFunctions.stdDev
. If that feels a bit risky, I still think we might want to consider NaN
handling here.
final double yMean = weight; | ||
final double yVar = (1 - weight) * yMean * yMean + weight * (1 - yMean) * (1 - yMean); | ||
double xyCov = 0; | ||
if (indicator.getFractions() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit comment but I think it'd make it easier to read this if you invert the if logic here (or the one above) so that in both cases we're dealing first with the same case (whether it is there are fractions or not).
+ "]. Unable to calculate correlation" | ||
); | ||
} | ||
final double xMean; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The correlation calculation here is based on something known (e.g. Pearson)? It yes it would be good to add a comment explaining this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment on method
} | ||
} | ||
------------------------------------------------- | ||
// NOTCONSOLE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make this a fully real example, i think. It'd be a pain to make the setup for it, but without that we can't be sure it works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nik9000 there is a non-doc integration test that covers this.
I can attempt to do a set up, but its gonna take a bit to generate data and write out the 50 ranges.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its important that we test these so they don't break eventually. I can't tell you the number of times I've broken stuff in the docs without noticing it. I mean, since we made the tests I can tell you its much rarer, but I still can't tell you.
You can totally use the setup
stuff in docs/build.gradle
- over there you can write for loops and stuff to emit values.
If we have a response on the page we should assert that it came from a request on the page - but its totally ok to use stuff like ...
and filter_path
to shrink it. No one is going to read a huge response anyway.....
|
||
import static org.hamcrest.Matchers.closeTo; | ||
|
||
public class BucketCorrelationAggregationIT extends MlSingleNodeTestCase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I had to pick either yaml or single node test I'd pick yaml. It's less expressive which is a pain but we are in the process of using the yaml files to assert things about backwards compatibility that we just can't do with single node tests.
); | ||
} | ||
|
||
private double pearsonCorrelation(double[] xs, int[] ys) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do this sort of thing in the aggs tests as well from time to time. We often have a simple example we can assert produces the numbers we expect but we other times fire random numbers into the thing and make an idealized implementation and assert that they are the same. Its nice to make sure none of the distributed "stuff" got in the way. And there really isn't a substitute for randomized data to find weird edges.
return corXY/xs.length; | ||
} | ||
|
||
private static double sum(double[] xs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is DoubleStream.of(xs).sum()
short enough that you wouldn't need to make a whole function for it? Its up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am simply using MovingFunctions.sum
now in a subsequent commit.
private static final ParseField FUNCTION = new ParseField("function"); | ||
|
||
@SuppressWarnings("unchecked") | ||
public static final ConstructingObjectParser<BucketCorrelationAggregationBuilder, String> PARSER = new ConstructingObjectParser<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If InstantiatingObjectParser
works for you it's generally a little nicer. I'm not sure if it does, but it's worth looking at.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the context isn't passed in anywhere, so I wouldn't be able to use InstantiatingObjectParser
.
@Override | ||
protected void validate(ValidationContext context) { | ||
|
||
final String firstAgg = bucketsPaths[0].split("[>\\.]")[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd feel more comfortable merging with the similar code and performing a more mechanical refactoring in a follow up then pushing it into this.
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
if (o == null || getClass() != o.getClass()) return false; | ||
if (super.equals(o) == false) return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super.equals
already does the reference and class comparison so you can probably skip that bit.
- do: | ||
search: | ||
index: store | ||
body: > |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to just write these in yaml. If you like it better this way that's fine - it is easier to copy and paste it into curl
or whatever.
…ent/elasticsearch into feature/ml-bucket-correlation-agg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM Just a javadoc comment rogue `
...n/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/correlation/CountCorrelationFunction.java
Outdated
Show resolved
Hide resolved
…/correlation/CountCorrelationFunction.java
…ation function (elastic#72133) This commit adds a new pipeline aggregation that allows correlation within the aggregation frame work in bucketed values. The initial function is a `count_correlation` function. The purpose of which is to correlate the count in a consistent number of buckets with a pre calculated indicator. The indicator and the aggregated buckets should related to the same metrics with in documents. Example for correlating terms within a `service.version.keyword` with latency percentiles. The percentiles and provided correlation indicator both refer to the same source data where the indicator was previously calculated.: ``` GET apm-7.12.0-transaction-generated/_search { "size": 0, "aggs": { "field_terms": { "terms": { "field": "service.version.keyword", "size": 20 }, "aggs": { "latency_range": { "range": { "field": "transaction.duration.us", "ranges": [<snip>], "keyed": true } }, "correlation": { "bucket_correlation": { "buckets_path": "latency_range>_count", "count_correlation": { "indicator": { "expectations": [<snip>], "doc_count": 20000 } } } } } } } } ```
…correlation function (#72133) (#72896) * [ML] add new bucket_correlation aggregation with initial count_correlation function (#72133) This commit adds a new pipeline aggregation that allows correlation within the aggregation frame work in bucketed values. The initial function is a `count_correlation` function. The purpose of which is to correlate the count in a consistent number of buckets with a pre calculated indicator. The indicator and the aggregated buckets should related to the same metrics with in documents. Example for correlating terms within a `service.version.keyword` with latency percentiles. The percentiles and provided correlation indicator both refer to the same source data where the indicator was previously calculated.: ``` GET apm-7.12.0-transaction-generated/_search { "size": 0, "aggs": { "field_terms": { "terms": { "field": "service.version.keyword", "size": 20 }, "aggs": { "latency_range": { "range": { "field": "transaction.duration.us", "ranges": [<snip>], "keyed": true } }, "correlation": { "bucket_correlation": { "buckets_path": "latency_range>_count", "count_correlation": { "indicator": { "expectations": [<snip>], "doc_count": 20000 } } } } } } } } ```
NamedWriteable Serialization was not declared in the original implementation for 7.x. This commit fixes this. Relates to: #72133 closes: elastic/kibana#105047
…75234) NamedWriteable Serialization was not declared in the original implementation for 7.x. This commit fixes this. Relates to: #72133 closes: elastic/kibana#105047
This commit adds a new pipeline aggregation that allows correlation within the aggregation frame work in bucketed values.
The initial function is a
count_correlation
function. The purpose of which is to correlate the count in a consistent number of buckets with a pre calculated indicator. The indicator and the aggregated buckets should related to the same metrics with in documents.Example for correlating terms within a
service.version.keyword
with latency percentiles. The percentiles and provided correlation indicator both refer to the same source data where the indicator was previously calculated.: