Skip to content

Commit

Permalink
Allow plugins to validate cluster-state on join (#26595)
Browse files Browse the repository at this point in the history
Today we don't have a pluggable way to validate if the cluster state
is compatible with the node that joins. We already apply some checks for index
compatibility that prevents nodes to join a cluster with indices it doesn't support
but for plugins this isn't possible. This change adds a cluster state validator that
allows plugins to prevent a join if the cluster-state is incompatible.
  • Loading branch information
s1monw committed Sep 12, 2017
1 parent c7d8eb9 commit 66d5c40
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.discovery;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.MasterService;
Expand All @@ -36,12 +38,15 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -62,14 +67,18 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
AllocationService allocationService) {
final UnicastHostsProvider hostsProvider;

final Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators = new ArrayList<>();
Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
for (DiscoveryPlugin plugin : plugins) {
plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> {
if (hostProviders.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Cannot register zen hosts provider [" + entry.getKey() + "] twice");
}
});
BiConsumer<DiscoveryNode, ClusterState> joinValidator = plugin.getJoinValidator();
if (joinValidator != null) {
joinValidators.add(joinValidator);
}
}
Optional<String> hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
if (hostsProviderName.isPresent()) {
Expand All @@ -85,7 +94,7 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
discoveryTypes.put("zen",
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, hostsProvider, allocationService));
clusterSettings, hostsProvider, allocationService, Collections.unmodifiableCollection(joinValidators)));
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier));
for (DiscoveryPlugin plugin : plugins) {
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public class MembershipAction extends AbstractComponent {

Expand All @@ -63,7 +66,8 @@ public interface MembershipListener {

private final MembershipListener listener;

public MembershipAction(Settings settings, TransportService transportService, MembershipListener listener) {
public MembershipAction(Settings settings, TransportService transportService, MembershipListener listener,
Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators) {
super(settings);
this.transportService = transportService;
this.listener = listener;
Expand All @@ -73,7 +77,7 @@ public MembershipAction(Settings settings, TransportService transportService, Me
ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
() -> new ValidateJoinRequest(), ThreadPool.Names.GENERIC,
new ValidateJoinRequestRequestHandler());
new ValidateJoinRequestRequestHandler(transportService::getLocalNode, joinValidators));
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,
ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
}
Expand Down Expand Up @@ -176,12 +180,20 @@ public void writeTo(StreamOutput out) throws IOException {
}

static class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {
private final Supplier<DiscoveryNode> localNodeSupplier;
private final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators;

ValidateJoinRequestRequestHandler(Supplier<DiscoveryNode> localNodeSupplier,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {
this.localNodeSupplier = localNodeSupplier;
this.joinValidators = joinValidators;
}

@Override
public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
ensureNodesCompatibility(Version.CURRENT, request.state.getNodes());
ensureIndexCompatibility(Version.CURRENT, request.state.getMetaData());
// for now, the mere fact that we can serialize the cluster state acts as validation....
DiscoveryNode node = localNodeSupplier.get();
assert node != null : "local node is null";
joinValidators.stream().forEach(action -> action.accept(node, request.state));
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
Expand All @@ -78,6 +80,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -146,15 +149,17 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover

private final NodeJoinController nodeJoinController;
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;

private final ClusterApplier clusterApplier;
private final AtomicReference<ClusterState> committedState; // last committed cluster state
private final Object stateMutex = new Object();
private final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators;

public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier,
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService) {
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService,
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators) {
super(settings);
this.onJoinValidators = addBuiltInJoinValidators(onJoinValidators);
this.masterService = masterService;
this.clusterApplier = clusterApplier;
this.transportService = transportService;
Expand Down Expand Up @@ -211,7 +216,7 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
namedWriteableRegistry,
this,
discoverySettings);
this.membership = new MembershipAction(settings, transportService, new MembershipListener());
this.membership = new MembershipAction(settings, transportService, new MembershipListener(), onJoinValidators);
this.joinThreadControl = new JoinThreadControl();

this.nodeJoinController = new NodeJoinController(masterService, allocationService, electMaster, settings);
Expand All @@ -223,6 +228,17 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler());
}

static Collection<BiConsumer<DiscoveryNode,ClusterState>> addBuiltInJoinValidators(
Collection<BiConsumer<DiscoveryNode,ClusterState>> onJoinValidators) {
Collection<BiConsumer<DiscoveryNode, ClusterState>> validators = new ArrayList<>();
validators.add((node, state) -> {
MembershipAction.ensureNodesCompatibility(node.getVersion(), state.getNodes());
MembershipAction.ensureIndexCompatibility(node.getVersion(), state.getMetaData());
});
validators.addAll(onJoinValidators);
return Collections.unmodifiableCollection(validators);
}

// protected to allow overriding in tests
protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
UnicastHostsProvider hostsProvider) {
Expand Down Expand Up @@ -885,8 +901,7 @@ void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final
} else {
// we do this in a couple of places including the cluster update thread. This one here is really just best effort
// to ensure we fail as fast as possible.
MembershipAction.ensureNodesCompatibility(node.getVersion(), state.getNodes());
MembershipAction.ensureIndexCompatibility(node.getVersion(), state.getMetaData());
onJoinValidators.stream().forEach(a -> a.accept(node, state));
if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
MembershipAction.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion());
}
Expand All @@ -898,7 +913,8 @@ void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final
try {
membership.sendValidateJoinRequestBlocking(node, state, joinTimeout);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node), e);
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node),
e);
callback.onFailure(new IllegalStateException("failure when sending a validation request to node", e));
return;
}
Expand Down Expand Up @@ -1313,4 +1329,9 @@ public void start() {
}

}

