Skip to content

Commit

Permalink
Also validate source index at put enrich policy time. (#48254)
Browse files Browse the repository at this point in the history
This changes tests to create a valid
source index prior to creating the enrich policy.
  • Loading branch information
martijnvg authored Oct 21, 2019
1 parent f155d88 commit 1ef8dc4
Show file tree
Hide file tree
Showing 17 changed files with 184 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public void cleanup() {

public void testPutPolicy() throws Exception {
RestHighLevelClient client = highLevelClient();
CreateIndexRequest createIndexRequest = new CreateIndexRequest("users")
.mapping(Map.of("properties", Map.of("email", Map.of("type", "keyword"))));
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);

// tag::enrich-put-policy-request
PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
"users-policy", "match", List.of("users"),
Expand Down Expand Up @@ -104,6 +108,10 @@ public void testDeletePolicy() throws Exception {
RestHighLevelClient client = highLevelClient();

{
CreateIndexRequest createIndexRequest = new CreateIndexRequest("users")
.mapping(Map.of("properties", Map.of("email", Map.of("type", "keyword"))));
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);

// Add a policy, so that it can be deleted:
PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
"users-policy", "match", List.of("users"),
Expand Down Expand Up @@ -155,6 +163,10 @@ public void onFailure(Exception e) {
public void testGetPolicy() throws Exception {
RestHighLevelClient client = highLevelClient();

CreateIndexRequest createIndexRequest = new CreateIndexRequest("users")
.mapping(Map.of("properties", Map.of("email", Map.of("type", "keyword"))));
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);

PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
"users-policy", "match", List.of("users"),
"email", List.of("address", "zip", "city", "state"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ Deletes an existing enrich policy and its enrich index.
[source,console]
----
PUT /users
{
"mappings" : {
"properties" : {
"email" : { "type" : "keyword" }
}
}
}
PUT /_enrich/policy/my-policy
{
Expand Down
7 changes: 7 additions & 0 deletions docs/reference/ingest/apis/enrich/get-enrich-policy.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ Returns information about an enrich policy.
[source,console]
----
PUT /users
{
"mappings" : {
"properties" : {
"email" : { "type" : "keyword" }
}
}
}
PUT /_enrich/policy/my-policy
{
Expand Down
7 changes: 7 additions & 0 deletions docs/reference/ingest/apis/enrich/put-enrich-policy.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ Creates an enrich policy.
[source,console]
----
PUT /users
{
"mappings" : {
"properties" : {
"email" : { "type" : "keyword" }
}
}
}
----
////

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
Expand Down Expand Up @@ -38,6 +39,15 @@ public void deletePolicies() throws Exception {
for (Map<?, ?> entry: policies) {
client().performRequest(new Request("DELETE", "/_enrich/policy/" +
XContentMapValues.extractValue("config.match.name", entry)));

List<?> sourceIndices = (List<?>) XContentMapValues.extractValue("config.match.indices", entry);
for (Object sourceIndex : sourceIndices) {
try {
client().performRequest(new Request("DELETE", "/" + sourceIndex));
} catch (ResponseException e) {
// and that is ok
}
}
}
}

Expand All @@ -48,6 +58,8 @@ protected boolean preserveIndicesUponCompletion() {
}

private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception {
// Create source index:
createSourceIndex("my-source-index");
// Create the policy:
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
putPolicyRequest.setJsonEntity(generatePolicySource("my-source-index"));
Expand Down Expand Up @@ -99,6 +111,7 @@ public void testBasicFlow() throws Exception {
}

public void testImmutablePolicy() throws IOException {
createSourceIndex("my-source-index");
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
putPolicyRequest.setJsonEntity(generatePolicySource("my-source-index"));
assertOK(client().performRequest(putPolicyRequest));
Expand All @@ -108,6 +121,7 @@ public void testImmutablePolicy() throws IOException {
}

public void testDeleteIsCaseSensitive() throws Exception {
createSourceIndex("my-source-index");
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
putPolicyRequest.setJsonEntity(generatePolicySource("my-source-index"));
assertOK(client().performRequest(putPolicyRequest));
Expand Down Expand Up @@ -155,6 +169,20 @@ public static String generatePolicySource(String index) throws IOException {
return Strings.toString(source);
}

public static void createSourceIndex(String index) throws IOException {
String mapping = createSourceIndexMapping();
createIndex(index, Settings.EMPTY, mapping);
}

public static String createSourceIndexMapping() {
return "\"properties\":" +
"{\"host\": {\"type\":\"keyword\"}," +
"\"globalRank\":{\"type\":\"keyword\"}," +
"\"tldRank\":{\"type\":\"keyword\"}," +
"\"tld\":{\"type\":\"keyword\"}" +
"}";
}

private static Map<String, Object> toMap(Response response) throws IOException {
return toMap(EntityUtils.toString(response.getEntity()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ protected Settings restAdminSettings() {
public void testInsufficientPermissionsOnNonExistentIndex() throws Exception {
// This test is here because it requires a valid user that has permission to execute policy PUTs but should fail if the user
// does not have access to read the backing indices used to enrich the data.
Request request = new Request("PUT", "/some-other-index");
request.setJsonEntity("{\n \"mappings\" : {" + createSourceIndexMapping() + "} }");
adminClient().performRequest(request);
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
putPolicyRequest.setJsonEntity(generatePolicySource("some-other-index"));
ResponseException exc = expectThrows(ResponseException.class, () -> client().performRequest(putPolicyRequest));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
---
"Test enrich crud apis":

- do:
indices.create:
index: bar
body:
mappings:
properties:
baz:
type: keyword
a:
type: keyword
b:
type: keyword
- is_true: acknowledged

- do:
enrich.put_policy:
name: policy-crud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,27 +134,34 @@ private void validateMappings(final GetIndexResponse getIndexResponse) {
logger.debug("Policy [{}]: Validating [{}] source mappings", policyName, sourceIndices);
for (String sourceIndex : sourceIndices) {
Map<String, Object> mapping = getMappings(getIndexResponse, sourceIndex);
// First ensure mapping is set
if (mapping.get("properties") == null) {
throw new ElasticsearchException(
"Enrich policy execution for [{}] failed. Could not read mapping for source [{}] included by pattern [{}]",
policyName, sourceIndex, policy.getIndices());
}
// Validate the key and values
try {
validateField(mapping, policy.getMatchField(), true);
for (String valueFieldName : policy.getEnrichFields()) {
validateField(mapping, valueFieldName, false);
}
} catch (ElasticsearchException e) {
throw new ElasticsearchException(
"Enrich policy execution for [{}] failed while validating field mappings for index [{}]",
e, policyName, sourceIndex);
validateMappings(policyName, policy, sourceIndex, mapping);
}
}

static void validateMappings(final String policyName,
final EnrichPolicy policy,
final String sourceIndex,
final Map<String, Object> mapping) {
// First ensure mapping is set
if (mapping.get("properties") == null) {
throw new ElasticsearchException(
"Enrich policy execution for [{}] failed. Could not read mapping for source [{}] included by pattern [{}]",
policyName, sourceIndex, policy.getIndices());
}
// Validate the key and values
try {
validateField(mapping, policy.getMatchField(), true);
for (String valueFieldName : policy.getEnrichFields()) {
validateField(mapping, valueFieldName, false);
}
} catch (ElasticsearchException e) {
throw new ElasticsearchException(
"Enrich policy execution for [{}] failed while validating field mappings for index [{}]",
e, policyName, sourceIndex);
}
}

private void validateField(Map<?, ?> properties, String fieldName, boolean fieldRequired) {
private static void validateField(Map<?, ?> properties, String fieldName, boolean fieldRequired) {
assert Strings.isEmpty(fieldName) == false: "Field name cannot be null or empty";
String[] fieldParts = fieldName.split("\\.");
StringBuilder parent = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -39,7 +43,11 @@ private EnrichStore() {}
* @param policy The policy to store
* @param handler The handler that gets invoked if policy has been stored or a failure has occurred.
*/
public static void putPolicy(String name, EnrichPolicy policy, ClusterService clusterService, Consumer<Exception> handler) {
public static void putPolicy(final String name,
final EnrichPolicy policy,
final ClusterService clusterService,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Consumer<Exception> handler) {
assert clusterService.localNode().isMasterNode();

if (Strings.isNullOrEmpty(name)) {
Expand Down Expand Up @@ -75,6 +83,22 @@ public static void putPolicy(String name, EnrichPolicy policy, ClusterService cl
finalPolicy = policy;
}
updateClusterState(clusterService, handler, current -> {
for (String indexExpression : finalPolicy.getIndices()) {
// indices field in policy can contain wildcards, aliases etc.
String[] concreteIndices =
indexNameExpressionResolver.concreteIndexNames(current, IndicesOptions.strictExpandOpen(), indexExpression);
for (String concreteIndex : concreteIndices) {
IndexMetaData imd = current.getMetaData().index(concreteIndex);
assert imd != null;
MappingMetaData mapping = imd.mapping();
if (mapping == null) {
throw new IllegalArgumentException("source index [" + concreteIndex + "] has no mapping");
}
Map<String, Object> mappingSource = mapping.getSourceAsMap();
EnrichPolicyRunner.validateMappings(name, finalPolicy, concreteIndex, mappingSource);
}
}

final Map<String, EnrichPolicy> policies = getPolicies(current);
if (policies.get(name) != null) {
throw new ResourceAlreadyExistsException("policy [{}] already exists", name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected void masterOperation(Task task, PutEnrichPolicyAction.Request request,
}

private void putPolicy(PutEnrichPolicyAction.Request request, ActionListener<AcknowledgedResponse> listener ) {
EnrichStore.putPolicy(request.getName(), request.getPolicy(), clusterService, e -> {
EnrichStore.putPolicy(request.getName(), request.getPolicy(), clusterService, indexNameExpressionResolver, e -> {
if (e == null) {
listener.onResponse(new AcknowledgedResponse(true));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
*/
package org.elasticsearch.xpack.enrich;

import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
Expand All @@ -24,9 +28,13 @@ protected Collection<Class<? extends Plugin>> getPlugins() {

protected AtomicReference<Exception> saveEnrichPolicy(String name, EnrichPolicy policy,
ClusterService clusterService) throws InterruptedException {
if (policy != null) {
createSourceIndices(policy);
}
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> error = new AtomicReference<>();
EnrichStore.putPolicy(name, policy, clusterService, e -> {
EnrichStore.putPolicy(name, policy, clusterService, resolver, e -> {
error.set(e);
latch.countDown();
});
Expand All @@ -46,4 +54,20 @@ protected void deleteEnrichPolicy(String name, ClusterService clusterService) th
throw error.get();
}
}

protected void createSourceIndices(EnrichPolicy policy) {
createSourceIndices(client(), policy);
}

protected static void createSourceIndices(Client client, EnrichPolicy policy) {
for (String sourceIndex : policy.getIndices()) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(sourceIndex);
createIndexRequest.mapping("_doc", policy.getMatchField(), "type=keyword");
try {
client.admin().indices().create(createIndexRequest).actionGet();
} catch (ResourceAlreadyExistsException e) {
// and that is okay
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
Expand All @@ -23,6 +24,7 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.json.JsonXContent;
Expand All @@ -33,6 +35,7 @@
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;

import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;

Expand Down Expand Up @@ -113,12 +116,15 @@ private EnrichPolicy randomPolicy() {
for (int i = 0; i < randomIntBetween(1, 3); i++) {
enrichKeys.add(randomAlphaOfLength(10));
}
return new EnrichPolicy(MATCH_TYPE, null, List.of(randomAlphaOfLength(10)), randomAlphaOfLength(10), enrichKeys);
String sourceIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
return new EnrichPolicy(MATCH_TYPE, null, List.of(sourceIndex), randomAlphaOfLength(10), enrichKeys);
}

private void addPolicy(String policyName, EnrichPolicy policy) throws InterruptedException {
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
createSourceIndices(client(), policy);
doSyncronously((clusterService, exceptionConsumer) ->
EnrichStore.putPolicy(policyName, policy, clusterService, exceptionConsumer));
EnrichStore.putPolicy(policyName, policy, clusterService, resolver, exceptionConsumer));
}

private void removePolicy(String policyName) throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Locale;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -59,7 +61,9 @@ public static EnrichPolicy randomEnrichPolicy(XContentType xContentType) {
return new EnrichPolicy(
randomFrom(EnrichPolicy.SUPPORTED_POLICY_TYPES),
randomBoolean() ? querySource : null,
Arrays.asList(generateRandomStringArray(8, 4, false, false)),
Arrays.stream(generateRandomStringArray(8, 4, false, false))
.map(s -> s.toLowerCase(Locale.ROOT))
.collect(Collectors.toList()),
randomAlphaOfLength(4),
Arrays.asList(generateRandomStringArray(8, 4, false, false))
);
Expand Down
Loading

0 comments on commit 1ef8dc4

Please sign in to comment.