-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add enrich processor #41532
Add enrich processor #41532
Changes from all commits
fe1fbd9
53b77a0
13f5612
87eceb8
02ebfc1
daf97e0
5897c07
fb38a65
7cf0931
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
package org.elasticsearch.xpack.enrich; | ||
|
||
import org.apache.http.entity.ByteArrayEntity; | ||
import org.apache.http.entity.ContentType; | ||
import org.apache.http.util.EntityUtils; | ||
import org.elasticsearch.client.Request; | ||
import org.elasticsearch.client.Response; | ||
import org.elasticsearch.common.xcontent.XContentBuilder; | ||
import org.elasticsearch.common.xcontent.XContentHelper; | ||
import org.elasticsearch.common.xcontent.XContentType; | ||
import org.elasticsearch.common.xcontent.json.JsonXContent; | ||
import org.elasticsearch.test.rest.ESRestTestCase; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.util.Map; | ||
|
||
import static org.hamcrest.Matchers.equalTo; | ||
|
||
public class EnrichIT extends ESRestTestCase { | ||
|
||
// TODO: update this test when policy runner is ready | ||
public void testBasicFlow() throws Exception { | ||
// Create the policy: | ||
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy"); | ||
putPolicyRequest.setJsonEntity("{\"type\": \"exact_match\",\"index_pattern\": \"my-index*\", \"enrich_key\": \"host\", " + | ||
"\"enrich_values\": [\"globalRank\", \"tldRank\", \"tld\"], \"schedule\": \"0 5 * * *\"}"); | ||
assertOK(client().performRequest(putPolicyRequest)); | ||
|
||
// Add a single enrich document for now and then refresh: | ||
Request indexRequest = new Request("PUT", "/.enrich-my_policy/_doc/elastic.co"); | ||
XContentBuilder document = XContentBuilder.builder(XContentType.SMILE.xContent()); | ||
document.startObject(); | ||
document.field("host", "elastic.co"); | ||
document.field("globalRank", 25); | ||
document.field("tldRank", 7); | ||
document.field("tld", "co"); | ||
document.endObject(); | ||
document.close(); | ||
ByteArrayOutputStream out = (ByteArrayOutputStream) document.getOutputStream(); | ||
indexRequest.setEntity(new ByteArrayEntity(out.toByteArray(), ContentType.create("application/smile"))); | ||
assertOK(client().performRequest(indexRequest)); | ||
Request refreshRequest = new Request("POST", "/.enrich-my_policy/_refresh"); | ||
assertOK(client().performRequest(refreshRequest)); | ||
|
||
// Create pipeline | ||
Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/my_pipeline"); | ||
putPipelineRequest.setJsonEntity("{\"processors\":[" + | ||
"{\"enrich\":{\"policy_name\":\"my_policy\",\"enrich_key\":\"host\",\"enrich_values\":[" + | ||
"{\"source\":\"globalRank\",\"target\":\"global_rank\"}," + | ||
"{\"source\":\"tldRank\",\"target\":\"tld_rank\"}" + | ||
"]}}" + | ||
"]}"); | ||
assertOK(client().performRequest(putPipelineRequest)); | ||
|
||
// Index document using pipeline with enrich processor: | ||
indexRequest = new Request("PUT", "/my-index/_doc/1"); | ||
indexRequest.addParameter("pipeline", "my_pipeline"); | ||
indexRequest.setJsonEntity("{\"host\": \"elastic.co\"}"); | ||
assertOK(client().performRequest(indexRequest)); | ||
|
||
// Check if document has been enriched | ||
Request getRequest = new Request("GET", "/my-index/_doc/1"); | ||
Map<String, Object> response = toMap(client().performRequest(getRequest)); | ||
Map<?, ?> _source = (Map<?, ?>) response.get("_source"); | ||
assertThat(_source.size(), equalTo(3)); | ||
assertThat(_source.get("host"), equalTo("elastic.co")); | ||
assertThat(_source.get("global_rank"), equalTo(25)); | ||
assertThat(_source.get("tld_rank"), equalTo(7)); | ||
} | ||
|
||
private static Map<String, Object> toMap(Response response) throws IOException { | ||
return toMap(EntityUtils.toString(response.getEntity())); | ||
} | ||
|
||
private static Map<String, Object> toMap(String response) { | ||
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
package org.elasticsearch.xpack.enrich; | ||
|
||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.index.engine.Engine; | ||
import org.elasticsearch.ingest.ConfigurationUtils; | ||
import org.elasticsearch.ingest.Processor; | ||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.function.Function; | ||
import java.util.function.Supplier; | ||
import java.util.stream.Collectors; | ||
|
||
final class EnrichProcessorFactory implements Processor.Factory { | ||
|
||
static final String TYPE = "enrich"; | ||
|
||
private final Function<String, EnrichPolicy> policyLookup; | ||
private final Function<String, Engine.Searcher> searchProvider; | ||
|
||
EnrichProcessorFactory(Supplier<ClusterState> clusterStateSupplier, | ||
Function<String, Engine.Searcher> searchProvider) { | ||
this.policyLookup = policyName -> EnrichStore.getPolicy(policyName, clusterStateSupplier.get()); | ||
this.searchProvider = searchProvider; | ||
} | ||
|
||
@Override | ||
public Processor create(Map<String, Processor.Factory> processorFactories, String tag, Map<String, Object> config) throws Exception { | ||
String policyName = ConfigurationUtils.readStringProperty(TYPE, tag, config, "policy_name"); | ||
EnrichPolicy policy = policyLookup.apply(policyName); | ||
if (policy == null) { | ||
throw new IllegalArgumentException("policy [" + policyName + "] does not exists"); | ||
} | ||
|
||
String enrichKey = ConfigurationUtils.readStringProperty(TYPE, tag, config, "enrich_key", policy.getEnrichKey()); | ||
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false); | ||
|
||
final List<EnrichSpecification> specifications; | ||
final List<Map<?, ?>> specificationConfig = ConfigurationUtils.readList(TYPE, tag, config, "enrich_values"); | ||
specifications = specificationConfig.stream() | ||
// TODO: Add templating support in enrich_values source and target options | ||
.map(entry -> new EnrichSpecification((String) entry.get("source"), (String) entry.get("target"))) | ||
.collect(Collectors.toList()); | ||
|
||
for (EnrichSpecification specification : specifications) { | ||
if (policy.getEnrichValues().contains(specification.sourceField) == false) { | ||
throw new IllegalArgumentException("source field [" + specification.sourceField + "] does not exist in policy [" + | ||
policyName + "]"); | ||
} | ||
} | ||
|
||
switch (policy.getType()) { | ||
case EnrichPolicy.EXACT_MATCH_TYPE: | ||
return new ExactMatchProcessor(tag, policyLookup, searchProvider, policyName, enrichKey, ignoreMissing, specifications); | ||
default: | ||
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]"); | ||
} | ||
} | ||
|
||
static final class EnrichSpecification { | ||
|
||
final String sourceField; | ||
final String targetField; | ||
|
||
EnrichSpecification(String sourceField, String targetField) { | ||
this.sourceField = sourceField; | ||
this.targetField = targetField; | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
package org.elasticsearch.xpack.enrich; | ||
|
||
import org.apache.lucene.document.Document; | ||
import org.apache.lucene.index.LeafReader; | ||
import org.apache.lucene.index.PostingsEnum; | ||
import org.apache.lucene.index.Terms; | ||
import org.apache.lucene.index.TermsEnum; | ||
import org.apache.lucene.util.BytesRef; | ||
import org.elasticsearch.common.bytes.BytesArray; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
import org.elasticsearch.common.xcontent.XContentHelper; | ||
import org.elasticsearch.common.xcontent.XContentType; | ||
import org.elasticsearch.index.engine.Engine; | ||
import org.elasticsearch.index.mapper.SourceFieldMapper; | ||
import org.elasticsearch.ingest.AbstractProcessor; | ||
import org.elasticsearch.ingest.IngestDocument; | ||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy; | ||
import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.function.Function; | ||
|
||
final class ExactMatchProcessor extends AbstractProcessor { | ||
|
||
private final Function<String, EnrichPolicy> policyLookup; | ||
private final Function<String, Engine.Searcher> searchProvider; | ||
|
||
private final String policyName; | ||
private final String enrichKey; | ||
private final boolean ignoreMissing; | ||
private final List<EnrichSpecification> specifications; | ||
|
||
ExactMatchProcessor(String tag, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we go ahead a create an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets create it when we add a second concrete implementation? I think that then we have a better idea what logic should be reused. |
||
Function<String, EnrichPolicy> policyLookup, | ||
Function<String, Engine.Searcher> searchProvider, | ||
String policyName, | ||
String enrichKey, | ||
boolean ignoreMissing, | ||
List<EnrichSpecification> specifications) { | ||
super(tag); | ||
this.policyLookup = policyLookup; | ||
this.searchProvider = searchProvider; | ||
this.policyName = policyName; | ||
this.enrichKey = enrichKey; | ||
this.ignoreMissing = ignoreMissing; | ||
this.specifications = specifications; | ||
} | ||
|
||
@Override | ||
public IngestDocument execute(IngestDocument ingestDocument) throws Exception { | ||
final EnrichPolicy policy = policyLookup.apply(policyName); | ||
if (policy == null) { | ||
throw new IllegalArgumentException("policy [" + policyName + "] does not exists"); | ||
} | ||
|
||
final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing); | ||
if (value == null) { | ||
return ingestDocument; | ||
} | ||
|
||
// TODO: re-use the engine searcher between enriching documents from the same write request | ||
try (Engine.Searcher engineSearcher = searchProvider.apply(policy.getAliasName(policyName))) { | ||
if (engineSearcher.getDirectoryReader().leaves().size() == 0) { | ||
return ingestDocument; | ||
} else if (engineSearcher.getDirectoryReader().leaves().size() != 1) { | ||
throw new IllegalStateException("enrich index must have exactly a single segment"); | ||
} | ||
|
||
final LeafReader leafReader = engineSearcher.getDirectoryReader().leaves().get(0).reader(); | ||
final Terms terms = leafReader.terms(policy.getEnrichKey()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this handle when the key is a dot representation of a nested object ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it should be able to handle field with dots. On the Lucene level the full path is used as field name. So if a key field is nested under object fields then this will work out. Supporting nested field type (for objects under a json array) is a different story and this code doesn't support that. An enrich index with a nested field type is something that we should allow, because that has an overhead each time a document gets enriched. (a block join would need to be performed in addition to the term lookup that happens here). The policy runner should always built an enrich index that is optimized for speed. The source index can contain a nested field, the policy runner should then de-normalize the source of the document. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks I wasn't sure how this is represented at the Lucene layer.. TIL. I added a line item to #32789 to ensure we don't forget out this requirement. (I added it under the processor, but it sounds more like the a concern of the synchronization) |
||
if (terms == null) { | ||
throw new IllegalStateException("enrich key field [" + policy.getEnrichKey() + "] does not exist"); | ||
} | ||
|
||
final TermsEnum tenum = terms.iterator(); | ||
if (tenum.seekExact(new BytesRef(value))) { | ||
PostingsEnum penum = tenum.postings(null, PostingsEnum.NONE); | ||
final int docId = penum.nextDoc(); | ||
assert docId != PostingsEnum.NO_MORE_DOCS : "no matching doc id for [" + enrichKey + "]"; | ||
assert penum.nextDoc() == PostingsEnum.NO_MORE_DOCS : "more than one doc id matching for [" + enrichKey + "]"; | ||
|
||
// TODO: The use of _source is temporarily until enrich source field mapper has been added (see PR #41521) | ||
Document document = leafReader.document(docId, Set.of(SourceFieldMapper.NAME)); | ||
BytesRef source = document.getBinaryValue(SourceFieldMapper.NAME); | ||
assert source != null; | ||
|
||
final BytesReference encoded = new BytesArray(source); | ||
final Map<String, Object> decoded = | ||
XContentHelper.convertToMap(encoded, false, XContentType.SMILE).v2(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This maybe a comment for the other PR ... but what are the SMILE binary format upgrade guarantees ? For example if we upgrade Jackson to a major version is the SMILE binary format guaranteed to be compatible ? AFAIK this would be the first time we persist SMILE. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the format, doesn't change between jackson versions. (json format doesn't change either when jackson is upgraded). We use smile in quite a few changes, for example the cluster state is stored as smile in the data directory and users can send their data as smile to ES and then the _source field will store the documents in the SMILE format. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TIL (again) thanks! |
||
for (EnrichSpecification specification : specifications) { | ||
Object enrichValue = decoded.get(specification.sourceField); | ||
// TODO: add support over overwrite option (like in SetProcessor) | ||
ingestDocument.setFieldValue(specification.targetField, enrichValue); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this handle nested fields (both from the source values and ingestDocument target) ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same answer as here: #41532 (comment) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We will probably want to support an |
||
} | ||
} | ||
} | ||
return ingestDocument; | ||
} | ||
|
||
@Override | ||
public String getType() { | ||
return EnrichProcessorFactory.TYPE; | ||
} | ||
|
||
String getPolicyName() { | ||
return policyName; | ||
} | ||
|
||
String getEnrichKey() { | ||
return enrichKey; | ||
} | ||
|
||
boolean isIgnoreMissing() { | ||
return ignoreMissing; | ||
} | ||
|
||
List<EnrichSpecification> getSpecifications() { | ||
return specifications; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will probably want to support templating on the target field. Probably out of scope for this review...maybe a TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add todos and also add it as task to the meta issue.