Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport: also validate source index at put enrich policy time #48311

Merged
merged 2 commits into from
Oct 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.singletonMap;

public class EnrichDocumentationIT extends ESRestHighLevelClientTestCase {

@After
Expand All @@ -59,6 +61,10 @@ public void cleanup() {

public void testPutPolicy() throws Exception {
RestHighLevelClient client = highLevelClient();
CreateIndexRequest createIndexRequest = new CreateIndexRequest("users")
.mapping(singletonMap("properties", singletonMap("email", singletonMap("type", "keyword"))));
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);

// tag::enrich-put-policy-request
PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
"users-policy", "match", Arrays.asList("users"),
Expand Down Expand Up @@ -106,6 +112,10 @@ public void testDeletePolicy() throws Exception {
RestHighLevelClient client = highLevelClient();

{
CreateIndexRequest createIndexRequest = new CreateIndexRequest("users")
.mapping(singletonMap("properties", singletonMap("email", singletonMap("type", "keyword"))));
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);

// Add a policy, so that it can be deleted:
PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
"users-policy", "match", Arrays.asList("users"),
Expand Down Expand Up @@ -158,6 +168,10 @@ public void onFailure(Exception e) {
public void testGetPolicy() throws Exception {
RestHighLevelClient client = highLevelClient();

CreateIndexRequest createIndexRequest = new CreateIndexRequest("users")
.mapping(singletonMap("properties", singletonMap("email", singletonMap("type", "keyword"))));
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);

PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
"users-policy", "match", Collections.singletonList("users"),
"email", Arrays.asList("address", "zip", "city", "state"));
Expand Down Expand Up @@ -259,8 +273,8 @@ public void testExecutePolicy() throws Exception {

{
CreateIndexRequest createIndexRequest = new CreateIndexRequest("users")
.mapping(Collections.singletonMap("properties", Collections.singletonMap("email",
Collections.singletonMap("type", "keyword"))));
.mapping(singletonMap("properties", singletonMap("email",
singletonMap("type", "keyword"))));
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
"users-policy", "match", Collections.singletonList("users"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ Deletes an existing enrich policy and its enrich index.
[source,console]
----
PUT /users
{
"mappings" : {
"properties" : {
"email" : { "type" : "keyword" }
}
}
}

PUT /_enrich/policy/my-policy
{
Expand Down
7 changes: 7 additions & 0 deletions docs/reference/ingest/apis/enrich/get-enrich-policy.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ Returns information about an enrich policy.
[source,console]
----
PUT /users
{
"mappings" : {
"properties" : {
"email" : { "type" : "keyword" }
}
}
}

PUT /_enrich/policy/my-policy
{
Expand Down
7 changes: 7 additions & 0 deletions docs/reference/ingest/apis/enrich/put-enrich-policy.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ Creates an enrich policy.
[source,console]
----
PUT /users
{
"mappings" : {
"properties" : {
"email" : { "type" : "keyword" }
}
}
}
----
////

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
Expand Down Expand Up @@ -38,6 +39,15 @@ public void deletePolicies() throws Exception {
for (Map<?, ?> entry: policies) {
client().performRequest(new Request("DELETE", "/_enrich/policy/" +
XContentMapValues.extractValue("config.match.name", entry)));

List<?> sourceIndices = (List<?>) XContentMapValues.extractValue("config.match.indices", entry);
for (Object sourceIndex : sourceIndices) {
try {
client().performRequest(new Request("DELETE", "/" + sourceIndex));
} catch (ResponseException e) {
// and that is ok
}
}
}
}

Expand All @@ -48,6 +58,8 @@ protected boolean preserveIndicesUponCompletion() {
}

private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception {
// Create source index:
createSourceIndex("my-source-index");
// Create the policy:
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
putPolicyRequest.setJsonEntity(generatePolicySource("my-source-index"));
Expand Down Expand Up @@ -99,6 +111,7 @@ public void testBasicFlow() throws Exception {
}

public void testImmutablePolicy() throws IOException {
createSourceIndex("my-source-index");
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
putPolicyRequest.setJsonEntity(generatePolicySource("my-source-index"));
assertOK(client().performRequest(putPolicyRequest));
Expand All @@ -108,6 +121,7 @@ public void testImmutablePolicy() throws IOException {
}

public void testDeleteIsCaseSensitive() throws Exception {
createSourceIndex("my-source-index");
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
putPolicyRequest.setJsonEntity(generatePolicySource("my-source-index"));
assertOK(client().performRequest(putPolicyRequest));
Expand Down Expand Up @@ -155,6 +169,20 @@ public static String generatePolicySource(String index) throws IOException {
return Strings.toString(source);
}

public static void createSourceIndex(String index) throws IOException {
String mapping = createSourceIndexMapping();
createIndex(index, Settings.EMPTY, mapping);
}

public static String createSourceIndexMapping() {
return "\"properties\":" +
"{\"host\": {\"type\":\"keyword\"}," +
"\"globalRank\":{\"type\":\"keyword\"}," +
"\"tldRank\":{\"type\":\"keyword\"}," +
"\"tld\":{\"type\":\"keyword\"}" +
"}";
}

private static Map<String, Object> toMap(Response response) throws IOException {
return toMap(EntityUtils.toString(response.getEntity()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ protected Settings restAdminSettings() {
public void testInsufficientPermissionsOnNonExistentIndex() throws Exception {
// This test is here because it requires a valid user that has permission to execute policy PUTs but should fail if the user
// does not have access to read the backing indices used to enrich the data.
Request request = new Request("PUT", "/some-other-index");
request.setJsonEntity("{\n \"mappings\" : {" + createSourceIndexMapping() + "} }");
adminClient().performRequest(request);
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
putPolicyRequest.setJsonEntity(generatePolicySource("some-other-index"));
ResponseException exc = expectThrows(ResponseException.class, () -> client().performRequest(putPolicyRequest));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
---
"Test enrich crud apis":

- do:
indices.create:
index: bar
body:
mappings:
properties:
baz:
type: keyword
a:
type: keyword
b:
type: keyword
- is_true: acknowledged

- do:
enrich.put_policy:
name: policy-crud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,27 +136,34 @@ private void validateMappings(final GetIndexResponse getIndexResponse) {
logger.debug("Policy [{}]: Validating [{}] source mappings", policyName, sourceIndices);
for (String sourceIndex : sourceIndices) {
Map<String, Object> mapping = getMappings(getIndexResponse, sourceIndex);
// First ensure mapping is set
if (mapping.get("properties") == null) {
throw new ElasticsearchException(
"Enrich policy execution for [{}] failed. Could not read mapping for source [{}] included by pattern [{}]",
policyName, sourceIndex, policy.getIndices());
}
// Validate the key and values
try {
validateField(mapping, policy.getMatchField(), true);
for (String valueFieldName : policy.getEnrichFields()) {
validateField(mapping, valueFieldName, false);
}
} catch (ElasticsearchException e) {
throw new ElasticsearchException(
"Enrich policy execution for [{}] failed while validating field mappings for index [{}]",
e, policyName, sourceIndex);
validateMappings(policyName, policy, sourceIndex, mapping);
}
}

static void validateMappings(final String policyName,
final EnrichPolicy policy,
final String sourceIndex,
final Map<String, Object> mapping) {
// First ensure mapping is set
if (mapping.get("properties") == null) {
throw new ElasticsearchException(
"Enrich policy execution for [{}] failed. Could not read mapping for source [{}] included by pattern [{}]",
policyName, sourceIndex, policy.getIndices());
}
// Validate the key and values
try {
validateField(mapping, policy.getMatchField(), true);
for (String valueFieldName : policy.getEnrichFields()) {
validateField(mapping, valueFieldName, false);
}
} catch (ElasticsearchException e) {
throw new ElasticsearchException(
"Enrich policy execution for [{}] failed while validating field mappings for index [{}]",
e, policyName, sourceIndex);
}
}

private void validateField(Map<?, ?> properties, String fieldName, boolean fieldRequired) {
private static void validateField(Map<?, ?> properties, String fieldName, boolean fieldRequired) {
assert Strings.isEmpty(fieldName) == false: "Field name cannot be null or empty";
String[] fieldParts = fieldName.split("\\.");
StringBuilder parent = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -40,7 +44,11 @@ private EnrichStore() {}
* @param policy The policy to store
* @param handler The handler that gets invoked if policy has been stored or a failure has occurred.
*/
public static void putPolicy(String name, EnrichPolicy policy, ClusterService clusterService, Consumer<Exception> handler) {
public static void putPolicy(final String name,
final EnrichPolicy policy,
final ClusterService clusterService,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Consumer<Exception> handler) {
assert clusterService.localNode().isMasterNode();

if (Strings.isNullOrEmpty(name)) {
Expand Down Expand Up @@ -76,6 +84,22 @@ public static void putPolicy(String name, EnrichPolicy policy, ClusterService cl
finalPolicy = policy;
}
updateClusterState(clusterService, handler, current -> {
for (String indexExpression : finalPolicy.getIndices()) {
// indices field in policy can contain wildcards, aliases etc.
String[] concreteIndices =
indexNameExpressionResolver.concreteIndexNames(current, IndicesOptions.strictExpandOpen(), indexExpression);
for (String concreteIndex : concreteIndices) {
IndexMetaData imd = current.getMetaData().index(concreteIndex);
assert imd != null;
MappingMetaData mapping = imd.mapping();
if (mapping == null) {
throw new IllegalArgumentException("source index [" + concreteIndex + "] has no mapping");
}
Map<String, Object> mappingSource = mapping.getSourceAsMap();
EnrichPolicyRunner.validateMappings(name, finalPolicy, concreteIndex, mappingSource);
}
}

final Map<String, EnrichPolicy> policies = getPolicies(current);
if (policies.get(name) != null) {
throw new ResourceAlreadyExistsException("policy [{}] already exists", name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ protected void masterOperation(PutEnrichPolicyAction.Request request, ClusterSta
}

private void putPolicy(PutEnrichPolicyAction.Request request, ActionListener<AcknowledgedResponse> listener ) {
EnrichStore.putPolicy(request.getName(), request.getPolicy(), clusterService, e -> {
EnrichStore.putPolicy(request.getName(), request.getPolicy(), clusterService, indexNameExpressionResolver, e -> {
if (e == null) {
listener.onResponse(new AcknowledgedResponse(true));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
*/
package org.elasticsearch.xpack.enrich;

import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
Expand All @@ -24,9 +28,13 @@ protected Collection<Class<? extends Plugin>> getPlugins() {

protected AtomicReference<Exception> saveEnrichPolicy(String name, EnrichPolicy policy,
ClusterService clusterService) throws InterruptedException {
if (policy != null) {
createSourceIndices(policy);
}
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> error = new AtomicReference<>();
EnrichStore.putPolicy(name, policy, clusterService, e -> {
EnrichStore.putPolicy(name, policy, clusterService, resolver, e -> {
error.set(e);
latch.countDown();
});
Expand All @@ -46,4 +54,20 @@ protected void deleteEnrichPolicy(String name, ClusterService clusterService) th
throw error.get();
}
}

protected void createSourceIndices(EnrichPolicy policy) {
createSourceIndices(client(), policy);
}

protected static void createSourceIndices(Client client, EnrichPolicy policy) {
for (String sourceIndex : policy.getIndices()) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(sourceIndex);
createIndexRequest.mapping("_doc", policy.getMatchField(), "type=keyword");
try {
client.admin().indices().create(createIndexRequest).actionGet();
} catch (ResourceAlreadyExistsException e) {
// and that is okay
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
Expand All @@ -24,6 +25,7 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.json.JsonXContent;
Expand All @@ -34,6 +36,7 @@
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;

import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;

Expand Down Expand Up @@ -114,13 +117,16 @@ private EnrichPolicy randomPolicy() {
for (int i = 0; i < randomIntBetween(1, 3); i++) {
enrichKeys.add(randomAlphaOfLength(10));
}
return new EnrichPolicy(MATCH_TYPE, null, Collections.singletonList(randomAlphaOfLength(10)), randomAlphaOfLength(10),
String sourceIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
return new EnrichPolicy(MATCH_TYPE, null, Collections.singletonList(sourceIndex), randomAlphaOfLength(10),
enrichKeys);
}

private void addPolicy(String policyName, EnrichPolicy policy) throws InterruptedException {
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
createSourceIndices(client(), policy);
doSyncronously((clusterService, exceptionConsumer) ->
EnrichStore.putPolicy(policyName, policy, clusterService, exceptionConsumer));
EnrichStore.putPolicy(policyName, policy, clusterService, resolver, exceptionConsumer));
}

private void removePolicy(String policyName) throws InterruptedException {
Expand Down
Loading