Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add enrich processor #41532

Merged
merged 9 commits into from
Apr 30, 2019
Merged

Add enrich processor #41532

merged 9 commits into from
Apr 30, 2019

Conversation

martijnvg
Copy link
Member

The enrich processor performs a lookup in a locally allocated
enrich index shard using a field value from the document being enriched.
If there is a match then the _source of the enrich document is fetched.
The document being enriched then gets the decorate values from the
enrich document based on the configured decorate fields in the pipeline.

Note that the usage of the _source field is temporary until the enrich
source field that is part of #41521 is merged into the enrich branch.
Using the _source field involves significant decompression which not
desired for enrich use cases. Otherwise this PR would need to include
the enrich source meta field mapper too and then reviewing this change
becomes more difficult.

The policy contains the information what field in the enrich index
to query and what fields are available to decorate a document being
enriched with.

The enrich processor has the following configuration options:

  • policy_name - the name of the policy this processor should use
  • enrich_key - the field in the document being enriched that holds to lookup value
  • enrich_key_ignore_missing - Whether to allow the key field to be missing
  • enrich_values - a list of fields to decorate the document being enriched with.
    Each entry holds a source field and a target field.
    The source field indicates what decorate field to use that is available in the policy.
    The target field controls the field name to use in the document being enriched.
    The source and target fields can be the same.

Example pipeline config:

{
   "processors": [
      {
         "policy_name": "my_policy",
         "enrich_key": "host_name",
         "enrich_values": [
            {
              "source": "globalRank",
              "target": "global_rank"
            }
         ]
      }
   ]
}

In the above example documents are being enriched with a global rank value.
For each document that has match in the enrich index based on its host_name field,
the document gets an global rank field value, which is fetched from the globalRank
field in the enrich index and saved as global_rank in the document being enriched.

This is PR is part one of #41521

The enrich processor performs a lookup in a locally allocated
enrich index shard using a field value from the document being enriched.
If there is a match then the _source of the enrich document is fetched.
The document being enriched then gets the decorate values from the
enrich document based on the configured decorate fields in the pipeline.

Note that the usage of the _source field is temporary until the enrich
source field that is part of elastic#41521 is merged into the enrich branch.
Using the _source field involves significant decompression which not
desired for enrich use cases.

The policy contains the information what field in the enrich index
to query and what fields are available to decorate a document being
enriched with.

The enrich processor has the following configuration options:
* `policy_name` - the name of the policy this processor should use
* `enrich_key` - the field in the document being enriched that holds to lookup value
* `enrich_key_ignore_missing` - Whether to allow the key field to be missing
* `enrich_values` - a list of fields to decorate the document being enriched with.
                    Each entry holds a source field and a target field.
                    The source field indicates what decorate field to use that is available in the policy.
                    The target field controls the field name to use in the document being enriched.
                    The source and target fields can be the same.

Example pipeline config:

```
{
   "processors": [
      {
         "policy_name": "my_policy",
         "enrich_key": "host_name",
         "enrich_values": [
            {
              "source": "globalRank",
              "target": "global_rank"
            }
         ]
      }
   ]
}
```

In the above example documents are being enriched with a global rank value.
For each document that has match in the enrich index based on its host_name field,
the document gets an global rank field value, which is fetched from the `globalRank`
field in the enrich index and saved as `global_rank` in the document being enriched.

This is PR is part one of elastic#41521
@martijnvg martijnvg added >non-issue :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP labels Apr 25, 2019
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-features

@martijnvg
Copy link
Member Author

@elasticmachine run elasticsearch-ci/1

Copy link
Contributor

@jakelandis jakelandis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good ...a couple comments ...

w/r/t nested objects , we can punt on that till later with a //TODO

is it too early to add in a REST test or two ?

private final boolean ignoreEnrichKeyMissing;
private final List<EnrichProcessorFactory.EnrichSpecification> specifications;

ExactMatchProcessor(String tag,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we go ahead a create an AbstractEnrichProcessor and move some of this up a level ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets create it when we add a second concrete implementation? I think that then we have a better idea what logic should be reused.

}

final LeafReader leafReader = engineSearcher.getDirectoryReader().leaves().get(0).reader();
final Terms terms = leafReader.terms(policy.getEnrichKey());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this handle when the key is a dot representation of a nested object ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be able to handle field with dots. On the Lucene level the full path is used as field name. So if a key field is nested under object fields then this will work out.

