Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not allow put mapping on follower #37675

Merged
merged 7 commits into from
Jan 24, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ public class ActionModule extends AbstractModule {
private final AutoCreateIndex autoCreateIndex;
private final DestructiveOperations destructiveOperations;
private final RestController restController;
private final TransportPutMappingAction.RequestValidators mappingRequestValidators;

public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
Expand Down Expand Up @@ -388,6 +389,10 @@ public ActionModule(boolean transportClient, Settings settings, IndexNameExpress
restWrapper = newRestWrapper;
}
}
mappingRequestValidators = new TransportPutMappingAction.RequestValidators(
actionPlugins.stream().flatMap(p -> p.mappingRequestValidators().stream()).collect(Collectors.toList())
);

if (transportClient) {
restController = null;
} else {
Expand Down Expand Up @@ -678,6 +683,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
protected void configure() {
bind(ActionFilters.class).toInstance(actionFilters);
bind(DestructiveOperations.class).toInstance(destructiveOperations);
bind(TransportPutMappingAction.RequestValidators.class).toInstance(mappingRequestValidators);

if (false == transportClient) {
// Supporting classes only used when not a transport client
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.indices.mapping.put;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.index.Index;

/**
* A validator that validates a {@link PutMappingRequest} before executing it.
* @see TransportPutMappingAction.RequestValidators
*/
public interface MappingRequestValidator {

/**
* Validates a given put mapping request with its associated concrete indices and the current state.
*
* @param request the request to validate
* @param state the current cluster state
* @param indices the concrete indices that associated with the given put mapping request
* @return a non-null exception indicates a reason that the given request should be aborted; otherwise returns null.
*/
Exception validateRequest(PutMappingRequest request, ClusterState state, Index[] indices);
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class PutMappingRequest extends AcknowledgedRequest<PutMappingRequest> im
private String type;

private String source;
private String origin = "";

private Index concreteIndex;

Expand Down Expand Up @@ -184,6 +185,16 @@ public PutMappingRequest source(Object... source) {
return source(buildFromSimplifiedDef(type, source));
}

public String origin() {
return origin;
}

public PutMappingRequest origin(String origin) {
// reserve "null" for bwc.
this.origin = Objects.requireNonNull(origin);
return this;
}

/**
* @param type
* the mapping type
Expand Down Expand Up @@ -301,6 +312,11 @@ public void readFrom(StreamInput in) throws IOException {
in.readBoolean(); // updateAllTypes
}
concreteIndex = in.readOptionalWriteable(Index::new);
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
origin = in.readOptionalString();
} else {
origin = null;
}
}

@Override
Expand All @@ -314,6 +330,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(true); // updateAllTypes
}
out.writeOptionalWriteable(concreteIndex);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeOptionalString(origin);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,25 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Collection;

/**
* Put mapping action.
*/
public class TransportPutMappingAction extends TransportMasterNodeAction<PutMappingRequest, AcknowledgedResponse> {

private final MetaDataMappingService metaDataMappingService;
private final RequestValidators requestValidators;

@Inject
public TransportPutMappingAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataMappingService metaDataMappingService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
RequestValidators requestValidators) {
super(PutMappingAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
PutMappingRequest::new);
this.metaDataMappingService = metaDataMappingService;
this.requestValidators = requestValidators;
}

@Override
Expand Down Expand Up @@ -82,6 +87,11 @@ protected void masterOperation(final PutMappingRequest request, final ClusterSta
final Index[] concreteIndices = request.getConcreteIndex() == null ?
indexNameExpressionResolver.concreteIndices(state, request)
: new Index[] {request.getConcreteIndex()};
final Exception validationException = requestValidators.validateRequest(request, state, concreteIndices);
if (validationException != null) {
listener.onFailure(validationException);
return;
}
PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest()
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices).type(request.type())
Expand All @@ -107,4 +117,26 @@ public void onFailure(Exception t) {
throw ex;
}
}


public static class RequestValidators {
private final Collection<MappingRequestValidator> validators;

public RequestValidators(Collection<MappingRequestValidator> validators) {
this.validators = validators;
}

private Exception validateRequest(PutMappingRequest request, ClusterState state, Index[] indices) {
Exception firstException = null;
for (MappingRequestValidator validator : validators) {
final Exception e = validator.validateRequest(request, state, indices);
if (firstException == null) {
firstException = e;
} else {
firstException.addSuppressed(e);
}
}
return firstException;
}
}
}
10 changes: 10 additions & 0 deletions server/src/main/java/org/elasticsearch/plugins/ActionPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator;
import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
Expand Down Expand Up @@ -179,4 +181,12 @@ public int hashCode() {
return Objects.hash(action, transportAction, supportTransportActions);
}
}

/**
* Returns a collection of validators that are used by {@link TransportPutMappingAction.RequestValidators} to
* validate a {@link org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest} before the executing it.
*/
default Collection<MappingRequestValidator> mappingRequestValidators() {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.indices.mapping.put;

import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.Index;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

public class ValidateMappingRequestPluginIT extends ESSingleNodeTestCase {
static final Map<String, Collection<String>> allowedOrigins = ConcurrentCollections.newConcurrentMap();
public static class TestPlugin extends Plugin implements ActionPlugin {
@Override
public Collection<MappingRequestValidator> mappingRequestValidators() {
return Collections.singletonList((request, state, indices) -> {
for (Index index : indices) {
if (allowedOrigins.getOrDefault(index.getName(), Collections.emptySet()).contains(request.origin()) == false) {
return new IllegalStateException("not allowed: index[" + index.getName() + "] origin[" + request.origin() + "]");
}
}
return null;
});
}
}

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singletonList(TestPlugin.class);
}

public void testValidateMappingRequest() {
createIndex("index_1");
createIndex("index_2");
allowedOrigins.put("index_1", Arrays.asList("1", "2"));
allowedOrigins.put("index_2", Arrays.asList("2", "3"));
{
String origin = randomFrom("", "3", "4", "5");
PutMappingRequest request = new PutMappingRequest().indices("index_1").type("doc").source("t1", "type=keyword").origin(origin);
Exception e = expectThrows(IllegalStateException.class, () -> client().admin().indices().putMapping(request).actionGet());
assertThat(e.getMessage(), equalTo("not allowed: index[index_1] origin[" + origin + "]"));
}
{
PutMappingRequest request = new PutMappingRequest().indices("index_1").origin(randomFrom("1", "2"))
.type("doc").source("t1", "type=keyword");
assertAcked(client().admin().indices().putMapping(request).actionGet());
}

{
String origin = randomFrom("", "1", "4", "5");
PutMappingRequest request = new PutMappingRequest().indices("index_2").type("doc").source("t2", "type=keyword").origin(origin);
Exception e = expectThrows(IllegalStateException.class, () -> client().admin().indices().putMapping(request).actionGet());
assertThat(e.getMessage(), equalTo("not allowed: index[index_2] origin[" + origin + "]"));
}
{
PutMappingRequest request = new PutMappingRequest().indices("index_2").origin(randomFrom("2", "3"))
.type("doc").source("t1", "type=keyword");
assertAcked(client().admin().indices().putMapping(request).actionGet());
}

{
String origin = randomFrom("", "1", "3", "4");
PutMappingRequest request = new PutMappingRequest().indices("*").type("doc").source("t3", "type=keyword").origin(origin);
Exception e = expectThrows(IllegalStateException.class, () -> client().admin().indices().putMapping(request).actionGet());
assertThat(e.getMessage(), containsString("not allowed:"));
}
{
PutMappingRequest request = new PutMappingRequest().indices("index_2").origin("2")
.type("doc").source("t3", "type=keyword");
assertAcked(client().admin().indices().putMapping(request).actionGet());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -45,6 +46,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
import org.elasticsearch.xpack.ccr.action.CcrRequests;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
Expand Down Expand Up @@ -312,4 +314,9 @@ public void onIndexModule(IndexModule indexModule) {

protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); }

@Override
public Collection<MappingRequestValidator> mappingRequestValidators() {
return Collections.singletonList(CcrRequests.CCR_PUT_MAPPING_REQUEST_VALIDATOR);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,20 @@
*/
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ccr.CcrSettings;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public final class CcrRequests {

Expand All @@ -24,8 +34,27 @@ public static ClusterStateRequest metaDataRequest(String leaderIndex) {

public static PutMappingRequest putMappingRequest(String followerIndex, MappingMetaData mappingMetaData) {
PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex);
putMappingRequest.origin("ccr");
putMappingRequest.type(mappingMetaData.type());
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
return putMappingRequest;
}

public static final MappingRequestValidator CCR_PUT_MAPPING_REQUEST_VALIDATOR = (request, state, indices) -> {
if (request.origin() == null) {
return null; // a put-mapping-request on old versions does not have origin.
}
final List<Index> followingIndices = Arrays.stream(indices)
.filter(index -> {
final IndexMetaData indexMetaData = state.metaData().index(index);
return indexMetaData != null && CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(indexMetaData.getSettings());
}).collect(Collectors.toList());
if (followingIndices.isEmpty() == false && "ccr".equals(request.origin()) == false) {
final String errorMessage = "can't put mapping to the following indices "
+ "[" + followingIndices.stream().map(Index::getName).collect(Collectors.joining(", ")) + "]; "
+ "the mapping of the following indices are self-replicated from its leader indices";
return new ElasticsearchStatusException(errorMessage, RestStatus.FORBIDDEN);
}
return null;
};
}
Loading