Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partitionable aggregations #21487

Closed
markharwood opened this issue Nov 11, 2016 · 16 comments
Closed

Partitionable aggregations #21487

markharwood opened this issue Nov 11, 2016 · 16 comments

Comments

@markharwood
Copy link
Contributor

Currently users frequently run into memory/circuit-breaker issues trying to perform analytics on high cardinality fields e.g. when finding IP addresses that have had more than 3 sessions.
The combination of expensive aggs such as cardinality under fields with many terms has an explosive effect. Entity centric indexing or collect_mode:breadth_first can help but aren't always a solution.
This proposal is that the terms agg should allow include clauses that help partition high-cardinality fields so that a client request can focus on just a subset of the overall data i.e.

"terms": {
  "field": "IP_ADDRESS",
  "include":{
	"partition":1,
	"of":20
  }

The client could then make repeated requests for partition 1, then 2 etc. Internally the terms agg would filter where the hash-modulo-N of a term did not match the chosen partition number.

The fuller example of "ip addresses with many sessions" example would then look like this (using pipeline aggs to remove uninteresting results)

{
	"aggs": {
		"anomalousIPs": {
			"terms": {
				"field": "IP_ADDRESS",
				"size": 10000,
				"order": {
					"numSessions": "desc"
				},
				"include": {
					"partition": 1,
					"of": 20
				}
			},
			"aggs": {
				"numSessions": {
					"cardinality": {
						"field": "session_id",
						"precision_threshold": 100
					}
				},
				"tooManySessions": {
					"bucket_selector": {
						"buckets_path": {
							"numSessions": "numSessions"
						},
						"script": "params.numSessions>3"
					}
				}

			}
		}
	}
}

Users today could of course create hashed forms of indexed values in the index and query ranges of those values to achieve the same effect (perhaps more efficiently) but this new syntax is arguably less work for the client and works with existing indices. Thoughts?

@markharwood
Copy link
Contributor Author

@jpountz we discussed this on FixItFriday but didn't reach a conclusion - said we'd hold off for your input

@jpountz
Copy link
Contributor

jpountz commented Nov 14, 2016

This kind of feature would help compute exhaustive results, which can't be done with the current API. However it's not clear to me which aggregations would benefit from it in terms of memory usage (eg. the example agg should work pretty well with breadth_first?). Currently I tend to see it more as a solution to #4915 (with less scope, which is needed anyway since pagination in general is not something we can implement) than to memory-usage issues.

Reading your proposal makes me wonder whether we could achieve the same result without modifying aggregations but exending the current slice parameter to also work with non-scroll requests and non-numeric fields.

@markharwood
Copy link
Contributor Author

markharwood commented Nov 14, 2016

eg. the example agg should work pretty well with breadth_first?

A breadth_first request would be ignored (or error?) because it is reliant on results from the child agg.

exending the current slice parameter to also work with non-scroll requests and non-numeric fields.

It can be more complex than that - sometimes we are talking about multi-value fields e.g. an exhaustive analysis of all tags used in stackoverflow articles.

without modifying aggregations

Aggs themselves wouldn't change - the proposal is isolated to adding another type of filter to the existing IncludeExclude class.

@markharwood
Copy link
Contributor Author

I prototyped this in IncludeExclude and benchmarked for a high-cardinality query (finding which of the 2.7m user accounts on StackOverflow look like they haven't been active since 2010). Each question doc has many user accounts associated with them. Example query:

GET stackq3/_search
{
   "size": 0,
   "aggs": {
      "accountRetirementCandidates": {
         "terms": {
            "field": "user",
            "size": 10000,
            "include": {
               "partition": 1,
               "of": 100
            },
            "order": {
               "lastVisit": "asc"
            }
         },
         "aggs": {
            "lastVisit": {
               "max": {
                  "field": "lastUpdateDate"
               }
            },
            "lastActiveBefore2010": {
               "bucket_selector": {
                  "buckets_path": {
                     "lastVisit": "lastVisit"
                  },
                  "script": "params.lastVisit<1262445274573l"
               }
            }
         }
      }
   }
}

Without term partitioning getting an exhaustive list simply would not be feasible as a single request on the existing data model and would require resorting to entity-centric indexing.

@jimczi
Copy link
Contributor

jimczi commented Nov 17, 2016

A breadth_first request would be ignored (or error?) because it is reliant on results from the child agg.

The breadth_first would work, it's just that in this case the child agg (bucket_selector) would be executed with the terms aggregation and the second round would compute the cardinality for the selected terms.

Isn't it possible to do this with a scripted terms aggs ? The execution time should be similar since you need to access the term (and not just the global ordinals) to compute the hash ?

@markharwood
Copy link
Contributor Author

the second round would compute the cardinality for the selected terms.

Breadth first is ignored in my first example because the top-level terms aggs is sorted on the child numSessions cardinality agg. This code is where it decides NOT to do breadth_first on a child agg because it sees it is responsible for sorting.

Isn't it possible to do this with a scripted terms aggs ?

Yes, but being a script

  1. I expect it to be slower
  2. It needs to get hold of a decent hash algo e.g. MurmurHash
  3. It would need to work with multi-value fields (see my last StackOverflow example query)
  4. It doesn't have (faster) access to ordinals.

@jpountz
Copy link
Contributor

jpountz commented Nov 17, 2016

I think Jim still made a good point that it would be nice to figure out whether we could do the partitioning differently so that we do not have to access values to know whether they are part of the current partition.

One way would be to use the ordinal range [partition*maxOrd/numPartitions, (partition+1)*maxOrd/numPartitions] even though it also has drawbacks, since eg. the partitions would become different if some terms are added or removed before we got time to retrieve data for all partitions.

Another way would be to use ranges of terms. For instance if there are 256 partitions in total, we could partition based on the first byte (we could figure out the ordinal range by calling lookupTerm on the min/max values). But this has a different issue that partitions can have very different sizes (especially in the worst case that they all share a common prefix, like urls).

Something I like about this proposal in general is that hopefully it would solve some of the use-cases behind the "paging support for aggregations" request. (#4915)
Based on the pros/cons of all options, I think I would lean towards using ordinal ranges ([partition*maxOrd/numPartitions, (partition+1)*maxOrd/numPartitions]) for strings and hashing for numbers?

@markharwood
Copy link
Contributor Author

I think we're on the same page with the ordinal support? Rough PR here: #21626

@jimczi
Copy link
Contributor

jimczi commented Nov 17, 2016

Breadth first is ignored in my first example because the top-level terms aggs is sorted on the child numSessions cardinality agg. This code is where it decides NOT to do breadth_first on a child agg because it sees it is responsible for sorting.

Right I missed the fact that the bucket selector needs to access the result of the cardinality ;).

I think Jim still made a good point that it would be nice to figure out whether we could do the partitioning differently so that we do not have to access values to know whether they are part of the current partition.

I like the global ords proposal but how can we ensure that maxOrd does not change between requests ?

@jpountz
Copy link
Contributor

jpountz commented Nov 17, 2016

I like the global ords proposal but how can we ensure that maxOrd does not change between requests?

Indeed we can't. So this would be best-effort only, similarly to pagination when not using the scroll API.

@markharwood
Copy link
Contributor Author

Note: from my benchmark tests there's a sweet spot to the num partitions you select because bigger numbers= lower-memory requests and faster (individual) responses but you have to do more requests to process all the data. The costs start to accumulate with high partition numbers because there's a fixed-cost element to running each request which is the feeding of all terms into the Include filter.

@jimczi
Copy link
Contributor

jimczi commented Nov 17, 2016

Indeed we can't. So this would be best-effort only, similarly to pagination when not using the scroll API.

The problem (like with paginations) is that there is no way to check if a response is valid or not. This makes this feature usable only on static indices otherwise the results could be completely wrong.
Since this is intended to exhaust a terms aggregation with high cardinality I think that precision should come before speed. Could we just rely on the term itself for the partitioning ? I know it will be slow but at least it will always work. Maybe we can figure out after how to speed up things but the ordinal based partition seems risky to me.

@markharwood
Copy link
Contributor Author

The client can use this to avoid ordinals:

     "execution_hint":"map",

@jimczi
Copy link
Contributor

jimczi commented Nov 17, 2016

@markharwood sorry I jumped from this issue to the PR and realized that afterward. Though I agree with Adrien here, we should be able to use the ordinals for the terms agg and the strings for the partitioning. This way the aggregation is still fast and always accurate ? I really don't know how to use this if I need to ensure that my index is not updated meanwhile.

@markharwood
Copy link
Contributor Author

Refresh problems aside, presumably there's a more fundamental issue that partitioning on global ordinals will give wrong results in a multi-shard system because "global" is only in the sense of spanning segments - not shards. For this reason we have to partition on values.

@jimczi
Copy link
Contributor

jimczi commented Nov 18, 2016

@markharwood lol !

markharwood added a commit to markharwood/elasticsearch that referenced this issue Nov 24, 2016
…ions so that multiple requests can be done without trying to compute everything in one request.

Closes elastic#21487
markharwood added a commit that referenced this issue Nov 24, 2016
…ions so that multiple requests can be done without trying to compute everything in one request.

Closes #21487
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants