Skip to content

Commit

Permalink
Improve handling of failure to create persistent task (elastic#114386)
Browse files Browse the repository at this point in the history
Today if creating a persistent task fails with an exception then we
submit a cluster state update to fail the task but until that update
executes we will retry the failing task creation and cluster state
submission on all other cluster state updates that change the persistent
tasks metadata.

With this commit we register a placeholder task on the executing node to
block further attempts to create it until the cluster state update is
processed.
  • Loading branch information
DaveCTurner authored Oct 10, 2024
1 parent 84625c6 commit 9ae0140
Show file tree
Hide file tree
Showing 3 changed files with 296 additions and 23 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/114386.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 114386
summary: Improve handling of failure to create persistent task
area: Task Management
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.persistent;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;

import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class PersistentTaskCreationFailureIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(FailingCreationPersistentTasksPlugin.class);
}

private static boolean hasPersistentTask(ClusterState clusterState) {
return findTasks(clusterState, FailingCreationPersistentTaskExecutor.TASK_NAME).isEmpty() == false;
}

public void testPersistentTasksThatFailDuringCreationAreRemovedFromClusterState() {

final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final var plugins = StreamSupport.stream(internalCluster().getInstances(PluginsService.class).spliterator(), false)
.flatMap(ps -> ps.filterPlugins(FailingCreationPersistentTasksPlugin.class))
.toList();
plugins.forEach(plugin -> plugin.hasFailedToCreateTask.set(false));

final var taskCreatedListener = ClusterServiceUtils.addTemporaryStateListener(
masterClusterService,
PersistentTaskCreationFailureIT::hasPersistentTask
);

taskCreatedListener.andThenAccept(v -> {
// enqueue some higher-priority cluster state updates to check that they do not cause retries of the failing task creation step
for (int i = 0; i < 5; i++) {
masterClusterService.submitUnbatchedStateUpdateTask("test", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public ClusterState execute(ClusterState currentState) {
assertTrue(hasPersistentTask(currentState));

assertTrue(waitUntil(() -> {
final var completePersistentTaskPendingTasksCount = masterClusterService.getMasterService()
.pendingTasks()
.stream()
.filter(
pendingClusterTask -> pendingClusterTask.getSource().string().equals("finish persistent task (failed)")
)
.count();
assertThat(completePersistentTaskPendingTasksCount, lessThanOrEqualTo(1L));
return completePersistentTaskPendingTasksCount == 1L;
}));

return currentState.copyAndUpdateMetadata(
mdb -> mdb.putCustom(
PersistentTasksCustomMetadata.TYPE,
PersistentTasksCustomMetadata.builder(
PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(currentState)
)
// create and remove a fake task just to force a change in lastAllocationId so that
// PersistentTasksNodeService checks for changes and potentially retries
.addTask("test", "test", null, PersistentTasksCustomMetadata.INITIAL_ASSIGNMENT)
.removeTask("test")
.build()
)
);
}

@Override
public void onFailure(Exception e) {
fail(e);
}
});
}
});

safeAwait(
l -> internalCluster().getInstance(PersistentTasksService.class)
.sendStartRequest(
UUIDs.base64UUID(),
FailingCreationPersistentTaskExecutor.TASK_NAME,
new FailingCreationTaskParams(),
null,
l.map(ignored -> null)
)
);

safeAwait(
taskCreatedListener.<Void>andThen(
(l, v) -> ClusterServiceUtils.addTemporaryStateListener(
masterClusterService,
clusterState -> hasPersistentTask(clusterState) == false
).addListener(l)
)
);

assertEquals(1L, plugins.stream().filter(plugin -> plugin.hasFailedToCreateTask.get()).count());
}

public static class FailingCreationPersistentTasksPlugin extends Plugin implements PersistentTaskPlugin {

private final AtomicBoolean hasFailedToCreateTask = new AtomicBoolean();

@Override
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
ClusterService clusterService,
ThreadPool threadPool,
Client client,
SettingsModule settingsModule,
IndexNameExpressionResolver expressionResolver
) {
return List.of(new FailingCreationPersistentTaskExecutor(hasFailedToCreateTask));
}

@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return List.of(
new NamedWriteableRegistry.Entry(
PersistentTaskParams.class,
FailingCreationPersistentTaskExecutor.TASK_NAME,
FailingCreationTaskParams::new
)
);
}

@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return List.of(
new NamedXContentRegistry.Entry(
PersistentTaskParams.class,
new ParseField(FailingCreationPersistentTaskExecutor.TASK_NAME),
p -> {
p.skipChildren();
return new FailingCreationTaskParams();
}
)
);
}
}

public static class FailingCreationTaskParams implements PersistentTaskParams {
public FailingCreationTaskParams() {}

public FailingCreationTaskParams(StreamInput in) {}

@Override
public String getWriteableName() {
return FailingCreationPersistentTaskExecutor.TASK_NAME;
}

@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersion.current();
}

@Override
public void writeTo(StreamOutput out) throws IOException {}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.endObject();
return builder;
}
}

