Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reload secret store #28244

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.elasticsearch.action.admin.cluster.settings;

import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -44,14 +46,15 @@ public class ClusterUpdateSettingsRequest extends AcknowledgedRequest<ClusterUpd

private Settings transientSettings = EMPTY_SETTINGS;
private Settings persistentSettings = EMPTY_SETTINGS;
private String secretStorePassword;

public ClusterUpdateSettingsRequest() {
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (transientSettings.isEmpty() && persistentSettings.isEmpty()) {
if (transientSettings.isEmpty() && persistentSettings.isEmpty() && Strings.isNullOrEmpty(secretStorePassword)) {
validationException = addValidationError("no settings to update", validationException);
}
return validationException;
Expand All @@ -65,6 +68,10 @@ public Settings persistentSettings() {
return persistentSettings;
}

public String secretStorePassword() {
return secretStorePassword;
}

/**
* Sets the transient settings to be updated. They will not survive a full cluster restart
*/
Expand Down Expand Up @@ -95,10 +102,10 @@ public ClusterUpdateSettingsRequest transientSettings(String source, XContentTyp
@SuppressWarnings("unchecked")
public ClusterUpdateSettingsRequest transientSettings(Map source) {
try {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
final XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
builder.map(source);
transientSettings(builder.string(), builder.contentType());
} catch (IOException e) {
} catch (final IOException e) {
throw new ElasticsearchGenerationException("Failed to generate [" + source + "]", e);
}
return this;
Expand Down Expand Up @@ -134,26 +141,39 @@ public ClusterUpdateSettingsRequest persistentSettings(String source, XContentTy
@SuppressWarnings("unchecked")
public ClusterUpdateSettingsRequest persistentSettings(Map source) {
try {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
final XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
builder.map(source);
persistentSettings(builder.string(), builder.contentType());
} catch (IOException e) {
} catch (final IOException e) {
throw new ElasticsearchGenerationException("Failed to generate [" + source + "]", e);
}
return this;
}

public ClusterUpdateSettingsRequest secretStorePassword(String secretStorePassword) {
this.secretStorePassword = secretStorePassword;
return this;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
transientSettings = readSettingsFromStream(in);
persistentSettings = readSettingsFromStream(in);
// TODO version condition
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
secretStorePassword = in.readOptionalString();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeSettingsToStream(transientSettings, out);
writeSettingsToStream(persistentSettings, out);
// TODO version condition
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeOptionalString(secretStorePassword);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,69 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.KeyStoreWrapper;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public class TransportClusterUpdateSettingsAction extends
TransportMasterNodeAction<ClusterUpdateSettingsRequest, ClusterUpdateSettingsResponse> {

private final AllocationService allocationService;

private final ClusterSettings clusterSettings;
private final TransportService transportService;
private final Environment environment;
private final String transportNodeActionName = "cluster:admin/settings/update_secure[n]";

@Inject
public TransportClusterUpdateSettingsAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, AllocationService allocationService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, ClusterSettings clusterSettings) {
IndexNameExpressionResolver indexNameExpressionResolver, ClusterSettings clusterSettings,
Environment environment) {
super(settings, ClusterUpdateSettingsAction.NAME, false, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, ClusterUpdateSettingsRequest::new);
this.allocationService = allocationService;
this.clusterSettings = clusterSettings;
this.transportService = transportService;
this.environment = environment;
transportService.registerRequestHandler(transportNodeActionName, NodeRequest::new, ThreadPool.Names.GENERIC,
new NodeTransportHandler());
}

@Override
Expand Down Expand Up @@ -122,10 +153,9 @@ private void reroute(final boolean updateSettingsAcked) {
// We're about to send a second update task, so we need to check if we're still the elected master
// For example the minimum_master_node could have been breached and we're no longer elected master,
// so we should *not* execute the reroute.
if (!clusterService.state().nodes().isLocalNodeElectedMaster()) {
if (clusterService.state().nodes().isLocalNodeElectedMaster() == false) {
logger.debug("Skipping reroute after cluster update settings, because node is no longer master");
listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, updater.getTransientUpdates(),
updater.getPersistentUpdate()));
maybeTriggerKeystoreReload(request, updateSettingsAcked);
return;
}

Expand All @@ -149,12 +179,21 @@ protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
return new ClusterUpdateSettingsResponse(updateSettingsAcked && acknowledged, updater.getTransientUpdates(),
updater.getPersistentUpdate());
}

@Override
public void onAllNodesAcked(@Nullable Exception e) {
maybeTriggerKeystoreReload(request, updateSettingsAcked);
}

@Override
public void onAckTimeout() {
maybeTriggerKeystoreReload(request, false);
}

@Override
public void onNoLongerMaster(String source) {
logger.debug("failed to preform reroute after cluster settings were updated - current node is no longer a master");
listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, updater.getTransientUpdates(),
updater.getPersistentUpdate()));
logger.debug("failed to perform reroute after cluster settings were updated - current node is no longer a master");
maybeTriggerKeystoreReload(request, updateSettingsAcked);
}

@Override
Expand All @@ -180,11 +219,168 @@ public void onFailure(String source, Exception e) {

@Override
public ClusterState execute(final ClusterState currentState) {
ClusterState clusterState = updater.updateSettings(currentState, request.transientSettings(), request.persistentSettings());
final ClusterState clusterState = updater.updateSettings(currentState, request.transientSettings(), request.persistentSettings());
changed = clusterState != currentState;
return clusterState;
}

private void maybeTriggerKeystoreReload(ClusterUpdateSettingsRequest request, boolean updateSettingsAcked) {
if (request.secretStorePassword() == null) {
listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, updater.getTransientUpdates(),
updater.getPersistentUpdate()));
return;
}
final TransportRequestOptions.Builder optionsBuilder = TransportRequestOptions.builder();
if (request.timeout() != null) {
optionsBuilder.withTimeout(request.timeout());
}
final DiscoveryNodes nodes = clusterService.state().nodes();
final AtomicReference<Map<String, byte[]>> secretSettingHashes = new AtomicReference<>();
final AtomicBoolean success = new AtomicBoolean(true);
final AtomicInteger counter = new AtomicInteger(nodes.getSize());
for (final DiscoveryNode node : nodes) {
final TransportRequest nodeRequest = new NodeRequest(node.getId(), request.secretStorePassword());
transportService.sendRequest(node, transportNodeActionName, nodeRequest, optionsBuilder.build(),
new TransportResponseHandler<NodeResponse>() {
@Override
public NodeResponse newInstance() {
return new NodeResponse();
}

@Override
public void handleResponse(NodeResponse response) {
secretSettingHashes.compareAndSet(null, response.secretSettingHashes);
if (compareSecretSettingHashes(secretSettingHashes.get(), response.secretSettingHashes) == false) {
success.set(false);
}
if (counter.decrementAndGet() == 0) {
listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked && success.get(),
updater.getTransientUpdates(), updater.getPersistentUpdate()));
}
}

@Override
public void handleException(TransportException exp) {
success.set(false);
if (counter.decrementAndGet() == 0) {
listener.onResponse(new ClusterUpdateSettingsResponse(false, updater.getTransientUpdates(),
updater.getPersistentUpdate()));
}
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
}

private boolean compareSecretSettingHashes(Map<String, byte[]> m1, Map<String, byte[]> m2) {
assert m1 != null && m2 != null;
if (m1.size() != m2.size()) {
return false;
}
for (final Map.Entry<String, byte[]> e : m1.entrySet()) {
if (Arrays.equals(e.getValue(), m2.get(e.getKey())) == false) {
return false;
}
}
return true;
}
});
}

private class NodeTransportHandler implements TransportRequestHandler<NodeRequest> {

@Override
public void messageReceived(NodeRequest request, TransportChannel channel, Task task) throws Exception {
messageReceived(request, channel);
}

@Override
public void messageReceived(NodeRequest request, TransportChannel channel) throws Exception {
final Map<String, byte[]> secretHashes = new HashMap<>();
// TODO Search for abstract components using SecureSetting s and pass them the
// newly decrypted keystore
KeyStoreWrapper keystore = null;
try {
keystore = KeyStoreWrapper.load(environment.configFile());
keystore.decrypt(new char[0] /* use password from request */);
for (final String settingName : keystore.getSettingNames()) {
if (settingName.equals(KeyStoreWrapper.SEED_SETTING.getKey())) {
continue;
}
// assume all secure setting values are string typed
final MessageDigest valueDigest = MessageDigest.getInstance("SHA-256");
try (SecureString settingValue = keystore.getString(settingName)) {
for (final char c : settingValue.getChars()) {
valueDigest.update((byte)c);
}
}
secretHashes.put(settingName, valueDigest.digest());
}
} finally {
if (keystore != null) {
keystore.close();
}
}
channel.sendResponse(new NodeResponse(clusterService.localNode(), secretHashes));
}

}

private static class NodeRequest extends BaseNodeRequest {
private String secretStorePassword;

NodeRequest() {
}

NodeRequest(String nodeId, String secretStorePassword) {
super(nodeId);
this.secretStorePassword = secretStorePassword;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.secretStorePassword = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(this.secretStorePassword);
}
}

private static class NodeResponse extends BaseNodeResponse {

private Map<String, byte[]> secretSettingHashes;

NodeResponse() {
}

public NodeResponse(DiscoveryNode node, Map<String, byte[]> secretSettingHashes) {
super(node);
this.secretSettingHashes = secretSettingHashes;
}

public Map<String, byte[]> secretSettingHashes() {
return this.secretSettingHashes;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.secretSettingHashes = in.readMap(StreamInput::readString, StreamInput::readByteArray);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(this.secretSettingHashes, StreamOutput::writeString, StreamOutput::writeByteArray);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
if (source.containsKey("persistent")) {
clusterUpdateSettingsRequest.persistentSettings((Map) source.get("persistent"));
}
if (source.containsKey("secretStorePassword")) {
clusterUpdateSettingsRequest.secretStorePassword((String) source.get("secretStorePassword"));
}

return channel -> client.admin().cluster().updateSettings(clusterUpdateSettingsRequest,
new AcknowledgedRestListener<ClusterUpdateSettingsResponse>(channel) {
Expand Down