Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] add new bucket_correlation aggregation with initial count_correlation function #72133

Merged
merged 9 commits into from
May 10, 2021

Conversation

benwtrent
Copy link
Member

@benwtrent benwtrent commented Apr 22, 2021

This commit adds a new pipeline aggregation that allows correlation within the aggregation frame work in bucketed values.

The initial function is a count_correlation function. The purpose of which is to correlate the count in a consistent number of buckets with a pre calculated indicator. The indicator and the aggregated buckets should related to the same metrics with in documents.

Example for correlating terms within a service.version.keyword with latency percentiles. The percentiles and provided correlation indicator both refer to the same source data where the indicator was previously calculated.:

GET apm-7.12.0-transaction-generated/_search
{
  "size": 0,
  "aggs": {
    "field_terms": {
      "terms": {
        "field": "service.version.keyword",
        "size": 20
      },
      "aggs": {
        "latency_range": {
          "range": {
            "field": "transaction.duration.us",
            "ranges": [<snip>],
            "keyed": true
          }
        },
        "correlation": {
          "bucket_correlation": {
            "buckets_path": "latency_range>_count",
            "count_correlation": {
              "indicator": {
                 "expectations": [<snip>],
                 "doc_count": 20000
               }
            }
          }
        }
      }
    }
  }
}

@benwtrent benwtrent force-pushed the feature/ml-bucket-correlation-agg branch 2 times, most recently from 4cadeee to 0f5bf18 Compare April 23, 2021 18:19
@benwtrent benwtrent marked this pull request as ready for review April 23, 2021 18:19
@elasticmachine elasticmachine added the Team:ML Meta label for the ML team label Apr 23, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/ml-core (Team:ML)

@benwtrent benwtrent force-pushed the feature/ml-bucket-correlation-agg branch from 0f5bf18 to 0c1970a Compare April 23, 2021 18:25
@benwtrent benwtrent force-pushed the feature/ml-bucket-correlation-agg branch from 0c1970a to 5216280 Compare April 23, 2021 20:10
Copy link
Contributor

@szabosteve szabosteve left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a couple of minor suggestions.

[[search-aggregations-bucket-correlation-aggregation]]
=== Bucket Correlation Aggregation
++++
<titleabbrev>Bucket Correlation Aggregation</titleabbrev>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

Suggested change
<titleabbrev>Bucket Correlation Aggregation</titleabbrev>
<titleabbrev>Bucket correlation aggregation</titleabbrev>

@benwtrent
Copy link
Member Author

@elasticmachine update branch

Copy link
Contributor

@szabosteve szabosteve left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs are LGTM! Thanks for writing them!

@nik9000 nik9000 self-requested a review May 5, 2021 13:41
Copy link
Contributor

@dimitris-athanasiou dimitris-athanasiou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good stuff! There is a few things to work through after my first pass.

the correlation of the term values with the latency.
<2> The range aggregation on the latency field. The ranges were created referencing the percentiles of the latency field.
<3> The bucket correlation aggregation that calculates the correlation of the number of term values within each range
and the previously calculated indicator values.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also have an example response. It seems to be present for most other aggs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See below :)

