Skip to content

Commit

Permalink
Merge branch 'master' into alpha-not-ascii
Browse files Browse the repository at this point in the history
* master:
  Fix Javadocs for BootstrapChecks#enforceLimits
  Disable bootstrap checks for single-node discovery
  Add unit tests for the missing aggregator (elastic#23895)
  Add a property to mark setting as final (elastic#23872)
  testDifferentRolesMaintainPathOnRestart - fix broken comment
  testDifferentRolesMaintainPathOnRestart - lower join timeout as split elections are likely
  Introduce single-node discovery
  Await termination after shutting down executors
  • Loading branch information
jasontedor committed Apr 4, 2017
2 parents 84e9e66 + a01f772 commit 8b0e059
Show file tree
Hide file tree
Showing 22 changed files with 893 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.process.ProcessProbe;
import org.elasticsearch.node.Node;
Expand Down Expand Up @@ -73,7 +74,7 @@ static void check(final Settings settings, final BoundTransportAddress boundTran
final List<BootstrapCheck> combinedChecks = new ArrayList<>(builtInChecks);
combinedChecks.addAll(additionalChecks);
check(
enforceLimits(boundTransportAddress),
enforceLimits(boundTransportAddress, DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings)),
Collections.unmodifiableList(combinedChecks),
Node.NODE_NAME_SETTING.get(settings));
}
Expand Down Expand Up @@ -164,13 +165,16 @@ static void log(final Logger logger, final String error) {
* Tests if the checks should be enforced.
*
* @param boundTransportAddress the node network bindings
* @param discoveryType the discovery type
* @return {@code true} if the checks should be enforced
*/
static boolean enforceLimits(final BoundTransportAddress boundTransportAddress) {
Predicate<TransportAddress> isLoopbackOrLinkLocalAddress =
static boolean enforceLimits(final BoundTransportAddress boundTransportAddress, final String discoveryType) {
final Predicate<TransportAddress> isLoopbackOrLinkLocalAddress =
t -> t.address().getAddress().isLinkLocalAddress() || t.address().getAddress().isLoopbackAddress();
return !(Arrays.stream(boundTransportAddress.boundAddresses()).allMatch(isLoopbackOrLinkLocalAddress) &&
final boolean bound =
!(Arrays.stream(boundTransportAddress.boundAddresses()).allMatch(isLoopbackOrLinkLocalAddress) &&
isLoopbackOrLinkLocalAddress.test(boundTransportAddress.publishAddress()));
return bound && !"single-node".equals(discoveryType);
}

// the list of checks to execute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ static Setting<Integer> buildNumberOfShardsSetting() {
throw new IllegalArgumentException("es.index.max_number_of_shards must be > 0");
}
return Setting.intSetting(SETTING_NUMBER_OF_SHARDS, Math.min(5, maxNumShards), 1, maxNumShards,
Property.IndexScope);
Property.IndexScope, Property.Final);
}

public static final String INDEX_SETTING_PREFIX = "index.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,6 @@ public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request
indexScopedSettings.validate(normalizedSettings);
// never allow to change the number of shards
for (Map.Entry<String, String> entry : normalizedSettings.getAsMap().entrySet()) {
if (entry.getKey().equals(IndexMetaData.SETTING_NUMBER_OF_SHARDS)) {
listener.onFailure(new IllegalArgumentException("can't change the number of shards for an index"));
return;
}
Setting setting = indexScopedSettings.get(entry.getKey());
assert setting != null; // we already validated the normalized settings
settingsForClosedIndices.put(entry.getKey(), entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand Down Expand Up @@ -382,11 +380,19 @@ private boolean assertMatcher(String key, int numComplexMatchers) {
/**
* Returns <code>true</code> if the setting for the given key is dynamically updateable. Otherwise <code>false</code>.
*/
public boolean hasDynamicSetting(String key) {
public boolean isDynamicSetting(String key) {
final Setting<?> setting = get(key);
return setting != null && setting.isDynamic();
}

/**
* Returns <code>true</code> if the setting for the given key is final. Otherwise <code>false</code>.
*/
public boolean isFinalSetting(String key) {
final Setting<?> setting = get(key);
return setting != null && setting.isFinal();
}

/**
* Returns a settings object that contains all settings that are not
* already set in the given source. The diff contains either the default value for each
Expand Down Expand Up @@ -465,11 +471,14 @@ private boolean updateSettings(Settings toApply, Settings.Builder target, Settin
boolean changed = false;
final Set<String> toRemove = new HashSet<>();
Settings.Builder settingsBuilder = Settings.builder();
final Predicate<String> canUpdate = (key) -> (onlyDynamic == false && get(key) != null) || hasDynamicSetting(key);
final Predicate<String> canRemove = (key) ->( // we can delete if
onlyDynamic && hasDynamicSetting(key) // it's a dynamicSetting and we only do dynamic settings
|| get(key) == null && key.startsWith(ARCHIVED_SETTINGS_PREFIX) // the setting is not registered AND it's been archived
|| (onlyDynamic == false && get(key) != null)); // if it's not dynamic AND we have a key
final Predicate<String> canUpdate = (key) -> (
isFinalSetting(key) == false && // it's not a final setting
((onlyDynamic == false && get(key) != null) || isDynamicSetting(key)));
final Predicate<String> canRemove = (key) ->(// we can delete if
isFinalSetting(key) == false && // it's not a final setting
(onlyDynamic && isDynamicSetting(key) // it's a dynamicSetting and we only do dynamic settings
|| get(key) == null && key.startsWith(ARCHIVED_SETTINGS_PREFIX) // the setting is not registered AND it's been archived
|| (onlyDynamic == false && get(key) != null))); // if it's not dynamic AND we have a key
for (Map.Entry<String, String> entry : toApply.getAsMap().entrySet()) {
if (entry.getValue() == null && (canRemove.test(entry.getKey()) || entry.getKey().endsWith("*"))) {
// this either accepts null values that suffice the canUpdate test OR wildcard expressions (key ends with *)
Expand All @@ -482,7 +491,11 @@ onlyDynamic && hasDynamicSetting(key) // it's a dynamicSetting and we only do d
updates.put(entry.getKey(), entry.getValue());
changed = true;
} else {
throw new IllegalArgumentException(type + " setting [" + entry.getKey() + "], not dynamically updateable");
if (isFinalSetting(entry.getKey())) {
throw new IllegalArgumentException("final " + type + " setting [" + entry.getKey() + "], not updateable");
} else {
throw new IllegalArgumentException(type + " setting [" + entry.getKey() + "], not dynamically updateable");
}
}
}
changed |= applyDeletes(toRemove, target, canRemove);
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/java/org/elasticsearch/common/settings/Setting.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ public enum Property {
*/
Dynamic,

/**
* mark this setting as final, not updateable even when the context is not dynamic
* ie. Setting this property on an index scoped setting will fail update when the index is closed
*/
Final,

/**
* mark this setting as deprecated
*/
Expand Down Expand Up @@ -135,6 +141,9 @@ private Setting(Key key, @Nullable Setting<T> fallbackSetting, Function<Settings
this.properties = EMPTY_PROPERTIES;
} else {
this.properties = EnumSet.copyOf(Arrays.asList(properties));
if (isDynamic() && isFinal()) {
throw new IllegalArgumentException("final setting [" + key + "] cannot be dynamic");
}
}
}

Expand Down Expand Up @@ -218,6 +227,13 @@ public final boolean isDynamic() {
return properties.contains(Property.Dynamic);
}

/**
* Returns <code>true</code> if this setting is final, otherwise <code>false</code>
*/
public final boolean isFinal() {
return properties.contains(Property.Final);
}

/**
* Returns the setting properties
* @see Property
Expand Down
26 changes: 14 additions & 12 deletions core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,29 @@

package org.elasticsearch.discovery;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.single.SingleNodeDiscovery;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* A module for loading classes for node discovery.
*/
Expand Down Expand Up @@ -83,6 +82,7 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
discoveryTypes.put("zen",
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, clusterService, hostsProvider));
discoveryTypes.put("none", () -> new NoneDiscovery(settings, clusterService, clusterService.getClusterSettings()));
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, clusterService));
for (DiscoveryPlugin plugin : plugins) {
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry,
clusterService, hostsProvider).entrySet().forEach(entry -> {
Expand All @@ -96,10 +96,12 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
if (discoverySupplier == null) {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
}
Loggers.getLogger(getClass(), settings).info("using discovery type [{}]", discoveryType);
discovery = Objects.requireNonNull(discoverySupplier.get());
}

public Discovery getDiscovery() {
return discovery;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery.single;

import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.discovery.zen.PendingClusterStatesQueue;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
* A discovery implementation where the only member of the cluster is the local node.
*/
public class SingleNodeDiscovery extends AbstractLifecycleComponent implements Discovery {

private final ClusterService clusterService;
private final DiscoverySettings discoverySettings;

public SingleNodeDiscovery(final Settings settings, final ClusterService clusterService) {
super(Objects.requireNonNull(settings));
this.clusterService = Objects.requireNonNull(clusterService);
final ClusterSettings clusterSettings =
Objects.requireNonNull(clusterService.getClusterSettings());
this.discoverySettings = new DiscoverySettings(settings, clusterSettings);
}

@Override
public DiscoveryNode localNode() {
return clusterService.localNode();
}

@Override
public String nodeDescription() {
return clusterService.getClusterName().value() + "/" + clusterService.localNode().getId();
}

@Override
public void setAllocationService(final AllocationService allocationService) {

}

@Override
public void publish(final ClusterChangedEvent event, final AckListener listener) {

}

@Override
public DiscoveryStats stats() {
return new DiscoveryStats((PendingClusterStateStats) null);
}

@Override
public DiscoverySettings getDiscoverySettings() {
return discoverySettings;
}

@Override
public void startInitialJoin() {
final ClusterStateTaskExecutor<DiscoveryNode> executor =
new ClusterStateTaskExecutor<DiscoveryNode>() {

@Override
public ClusterTasksResult<DiscoveryNode> execute(
final ClusterState current,
final List<DiscoveryNode> tasks) throws Exception {
assert tasks.size() == 1;
final DiscoveryNodes.Builder nodes =
DiscoveryNodes.builder(current.nodes());
// always set the local node as master, there will not be other nodes
nodes.masterNodeId(localNode().getId());
final ClusterState next =
ClusterState.builder(current).nodes(nodes).build();
final ClusterTasksResult.Builder<DiscoveryNode> result =
ClusterTasksResult.builder();
return result.successes(tasks).build(next);
}

@Override
public boolean runOnlyOnMaster() {
return false;
}

};
final ClusterStateTaskConfig config = ClusterStateTaskConfig.build(Priority.URGENT);
clusterService.submitStateUpdateTasks(
"single-node-start-initial-join",
Collections.singletonMap(localNode(), (s, e) -> {}), config, executor);
}

@Override
public int getMinimumMasterNodes() {
return 1;
}

@Override
protected void doStart() {

}

@Override
protected void doStop() {

}

@Override
protected void doClose() throws IOException {

}

}
Loading

0 comments on commit 8b0e059

Please sign in to comment.