Skip to content

Commit

Permalink
[ML] Don't install empty ML metadata on startup
Browse files Browse the repository at this point in the history
This change is to support rolling upgrade from a pre-6.3 default
distribution (i.e. without X-Pack) to a 6.3+ default distribution
(i.e. with X-Pack).

The ML metadata is no longer eagerly added to the cluster state
as soon as the master node has X-Pack available.  Instead, it
is added when the first ML job is created.

As a result all methods that get the ML metadata need to be able
to handle the situation where there is no ML metadata in the
current cluster state.  They do this by behaving as though an
empty ML metadata was present.  This logic is encapsulated by
always asking for the current ML metadata using a static method
on the MlMetadata class.

Relates elastic#30731
  • Loading branch information
droberts195 committed May 21, 2018
1 parent ea09667 commit 04eb564
Show file tree
Hide file tree
Showing 34 changed files with 99 additions and 256 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.NamedDiff;
Expand Down Expand Up @@ -467,6 +468,14 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis
}
}

public static MlMetadata getMlMetadata(ClusterState state) {
MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(MLMetadataField.TYPE);
if (mlMetadata == null) {
return EMPTY_METADATA;
}
return mlMetadata;
}

public static class JobAlreadyMarkedAsDeletedException extends RuntimeException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.elasticsearch.xpack.core.ml.job.persistence;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;

/**
Expand Down Expand Up @@ -47,8 +46,7 @@ public static String resultsWriteAlias(String jobId) {
* @return The index name
*/
public static String getPhysicalIndexFromState(ClusterState state, String jobId) {
MlMetadata meta = state.getMetaData().custom(MLMetadataField.TYPE);
return meta.getJobs().get(jobId).getResultsIndexName();
return MlMetadata.getMlMetadata(state).getJobs().get(jobId).getResultsIndexName();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
Expand Down Expand Up @@ -132,15 +131,7 @@ public Map<String, Object> nativeCodeInfo() {
@Override
public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
ClusterState state = clusterService.state();
MlMetadata mlMetadata = state.getMetaData().custom(MLMetadataField.TYPE);

// Handle case when usage is called but MlMetadata has not been installed yet
if (mlMetadata == null) {
listener.onResponse(new MachineLearningFeatureSetUsage(available(), enabled,
Collections.emptyMap(), Collections.emptyMap()));
} else {
new Retriever(client, mlMetadata, available(), enabled()).execute(listener);
}
new Retriever(client, MlMetadata.getMlMetadata(state), available(), enabled()).execute(listener);
}

public static class Retriever {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
Expand Down Expand Up @@ -90,8 +89,7 @@ public void clusterChanged(ClusterChangedEvent event) {
}
} else if (StartDatafeedAction.TASK_NAME.equals(currentTask.getTaskName())) {
String datafeedId = ((StartDatafeedAction.DatafeedParams) currentTask.getParams()).getDatafeedId();
MlMetadata mlMetadata = event.state().getMetaData().custom(MLMetadataField.TYPE);
DatafeedConfig datafeedConfig = mlMetadata.getDatafeed(datafeedId);
DatafeedConfig datafeedConfig = MlMetadata.getMlMetadata(event.state()).getDatafeed(datafeedId);
if (currentAssignment.getExecutorNode() == null) {
String msg = "No node found to start datafeed [" + datafeedId +"]. Reasons [" +
currentAssignment.getExplanation() + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,20 @@

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;

import java.util.concurrent.atomic.AtomicBoolean;

class MlInitializationService extends AbstractComponent implements ClusterStateListener {

private final ThreadPool threadPool;
private final ClusterService clusterService;
private final Client client;

private final AtomicBoolean installMlMetadataCheck = new AtomicBoolean(false);

private volatile MlDailyMaintenanceService mlDailyMaintenanceService;

MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client) {
Expand All @@ -48,45 +39,12 @@ public void clusterChanged(ClusterChangedEvent event) {
}

if (event.localNodeMaster()) {
MetaData metaData = event.state().metaData();
installMlMetadata(metaData);
installDailyMaintenanceService();
} else {
uninstallDailyMaintenanceService();
}
}

private void installMlMetadata(MetaData metaData) {
if (metaData.custom(MLMetadataField.TYPE) == null) {
if (installMlMetadataCheck.compareAndSet(false, true)) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() ->
clusterService.submitStateUpdateTask("install-ml-metadata", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// If the metadata has been added already don't try to update
if (currentState.metaData().custom(MLMetadataField.TYPE) != null) {
return currentState;
}
ClusterState.Builder builder = new ClusterState.Builder(currentState);
MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
metadataBuilder.putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA);
builder.metaData(metadataBuilder.build());
return builder.build();
}

@Override
public void onFailure(String source, Exception e) {
installMlMetadataCheck.set(false);
logger.error("unable to install ml metadata", e);
}
})
);
}
} else {
installMlMetadataCheck.set(false);
}
}