static class FailingCreationPersistentTaskExecutor extends PersistentTasksExecutor<FailingCreationTaskParams> {
static final String TASK_NAME = "cluster:admin/persistent/test_creation_failure";

private final AtomicBoolean hasFailedToCreateTask;

FailingCreationPersistentTaskExecutor(AtomicBoolean hasFailedToCreateTask) {
super(TASK_NAME, r -> fail("execution is unexpected"));
this.hasFailedToCreateTask = hasFailedToCreateTask;
}

@Override
protected AllocatedPersistentTask createTask(
long id,
String type,
String action,
TaskId parentTaskId,
PersistentTasksCustomMetadata.PersistentTask<FailingCreationTaskParams> taskInProgress,
Map<String, String> headers
) {
assertTrue("already failed before", hasFailedToCreateTask.compareAndSet(false, true));
throw new RuntimeException("simulated");
}

@Override
protected void nodeOperation(AllocatedPersistentTask task, FailingCreationTaskParams params, PersistentTaskState state) {
fail("execution is unexpected");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.tasks.Task;
Expand All @@ -32,6 +33,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;

import static java.util.Objects.requireNonNull;
import static org.elasticsearch.core.Strings.format;
Expand Down Expand Up @@ -172,41 +174,65 @@ private <Params extends PersistentTaskParams> void startTask(PersistentTask<Para
taskInProgress.getTaskName()
);

TaskAwareRequest request = new TaskAwareRequest() {
TaskId parentTaskId = new TaskId("cluster", taskInProgress.getAllocationId());
final var request = new PersistentTaskAwareRequest<>(taskInProgress, executor);
try (var ignored = threadPool.getThreadContext().newTraceContext()) {
doStartTask(taskInProgress, executor, request);
}
}

@Override
public void setParentTask(TaskId taskId) {
throw new UnsupportedOperationException("parent task if for persistent tasks shouldn't change");
}
/**
* A {@link TaskAwareRequest} which creates the relevant task using a {@link PersistentTasksExecutor}.
*/
private static class PersistentTaskAwareRequest<Params extends PersistentTaskParams> implements TaskAwareRequest {
private final PersistentTask<Params> taskInProgress;
private final TaskId parentTaskId;
private final PersistentTasksExecutor<Params> executor;

private PersistentTaskAwareRequest(PersistentTask<Params> taskInProgress, PersistentTasksExecutor<Params> executor) {
this.taskInProgress = taskInProgress;
this.parentTaskId = new TaskId("cluster", taskInProgress.getAllocationId());
this.executor = executor;
}

@Override
public void setRequestId(long requestId) {
throw new UnsupportedOperationException("does not have a request ID");
}
@Override
public void setParentTask(TaskId taskId) {
throw new UnsupportedOperationException("parent task if for persistent tasks shouldn't change");
}

@Override
public TaskId getParentTask() {
return parentTaskId;
}
@Override
public void setRequestId(long requestId) {
throw new UnsupportedOperationException("does not have a request ID");
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return executor.createTask(id, type, action, parentTaskId, taskInProgress, headers);
}
};
@Override
public TaskId getParentTask() {
return parentTaskId;
}

try (var ignored = threadPool.getThreadContext().newTraceContext()) {
doStartTask(taskInProgress, executor, request);
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return executor.createTask(id, type, action, parentTaskId, taskInProgress, headers);
}
}

/**
* A no-op {@link PersistentTasksExecutor} to create a placeholder task if creating the real task fails for some reason.
*/
private static class PersistentTaskStartupFailureExecutor<Params extends PersistentTaskParams> extends PersistentTasksExecutor<Params> {
PersistentTaskStartupFailureExecutor(String taskName, Executor executor) {
super(taskName, executor);
}

@Override
protected void nodeOperation(AllocatedPersistentTask task, Params params, PersistentTaskState state) {}
}

private <Params extends PersistentTaskParams> void doStartTask(
PersistentTask<Params> taskInProgress,
PersistentTasksExecutor<Params> executor,
TaskAwareRequest request
) {
AllocatedPersistentTask task;
final AllocatedPersistentTask task;
try {
task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]", request);
} catch (Exception e) {
Expand All @@ -220,7 +246,21 @@ private <Params extends PersistentTaskParams> void doStartTask(
+ "], removing from persistent tasks",
e
);
notifyMasterOfFailedTask(taskInProgress, e);

// create a no-op placeholder task so that we don't keep trying to start this task while we wait for the cluster state update
// which handles the failure
final var placeholderTask = (AllocatedPersistentTask) taskManager.register(
"persistent",
taskInProgress.getTaskName() + "[c]",
new PersistentTaskAwareRequest<>(
taskInProgress,
new PersistentTaskStartupFailureExecutor<>(executor.getTaskName(), EsExecutors.DIRECT_EXECUTOR_SERVICE)
)
);
placeholderTask.init(persistentTasksService, taskManager, taskInProgress.getId(), taskInProgress.getAllocationId());
taskManager.unregister(placeholderTask);
runningTasks.put(taskInProgress.getAllocationId(), placeholderTask);
placeholderTask.markAsFailed(e);
return;
}

Expand Down

0 comments on commit 9ae0140

Please sign in to comment.