Supporting nested field type (for objects under a json array) is a different story and this code doesn't support that. An enrich index with a nested field type is something that we should allow, because that has an overhead each time a document gets enriched. (a block join would need to be performed in addition to the term lookup that happens here). The policy runner should always built an enrich index that is optimized for speed. The source index can contain a nested field, the policy runner should then de-normalize the source of the document.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks I wasn't sure how this is represented at the Lucene layer.. TIL.

I added a line item to #32789 to ensure we don't forget out this requirement. (I added it under the processor, but it sounds more like the a concern of the synchronization)

XContentHelper.convertToMap(encoded, false, XContentType.SMILE).v2();
for (EnrichProcessorFactory.EnrichSpecification specification : specifications) {
Object enrichValue = decoded.get(specification.sourceField);
ingestDocument.setFieldValue(specification.targetField, enrichValue);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this handle nested fields (both from the source values and ingestDocument target) ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same answer as here: #41532 (comment)


final BytesReference encoded = new BytesArray(source);
final Map<String, Object> decoded =
XContentHelper.convertToMap(encoded, false, XContentType.SMILE).v2();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This maybe a comment for the other PR ... but what are the SMILE binary format upgrade guarantees ? For example if we upgrade Jackson to a major version is the SMILE binary format guaranteed to be compatible ?

AFAIK this would be the first time we persist SMILE.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the format, doesn't change between jackson versions. (json format doesn't change either when jackson is upgraded). We use smile in quite a few changes, for example the cluster state is stored as smile in the data directory and users can send their data as smile to ES and then the _source field will store the documents in the SMILE format.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL (again) thanks!

}

// TODO: re-use the engine searcher between enriching documents from the same write request
try (Engine.Searcher engineSearcher = searchProvider.apply(policy.getIndexPattern())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this supposed to be the policies index pattern ? I think this should be .enrich-<policy-name> ? (or whatever we agree on for the enrich index name)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this should be the alias that points to the current enrich index.
The search provider does do alias resolution as well.

So this needs to change to use the alias for a given policy. Given that the alias name doesn't change, we can add a method to EnrichPolicy that returns the alias name for that policy

}

String enrichKey = ConfigurationUtils.readStringProperty(TYPE, tag, config, "enrich_key", policy.getEnrichKey());
boolean ignoreKeyMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "enrich_key_ignore_missing", false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: enrich_key_ignore_missing -> ignore_missing_enrich_key or simply ignore_missing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 to just ignore_missing

@martijnvg
Copy link
Member Author

@elasticmachine run elasticsearch-ci/1

@martijnvg
Copy link
Member Author

Thanks for reviewing @jakelandis!

is it too early to add in a REST test or two ?

The put policy api didn't exist yesterday :), I will work on a rest test. We will need to adapt the test when the policy runner is in and the execute policy api.

martijnvg added a commit to martijnvg/elasticsearch that referenced this pull request Apr 26, 2019
move put policy api yaml test to this rest module.

The main benefit is that all tests will then be run when running:
`./gradlew -p x-pack/plugin/enrich check`

The rest qa module starts a node with default distribution and basic
license.

This qa module will also be used for adding different rest tests (not yaml),
for example rest tests needed for elastic#41532

Also when we are going to work on security integration then we can
add a security qa module under the qa folder. Also at some point
we should add a multi node qa module.
Copy link
Contributor

