diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java index 06d74222b171f..b65007f437a28 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java @@ -28,7 +28,7 @@ */ public final class EnrichPolicy implements Writeable, ToXContentFragment { - static final String EXACT_MATCH_TYPE = "exact_match"; + public static final String EXACT_MATCH_TYPE = "exact_match"; public static final String[] SUPPORTED_POLICY_TYPES = new String[]{EXACT_MATCH_TYPE}; static final ParseField TYPE = new ParseField("type"); @@ -125,6 +125,11 @@ public String getSchedule() { return schedule; } + public String getAliasName(String policyName) { + // #41553 (list policy api) will add name to policy, so that we don't have to provide the name via a parameter. + return ".enrich-" + policyName; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(type); diff --git a/x-pack/plugin/enrich/qa/rest/src/test/java/org/elasticsearch/xpack/enrich/EnrichIT.java b/x-pack/plugin/enrich/qa/rest/src/test/java/org/elasticsearch/xpack/enrich/EnrichIT.java new file mode 100644 index 0000000000000..cce65a53a3d14 --- /dev/null +++ b/x-pack/plugin/enrich/qa/rest/src/test/java/org/elasticsearch/xpack/enrich/EnrichIT.java @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.test.rest.ESRestTestCase; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class EnrichIT extends ESRestTestCase { + + // TODO: update this test when policy runner is ready + public void testBasicFlow() throws Exception { + // Create the policy: + Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy"); + putPolicyRequest.setJsonEntity("{\"type\": \"exact_match\",\"index_pattern\": \"my-index*\", \"enrich_key\": \"host\", " + + "\"enrich_values\": [\"globalRank\", \"tldRank\", \"tld\"], \"schedule\": \"0 5 * * *\"}"); + assertOK(client().performRequest(putPolicyRequest)); + + // Add a single enrich document for now and then refresh: + Request indexRequest = new Request("PUT", "/.enrich-my_policy/_doc/elastic.co"); + XContentBuilder document = XContentBuilder.builder(XContentType.SMILE.xContent()); + document.startObject(); + document.field("host", "elastic.co"); + document.field("globalRank", 25); + document.field("tldRank", 7); + document.field("tld", "co"); + document.endObject(); + document.close(); + ByteArrayOutputStream out = (ByteArrayOutputStream) document.getOutputStream(); + indexRequest.setEntity(new ByteArrayEntity(out.toByteArray(), ContentType.create("application/smile"))); + assertOK(client().performRequest(indexRequest)); + Request refreshRequest = new Request("POST", "/.enrich-my_policy/_refresh"); + assertOK(client().performRequest(refreshRequest)); + + // Create pipeline + Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/my_pipeline"); + putPipelineRequest.setJsonEntity("{\"processors\":[" + + "{\"enrich\":{\"policy_name\":\"my_policy\",\"enrich_key\":\"host\",\"enrich_values\":[" + + "{\"source\":\"globalRank\",\"target\":\"global_rank\"}," + + "{\"source\":\"tldRank\",\"target\":\"tld_rank\"}" + + "]}}" + + "]}"); + assertOK(client().performRequest(putPipelineRequest)); + + // Index document using pipeline with enrich processor: + indexRequest = new Request("PUT", "/my-index/_doc/1"); + indexRequest.addParameter("pipeline", "my_pipeline"); + indexRequest.setJsonEntity("{\"host\": \"elastic.co\"}"); + assertOK(client().performRequest(indexRequest)); + + // Check if document has been enriched + Request getRequest = new Request("GET", "/my-index/_doc/1"); + Map response = toMap(client().performRequest(getRequest)); + Map _source = (Map) response.get("_source"); + assertThat(_source.size(), equalTo(3)); + assertThat(_source.get("host"), equalTo("elastic.co")); + assertThat(_source.get("global_rank"), equalTo(25)); + assertThat(_source.get("tld_rank"), equalTo(7)); + } + + private static Map toMap(Response response) throws IOException { + return toMap(EntityUtils.toString(response.getEntity())); + } + + private static Map toMap(String response) { + return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false); + } + +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java index 5e8212b795e47..d0c47418cb625 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java @@ -7,8 +7,10 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -31,6 +33,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import static java.util.Collections.emptyList; @@ -48,7 +51,14 @@ public EnrichPlugin(final Settings settings) { @Override public Map getProcessors(Processor.Parameters parameters) { - return Collections.emptyMap(); + final ClusterService clusterService = parameters.ingestService.getClusterService(); + // Pipelines are created from cluster state update thead and calling ClusterService#state() from that thead is illegal + // (because the current cluster state update is in progress) + // So with the below atomic reference we keep track of the latest updated cluster state: + AtomicReference reference = new AtomicReference<>(); + clusterService.addStateApplier(event -> reference.set(event.state())); + + return Map.of(EnrichProcessorFactory.TYPE, new EnrichProcessorFactory(reference::get, parameters.localShardSearcher)); } public List> getActions() { diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java new file mode 100644 index 0000000000000..91a2236cec6fc --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java @@ -0,0 +1,77 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +final class EnrichProcessorFactory implements Processor.Factory { + + static final String TYPE = "enrich"; + + private final Function policyLookup; + private final Function searchProvider; + + EnrichProcessorFactory(Supplier clusterStateSupplier, + Function searchProvider) { + this.policyLookup = policyName -> EnrichStore.getPolicy(policyName, clusterStateSupplier.get()); + this.searchProvider = searchProvider; + } + + @Override + public Processor create(Map processorFactories, String tag, Map config) throws Exception { + String policyName = ConfigurationUtils.readStringProperty(TYPE, tag, config, "policy_name"); + EnrichPolicy policy = policyLookup.apply(policyName); + if (policy == null) { + throw new IllegalArgumentException("policy [" + policyName + "] does not exists"); + } + + String enrichKey = ConfigurationUtils.readStringProperty(TYPE, tag, config, "enrich_key", policy.getEnrichKey()); + boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false); + + final List specifications; + final List> specificationConfig = ConfigurationUtils.readList(TYPE, tag, config, "enrich_values"); + specifications = specificationConfig.stream() + // TODO: Add templating support in enrich_values source and target options + .map(entry -> new EnrichSpecification((String) entry.get("source"), (String) entry.get("target"))) + .collect(Collectors.toList()); + + for (EnrichSpecification specification : specifications) { + if (policy.getEnrichValues().contains(specification.sourceField) == false) { + throw new IllegalArgumentException("source field [" + specification.sourceField + "] does not exist in policy [" + + policyName + "]"); + } + } + + switch (policy.getType()) { + case EnrichPolicy.EXACT_MATCH_TYPE: + return new ExactMatchProcessor(tag, policyLookup, searchProvider, policyName, enrichKey, ignoreMissing, specifications); + default: + throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]"); + } + } + + static final class EnrichSpecification { + + final String sourceField; + final String targetField; + + EnrichSpecification(String sourceField, String targetField) { + this.sourceField = sourceField; + this.targetField = targetField; + } + } + +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java new file mode 100644 index 0000000000000..88f5385eec6f6 --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +final class ExactMatchProcessor extends AbstractProcessor { + + private final Function policyLookup; + private final Function searchProvider; + + private final String policyName; + private final String enrichKey; + private final boolean ignoreMissing; + private final List specifications; + + ExactMatchProcessor(String tag, + Function policyLookup, + Function searchProvider, + String policyName, + String enrichKey, + boolean ignoreMissing, + List specifications) { + super(tag); + this.policyLookup = policyLookup; + this.searchProvider = searchProvider; + this.policyName = policyName; + this.enrichKey = enrichKey; + this.ignoreMissing = ignoreMissing; + this.specifications = specifications; + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + final EnrichPolicy policy = policyLookup.apply(policyName); + if (policy == null) { + throw new IllegalArgumentException("policy [" + policyName + "] does not exists"); + } + + final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing); + if (value == null) { + return ingestDocument; + } + + // TODO: re-use the engine searcher between enriching documents from the same write request + try (Engine.Searcher engineSearcher = searchProvider.apply(policy.getAliasName(policyName))) { + if (engineSearcher.getDirectoryReader().leaves().size() == 0) { + return ingestDocument; + } else if (engineSearcher.getDirectoryReader().leaves().size() != 1) { + throw new IllegalStateException("enrich index must have exactly a single segment"); + } + + final LeafReader leafReader = engineSearcher.getDirectoryReader().leaves().get(0).reader(); + final Terms terms = leafReader.terms(policy.getEnrichKey()); + if (terms == null) { + throw new IllegalStateException("enrich key field [" + policy.getEnrichKey() + "] does not exist"); + } + + final TermsEnum tenum = terms.iterator(); + if (tenum.seekExact(new BytesRef(value))) { + PostingsEnum penum = tenum.postings(null, PostingsEnum.NONE); + final int docId = penum.nextDoc(); + assert docId != PostingsEnum.NO_MORE_DOCS : "no matching doc id for [" + enrichKey + "]"; + assert penum.nextDoc() == PostingsEnum.NO_MORE_DOCS : "more than one doc id matching for [" + enrichKey + "]"; + + // TODO: The use of _source is temporarily until enrich source field mapper has been added (see PR #41521) + Document document = leafReader.document(docId, Set.of(SourceFieldMapper.NAME)); + BytesRef source = document.getBinaryValue(SourceFieldMapper.NAME); + assert source != null; + + final BytesReference encoded = new BytesArray(source); + final Map decoded = + XContentHelper.convertToMap(encoded, false, XContentType.SMILE).v2(); + for (EnrichSpecification specification : specifications) { + Object enrichValue = decoded.get(specification.sourceField); + // TODO: add support over overwrite option (like in SetProcessor) + ingestDocument.setFieldValue(specification.targetField, enrichValue); + } + } + } + return ingestDocument; + } + + @Override + public String getType() { + return EnrichProcessorFactory.TYPE; + } + + String getPolicyName() { + return policyName; + } + + String getEnrichKey() { + return enrichKey; + } + + boolean isIgnoreMissing() { + return ignoreMissing; + } + + List getSpecifications() { + return specifications; + } +} diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java new file mode 100644 index 0000000000000..fed7bfe0309b8 --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java @@ -0,0 +1,185 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +public class EnrichProcessorFactoryTests extends ESTestCase { + + public void testCreateProcessorInstance() throws Exception { + List enrichValues = List.of("globalRank", "tldRank", "tld"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "source_index", "my_key", enrichValues, "schedule"); + EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier("majestic", policy), null); + + Map config = new HashMap<>(); + config.put("policy_name", "majestic"); + config.put("enrich_key", "host"); + boolean keyIgnoreMissing = randomBoolean(); + if (keyIgnoreMissing || randomBoolean()) { + config.put("ignore_missing", keyIgnoreMissing); + } + + int numRandomValues = randomIntBetween(1, 8); + List> randomValues = new ArrayList<>(numRandomValues); + for (int i = 0; i < numRandomValues; i++) { + randomValues.add(new Tuple<>(randomFrom(enrichValues), randomAlphaOfLength(4))); + } + + List> valuesConfig = new ArrayList<>(numRandomValues); + for (Tuple tuple : randomValues) { + valuesConfig.add(Map.of("source", tuple.v1(), "target", tuple.v2())); + } + config.put("enrich_values", valuesConfig); + + ExactMatchProcessor result = (ExactMatchProcessor) factory.create(Collections.emptyMap(), "_tag", config); + assertThat(result, notNullValue()); + assertThat(result.getPolicyName(), equalTo("majestic")); + assertThat(result.getEnrichKey(), equalTo("host")); + assertThat(result.isIgnoreMissing(), is(keyIgnoreMissing)); + assertThat(result.getSpecifications().size(), equalTo(numRandomValues)); + for (int i = 0; i < numRandomValues; i++) { + EnrichSpecification actual = result.getSpecifications().get(i); + Tuple expected = randomValues.get(i); + assertThat(actual.sourceField, equalTo(expected.v1())); + assertThat(actual.targetField, equalTo(expected.v2())); + } + } + + public void testPolicyDoesNotExist() { + List enrichValues = List.of("globalRank", "tldRank", "tld"); + EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier(), null); + + Map config = new HashMap<>(); + config.put("policy_name", "majestic"); + config.put("enrich_key", "host"); + boolean keyIgnoreMissing = randomBoolean(); + if (keyIgnoreMissing || randomBoolean()) { + config.put("ignore_missing", keyIgnoreMissing); + } + + int numRandomValues = randomIntBetween(1, 8); + List> randomValues = new ArrayList<>(numRandomValues); + for (int i = 0; i < numRandomValues; i++) { + randomValues.add(new Tuple<>(randomFrom(enrichValues), randomAlphaOfLength(4))); + } + + List> valuesConfig = new ArrayList<>(numRandomValues); + for (Tuple tuple : randomValues) { + valuesConfig.add(Map.of("source", tuple.v1(), "target", tuple.v2())); + } + config.put("enrich_values", valuesConfig); + + Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config)); + assertThat(e.getMessage(), equalTo("policy [majestic] does not exists")); + } + + public void testPolicyNameMissing() { + List enrichValues = List.of("globalRank", "tldRank", "tld"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "source_index", "my_key", enrichValues, "schedule"); + EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier("_name", policy), null); + + Map config = new HashMap<>(); + config.put("enrich_key", "host"); + boolean keyIgnoreMissing = randomBoolean(); + if (keyIgnoreMissing || randomBoolean()) { + config.put("ignore_missing", keyIgnoreMissing); + } + + int numRandomValues = randomIntBetween(1, 8); + List> randomValues = new ArrayList<>(numRandomValues); + for (int i = 0; i < numRandomValues; i++) { + randomValues.add(new Tuple<>(randomFrom(enrichValues), randomAlphaOfLength(4))); + } + + List> valuesConfig = new ArrayList<>(numRandomValues); + for (Tuple tuple : randomValues) { + valuesConfig.add(Map.of("source", tuple.v1(), "target", tuple.v2())); + } + config.put("enrich_values", valuesConfig); + + Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(Collections.emptyMap(), "_tag", config)); + assertThat(e.getMessage(), equalTo("[policy_name] required property is missing")); + } + + public void testUnsupportedPolicy() { + List enrichValues = List.of("globalRank", "tldRank", "tld"); + EnrichPolicy policy = new EnrichPolicy("unsupported", null, "source_index", "my_key", enrichValues, "schedule"); + EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier("majestic", policy), null); + + Map config = new HashMap<>(); + config.put("policy_name", "majestic"); + config.put("enrich_key", "host"); + boolean keyIgnoreMissing = randomBoolean(); + if (keyIgnoreMissing || randomBoolean()) { + config.put("ignore_missing", keyIgnoreMissing); + } + + int numRandomValues = randomIntBetween(1, 8); + List> randomValues = new ArrayList<>(numRandomValues); + for (int i = 0; i < numRandomValues; i++) { + randomValues.add(new Tuple<>(randomFrom(enrichValues), randomAlphaOfLength(4))); + } + + List> valuesConfig = new ArrayList<>(numRandomValues); + for (Tuple tuple : randomValues) { + valuesConfig.add(Map.of("source", tuple.v1(), "target", tuple.v2())); + } + config.put("enrich_values", valuesConfig); + + Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config)); + assertThat(e.getMessage(), equalTo("unsupported policy type [unsupported]")); + } + + public void testNonExistingDecorateField() throws Exception { + List enrichValues = List.of("globalRank", "tldRank", "tld"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "source_index", "my_key", enrichValues, "schedule"); + EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier("majestic", policy), null); + + Map config = new HashMap<>(); + config.put("policy_name", "majestic"); + config.put("enrich_key", "host"); + List> valuesConfig = List.of(Map.of("source", "rank", "target", "rank")); + config.put("enrich_values", valuesConfig); + + Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config)); + assertThat(e.getMessage(), equalTo("source field [rank] does not exist in policy [majestic]")); + } + + private static Supplier createClusterStateSupplier(String policyName, EnrichPolicy policy) { + EnrichMetadata enrichMetadata = new EnrichMetadata(Map.of(policyName, policy)); + ClusterState state = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(EnrichMetadata.TYPE, enrichMetadata).build()) + .build(); + return () -> state; + } + + private static Supplier createClusterStateSupplier() { + EnrichMetadata enrichMetadata = new EnrichMetadata(Map.of()); + ClusterState state = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(EnrichMetadata.TYPE, enrichMetadata).build()) + .build(); + return () -> state; + } + +} diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java new file mode 100644 index 0000000000000..a1c1fb5013564 --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java @@ -0,0 +1,252 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.store.Directory; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class ExactMatchProcessorTests extends ESTestCase { + + public void testBasics() throws Exception { + try (Directory directory = newDirectory()) { + IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random())); + iwConfig.setMergePolicy(NoMergePolicy.INSTANCE); + try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) { + indexWriter.addDocument(createEnrichDocument("google.com", Map.of("globalRank", 1, "tldRank", 1, "tld", "com"))); + indexWriter.addDocument(createEnrichDocument("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co"))); + indexWriter.addDocument(createEnrichDocument("bbc.co.uk", Map.of("globalRank", 45, "tldRank", 14, "tld", "co.uk"))); + indexWriter.addDocument(createEnrichDocument("eops.nl", Map.of("globalRank", 4567, "tldRank", 80, "tld", "nl"))); + indexWriter.commit(); + + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "majestic_index", "key", List.of(), "schedule"); + Function policyLookup = policyName -> policy; + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + Function searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader); + + ExactMatchProcessor processor = + new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false, + List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, + Map.of("domain", "elastic.co")); + assertThat(processor.execute(ingestDocument), notNullValue()); + assertThat(ingestDocument.getFieldValue("tld_rank", Integer.class), equalTo(23)); + assertThat(ingestDocument.getFieldValue("tld", String.class), equalTo("co")); + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + Function searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader); + + ExactMatchProcessor processor = + new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false, + List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, + Map.of("domain", "eops.nl")); + assertThat(processor.execute(ingestDocument), notNullValue()); + assertThat(ingestDocument.getFieldValue("tld_rank", Integer.class), equalTo(80)); + assertThat(ingestDocument.getFieldValue("tld", String.class), equalTo("nl")); + } + } + } + } + + public void testNoMatch() throws Exception { + try (Directory directory = newDirectory()) { + IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random())); + iwConfig.setMergePolicy(NoMergePolicy.INSTANCE); + try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) { + indexWriter.addDocument(createEnrichDocument("google.com", Map.of("globalRank", 1, "tldRank", 1, "tld", "com"))); + indexWriter.addDocument(createEnrichDocument("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co"))); + indexWriter.addDocument(createEnrichDocument("bbc.co.uk", Map.of("globalRank", 45, "tldRank", 14, "tld", "co.uk"))); + indexWriter.addDocument(createEnrichDocument("eops.nl", Map.of("globalRank", 4567, "tldRank", 80, "tld", "nl"))); + indexWriter.commit(); + + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "majestic_index", "key", List.of(), "schedule"); + Function policyLookup = policyName -> policy; + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + Function searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader); + + ExactMatchProcessor processor = + new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false, + List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, + Map.of("domain", "elastic.com")); + int numProperties = ingestDocument.getSourceAndMetadata().size(); + assertThat(processor.execute(ingestDocument), notNullValue()); + assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(numProperties)); + } + } + } + } + + public void testMoreThanOneSegment() throws Exception { + try (Directory directory = newDirectory()) { + IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random())); + iwConfig.setMergePolicy(NoMergePolicy.INSTANCE); + try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) { + indexWriter.addDocument(createEnrichDocument("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co"))); + indexWriter.commit(); + indexWriter.addDocument(createEnrichDocument("eops.nl", Map.of("globalRank", 4567, "tldRank", 80, "tld", "nl"))); + indexWriter.commit(); + + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "majestic_index", "key", List.of(), "schedule"); + Function policyLookup = policyName -> policy; + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + Function searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader); + + ExactMatchProcessor processor = + new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false, + List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, + Map.of("domain", "elastic.co")); + Exception e = expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument)); + assertThat(e.getMessage(), equalTo("enrich index must have exactly a single segment")); + } + } + } + } + + public void testEmptyIndex() throws Exception { + try (Directory directory = newDirectory()) { + IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random())); + iwConfig.setMergePolicy(NoMergePolicy.INSTANCE); + try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) { + indexWriter.commit(); + + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "majestic_index", "key", List.of(), "schedule"); + Function policyLookup = policyName -> policy; + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + Function searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader); + + ExactMatchProcessor processor = + new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false, + List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, + Map.of("domain", "elastic.co")); + int numProperties = ingestDocument.getSourceAndMetadata().size(); + assertThat(processor.execute(ingestDocument), notNullValue()); + assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(numProperties)); + } + } + } + } + + public void testEnrichKeyFieldMissing() throws Exception { + try (Directory directory = newDirectory()) { + IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random())); + iwConfig.setMergePolicy(NoMergePolicy.INSTANCE); + try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) { + Document document = new Document(); + document.add(new StringField("different_key", "elastic.co", Field.Store.NO)); + indexWriter.addDocument(document); + indexWriter.commit(); + + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "majestic_index", "key", List.of(), "schedule"); + Function policyLookup = policyName -> policy; + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + Function searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader); + + ExactMatchProcessor processor = + new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false, + List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, + Map.of("domain", "elastic.co")); + Exception e = expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument)); + assertThat(e.getMessage(), equalTo("enrich key field [key] does not exist")); + } + } + } + } + + public void testPolicyMissing() { + Function policyLookup = policyName -> null; + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", policyLookup, indexExpression -> null, "_name", "domain", + true, List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of()); + expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); + } + + public void testIgnoreKeyMissing() throws Exception { + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "majestic_index", "key", List.of(), "schedule"); + Function policyLookup = policyName -> policy; + { + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", policyLookup, indexExpression -> null, "_name", "domain", + true, List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of()); + + assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6)); + assertThat(processor.execute(ingestDocument), notNullValue()); + assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6)); + } + { + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", policyLookup, indexExpression -> null, "_name", "domain", + false, List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of()); + expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); + } + } + + private static Document createEnrichDocument(String key, Map decorateValues) throws IOException { + BytesReference decorateContent; + try (XContentBuilder builder = XContentBuilder.builder(XContentType.SMILE.xContent())) { + builder.map(decorateValues); + builder.flush(); + ByteArrayOutputStream outputStream = (ByteArrayOutputStream) builder.getOutputStream(); + decorateContent = new BytesArray(outputStream.toByteArray()); + } + Document document = new Document(); + document.add(new StringField("key", key, Field.Store.NO)); + document.add(new StoredField(SourceFieldMapper.NAME, decorateContent.toBytesRef())); + return document; + } + +}