Skip to content

Commit

Permalink
Do not allow put mapping on follower (#37675)
Browse files Browse the repository at this point in the history
Today, the mapping on the follower is managed and replicated from its
leader index by the ShardFollowTask. Thus, we should prevent users
from modifying the mapping on the follower indices.

Relates #30086
  • Loading branch information
dnhatn committed Jan 28, 2019
1 parent 5ca5104 commit 44b3089
Show file tree
Hide file tree
Showing 10 changed files with 271 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ public class ActionModule extends AbstractModule {
private final AutoCreateIndex autoCreateIndex;
private final DestructiveOperations destructiveOperations;
private final RestController restController;
private final TransportPutMappingAction.RequestValidators mappingRequestValidators;

public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
Expand Down Expand Up @@ -384,6 +385,10 @@ public ActionModule(boolean transportClient, Settings settings, IndexNameExpress
restWrapper = newRestWrapper;
}
}
mappingRequestValidators = new TransportPutMappingAction.RequestValidators(
actionPlugins.stream().flatMap(p -> p.mappingRequestValidators().stream()).collect(Collectors.toList())
);

if (transportClient) {
restController = null;
} else {
Expand Down Expand Up @@ -672,6 +677,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
protected void configure() {
bind(ActionFilters.class).toInstance(actionFilters);
bind(DestructiveOperations.class).toInstance(destructiveOperations);
bind(TransportPutMappingAction.RequestValidators.class).toInstance(mappingRequestValidators);

if (false == transportClient) {
// Supporting classes only used when not a transport client
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.indices.mapping.put;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.index.Index;

/**
* A validator that validates a {@link PutMappingRequest} before executing it.
* @see TransportPutMappingAction.RequestValidators
*/
public interface MappingRequestValidator {

/**
* Validates a given put mapping request with its associated concrete indices and the current state.
*
* @param request the request to validate
* @param state the current cluster state
* @param indices the concrete indices that associated with the given put mapping request
* @return a non-null exception indicates a reason that the given request should be aborted; otherwise returns null.
*/
Exception validateRequest(PutMappingRequest request, ClusterState state, Index[] indices);
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class PutMappingRequest extends AcknowledgedRequest<PutMappingRequest> im
private String type;

private String source;
private String origin = "";

private boolean updateAllTypes = false;
private Index concreteIndex;
Expand Down Expand Up @@ -185,6 +186,16 @@ public PutMappingRequest source(Object... source) {
return source(buildFromSimplifiedDef(type, source));
}

public String origin() {
return origin;
}

public PutMappingRequest origin(String origin) {
// reserve "null" for bwc.
this.origin = Objects.requireNonNull(origin);
return this;
}

/**
* @param type
* the mapping type
Expand Down Expand Up @@ -320,6 +331,11 @@ public void readFrom(StreamInput in) throws IOException {
}
updateAllTypes = in.readBoolean();
concreteIndex = in.readOptionalWriteable(Index::new);
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
origin = in.readOptionalString();
} else {
origin = null;
}
}

@Override
Expand All @@ -331,6 +347,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(source);
out.writeBoolean(updateAllTypes);
out.writeOptionalWriteable(concreteIndex);
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
out.writeOptionalString(origin);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,25 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Collection;

/**
* Put mapping action.
*/
public class TransportPutMappingAction extends TransportMasterNodeAction<PutMappingRequest, AcknowledgedResponse> {

private final MetaDataMappingService metaDataMappingService;
private final RequestValidators requestValidators;

@Inject
public TransportPutMappingAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataMappingService metaDataMappingService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
RequestValidators requestValidators) {
super(settings, PutMappingAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
PutMappingRequest::new);
this.metaDataMappingService = metaDataMappingService;
this.requestValidators = requestValidators;
}

@Override
Expand Down Expand Up @@ -83,6 +88,11 @@ protected void masterOperation(final PutMappingRequest request, final ClusterSta
final Index[] concreteIndices = request.getConcreteIndex() == null ?
indexNameExpressionResolver.concreteIndices(state, request)
: new Index[] {request.getConcreteIndex()};
final Exception validationException = requestValidators.validateRequest(request, state, concreteIndices);
if (validationException != null) {
listener.onFailure(validationException);
return;
}
PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest()
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices).type(request.type())
Expand All @@ -109,4 +119,26 @@ public void onFailure(Exception t) {
throw ex;
}
}


public static class RequestValidators {
private final Collection<MappingRequestValidator> validators;

public RequestValidators(Collection<MappingRequestValidator> validators) {
this.validators = validators;
}

private Exception validateRequest(PutMappingRequest request, ClusterState state, Index[] indices) {
Exception firstException = null;
for (MappingRequestValidator validator : validators) {
final Exception e = validator.validateRequest(request, state, indices);
if (firstException == null) {
firstException = e;
} else {
firstException.addSuppressed(e);
}
}
return firstException;
}
}
}
10 changes: 10 additions & 0 deletions server/src/main/java/org/elasticsearch/plugins/ActionPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator;
import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
Expand Down Expand Up @@ -179,4 +181,12 @@ public int hashCode() {
return Objects.hash(action, transportAction, supportTransportActions);
}
}

/**
* Returns a collection of validators that are used by {@link TransportPutMappingAction.RequestValidators} to
* validate a {@link org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest} before the executing it.
*/
default Collection<MappingRequestValidator> mappingRequestValidators() {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.indices.mapping.put;

import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.Index;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

public class ValidateMappingRequestPluginIT extends ESSingleNodeTestCase {
static final Map<String, Collection<String>> allowedOrigins = ConcurrentCollections.newConcurrentMap();
public static class TestPlugin extends Plugin implements ActionPlugin {
@Override
public Collection<MappingRequestValidator> mappingRequestValidators() {
return Collections.singletonList((request, state, indices) -> {
for (Index index : indices) {
if (allowedOrigins.getOrDefault(index.getName(), Collections.emptySet()).contains(request.origin()) == false) {
return new IllegalStateException("not allowed: index[" + index.getName() + "] origin[" + request.origin() + "]");
}
}
return null;
});
}
}

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singletonList(TestPlugin.class);
}

public void testValidateMappingRequest() {
createIndex("index_1");
createIndex("index_2");
allowedOrigins.put("index_1", Arrays.asList("1", "2"));
allowedOrigins.put("index_2", Arrays.asList("2", "3"));
{
String origin = randomFrom("", "3", "4", "5");
PutMappingRequest request = new PutMappingRequest().indices("index_1").type("doc").source("t1", "type=keyword").origin(origin);
Exception e = expectThrows(IllegalStateException.class, () -> client().admin().indices().putMapping(request).actionGet());
assertThat(e.getMessage(), equalTo("not allowed: index[index_1] origin[" + origin + "]"));
}
{
PutMappingRequest request = new PutMappingRequest().indices("index_1").origin(randomFrom("1", "2"))
.type("doc").source("t1", "type=keyword");
assertAcked(client().admin().indices().putMapping(request).actionGet());
}

{
String origin = randomFrom("", "1", "4", "5");
PutMappingRequest request = new PutMappingRequest().indices("index_2").type("doc").source("t2", "type=keyword").origin(origin);
Exception e = expectThrows(IllegalStateException.class, () -> client().admin().indices().putMapping(request).actionGet());
assertThat(e.getMessage(), equalTo("not allowed: index[index_2] origin[" + origin + "]"));
}
{
PutMappingRequest request = new PutMappingRequest().indices("index_2").origin(randomFrom("2", "3"))
.type("doc").source("t1", "type=keyword");
assertAcked(client().admin().indices().putMapping(request).actionGet());
}

{
String origin = randomFrom("", "1", "3", "4");
PutMappingRequest request = new PutMappingRequest().indices("*").type("doc").source("t3", "type=keyword").origin(origin);
Exception e = expectThrows(IllegalStateException.class, () -> client().admin().indices().putMapping(request).actionGet());
assertThat(e.getMessage(), containsString("not allowed:"));
}
{
PutMappingRequest request = new PutMappingRequest().indices("index_2").origin("2")
.type("doc").source("t3", "type=keyword");
assertAcked(client().admin().indices().putMapping(request).actionGet());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -46,6 +47,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
import org.elasticsearch.xpack.ccr.action.CcrRequests;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
Expand Down Expand Up @@ -321,4 +323,9 @@ public void onIndexModule(IndexModule indexModule) {

protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); }

@Override
public Collection<MappingRequestValidator> mappingRequestValidators() {
return Collections.singletonList(CcrRequests.CCR_PUT_MAPPING_REQUEST_VALIDATOR);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,20 @@
*/
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ccr.CcrSettings;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public final class CcrRequests {

Expand All @@ -24,8 +34,27 @@ public static ClusterStateRequest metaDataRequest(String leaderIndex) {

public static PutMappingRequest putMappingRequest(String followerIndex, MappingMetaData mappingMetaData) {
PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex);
putMappingRequest.origin("ccr");
putMappingRequest.type(mappingMetaData.type());
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
return putMappingRequest;
}

public static final MappingRequestValidator CCR_PUT_MAPPING_REQUEST_VALIDATOR = (request, state, indices) -> {
if (request.origin() == null) {
return null; // a put-mapping-request on old versions does not have origin.
}
final List<Index> followingIndices = Arrays.stream(indices)
.filter(index -> {
final IndexMetaData indexMetaData = state.metaData().index(index);
return indexMetaData != null && CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(indexMetaData.getSettings());
}).collect(Collectors.toList());
if (followingIndices.isEmpty() == false && "ccr".equals(request.origin()) == false) {
final String errorMessage = "can't put mapping to the following indices "
+ "[" + followingIndices.stream().map(Index::getName).collect(Collectors.joining(", ")) + "]; "
+ "the mapping of the following indices are self-replicated from its leader indices";
return new ElasticsearchStatusException(errorMessage, RestStatus.FORBIDDEN);
}
return null;
};
}
Loading

0 comments on commit 44b3089

Please sign in to comment.