Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow plugins to validate cluster-state on join #26595

Merged
merged 4 commits into from
Sep 12, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.discovery;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.MasterService;
Expand All @@ -36,12 +38,15 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -62,14 +67,18 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
AllocationService allocationService) {
final UnicastHostsProvider hostsProvider;

final Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators = new ArrayList<>();
Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
for (DiscoveryPlugin plugin : plugins) {
plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> {
if (hostProviders.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Cannot register zen hosts provider [" + entry.getKey() + "] twice");
}
});
BiConsumer<DiscoveryNode, ClusterState> joinValidator = plugin.getJoinValidator();
if (joinValidator != null) {
joinValidators.add(joinValidator);
}
}
Optional<String> hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
if (hostsProviderName.isPresent()) {
Expand All @@ -85,7 +94,7 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
discoveryTypes.put("zen",
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, hostsProvider, allocationService));
clusterSettings, hostsProvider, allocationService, Collections.unmodifiableCollection(joinValidators)));
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier));
for (DiscoveryPlugin plugin : plugins) {
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public class MembershipAction extends AbstractComponent {

Expand All @@ -63,7 +66,8 @@ public interface MembershipListener {

private final MembershipListener listener;

public MembershipAction(Settings settings, TransportService transportService, MembershipListener listener) {
public MembershipAction(Settings settings, TransportService transportService, MembershipListener listener,
Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators) {
super(settings);
this.transportService = transportService;
this.listener = listener;
Expand All @@ -73,7 +77,7 @@ public MembershipAction(Settings settings, TransportService transportService, Me
ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
() -> new ValidateJoinRequest(), ThreadPool.Names.GENERIC,
new ValidateJoinRequestRequestHandler());
new ValidateJoinRequestRequestHandler(transportService::getLocalNode, joinValidators));
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,
ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
}
Expand Down Expand Up @@ -176,12 +180,20 @@ public void writeTo(StreamOutput out) throws IOException {
}

static class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {
private final Supplier<DiscoveryNode> localNodeSupplier;
private final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators;

ValidateJoinRequestRequestHandler(Supplier<DiscoveryNode> localNodeSupplier,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {
this.localNodeSupplier = localNodeSupplier;
this.joinValidators = joinValidators;
}

@Override
public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
ensureNodesCompatibility(Version.CURRENT, request.state.getNodes());
ensureIndexCompatibility(Version.CURRENT, request.state.getMetaData());
// for now, the mere fact that we can serialize the cluster state acts as validation....
DiscoveryNode node = localNodeSupplier.get();
assert node != null : "local node is null";
joinValidators.stream().forEach(action -> action.accept(node, request.state));
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
Expand All @@ -78,6 +80,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -146,15 +149,17 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover

private final NodeJoinController nodeJoinController;
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;

private final ClusterApplier clusterApplier;
private final AtomicReference<ClusterState> committedState; // last committed cluster state
private final Object stateMutex = new Object();
private final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators;

public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier,
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService) {
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService,
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators) {
super(settings);
this.onJoinValidators = addBuiltInJoinValidators(onJoinValidators);
this.masterService = masterService;
this.clusterApplier = clusterApplier;
this.transportService = transportService;
Expand Down Expand Up @@ -211,7 +216,7 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
namedWriteableRegistry,
this,
discoverySettings);
this.membership = new MembershipAction(settings, transportService, new MembershipListener());
this.membership = new MembershipAction(settings, transportService, new MembershipListener(), onJoinValidators);
this.joinThreadControl = new JoinThreadControl();

this.nodeJoinController = new NodeJoinController(masterService, allocationService, electMaster, settings);
Expand All @@ -223,6 +228,17 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler());
}

static Collection<BiConsumer<DiscoveryNode,ClusterState>> addBuiltInJoinValidators(
Collection<BiConsumer<DiscoveryNode,ClusterState>> onJoinValidators) {
Collection<BiConsumer<DiscoveryNode, ClusterState>> validators = new ArrayList<>();
validators.add((node, state) -> {
MembershipAction.ensureNodesCompatibility(node.getVersion(), state.getNodes());
MembershipAction.ensureIndexCompatibility(node.getVersion(), state.getMetaData());
});
validators.addAll(onJoinValidators);
return Collections.unmodifiableCollection(validators);
}

// protected to allow overriding in tests
protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
UnicastHostsProvider hostsProvider) {
Expand Down Expand Up @@ -885,8 +901,7 @@ void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final
} else {
// we do this in a couple of places including the cluster update thread. This one here is really just best effort
// to ensure we fail as fast as possible.
MembershipAction.ensureNodesCompatibility(node.getVersion(), state.getNodes());
MembershipAction.ensureIndexCompatibility(node.getVersion(), state.getMetaData());
onJoinValidators.stream().forEach(a -> a.accept(node, state));
if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
MembershipAction.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion());
}
Expand All @@ -898,7 +913,8 @@ void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final
try {
membership.sendValidateJoinRequestBlocking(node, state, joinTimeout);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node), e);
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node),
e);
callback.onFailure(new IllegalStateException("failure when sending a validation request to node", e));
return;
}
Expand Down Expand Up @@ -1313,4 +1329,9 @@ public void start() {
}

}

public final Collection<BiConsumer<DiscoveryNode, ClusterState>> getOnJoinValidators() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed to git add one file that uses it. sorry for the noise... it's in a test

return onJoinValidators;
}

}
11 changes: 11 additions & 0 deletions core/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@

package org.elasticsearch.plugins;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.MasterService;
Expand Down Expand Up @@ -106,4 +110,11 @@ default Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(Transpo
NetworkService networkService) {
return Collections.emptyMap();
}

/**
* Returns a consumer that validate the initial join cluster state. The validator, unless <code>null</code> is called exactly once per
* join attempt but might be called multiple times during the lifetime of a node. Validators are expected to throw a
* {@link IllegalStateException} if the incoming cluster states is invalid.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - since we call it both on the master and the joining node, I think we should say "if the node and the cluster state are incompatible"

*/
default BiConsumer<DiscoveryNode,ClusterState> getJoinValidator() { return null; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ public void onNewClusterState(String source, Supplier<ClusterState> clusterState
}
};
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
masterService, clusterApplier, clusterSettings, Collections::emptyList, ESAllocationTestCase.createAllocationService());
masterService, clusterApplier, clusterSettings, Collections::emptyList, ESAllocationTestCase.createAllocationService(),
Collections.emptyList());
zenDiscovery.start();
return zenDiscovery;
}
Expand All @@ -342,7 +343,10 @@ public void testValidateOnUnsupportedIndexVersionCreated() throws Exception {
ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.DEFAULT);
final DiscoveryNode otherNode = new DiscoveryNode("other_node", buildNewFakeTransportAddress(), emptyMap(),
EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT);
MembershipAction.ValidateJoinRequestRequestHandler request = new MembershipAction.ValidateJoinRequestRequestHandler();
final DiscoveryNode localNode = new DiscoveryNode("other_node", buildNewFakeTransportAddress(), emptyMap(),
EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT);
MembershipAction.ValidateJoinRequestRequestHandler request = new MembershipAction.ValidateJoinRequestRequestHandler
(() -> localNode, ZenDiscovery.addBuiltInJoinValidators(Collections.emptyList()));
final boolean incompatible = randomBoolean();
IndexMetaData indexMetaData = IndexMetaData.builder("test").settings(Settings.builder()
.put(SETTING_VERSION_CREATED, incompatible ? VersionUtils.getPreviousVersion(Version.CURRENT.minimumIndexCompatibilityVersion())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private TestZenDiscovery(Settings settings, ThreadPool threadPool, TransportServ
ClusterApplier clusterApplier, ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider,
AllocationService allocationService) {
super(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, clusterSettings,
hostsProvider, allocationService);
hostsProvider, allocationService, Collections.emptyList());
}

@Override
Expand Down