diff --git a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index 0ecf40e65a1ba..179692cd516c8 100644 --- a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -19,6 +19,8 @@ package org.elasticsearch.discovery; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.MasterService; @@ -36,12 +38,15 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; @@ -62,7 +67,7 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic ClusterApplier clusterApplier, ClusterSettings clusterSettings, List plugins, AllocationService allocationService) { final UnicastHostsProvider hostsProvider; - + final Collection> joinValidators = new ArrayList<>(); Map> hostProviders = new HashMap<>(); for (DiscoveryPlugin plugin : plugins) { plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> { @@ -70,6 +75,10 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic throw new IllegalArgumentException("Cannot register zen hosts provider [" + entry.getKey() + "] twice"); } }); + BiConsumer joinValidator = plugin.getJoinValidator(); + if (joinValidator != null) { + joinValidators.add(joinValidator); + } } Optional hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings); if (hostsProviderName.isPresent()) { @@ -85,7 +94,7 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic Map> discoveryTypes = new HashMap<>(); discoveryTypes.put("zen", () -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, - clusterSettings, hostsProvider, allocationService)); + clusterSettings, hostsProvider, allocationService, Collections.unmodifiableCollection(joinValidators))); discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier)); for (DiscoveryPlugin plugin : plugins) { plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry, diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java index 18cac5818049f..fdfcd8ac29079 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java @@ -39,7 +39,10 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Collection; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Supplier; public class MembershipAction extends AbstractComponent { @@ -63,7 +66,8 @@ public interface MembershipListener { private final MembershipListener listener; - public MembershipAction(Settings settings, TransportService transportService, MembershipListener listener) { + public MembershipAction(Settings settings, TransportService transportService, MembershipListener listener, + Collection> joinValidators) { super(settings); this.transportService = transportService; this.listener = listener; @@ -73,7 +77,7 @@ public MembershipAction(Settings settings, TransportService transportService, Me ThreadPool.Names.GENERIC, new JoinRequestRequestHandler()); transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME, () -> new ValidateJoinRequest(), ThreadPool.Names.GENERIC, - new ValidateJoinRequestRequestHandler()); + new ValidateJoinRequestRequestHandler(transportService::getLocalNode, joinValidators)); transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new, ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler()); } @@ -176,12 +180,20 @@ public void writeTo(StreamOutput out) throws IOException { } static class ValidateJoinRequestRequestHandler implements TransportRequestHandler { + private final Supplier localNodeSupplier; + private final Collection> joinValidators; + + ValidateJoinRequestRequestHandler(Supplier localNodeSupplier, + Collection> joinValidators) { + this.localNodeSupplier = localNodeSupplier; + this.joinValidators = joinValidators; + } @Override public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception { - ensureNodesCompatibility(Version.CURRENT, request.state.getNodes()); - ensureIndexCompatibility(Version.CURRENT, request.state.getMetaData()); - // for now, the mere fact that we can serialize the cluster state acts as validation.... + DiscoveryNode node = localNodeSupplier.get(); + assert node != null : "local node is null"; + joinValidators.stream().forEach(action -> action.accept(node, request.state)); channel.sendResponse(TransportResponse.Empty.INSTANCE); } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index a4817fada36d2..249cce73765be 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -69,6 +69,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Set; @@ -78,6 +80,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -146,15 +149,17 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover private final NodeJoinController nodeJoinController; private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; - private final ClusterApplier clusterApplier; private final AtomicReference committedState; // last committed cluster state private final Object stateMutex = new Object(); + private final Collection> onJoinValidators; public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier, - ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService) { + ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService, + Collection> onJoinValidators) { super(settings); + this.onJoinValidators = addBuiltInJoinValidators(onJoinValidators); this.masterService = masterService; this.clusterApplier = clusterApplier; this.transportService = transportService; @@ -211,7 +216,7 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t namedWriteableRegistry, this, discoverySettings); - this.membership = new MembershipAction(settings, transportService, new MembershipListener()); + this.membership = new MembershipAction(settings, transportService, new MembershipListener(), onJoinValidators); this.joinThreadControl = new JoinThreadControl(); this.nodeJoinController = new NodeJoinController(masterService, allocationService, electMaster, settings); @@ -223,6 +228,17 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler()); } + static Collection> addBuiltInJoinValidators( + Collection> onJoinValidators) { + Collection> validators = new ArrayList<>(); + validators.add((node, state) -> { + MembershipAction.ensureNodesCompatibility(node.getVersion(), state.getNodes()); + MembershipAction.ensureIndexCompatibility(node.getVersion(), state.getMetaData()); + }); + validators.addAll(onJoinValidators); + return Collections.unmodifiableCollection(validators); + } + // protected to allow overriding in tests protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, UnicastHostsProvider hostsProvider) { @@ -885,8 +901,7 @@ void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final } else { // we do this in a couple of places including the cluster update thread. This one here is really just best effort // to ensure we fail as fast as possible. - MembershipAction.ensureNodesCompatibility(node.getVersion(), state.getNodes()); - MembershipAction.ensureIndexCompatibility(node.getVersion(), state.getMetaData()); + onJoinValidators.stream().forEach(a -> a.accept(node, state)); if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) { MembershipAction.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion()); } @@ -898,7 +913,8 @@ void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final try { membership.sendValidateJoinRequestBlocking(node, state, joinTimeout); } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node), e); + logger.warn((Supplier) () -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node), + e); callback.onFailure(new IllegalStateException("failure when sending a validation request to node", e)); return; } @@ -1313,4 +1329,9 @@ public void start() { } } + + public final Collection> getOnJoinValidators() { + return onJoinValidators; + } + } diff --git a/core/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java b/core/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java index c3af5593cd7c4..912bcdc9d852a 100644 --- a/core/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java +++ b/core/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java @@ -19,10 +19,14 @@ package org.elasticsearch.plugins; +import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.function.BiConsumer; import java.util.function.Supplier; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.MasterService; @@ -106,4 +110,11 @@ default Map> getZenHostsProviders(Transpo NetworkService networkService) { return Collections.emptyMap(); } + + /** + * Returns a consumer that validate the initial join cluster state. The validator, unless null is called exactly once per + * join attempt but might be called multiple times during the lifetime of a node. Validators are expected to throw a + * {@link IllegalStateException} if the node and the cluster-state are incompatible. + */ + default BiConsumer getJoinValidator() { return null; } } diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java index 39a9dbff959c6..8c2d84cd8c89d 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java @@ -20,6 +20,8 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.MasterService; @@ -40,10 +42,12 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; import java.util.function.Supplier; import static org.mockito.Mockito.mock; @@ -160,7 +164,23 @@ public void testDuplicateHostsProvider() { public void testLazyConstructionHostsProvider() { DummyHostsProviderPlugin plugin = () -> Collections.singletonMap("custom", - () -> { throw new AssertionError("created hosts provider which was not selected"); }); + () -> { + throw new AssertionError("created hosts provider which was not selected"); + }); newModule(Settings.EMPTY, Collections.singletonList(plugin)); } + + public void testJoinValidator() { + BiConsumer consumer = (a, b) -> {}; + DiscoveryModule module = newModule(Settings.EMPTY, Collections.singletonList(new DiscoveryPlugin() { + @Override + public BiConsumer getJoinValidator() { + return consumer; + } + })); + ZenDiscovery discovery = (ZenDiscovery) module.getDiscovery(); + Collection> onJoinValidators = discovery.getOnJoinValidators(); + assertEquals(2, onJoinValidators.size()); + assertTrue(onJoinValidators.contains(consumer)); + } } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index bc653e14e3275..b0dc783349ca8 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -320,7 +320,8 @@ public void onNewClusterState(String source, Supplier clusterState } }; ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), - masterService, clusterApplier, clusterSettings, Collections::emptyList, ESAllocationTestCase.createAllocationService()); + masterService, clusterApplier, clusterSettings, Collections::emptyList, ESAllocationTestCase.createAllocationService(), + Collections.emptyList()); zenDiscovery.start(); return zenDiscovery; } @@ -342,7 +343,10 @@ public void testValidateOnUnsupportedIndexVersionCreated() throws Exception { ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.DEFAULT); final DiscoveryNode otherNode = new DiscoveryNode("other_node", buildNewFakeTransportAddress(), emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT); - MembershipAction.ValidateJoinRequestRequestHandler request = new MembershipAction.ValidateJoinRequestRequestHandler(); + final DiscoveryNode localNode = new DiscoveryNode("other_node", buildNewFakeTransportAddress(), emptyMap(), + EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT); + MembershipAction.ValidateJoinRequestRequestHandler request = new MembershipAction.ValidateJoinRequestRequestHandler + (() -> localNode, ZenDiscovery.addBuiltInJoinValidators(Collections.emptyList())); final boolean incompatible = randomBoolean(); IndexMetaData indexMetaData = IndexMetaData.builder("test").settings(Settings.builder() .put(SETTING_VERSION_CREATED, incompatible ? VersionUtils.getPreviousVersion(Version.CURRENT.minimumIndexCompatibilityVersion()) diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java index 63212cddc39b1..d224d9c519c8a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java @@ -83,7 +83,7 @@ private TestZenDiscovery(Settings settings, ThreadPool threadPool, TransportServ ClusterApplier clusterApplier, ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService) { super(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, clusterSettings, - hostsProvider, allocationService); + hostsProvider, allocationService, Collections.emptyList()); } @Override