public final Collection<BiConsumer<DiscoveryNode, ClusterState>> getOnJoinValidators() {
return onJoinValidators;
}

}
11 changes: 11 additions & 0 deletions core/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@

package org.elasticsearch.plugins;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.MasterService;
Expand Down Expand Up @@ -106,4 +110,11 @@ default Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(Transpo
NetworkService networkService) {
return Collections.emptyMap();
}

/**
* Returns a consumer that validate the initial join cluster state. The validator, unless <code>null</code> is called exactly once per
* join attempt but might be called multiple times during the lifetime of a node. Validators are expected to throw a
* {@link IllegalStateException} if the node and the cluster-state are incompatible.
*/
default BiConsumer<DiscoveryNode,ClusterState> getJoinValidator() { return null; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.MasterService;
Expand All @@ -40,10 +42,12 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -160,7 +164,23 @@ public void testDuplicateHostsProvider() {

public void testLazyConstructionHostsProvider() {
DummyHostsProviderPlugin plugin = () -> Collections.singletonMap("custom",
() -> { throw new AssertionError("created hosts provider which was not selected"); });
() -> {
throw new AssertionError("created hosts provider which was not selected");
});
newModule(Settings.EMPTY, Collections.singletonList(plugin));
}

public void testJoinValidator() {
BiConsumer<DiscoveryNode, ClusterState> consumer = (a, b) -> {};
DiscoveryModule module = newModule(Settings.EMPTY, Collections.singletonList(new DiscoveryPlugin() {
@Override
public BiConsumer<DiscoveryNode, ClusterState> getJoinValidator() {
return consumer;
}
}));
ZenDiscovery discovery = (ZenDiscovery) module.getDiscovery();
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators = discovery.getOnJoinValidators();
assertEquals(2, onJoinValidators.size());
assertTrue(onJoinValidators.contains(consumer));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ public void onNewClusterState(String source, Supplier<ClusterState> clusterState
}
};
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
masterService, clusterApplier, clusterSettings, Collections::emptyList, ESAllocationTestCase.createAllocationService());
masterService, clusterApplier, clusterSettings, Collections::emptyList, ESAllocationTestCase.createAllocationService(),
Collections.emptyList());
zenDiscovery.start();
return zenDiscovery;
}
Expand All @@ -342,7 +343,10 @@ public void testValidateOnUnsupportedIndexVersionCreated() throws Exception {
ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.DEFAULT);
final DiscoveryNode otherNode = new DiscoveryNode("other_node", buildNewFakeTransportAddress(), emptyMap(),
EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT);
MembershipAction.ValidateJoinRequestRequestHandler request = new MembershipAction.ValidateJoinRequestRequestHandler();
final DiscoveryNode localNode = new DiscoveryNode("other_node", buildNewFakeTransportAddress(), emptyMap(),
EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT);
MembershipAction.ValidateJoinRequestRequestHandler request = new MembershipAction.ValidateJoinRequestRequestHandler
(() -> localNode, ZenDiscovery.addBuiltInJoinValidators(Collections.emptyList()));
final boolean incompatible = randomBoolean();
IndexMetaData indexMetaData = IndexMetaData.builder("test").settings(Settings.builder()
.put(SETTING_VERSION_CREATED, incompatible ? VersionUtils.getPreviousVersion(Version.CURRENT.minimumIndexCompatibilityVersion())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private TestZenDiscovery(Settings settings, ThreadPool threadPool, TransportServ
ClusterApplier clusterApplier, ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider,
AllocationService allocationService) {
super(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, clusterSettings,
hostsProvider, allocationService);
hostsProvider, allocationService, Collections.emptyList());
}

@Override
Expand Down

0 comments on commit 66d5c40

Please sign in to comment.