for (BulkItemResponse itemResponse : bulkResponse) {
if (itemResponse.isFailed()) {
failures++;
logger.error("Item response failure [{}]", itemResponse.getFailureMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we should log for each failure here. I think you could just log bulkResponse.buildFailureMessage() to have a single log entry.

@@ -1089,7 +1091,14 @@ protected Clock getClock() {
(parser, name) -> InferencePipelineAggregationBuilder.parse(modelLoadingService, getLicenseState(), name, parser));
spec.addResultReader(InternalInferenceAggregation::new);

return Collections.singletonList(spec);
return Arrays.asList(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is the first time we're adding a second agg, I think it'd be nice to make this list here read nice and simple. How about we create private methods that create the spec for each one so the list in this method reads nice?


void validate(PipelineAggregationBuilder.ValidationContext context, String bucketPath);

static double sum(double[] xs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to use MovingFunctions.sum instead. It also deals with NaN. It's worth checking it out.


@Override
public int hashCode() {
int result = Objects.hash(docCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Objects.hash(docCount, Arrays.hashCode(expectations), Arrays.hashCode(fractions)) seems to be a good way to do this without the 31s.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is simply the intellij autogenerated one. I can change it :)

@Override
protected void validate(ValidationContext context) {

final String firstAgg = bucketsPaths[0].split("[>\\.]")[0];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that the validation here is the same as in BucketMetricsPipelineAggregationBuilder made me wonder whether the bucket correlation agg should extend that class. I didn't look further if that's possible but I think it's worth checking it out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BucketMetricsPipelineAggregationBuilder

This class supports things we don't want to support like user provided formats + gap policies. I could overwrite those methods to make sure prevent them from being set internally (it can be prevented in the parser).

I will see if extending that class will work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, I did not inherit from that class because of an initial design choice. But that has since changed. So, hopefully, it works nicely

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimitris-athanasiou I think to do this, I would need to do a refactoring of BucketMetricsPipelineAggregationBuilder to either allow format and gapPolicy to be accessed directly in the sub-class, or that all accesses go through their respective getters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we not pass in the zero gap policy?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd feel more comfortable merging with the similar code and performing a more mechanical refactoring in a follow up then pushing it into this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we not pass in the zero gap policy?

I am not sure where we would do that. Also, when serializing in XContent the gap_policy is written. I suppose we call gapPolicy(GapPolicy) in the subclass ctor, but that felt...messy (accessing a method in a ctor).

}

private final CorrelationFunction correlationFunction;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see anything about gap policy here. Should we not support various gap policies? Or is it not applicable for correlation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we shouldn't. There are two gap policys: Skip and Zero. Skip breaks things as we need the two arrays to be the same length.

import java.util.List;
import java.util.Objects;

public class CorrelativeValue implements Writeable, ToXContentObject {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment explaining what this class encapsulates would benefit readability I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if this should be renamed to CorrelationIndicator.

if (indicator.getFractions() == null) {
double sum = CorrelationFunction.sum(indicator.getExpectations());
xMean = sum / indicator.getExpectations().length;
double var = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is MovingFunctions.stdDev which calculates this as part of it. I wonder if it makes sense to add a MovingFunctions.variance method and reuse it from MovingFunctions.stdDev. If that feels a bit risky, I still think we might want to consider NaN handling here.

final double yMean = weight;
final double yVar = (1 - weight) * yMean * yMean + weight * (1 - yMean) * (1 - yMean);
double xyCov = 0;
if (indicator.getFractions() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit comment but I think it'd make it easier to read this if you invert the if logic here (or the one above) so that in both cases we're dealing first with the same case (whether it is there are fractions or not).

+ "]. Unable to calculate correlation"
);
}
final double xMean;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correlation calculation here is based on something known (e.g. Pearson)? It yes it would be good to add a comment explaining this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment on method

}
}
-------------------------------------------------
// NOTCONSOLE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this a fully real example, i think. It'd be a pain to make the setup for it, but without that we can't be sure it works.

Copy link
Member Author

@benwtrent benwtrent May 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nik9000 there is a non-doc integration test that covers this.

I can attempt to do a set up, but its gonna take a bit to generate data and write out the 50 ranges.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its important that we test these so they don't break eventually. I can't tell you the number of times I've broken stuff in the docs without noticing it. I mean, since we made the tests I can tell you its much rarer, but I still can't tell you.

You can totally use the setup stuff in docs/build.gradle - over there you can write for loops and stuff to emit values.

If we have a response on the page we should assert that it came from a request on the page - but its totally ok to use stuff like ... and filter_path to shrink it. No one is going to read a huge response anyway.....


import static org.hamcrest.Matchers.closeTo;

public class BucketCorrelationAggregationIT extends MlSingleNodeTestCase {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I had to pick either yaml or single node test I'd pick yaml. It's less expressive which is a pain but we are in the process of using the yaml files to assert things about backwards compatibility that we just can't do with single node tests.

);
}

private double pearsonCorrelation(double[] xs, int[] ys) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do this sort of thing in the aggs tests as well from time to time. We often have a simple example we can assert produces the numbers we expect but we other times fire random numbers into the thing and make an idealized implementation and assert that they are the same. Its nice to make sure none of the distributed "stuff" got in the way. And there really isn't a substitute for randomized data to find weird edges.

return corXY/xs.length;
}

private static double sum(double[] xs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is DoubleStream.of(xs).sum() short enough that you wouldn't need to make a whole function for it? Its up to you.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am simply using MovingFunctions.sum now in a subsequent commit.

private static final ParseField FUNCTION = new ParseField("function");

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<BucketCorrelationAggregationBuilder, String> PARSER = new ConstructingObjectParser<>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If InstantiatingObjectParser works for you it's generally a little nicer. I'm not sure if it does, but it's worth looking at.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the context isn't passed in anywhere, so I wouldn't be able to use InstantiatingObjectParser.

@Override
protected void validate(ValidationContext context) {

final String firstAgg = bucketsPaths[0].split("[>\\.]")[0];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd feel more comfortable merging with the similar code and performing a more mechanical refactoring in a follow up then pushing it into this.

public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (super.equals(o) == false) return false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super.equals already does the reference and class comparison so you can probably skip that bit.

- do:
search:
index: store
body: >
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to just write these in yaml. If you like it better this way that's fine - it is easier to copy and paste it into curl or whatever.

Copy link
Contributor

@dimitris-athanasiou dimitris-athanasiou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM Just a javadoc comment rogue `

@benwtrent benwtrent merged commit 8069e9b into elastic:master May 10, 2021
@benwtrent benwtrent deleted the feature/ml-bucket-correlation-agg branch May 10, 2021 16:46
benwtrent added a commit to benwtrent/elasticsearch that referenced this pull request May 10, 2021
…ation function (elastic#72133)

This commit adds a new pipeline aggregation that allows correlation within the aggregation frame work in bucketed values.

The initial function is a `count_correlation` function. The purpose of which is to correlate the count in a consistent number of buckets with a pre calculated indicator. The indicator and the aggregated buckets should related to the same metrics with in documents.

Example for correlating terms within a `service.version.keyword` with latency percentiles. The percentiles and provided correlation indicator both refer to the same source data where the indicator was previously calculated.:
```
GET apm-7.12.0-transaction-generated/_search
{
  "size": 0,
  "aggs": {
    "field_terms": {
      "terms": {
        "field": "service.version.keyword",
        "size": 20
      },
      "aggs": {
        "latency_range": {
          "range": {
            "field": "transaction.duration.us",
            "ranges": [<snip>],
            "keyed": true
          }
        },
        "correlation": {
          "bucket_correlation": {
            "buckets_path": "latency_range>_count",
            "count_correlation": {
              "indicator": {
                 "expectations": [<snip>],
                 "doc_count": 20000
               }
            }
          }
        }
      }
    }
  }
}
```
benwtrent added a commit that referenced this pull request May 10, 2021
…correlation function (#72133) (#72896)

* [ML] add new bucket_correlation aggregation with initial count_correlation function (#72133)

This commit adds a new pipeline aggregation that allows correlation within the aggregation frame work in bucketed values.

The initial function is a `count_correlation` function. The purpose of which is to correlate the count in a consistent number of buckets with a pre calculated indicator. The indicator and the aggregated buckets should related to the same metrics with in documents.

Example for correlating terms within a `service.version.keyword` with latency percentiles. The percentiles and provided correlation indicator both refer to the same source data where the indicator was previously calculated.:
```
GET apm-7.12.0-transaction-generated/_search
{
  "size": 0,
  "aggs": {
    "field_terms": {
      "terms": {
        "field": "service.version.keyword",
        "size": 20
      },
      "aggs": {
        "latency_range": {
          "range": {
            "field": "transaction.duration.us",
            "ranges": [<snip>],
            "keyed": true
          }
        },
        "correlation": {
          "bucket_correlation": {
            "buckets_path": "latency_range>_count",
            "count_correlation": {
              "indicator": {
                 "expectations": [<snip>],
                 "doc_count": 20000
               }
            }
          }
        }
      }
    }
  }
}
```
benwtrent added a commit that referenced this pull request Jul 12, 2021
NamedWriteable Serialization was not declared in the original implementation for 7.x.

This commit fixes this. Relates to: #72133

closes: elastic/kibana#105047
benwtrent added a commit that referenced this pull request Jul 12, 2021
…75234)

NamedWriteable Serialization was not declared in the original implementation for 7.x.

This commit fixes this. Relates to: #72133

closes: elastic/kibana#105047
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>enhancement :ml Machine learning Team:ML Meta label for the ML team v7.14.0 v8.0.0-alpha1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants