From 7069045cd420ed524914686bd47dade26414c1df Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 22 Nov 2019 12:50:02 -0700 Subject: [PATCH] Add the simple strategy to cluster settings (#49414) This is related to #49067. This commit adds the simple connection strategy settings and strategy mode setting to the cluster settings registry. With these changes, the simple connection mode can be used. Additionally, it adds validation to ensure that settings cannot be misconfigured. --- .../100_connection_mode_configuration.yml | 212 ++++++++++++++++++ .../common/settings/ClusterSettings.java | 7 +- .../common/settings/Setting.java | 81 +++++-- .../transport/RemoteClusterAware.java | 13 +- .../transport/RemoteClusterConnection.java | 4 +- .../transport/RemoteClusterService.java | 46 ++-- .../transport/RemoteConnectionStrategy.java | 57 ++++- .../transport/SimpleConnectionStrategy.java | 12 +- .../transport/SniffConnectionStrategy.java | 79 ++++--- .../RemoteClusterConnectionTests.java | 2 +- .../transport/RemoteClusterServiceTests.java | 7 +- .../transport/RemoteClusterSettingsTests.java | 2 +- .../RemoteConnectionStrategyTests.java | 16 +- .../SimpleConnectionStrategyTests.java | 36 +++ .../SniffConnectionStrategyTests.java | 35 +++ .../core/security/authc/RealmSettings.java | 2 +- .../70_connection_mode_configuration.yml | 212 ++++++++++++++++++ 17 files changed, 721 insertions(+), 102 deletions(-) create mode 100644 qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_connection_mode_configuration.yml create mode 100644 x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_connection_mode_configuration.yml diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_connection_mode_configuration.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_connection_mode_configuration.yml new file mode 100644 index 0000000000000..ed639b3655ed5 --- /dev/null +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_connection_mode_configuration.yml @@ -0,0 +1,212 @@ +--- +"Add transient remote cluster in simple mode with invalid sniff settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.node_connections: "5" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.node_connections\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + +--- +"Add transient remote cluster in sniff mode with invalid simple settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.simple.socket_connections: "20" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.socket_connections\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.addresses\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" } + +--- +"Add transient remote cluster using simple connection mode using valid settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.simple.socket_connections: "3" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "3"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + +--- +"Add transient remote cluster using sniff connection mode using valid settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "sniff" + cluster.remote.test_remote_cluster.sniff.node_connections: "3" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.node_connections: "3"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + +--- +"Switch connection mode for configured cluster": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "sniff" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.seeds: null + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index c2aeeac3f49d9..f51dfb62e0593 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -102,6 +102,8 @@ import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.RemoteConnectionStrategy; +import org.elasticsearch.transport.SimpleConnectionStrategy; import org.elasticsearch.transport.SniffConnectionStrategy; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.watcher.ResourceWatcherService; @@ -281,12 +283,15 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE, - RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER, + SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER, RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING, RemoteClusterService.REMOTE_NODE_ATTRIBUTE, RemoteClusterService.ENABLE_REMOTE_CLUSTERS, RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, RemoteClusterService.REMOTE_CLUSTER_COMPRESS, + RemoteConnectionStrategy.REMOTE_CONNECTION_MODE, + SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, + SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index 8848392aade82..2e6bfdc635cd2 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -53,6 +53,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.regex.Matcher; @@ -445,6 +446,7 @@ private T get(Settings settings, boolean validate) { } validator.validate(parsed); validator.validate(parsed, map); + validator.validate(parsed, map, exists(settings)); } return parsed; } catch (ElasticsearchParseException ex) { @@ -671,10 +673,11 @@ public String toString() { public static class AffixSetting extends Setting { private final AffixKey key; - private final Function> delegateFactory; + private final BiFunction> delegateFactory; private final Set dependencies; - public AffixSetting(AffixKey key, Setting delegate, Function> delegateFactory, AffixSetting... dependencies) { + public AffixSetting(AffixKey key, Setting delegate, BiFunction> delegateFactory, + AffixSetting... dependencies) { super(key, delegate.defaultValue, delegate.parser, delegate.properties.toArray(new Property[0])); this.key = key; this.delegateFactory = delegateFactory; @@ -689,6 +692,7 @@ private Stream matchStream(Settings settings) { return settings.keySet().stream().filter(this::match).map(key::getConcreteString); } + @Override public Set> getSettingsDependencies(String settingsKey) { if (dependencies.isEmpty()) { return Collections.emptySet(); @@ -713,7 +717,7 @@ public Map, T> getValue(Settings curren final Map, T> result = new IdentityHashMap<>(); Stream.concat(matchStream(current), matchStream(previous)).distinct().forEach(aKey -> { String namespace = key.getNamespace(aKey); - Setting concreteSetting = getConcreteSetting(aKey); + Setting concreteSetting = getConcreteSetting(namespace, aKey); AbstractScopedSettings.SettingUpdater updater = concreteSetting.newUpdater((v) -> consumer.accept(namespace, v), logger, (v) -> validator.accept(namespace, v)); @@ -751,7 +755,7 @@ public Map getValue(Settings current, Settings previous) { final Map result = new IdentityHashMap<>(); Stream.concat(matchStream(current), matchStream(previous)).distinct().forEach(aKey -> { String namespace = key.getNamespace(aKey); - Setting concreteSetting = getConcreteSetting(aKey); + Setting concreteSetting = getConcreteSetting(namespace, aKey); AbstractScopedSettings.SettingUpdater updater = concreteSetting.newUpdater((v) -> {}, logger, (v) -> validator.accept(namespace, v)); if (updater.hasChanged(current, previous)) { @@ -786,7 +790,16 @@ public String innerGetRaw(final Settings settings) { @Override public Setting getConcreteSetting(String key) { if (match(key)) { - return delegateFactory.apply(key); + String namespace = this.key.getNamespace(key); + return delegateFactory.apply(namespace, key); + } else { + throw new IllegalArgumentException("key [" + key + "] must match [" + getKey() + "] but didn't."); + } + } + + private Setting getConcreteSetting(String namespace, String key) { + if (match(key)) { + return delegateFactory.apply(namespace, key); } else { throw new IllegalArgumentException("key [" + key + "] must match [" + getKey() + "] but didn't."); } @@ -797,7 +810,7 @@ public Setting getConcreteSetting(String key) { */ public Setting getConcreteSettingForNamespace(String namespace) { String fullKey = key.toConcreteKey(namespace).toString(); - return getConcreteSetting(fullKey); + return getConcreteSetting(namespace, fullKey); } @Override @@ -834,8 +847,9 @@ public Set getNamespaces(Settings settings) { public Map getAsMap(Settings settings) { Map map = new HashMap<>(); matchStream(settings).distinct().forEach(key -> { - Setting concreteSetting = getConcreteSetting(key); - map.put(getNamespace(concreteSetting), concreteSetting.get(settings)); + String namespace = this.key.getNamespace(key); + Setting concreteSetting = getConcreteSetting(namespace, key); + map.put(namespace, concreteSetting.get(settings)); }); return Collections.unmodifiableMap(map); } @@ -843,9 +857,9 @@ public Map getAsMap(Settings settings) { /** * Represents a validator for a setting. The {@link #validate(Object)} method is invoked early in the update setting process with the - * value of this setting for a fail-fast validation. Later on, the {@link #validate(Object, Map)} method is invoked with the value of - * this setting and a map from the settings specified by {@link #settings()}} to their values. All these values come from the same - * {@link Settings} instance. + * value of this setting for a fail-fast validation. Later on, the {@link #validate(Object, Map)} and + * {@link #validate(Object, Map, boolean)} methods are invoked with the value of this setting and a map from the settings specified by + * {@link #settings()}} to their values. All these values come from the same {@link Settings} instance. * * @param the type of the {@link Setting} */ @@ -869,6 +883,18 @@ public interface Validator { default void validate(T value, Map, Object> settings) { } + /** + * Validate this setting against its dependencies, specified by {@link #settings()}. This method allows validation logic + * to evaluate whether the setting will be present in the {@link Settings} after the update. The default implementation + * does nothing, accepting any value as valid as long as it passes the validation in {@link #validate(Object)}. + * + * @param value the value of this setting + * @param settings a map from the settings specified by {@link #settings()}} to their values + * @param isPresent boolean indicating if this setting is present + */ + default void validate(T value, Map, Object> settings, boolean isPresent) { + } + /** * The settings on which the validity of this setting depends. The values of the specified settings are passed to * {@link #validate(Object, Map)}. By default this returns an empty iterator, indicating that this setting does not depend on any @@ -1066,6 +1092,12 @@ public static Setting intSetting(String key, int defaultValue, int minV properties); } + public static Setting intSetting(String key, int defaultValue, int minValue, Validator validator, + Property... properties) { + return new Setting<>(key, Integer.toString(defaultValue), (s) -> parseInt(s, minValue, key, isFiltered(properties)), validator, + properties); + } + public static Setting intSetting(String key, Setting fallbackSetting, int minValue, Property... properties) { return new Setting<>(key, fallbackSetting, (s) -> parseInt(s, minValue, key, isFiltered(properties)), properties); } @@ -1317,6 +1349,15 @@ public static Setting> listSetting( return listSetting(key, null, singleValueParser, defaultStringValue, properties); } + public static Setting> listSetting( + final String key, + final Function singleValueParser, + final Function> defaultStringValue, + final Validator> validator, + final Property... properties) { + return listSetting(key, null, singleValueParser, defaultStringValue, validator, properties); + } + public static Setting> listSetting( final String key, final @Nullable Setting> fallbackSetting, @@ -1326,7 +1367,7 @@ public static Setting> listSetting( return listSetting(key, fallbackSetting, singleValueParser, defaultStringValue, v -> {}, properties); } - static Setting> listSetting( + public static Setting> listSetting( final String key, final @Nullable Setting> fallbackSetting, final Function singleValueParser, @@ -1584,7 +1625,8 @@ public int hashCode() { * {@link #getConcreteSetting(String)} is used to pull the updater. */ public static AffixSetting prefixKeySetting(String prefix, Function> delegateFactory) { - return affixKeySetting(new AffixKey(prefix), delegateFactory); + BiFunction> delegateFactoryWithNamespace = (ns, k) -> delegateFactory.apply(k); + return affixKeySetting(new AffixKey(prefix), delegateFactoryWithNamespace); } /** @@ -1594,12 +1636,19 @@ public static AffixSetting prefixKeySetting(String prefix, Function AffixSetting affixKeySetting(String prefix, String suffix, Function> delegateFactory, AffixSetting... dependencies) { - return affixKeySetting(new AffixKey(prefix, suffix), delegateFactory, dependencies); + BiFunction> delegateFactoryWithNamespace = (ns, k) -> delegateFactory.apply(k); + return affixKeySetting(new AffixKey(prefix, suffix), delegateFactoryWithNamespace, dependencies); + } + + public static AffixSetting affixKeySetting(String prefix, String suffix, BiFunction> delegateFactory, + AffixSetting... dependencies) { + Setting delegate = delegateFactory.apply("_na_", "_na_"); + return new AffixSetting<>(new AffixKey(prefix, suffix), delegate, delegateFactory, dependencies); } - private static AffixSetting affixKeySetting(AffixKey key, Function> delegateFactory, + private static AffixSetting affixKeySetting(AffixKey key, BiFunction> delegateFactory, AffixSetting... dependencies) { - Setting delegate = delegateFactory.apply("_na_"); + Setting delegate = delegateFactory.apply("_na_", "_na_"); return new AffixSetting<>(key, delegate, delegateFactory, dependencies); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 2212525411101..be1ca9a1a2c43 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -102,9 +102,16 @@ void validateAndUpdateRemoteCluster(String clusterAlias, Settings settings) { * Registers this instance to listen to updates on the cluster settings. */ public void listenForUpdates(ClusterSettings clusterSettings) { - List> remoteClusterSettings = Arrays.asList(SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY, - SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, RemoteClusterService.REMOTE_CLUSTER_COMPRESS, - RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS); + List> remoteClusterSettings = Arrays.asList( + RemoteClusterService.REMOTE_CLUSTER_COMPRESS, + RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, + RemoteConnectionStrategy.REMOTE_CONNECTION_MODE, + SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY, + SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, + SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, + SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS, + SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, + SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS); clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 16f13e57e7223..8b89e8f8f8905 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -47,7 +47,7 @@ * in the remote cluster and connects to all eligible nodes, for details see {@link RemoteClusterService#REMOTE_NODE_ATTRIBUTE}. * * In the case of a disconnection, this class will issue a re-connect task to establish at most - * {@link RemoteClusterService#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of + * {@link SniffConnectionStrategy#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of * connections per cluster has been reached. */ final class RemoteClusterConnection implements Closeable { @@ -238,7 +238,7 @@ ConnectionManager getConnectionManager() { return remoteConnectionManager.getConnectionManager(); } - public boolean shouldRebuildConnection(Settings newSettings) { + boolean shouldRebuildConnection(Settings newSettings) { return connectionStrategy.shouldRebuildConnection(newSettings); } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 3a1ab055a05cf..2bfe3980ed8d3 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -64,17 +64,6 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl private static final ActionListener noopListener = ActionListener.wrap((x) -> {}, (x) -> {}); - /** - * The maximum number of connections that will be established to a remote cluster. For instance if there is only a single - * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. - */ - public static final Setting REMOTE_CONNECTIONS_PER_CLUSTER = - Setting.intSetting( - "cluster.remote.connections_per_cluster", - 3, - 1, - Setting.Property.NodeScope); - /** * The initial connect timeout for remote cluster connections */ @@ -237,22 +226,18 @@ private synchronized void updateSkipUnavailable(String clusterAlias, Boolean ski @Override protected void updateRemoteCluster(String clusterAlias, Settings settings) { - if (remoteClusters.containsKey(clusterAlias) == false) { - CountDownLatch latch = new CountDownLatch(1); - updateRemoteCluster(clusterAlias, settings, ActionListener.wrap(latch::countDown)); + CountDownLatch latch = new CountDownLatch(1); + updateRemoteCluster(clusterAlias, settings, ActionListener.wrap(latch::countDown)); - try { - // Wait 10 seconds for a new cluster. We must use a latch instead of a future because we - // are on the cluster state thread and our custom future implementation will throw an - // assertion. - if (latch.await(10, TimeUnit.SECONDS) == false) { - logger.warn("failed to connect to new remote cluster {} within {}", clusterAlias, TimeValue.timeValueSeconds(10)); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + try { + // Wait 10 seconds for a connections. We must use a latch instead of a future because we + // are on the cluster state thread and our custom future implementation will throw an + // assertion. + if (latch.await(10, TimeUnit.SECONDS) == false) { + logger.warn("failed to connect to new remote cluster {} within {}", clusterAlias, TimeValue.timeValueSeconds(10)); } - } else { - updateRemoteCluster(clusterAlias, settings, noopListener); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } @@ -280,13 +265,14 @@ synchronized void updateRemoteCluster(String clusterAlias, Settings newSettings, return; } - // this is a new cluster we have to add a new representation if (remote == null) { + // this is a new cluster we have to add a new representation Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build(); remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService); remoteClusters.put(clusterAlias, remote); + remote.ensureConnected(listener); } else if (remote.shouldRebuildConnection(newSettings)) { - // New ConnectionProfile. Must tear down existing connection + // Changes to connection configuration. Must tear down existing connection try { IOUtils.close(remote); } catch (IOException e) { @@ -296,9 +282,11 @@ synchronized void updateRemoteCluster(String clusterAlias, Settings newSettings, Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build(); remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService); remoteClusters.put(clusterAlias, remote); + remote.ensureConnected(listener); + } else { + // No changes to connection configuration. + listener.onResponse(null); } - - remote.ensureConnected(listener); } /** diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 78d831b878bd3..d8a459a79a56e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -40,13 +40,16 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -58,11 +61,15 @@ enum ConnectionStrategy { SIMPLE(SimpleConnectionStrategy.CHANNELS_PER_CONNECTION, SimpleConnectionStrategy::enablementSettings); private final int numberOfChannels; - private final Supplier>> enabledSettings; + private final Supplier>> enablementSettings; - ConnectionStrategy(int numberOfChannels, Supplier>> enabledSettings) { + ConnectionStrategy(int numberOfChannels, Supplier>> enablementSettings) { this.numberOfChannels = numberOfChannels; - this.enabledSettings = enabledSettings; + this.enablementSettings = enablementSettings; + } + + public int getNumberOfChannels() { + return numberOfChannels; } } @@ -71,6 +78,7 @@ enum ConnectionStrategy { key, ConnectionStrategy.SNIFF.name(), value -> ConnectionStrategy.valueOf(value.toUpperCase(Locale.ROOT)), + Setting.Property.NodeScope, Setting.Property.Dynamic)); @@ -121,7 +129,7 @@ static RemoteConnectionStrategy buildStrategy(String clusterAlias, TransportServ static Set getRemoteClusters(Settings settings) { final Stream> enablementSettings = Arrays.stream(ConnectionStrategy.values()) - .flatMap(strategy -> strategy.enabledSettings.get()); + .flatMap(strategy -> strategy.enablementSettings.get()); return enablementSettings.flatMap(s -> getClusterAlias(settings, s)).collect(Collectors.toSet()); } @@ -319,4 +327,45 @@ private boolean connectionProfileChanged(ConnectionProfile oldProfile, Connectio return Objects.equals(oldProfile.getCompressionEnabled(), newProfile.getCompressionEnabled()) == false || Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false; } + + static class StrategyValidator implements Setting.Validator { + + private final String key; + private final ConnectionStrategy expectedStrategy; + private final String namespace; + private final Consumer valueChecker; + + StrategyValidator(String namespace, String key, ConnectionStrategy expectedStrategy) { + this(namespace, key, expectedStrategy, (v) -> {}); + } + + StrategyValidator(String namespace, String key, ConnectionStrategy expectedStrategy, Consumer valueChecker) { + this.namespace = namespace; + this.key = key; + this.expectedStrategy = expectedStrategy; + this.valueChecker = valueChecker; + } + + @Override + public void validate(T value) { + valueChecker.accept(value); + } + + @Override + public void validate(T value, Map, Object> settings, boolean isPresent) { + Setting concrete = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(namespace); + ConnectionStrategy modeType = (ConnectionStrategy) settings.get(concrete); + if (isPresent && modeType.equals(expectedStrategy) == false) { + throw new IllegalArgumentException("Setting \"" + key + "\" cannot be used with the configured \"" + concrete.getKey() + + "\" [required=" + expectedStrategy.name() + ", configured=" + modeType.name() + "]"); + } + } + + @Override + public Iterator> settings() { + Setting concrete = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(namespace); + Stream> settingStream = Stream.of(concrete); + return settingStream.iterator(); + } + } } diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index 97e0d8a36a00c..839a1d19285b7 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -50,12 +50,13 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { */ public static final Setting.AffixSetting> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting( "cluster.remote.", - "addresses", - key -> Setting.listSetting(key, Collections.emptyList(), s -> { + "simple.addresses", + (ns, key) -> Setting.listSetting(key, Collections.emptyList(), s -> { // validate address parsePort(s); return s; - }, Setting.Property.Dynamic, Setting.Property.NodeScope)); + }, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE), + Setting.Property.Dynamic, Setting.Property.NodeScope)); /** * The maximum number of socket connections that will be established to a remote cluster. The default is 18. @@ -63,7 +64,8 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { public static final Setting.AffixSetting REMOTE_SOCKET_CONNECTIONS = Setting.affixKeySetting( "cluster.remote.", "simple.socket_connections", - key -> intSetting(key, 18, 1, Setting.Property.Dynamic, Setting.Property.NodeScope)); + (ns, key) -> intSetting(key, 18, 1, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE), + Setting.Property.Dynamic, Setting.Property.NodeScope)); static final int CHANNELS_PER_CONNECTION = 1; @@ -78,7 +80,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { private final ConnectionManager.ConnectionValidator clusterNameValidator; SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, - Settings settings) { + Settings settings) { this( clusterAlias, transportService, diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 725c6a6e6fb16..ee56629ebf0aa 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -65,18 +65,19 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { * A list of initial seed nodes to discover eligible nodes from the remote cluster */ public static final Setting.AffixSetting> REMOTE_CLUSTER_SEEDS_OLD = Setting.affixKeySetting( - "cluster.remote.", - "seeds", - key -> Setting.listSetting( - key, - Collections.emptyList(), - s -> { - // validate seed address - parsePort(s); - return s; - }, - Setting.Property.Dynamic, - Setting.Property.NodeScope)); + "cluster.remote.", + "seeds", + (ns, key) -> Setting.listSetting( + key, + Collections.emptyList(), + s -> { + // validate seed address + parsePort(s); + return s; + }, + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF), + Setting.Property.Dynamic, + Setting.Property.NodeScope)); /** * A list of initial seed nodes to discover eligible nodes from the remote cluster @@ -84,14 +85,19 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { public static final Setting.AffixSetting> REMOTE_CLUSTER_SEEDS = Setting.affixKeySetting( "cluster.remote.", "sniff.seeds", - key -> Setting.listSetting(key, - "_na_".equals(key) ? REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(key) - : REMOTE_CLUSTER_SEEDS_OLD.getConcreteSetting(key.replaceAll("sniff\\.seeds$", "seeds")), + (ns, key) -> Setting.listSetting(key, + REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(ns), s -> { // validate seed address parsePort(s); return s; - }, Setting.Property.Dynamic, Setting.Property.NodeScope)); + }, + s -> REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(ns).get(s), + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF), + Setting.Property.Dynamic, + Setting.Property.NodeScope)); + + /** * A proxy address for the remote cluster. By default this is not set, meaning that Elasticsearch will connect directly to the nodes in * the remote cluster using their publish addresses. If this setting is set to an IP address or hostname then Elasticsearch will connect @@ -99,19 +105,29 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { * undocumented as it does not work well with all proxies. */ public static final Setting.AffixSetting REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting( - "cluster.remote.", - "proxy", - key -> Setting.simpleString( - key, - s -> { - if (Strings.hasLength(s)) { - parsePort(s); - } - }, - Setting.Property.Dynamic, - Setting.Property.NodeScope), + "cluster.remote.", + "proxy", + (ns, key) -> Setting.simpleString( + key, + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF, s -> { + if (Strings.hasLength(s)) { + parsePort(s); + } + }), + Setting.Property.Dynamic, + Setting.Property.NodeScope), REMOTE_CLUSTER_SEEDS); + /** + * The maximum number of connections that will be established to a remote cluster. For instance if there is only a single + * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. + */ + public static final Setting REMOTE_CONNECTIONS_PER_CLUSTER = + intSetting( + "cluster.remote.connections_per_cluster", + 3, + 1, + Setting.Property.NodeScope); /** * The maximum number of node connections that will be established to a remote cluster. For instance if there is only a single * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. @@ -119,8 +135,13 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { public static final Setting.AffixSetting REMOTE_NODE_CONNECTIONS = Setting.affixKeySetting( "cluster.remote.", "sniff.node_connections", - key -> intSetting(key, RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER, 1, - Setting.Property.Dynamic, Setting.Property.NodeScope)); + (ns, key) -> intSetting( + key, + REMOTE_CONNECTIONS_PER_CLUSTER, + 1, + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF), + Setting.Property.Dynamic, + Setting.Property.NodeScope)); static final int CHANNELS_PER_CONNECTION = 6; diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index b9038e8101ed6..d74a8daa98d61 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -330,7 +330,7 @@ public void testGetConnectionInfo() throws Exception { int maxNumConnections = randomIntBetween(1, 5); String clusterAlias = "test-cluster"; Settings settings = Settings.builder().put(buildSniffSettings(clusterAlias, seedNodes)) - .put(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER.getKey(), maxNumConnections).build(); + .put(SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER.getKey(), maxNumConnections).build(); try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { // test no nodes connected RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 60e7a848bbb18..0fdc797b8b377 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -76,15 +76,18 @@ private MockTransportService startTransport( } public void testSettingsAreRegistered() { - assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)); - assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_NODE_ATTRIBUTE)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_COMPRESS)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS)); } public void testRemoteClusterSeedSetting() { diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java index 80206eaf2b21d..af855314278f7 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java @@ -29,7 +29,7 @@ import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD; import static org.elasticsearch.transport.RemoteClusterService.ENABLE_REMOTE_CLUSTERS; import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE; -import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER; +import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER; import static org.elasticsearch.transport.RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING; import static org.elasticsearch.transport.RemoteClusterService.REMOTE_NODE_ATTRIBUTE; import static org.hamcrest.Matchers.emptyCollectionOf; diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index 3bae8b4c9559b..5ea54c7356b94 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -72,14 +72,14 @@ public void testChangeInConnectionProfileMeansTheStrategyMustBeRebuilt() { public void testCorrectChannelNumber() { String clusterAlias = "cluster-alias"; - String settingKey = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(); - Settings simpleSettings = Settings.builder().put(settingKey, "simple").build(); - ConnectionProfile simpleProfile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, simpleSettings); - assertEquals(1, simpleProfile.getNumConnections()); - - Settings sniffSettings = Settings.builder().put(settingKey, "sniff").build(); - ConnectionProfile sniffProfile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, sniffSettings); - assertEquals(6, sniffProfile.getNumConnections()); + + for (RemoteConnectionStrategy.ConnectionStrategy strategy : RemoteConnectionStrategy.ConnectionStrategy.values()) { + String settingKey = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(); + Settings simpleSettings = Settings.builder().put(settingKey, strategy.name()).build(); + ConnectionProfile simpleProfile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, simpleSettings); + assertEquals("Incorrect number of channels for " + strategy.name(), + strategy.getNumberOfChannels(), simpleProfile.getNumConnections()); + } } private static class FakeConnectionStrategy extends RemoteConnectionStrategy { diff --git a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java index 95297bf33e931..35a6b7a6758ac 100644 --- a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java @@ -22,6 +22,10 @@ import org.elasticsearch.Version; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.AbstractScopedSettings; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; @@ -32,7 +36,9 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -297,6 +303,36 @@ numOfConnections, addresses(address), Collections.singletonList(addressSupplier } } + public void testModeSettingsCannotBeUsedWhenInDifferentMode() { + List, String>> restrictedSettings = Arrays.asList( + new Tuple<>(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, "192.168.0.1:8080"), + new Tuple<>(SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, "3")); + + RemoteConnectionStrategy.ConnectionStrategy sniff = RemoteConnectionStrategy.ConnectionStrategy.SNIFF; + + String clusterName = "cluster_name"; + Settings settings = Settings.builder() + .put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterName).getKey(), sniff.name()) + .build(); + + Set> clusterSettings = new HashSet<>(); + clusterSettings.add(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE); + clusterSettings.addAll(restrictedSettings.stream().map(Tuple::v1).collect(Collectors.toList())); + AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, clusterSettings); + + // Should validate successfully + service.validate(settings, true); + + for (Tuple, String> restrictedSetting : restrictedSettings) { + Setting concreteSetting = restrictedSetting.v1().getConcreteSettingForNamespace(clusterName); + Settings invalid = Settings.builder().put(settings).put(concreteSetting.getKey(), restrictedSetting.v2()).build(); + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.validate(invalid, true)); + String expected = "Setting \"" + concreteSetting.getKey() + "\" cannot be used with the configured " + + "\"cluster.remote.cluster_name.mode\" [required=SIMPLE, configured=SNIFF]"; + assertEquals(expected, iae.getMessage()); + } + } + private static List addresses(final TransportAddress... addresses) { return Arrays.stream(addresses).map(TransportAddress::toString).collect(Collectors.toList()); } diff --git a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java index 758b5dca101e5..721055a9c20f7 100644 --- a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java @@ -30,6 +30,9 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.AbstractScopedSettings; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -42,6 +45,7 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; @@ -637,6 +641,37 @@ public void testGetNodePredicatesCombination() { } } + public void testModeSettingsCannotBeUsedWhenInDifferentMode() { + List, String>> restrictedSettings = Arrays.asList( + new Tuple<>(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, "192.168.0.1:8080"), + new Tuple<>(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, "192.168.0.1:8080"), + new Tuple<>(SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS, "2")); + + RemoteConnectionStrategy.ConnectionStrategy simple = RemoteConnectionStrategy.ConnectionStrategy.SIMPLE; + + String clusterName = "cluster_name"; + Settings settings = Settings.builder() + .put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterName).getKey(), simple.name()) + .build(); + + Set> clusterSettings = new HashSet<>(); + clusterSettings.add(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE); + clusterSettings.addAll(restrictedSettings.stream().map(Tuple::v1).collect(Collectors.toList())); + AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, clusterSettings); + + // Should validate successfully + service.validate(settings, true); + + for (Tuple, String> restrictedSetting : restrictedSettings) { + Setting concreteSetting = restrictedSetting.v1().getConcreteSettingForNamespace(clusterName); + Settings invalid = Settings.builder().put(settings).put(concreteSetting.getKey(), restrictedSetting.v2()).build(); + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.validate(invalid, true)); + String expected = "Setting \"" + concreteSetting.getKey() + "\" cannot be used with the configured " + + "\"cluster.remote.cluster_name.mode\" [required=SNIFF, configured=SIMPLE]"; + assertEquals(expected, iae.getMessage()); + } + } + private static List seedNodes(final DiscoveryNode... seedNodes) { return Arrays.stream(seedNodes).map(s -> s.getAddress().toString()).collect(Collectors.toList()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/RealmSettings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/RealmSettings.java index 6ab511df90eed..fda2cf614c8bd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/RealmSettings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/RealmSettings.java @@ -74,7 +74,7 @@ public static Setting.AffixSetting secureString(String realmType, * The {@code Function} takes the realm-type as an argument. * @param suffix The suffix of the setting (everything following the realm name in the affix setting) * @param delegateFactory A factory to produce the concrete setting. - * See {@link Setting#affixKeySetting(Setting.AffixKey, Function, Setting.AffixSetting[])} + * See {@link Setting#affixKeySetting(String, String, Function, Setting.AffixSetting[])} */ public static Function> affixSetting(String suffix, Function> delegateFactory) { return realmType -> Setting.affixKeySetting(realmSettingPrefix(realmType), suffix, delegateFactory); diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_connection_mode_configuration.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_connection_mode_configuration.yml new file mode 100644 index 0000000000000..ed639b3655ed5 --- /dev/null +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_connection_mode_configuration.yml @@ -0,0 +1,212 @@ +--- +"Add transient remote cluster in simple mode with invalid sniff settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.node_connections: "5" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.node_connections\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + +--- +"Add transient remote cluster in sniff mode with invalid simple settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.simple.socket_connections: "20" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.socket_connections\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.addresses\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" } + +--- +"Add transient remote cluster using simple connection mode using valid settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.simple.socket_connections: "3" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "3"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + +--- +"Add transient remote cluster using sniff connection mode using valid settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "sniff" + cluster.remote.test_remote_cluster.sniff.node_connections: "3" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.node_connections: "3"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + +--- +"Switch connection mode for configured cluster": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "sniff" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.seeds: null + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" }