@jakelandis jakelandis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
(1 nitpick and 2 //TODO suggestions)

This is looking good. At some point in the future when everything comes together we may want to re-evaluate the naming. I found myself getting a bit confused between enrich index vs. document, it could just be me or maybe doc will address.

Function<String, Engine.Searcher> searchProvider) {
this.policyLookup = policyName -> {
ClusterState clusterState = clusterStateSupplier.get();
return EnrichStore.getPolicy(policyName, clusterState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this.policyLookup = policyName -> EnrichStore.getPolicy(policyName, clusterStateSupplier.get()); reads a bit cleaner IMO

XContentHelper.convertToMap(encoded, false, XContentType.SMILE).v2();
for (EnrichProcessorFactory.EnrichSpecification specification : specifications) {
Object enrichValue = decoded.get(specification.sourceField);
ingestDocument.setFieldValue(specification.targetField, enrichValue);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will probably want to support an override option like in the set processor. Maybe out of scope for this review...maybe a new TODO ?

final List<EnrichSpecification> specifications;
final List<Map<?, ?>> specificationConfig = ConfigurationUtils.readList(TYPE, tag, config, "enrich_values");
specifications = specificationConfig.stream()
.map(entry -> new EnrichSpecification((String) entry.get("source"), (String) entry.get("target")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will probably want to support templating on the target field. Probably out of scope for this review...maybe a TODO

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add todos and also add it as task to the meta issue.

@martijnvg
Copy link
Member Author

At some point in the future when everything comes together we may want to re-evaluate the naming. I found myself getting a bit confused between enrich index vs. document, it could just be me or maybe doc will address.

Agreed, I think naming will improve when things are coming together.

martijnvg added a commit that referenced this pull request Apr 26, 2019
move put policy api yaml test to this rest module.

The main benefit is that all tests will then be run when running:
`./gradlew -p x-pack/plugin/enrich check`

The rest qa module starts a node with default distribution and basic
license.

This qa module will also be used for adding different rest tests (not yaml),
for example rest tests needed for #41532

Also when we are going to work on security integration then we can
add a security qa module under the qa folder. Also at some point
we should add a multi node qa module.
martijnvg added a commit that referenced this pull request Apr 26, 2019
move put policy api yaml test to this rest module.

The main benefit is that all tests will then be run when running:
`./gradlew -p x-pack/plugin/enrich check`

The rest qa module starts a node with default distribution and basic
license.

This qa module will also be used for adding different rest tests (not yaml),
for example rest tests needed for #41532

Also when we are going to work on security integration then we can
add a security qa module under the qa folder. Also at some point
we should add a multi node qa module.
@martijnvg
Copy link
Member Author

I've also added a integration test under the rest qa module. The test is now complete, because the policy runner hasn't been merged yet, so it just creates an enrich index and then indexes a single enrich document. But this does allow to test the enrich processor.

Also a small issue was found, the way the cluster state was queried from the enrich processor factory was problematic. The processor factory is called on cluster state update thread, and calling ClusterService#state() from there is illegal, because the current cluster state update hasn't been completed. So this fix this, we now keep track of the latest applied cluster state and use that in the enrich processor.

@martijnvg
Copy link
Member Author

@elasticmachine run elasticsearch-ci/1

@martijnvg
Copy link
Member Author

@elasticmachine run elasticsearch-ci/1

@martijnvg
Copy link
Member Author

@elasticmachine run elasticsearch-ci/1

1 similar comment
@martijnvg
Copy link
Member Author

@elasticmachine run elasticsearch-ci/1

@martijnvg martijnvg merged commit 3c7f463 into elastic:enrich Apr 30, 2019
martijnvg added a commit that referenced this pull request Apr 30, 2019
The enrich processor performs a lookup in a locally allocated
enrich index shard using a field value from the document being enriched.
If there is a match then the _source of the enrich document is fetched.
The document being enriched then gets the decorate values from the
enrich document based on the configured decorate fields in the pipeline.

Note that the usage of the _source field is temporary until the enrich
source field that is part of #41521 is merged into the enrich branch.
Using the _source field involves significant decompression which not
desired for enrich use cases.

The policy contains the information what field in the enrich index
to query and what fields are available to decorate a document being
enriched with.

The enrich processor has the following configuration options:
* `policy_name` - the name of the policy this processor should use
* `enrich_key` - the field in the document being enriched that holds to lookup value
* `ignore_missing` - Whether to allow the key field to be missing
* `enrich_values` - a list of fields to decorate the document being enriched with.
                    Each entry holds a source field and a target field.
                    The source field indicates what decorate field to use that is available in the policy.
                    The target field controls the field name to use in the document being enriched.
                    The source and target fields can be the same.

Example pipeline config:

```
{
   "processors": [
      {
         "policy_name": "my_policy",
         "enrich_key": "host_name",
         "enrich_values": [
            {
              "source": "globalRank",
              "target": "global_rank"
            }
         ]
      }
   ]
}
```

In the above example documents are being enriched with a global rank value.
For each document that has match in the enrich index based on its host_name field,
the document gets an global rank field value, which is fetched from the `globalRank`
field in the enrich index and saved as `global_rank` in the document being enriched.

This is PR is part one of #41521
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >non-issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants