-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add enrich processor #41532
Add enrich processor #41532
Conversation
The enrich processor performs a lookup in a locally allocated enrich index shard using a field value from the document being enriched. If there is a match then the _source of the enrich document is fetched. The document being enriched then gets the decorate values from the enrich document based on the configured decorate fields in the pipeline. Note that the usage of the _source field is temporary until the enrich source field that is part of elastic#41521 is merged into the enrich branch. Using the _source field involves significant decompression which not desired for enrich use cases. The policy contains the information what field in the enrich index to query and what fields are available to decorate a document being enriched with. The enrich processor has the following configuration options: * `policy_name` - the name of the policy this processor should use * `enrich_key` - the field in the document being enriched that holds to lookup value * `enrich_key_ignore_missing` - Whether to allow the key field to be missing * `enrich_values` - a list of fields to decorate the document being enriched with. Each entry holds a source field and a target field. The source field indicates what decorate field to use that is available in the policy. The target field controls the field name to use in the document being enriched. The source and target fields can be the same. Example pipeline config: ``` { "processors": [ { "policy_name": "my_policy", "enrich_key": "host_name", "enrich_values": [ { "source": "globalRank", "target": "global_rank" } ] } ] } ``` In the above example documents are being enriched with a global rank value. For each document that has match in the enrich index based on its host_name field, the document gets an global rank field value, which is fetched from the `globalRank` field in the enrich index and saved as `global_rank` in the document being enriched. This is PR is part one of elastic#41521
Pinging @elastic/es-core-features |
@elasticmachine run elasticsearch-ci/1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good ...a couple comments ...
w/r/t nested objects , we can punt on that till later with a //TODO
is it too early to add in a REST test or two ?
private final boolean ignoreEnrichKeyMissing; | ||
private final List<EnrichProcessorFactory.EnrichSpecification> specifications; | ||
|
||
ExactMatchProcessor(String tag, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we go ahead a create an AbstractEnrichProcessor
and move some of this up a level ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets create it when we add a second concrete implementation? I think that then we have a better idea what logic should be reused.
} | ||
|
||
final LeafReader leafReader = engineSearcher.getDirectoryReader().leaves().get(0).reader(); | ||
final Terms terms = leafReader.terms(policy.getEnrichKey()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this handle when the key is a dot representation of a nested object ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it should be able to handle field with dots. On the Lucene level the full path is used as field name. So if a key field is nested under object fields then this will work out.
Supporting nested field type (for objects under a json array) is a different story and this code doesn't support that. An enrich index with a nested field type is something that we should allow, because that has an overhead each time a document gets enriched. (a block join would need to be performed in addition to the term lookup that happens here). The policy runner should always built an enrich index that is optimized for speed. The source index can contain a nested field, the policy runner should then de-normalize the source of the document.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks I wasn't sure how this is represented at the Lucene layer.. TIL.
I added a line item to #32789 to ensure we don't forget out this requirement. (I added it under the processor, but it sounds more like the a concern of the synchronization)
XContentHelper.convertToMap(encoded, false, XContentType.SMILE).v2(); | ||
for (EnrichProcessorFactory.EnrichSpecification specification : specifications) { | ||
Object enrichValue = decoded.get(specification.sourceField); | ||
ingestDocument.setFieldValue(specification.targetField, enrichValue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this handle nested fields (both from the source values and ingestDocument target) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same answer as here: #41532 (comment)
|
||
final BytesReference encoded = new BytesArray(source); | ||
final Map<String, Object> decoded = | ||
XContentHelper.convertToMap(encoded, false, XContentType.SMILE).v2(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This maybe a comment for the other PR ... but what are the SMILE binary format upgrade guarantees ? For example if we upgrade Jackson to a major version is the SMILE binary format guaranteed to be compatible ?
AFAIK this would be the first time we persist SMILE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the format, doesn't change between jackson versions. (json format doesn't change either when jackson is upgraded). We use smile in quite a few changes, for example the cluster state is stored as smile in the data directory and users can send their data as smile to ES and then the _source field will store the documents in the SMILE format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL (again) thanks!
} | ||
|
||
// TODO: re-use the engine searcher between enriching documents from the same write request | ||
try (Engine.Searcher engineSearcher = searchProvider.apply(policy.getIndexPattern())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this supposed to be the policies index pattern ? I think this should be .enrich-<policy-name> ? (or whatever we agree on for the enrich index name)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this should be the alias that points to the current enrich index.
The search provider does do alias resolution as well.
So this needs to change to use the alias for a given policy. Given that the alias name doesn't change, we can add a method to EnrichPolicy
that returns the alias name for that policy
} | ||
|
||
String enrichKey = ConfigurationUtils.readStringProperty(TYPE, tag, config, "enrich_key", policy.getEnrichKey()); | ||
boolean ignoreKeyMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "enrich_key_ignore_missing", false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: enrich_key_ignore_missing
-> ignore_missing_enrich_key
or simply ignore_missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 to just ignore_missing
@elasticmachine run elasticsearch-ci/1 |
Thanks for reviewing @jakelandis!
The put policy api didn't exist yesterday :), I will work on a rest test. We will need to adapt the test when the policy runner is in and the execute policy api. |
move put policy api yaml test to this rest module. The main benefit is that all tests will then be run when running: `./gradlew -p x-pack/plugin/enrich check` The rest qa module starts a node with default distribution and basic license. This qa module will also be used for adding different rest tests (not yaml), for example rest tests needed for elastic#41532 Also when we are going to work on security integration then we can add a security qa module under the qa folder. Also at some point we should add a multi node qa module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
(1 nitpick and 2 //TODO suggestions)
This is looking good. At some point in the future when everything comes together we may want to re-evaluate the naming. I found myself getting a bit confused between enrich index vs. document, it could just be me or maybe doc will address.
Function<String, Engine.Searcher> searchProvider) { | ||
this.policyLookup = policyName -> { | ||
ClusterState clusterState = clusterStateSupplier.get(); | ||
return EnrichStore.getPolicy(policyName, clusterState); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this.policyLookup = policyName -> EnrichStore.getPolicy(policyName, clusterStateSupplier.get());
reads a bit cleaner IMO
XContentHelper.convertToMap(encoded, false, XContentType.SMILE).v2(); | ||
for (EnrichProcessorFactory.EnrichSpecification specification : specifications) { | ||
Object enrichValue = decoded.get(specification.sourceField); | ||
ingestDocument.setFieldValue(specification.targetField, enrichValue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will probably want to support an override
option like in the set processor. Maybe out of scope for this review...maybe a new TODO ?
final List<EnrichSpecification> specifications; | ||
final List<Map<?, ?>> specificationConfig = ConfigurationUtils.readList(TYPE, tag, config, "enrich_values"); | ||
specifications = specificationConfig.stream() | ||
.map(entry -> new EnrichSpecification((String) entry.get("source"), (String) entry.get("target"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will probably want to support templating on the target field. Probably out of scope for this review...maybe a TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add todos and also add it as task to the meta issue.
Agreed, I think naming will improve when things are coming together. |
move put policy api yaml test to this rest module. The main benefit is that all tests will then be run when running: `./gradlew -p x-pack/plugin/enrich check` The rest qa module starts a node with default distribution and basic license. This qa module will also be used for adding different rest tests (not yaml), for example rest tests needed for #41532 Also when we are going to work on security integration then we can add a security qa module under the qa folder. Also at some point we should add a multi node qa module.
move put policy api yaml test to this rest module. The main benefit is that all tests will then be run when running: `./gradlew -p x-pack/plugin/enrich check` The rest qa module starts a node with default distribution and basic license. This qa module will also be used for adding different rest tests (not yaml), for example rest tests needed for #41532 Also when we are going to work on security integration then we can add a security qa module under the qa folder. Also at some point we should add a multi node qa module.
I've also added a integration test under the rest qa module. The test is now complete, because the policy runner hasn't been merged yet, so it just creates an enrich index and then indexes a single enrich document. But this does allow to test the enrich processor. Also a small issue was found, the way the cluster state was queried from the enrich processor factory was problematic. The processor factory is called on cluster state update thread, and calling |
@elasticmachine run elasticsearch-ci/1 |
@elasticmachine run elasticsearch-ci/1 |
@elasticmachine run elasticsearch-ci/1 |
1 similar comment
@elasticmachine run elasticsearch-ci/1 |
The enrich processor performs a lookup in a locally allocated enrich index shard using a field value from the document being enriched. If there is a match then the _source of the enrich document is fetched. The document being enriched then gets the decorate values from the enrich document based on the configured decorate fields in the pipeline. Note that the usage of the _source field is temporary until the enrich source field that is part of #41521 is merged into the enrich branch. Using the _source field involves significant decompression which not desired for enrich use cases. The policy contains the information what field in the enrich index to query and what fields are available to decorate a document being enriched with. The enrich processor has the following configuration options: * `policy_name` - the name of the policy this processor should use * `enrich_key` - the field in the document being enriched that holds to lookup value * `ignore_missing` - Whether to allow the key field to be missing * `enrich_values` - a list of fields to decorate the document being enriched with. Each entry holds a source field and a target field. The source field indicates what decorate field to use that is available in the policy. The target field controls the field name to use in the document being enriched. The source and target fields can be the same. Example pipeline config: ``` { "processors": [ { "policy_name": "my_policy", "enrich_key": "host_name", "enrich_values": [ { "source": "globalRank", "target": "global_rank" } ] } ] } ``` In the above example documents are being enriched with a global rank value. For each document that has match in the enrich index based on its host_name field, the document gets an global rank field value, which is fetched from the `globalRank` field in the enrich index and saved as `global_rank` in the document being enriched. This is PR is part one of #41521
The enrich processor performs a lookup in a locally allocated
enrich index shard using a field value from the document being enriched.
If there is a match then the _source of the enrich document is fetched.
The document being enriched then gets the decorate values from the
enrich document based on the configured decorate fields in the pipeline.
Note that the usage of the _source field is temporary until the enrich
source field that is part of #41521 is merged into the enrich branch.
Using the _source field involves significant decompression which not
desired for enrich use cases. Otherwise this PR would need to include
the enrich source meta field mapper too and then reviewing this change
becomes more difficult.
The policy contains the information what field in the enrich index
to query and what fields are available to decorate a document being
enriched with.
The enrich processor has the following configuration options:
policy_name
- the name of the policy this processor should useenrich_key
- the field in the document being enriched that holds to lookup valueenrich_key_ignore_missing
- Whether to allow the key field to be missingenrich_values
- a list of fields to decorate the document being enriched with.Each entry holds a source field and a target field.
The source field indicates what decorate field to use that is available in the policy.
The target field controls the field name to use in the document being enriched.
The source and target fields can be the same.
Example pipeline config:
In the above example documents are being enriched with a global rank value.
For each document that has match in the enrich index based on its host_name field,
the document gets an global rank field value, which is fetched from the
globalRank
field in the enrich index and saved as
global_rank
in the document being enriched.This is PR is part one of #41521