private void installDailyMaintenanceService() {
if (mlDailyMaintenanceService == null) {
mlDailyMaintenanceService = new MlDailyMaintenanceService(clusterService.getClusterName(), threadPool, client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
Expand Down Expand Up @@ -92,8 +91,7 @@ public TransportCloseJobAction(Settings settings, TransportService transportServ
static void resolveAndValidateJobId(CloseJobAction.Request request, ClusterState state, List<String> openJobIds,
List<String> closingJobIds) {
PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata maybeNull = state.metaData().custom(MLMetadataField.TYPE);
final MlMetadata mlMetadata = (maybeNull == null) ? MlMetadata.EMPTY_METADATA : maybeNull;
final MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);

List<String> failedJobs = new ArrayList<>();

Expand All @@ -107,7 +105,7 @@ static void resolveAndValidateJobId(CloseJobAction.Request request, ClusterState
};

Set<String> expandedJobIds = mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs());
expandedJobIds.stream().forEach(jobIdProcessor::accept);
expandedJobIds.forEach(jobIdProcessor::accept);
if (request.isForce() == false && failedJobs.size() > 0) {
if (expandedJobIds.size() == 1) {
throw ExceptionsHelper.conflictStatusException("cannot close job [{}] because it failed, use force close",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ protected DeleteDatafeedAction.Response newResponse(boolean acknowledged) {
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata currentMetadata = currentState.getMetaData().custom(MLMetadataField.TYPE);
public ClusterState execute(ClusterState currentState) {
MlMetadata currentMetadata = MlMetadata.getMlMetadata(currentState);
PersistentTasksCustomMetaData persistentTasks =
currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
Expand Down Expand Up @@ -60,8 +59,7 @@ protected void doExecute(DeleteFilterAction.Request request, ActionListener<Dele

final String filterId = request.getFilterId();
ClusterState state = clusterService.state();
MlMetadata currentMlMetadata = state.metaData().custom(MLMetadataField.TYPE);
Map<String, Job> jobs = currentMlMetadata.getJobs();
Map<String, Job> jobs = MlMetadata.getMlMetadata(state).getJobs();
List<String> currentlyUsedBy = new ArrayList<>();
for (Job job : jobs.values()) {
List<Detector> detectors = job.getAnalysisConfig().getDetectors();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,9 @@ public void onFailure(Exception e) {
void markJobAsDeleting(String jobId, ActionListener<Boolean> listener, boolean force) {
clusterService.submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata currentMlMetadata = currentState.metaData().custom(MLMetadataField.TYPE);
public ClusterState execute(ClusterState currentState) {
PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata);
MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState));
builder.markJobAsDeleted(jobId, tasks, force);
return buildNewClusterState(currentState, builder);
}
Expand Down Expand Up @@ -248,11 +247,7 @@ public void onTimeout(TimeValue timeout) {
}

static boolean jobIsDeletedFromState(String jobId, ClusterState clusterState) {
MlMetadata metadata = clusterState.metaData().custom(MLMetadataField.TYPE);
if (metadata == null) {
return true;
}
return !metadata.getJobs().containsKey(jobId);
return !MlMetadata.getMlMetadata(clusterState).getJobs().containsKey(jobId);
}

private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ protected void masterOperation(FinalizeJobExecutionAction.Request request, Clust
logger.debug("finalizing jobs [{}]", jobIdString);
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata mlMetadata = currentState.metaData().custom(MLMetadataField.TYPE);
public ClusterState execute(ClusterState currentState) {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
Date finishedTime = new Date();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetCalendarEventsAction;
import org.elasticsearch.xpack.core.ml.action.GetCalendarsAction;
Expand Down Expand Up @@ -70,7 +69,7 @@ protected void doExecute(GetCalendarEventsAction.Request request,

if (request.getJobId() != null) {
ClusterState state = clusterService.state();
MlMetadata currentMlMetadata = state.metaData().custom(MLMetadataField.TYPE);
MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(state);

List<String> jobGroups;
String requestId = request.getJobId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
Expand Down Expand Up @@ -52,10 +51,7 @@ protected void masterOperation(GetDatafeedsAction.Request request, ClusterState
ActionListener<GetDatafeedsAction.Response> listener) throws Exception {
logger.debug("Get datafeed '{}'", request.getDatafeedId());

MlMetadata mlMetadata = state.metaData().custom(MLMetadataField.TYPE);
if (mlMetadata == null) {
mlMetadata = MlMetadata.EMPTY_METADATA;
}
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds());
List<DatafeedConfig> datafeedConfigs = new ArrayList<>();
for (String expandedDatafeedId : expandedDatafeedIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
Expand Down Expand Up @@ -56,11 +55,7 @@ protected void masterOperation(GetDatafeedsStatsAction.Request request, ClusterS
ActionListener<GetDatafeedsStatsAction.Response> listener) throws Exception {
logger.debug("Get stats for datafeed '{}'", request.getDatafeedId());

MlMetadata mlMetadata = state.metaData().custom(MLMetadataField.TYPE);
if (mlMetadata == null) {
mlMetadata = MlMetadata.EMPTY_METADATA;
}

MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds());

PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
Expand Down Expand Up @@ -69,8 +68,7 @@ public TransportGetJobsStatsAction(Settings settings, TransportService transport

@Override
protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener<GetJobsStatsAction.Response> listener) {
MlMetadata clusterMlMetadata = clusterService.state().metaData().custom(MLMetadataField.TYPE);
MlMetadata mlMetadata = (clusterMlMetadata == null) ? MlMetadata.EMPTY_METADATA : clusterMlMetadata;
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state());
request.setExpandedJobsIds(new ArrayList<>(mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs())));
ActionListener<GetJobsStatsAction.Response> finalListener = listener;
listener = ActionListener.wrap(response -> gatherStatsForClosedJobs(mlMetadata,
Expand Down
Loading

0 comments on commit 04eb564

Please sign in to comment.