Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handling of failure to create persistent task #114386

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/114386.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 114386
summary: Improve handling of failure to create persistent task
area: Task Management
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.persistent;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;

import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class PersistentTaskCreationFailureIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(FailingCreationPersistentTasksPlugin.class);
}

private static boolean hasPersistentTask(ClusterState clusterState) {
return findTasks(clusterState, FailingCreationPersistentTaskExecutor.TASK_NAME).isEmpty() == false;
}

public void testPersistentTasksThatFailDuringCreationAreRemovedFromClusterState() {

final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final var plugins = StreamSupport.stream(internalCluster().getInstances(PluginsService.class).spliterator(), false)
.flatMap(ps -> ps.filterPlugins(FailingCreationPersistentTasksPlugin.class))
.toList();
plugins.forEach(plugin -> plugin.hasFailedToCreateTask.set(false));

final var taskCreatedListener = ClusterServiceUtils.addTemporaryStateListener(
masterClusterService,
PersistentTaskCreationFailureIT::hasPersistentTask
);

taskCreatedListener.andThenAccept(v -> {
// enqueue some higher-priority cluster state updates to check that they do not cause retries of the failing task creation step
for (int i = 0; i < 5; i++) {
masterClusterService.submitUnbatchedStateUpdateTask("test", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public ClusterState execute(ClusterState currentState) {
assertTrue(hasPersistentTask(currentState));

assertTrue(waitUntil(() -> {
final var completePersistentTaskPendingTasksCount = masterClusterService.getMasterService()
.pendingTasks()
.stream()
.filter(
pendingClusterTask -> pendingClusterTask.getSource().string().equals("finish persistent task (failed)")
)
.count();
assertThat(completePersistentTaskPendingTasksCount, lessThanOrEqualTo(1L));
return completePersistentTaskPendingTasksCount == 1L;
}));

return currentState.copyAndUpdateMetadata(
mdb -> mdb.putCustom(
PersistentTasksCustomMetadata.TYPE,
PersistentTasksCustomMetadata.builder(
PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(currentState)
)
// create and remove a fake task just to force a change in lastAllocationId so that
// PersistentTasksNodeService checks for changes and potentially retries
.addTask("test", "test", null, PersistentTasksCustomMetadata.INITIAL_ASSIGNMENT)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to force a change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid the fast-path through PersistentTasksNodeService that avoids the retry bug if this bit of the metadata is unchanged:

if (Objects.equals(tasks, previousTasks) == false || event.nodesChanged()) {

.removeTask("test")
.build()
)
);
}

@Override
public void onFailure(Exception e) {
fail(e);
}
});
}
});

safeAwait(
l -> internalCluster().getInstance(PersistentTasksService.class)
.sendStartRequest(
UUIDs.base64UUID(),
FailingCreationPersistentTaskExecutor.TASK_NAME,
new FailingCreationTaskParams(),
null,
l.map(ignored -> null)
)
);

safeAwait(
taskCreatedListener.<Void>andThen(
(l, v) -> ClusterServiceUtils.addTemporaryStateListener(
masterClusterService,
clusterState -> hasPersistentTask(clusterState) == false
).addListener(l)
)
);

assertEquals(1L, plugins.stream().filter(plugin -> plugin.hasFailedToCreateTask.get()).count());
}

public static class FailingCreationPersistentTasksPlugin extends Plugin implements PersistentTaskPlugin {

private final AtomicBoolean hasFailedToCreateTask = new AtomicBoolean();

@Override
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
ClusterService clusterService,
ThreadPool threadPool,
Client client,
SettingsModule settingsModule,
IndexNameExpressionResolver expressionResolver
) {
return List.of(new FailingCreationPersistentTaskExecutor(hasFailedToCreateTask));
}

@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return List.of(
new NamedWriteableRegistry.Entry(
PersistentTaskParams.class,
FailingCreationPersistentTaskExecutor.TASK_NAME,
FailingCreationTaskParams::new
)
);
}

@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return List.of(
new NamedXContentRegistry.Entry(
PersistentTaskParams.class,
new ParseField(FailingCreationPersistentTaskExecutor.TASK_NAME),
p -> {
p.skipChildren();
return new FailingCreationTaskParams();
}
)
);
}
}

public static class FailingCreationTaskParams implements PersistentTaskParams {
public FailingCreationTaskParams() {}

public FailingCreationTaskParams(StreamInput in) {}

@Override
public String getWriteableName() {
return FailingCreationPersistentTaskExecutor.TASK_NAME;
}

@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersion.current();
}

@Override
public void writeTo(StreamOutput out) throws IOException {}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.endObject();
return builder;
}
}

static class FailingCreationPersistentTaskExecutor extends PersistentTasksExecutor<FailingCreationTaskParams> {
static final String TASK_NAME = "cluster:admin/persistent/test_creation_failure";

private final AtomicBoolean hasFailedToCreateTask;

FailingCreationPersistentTaskExecutor(AtomicBoolean hasFailedToCreateTask) {
super(TASK_NAME, r -> fail("execution is unexpected"));
this.hasFailedToCreateTask = hasFailedToCreateTask;
}

@Override
protected AllocatedPersistentTask createTask(
long id,
String type,
String action,
TaskId parentTaskId,
PersistentTasksCustomMetadata.PersistentTask<FailingCreationTaskParams> taskInProgress,
Map<String, String> headers
) {
assertTrue("already failed before", hasFailedToCreateTask.compareAndSet(false, true));
throw new RuntimeException("simulated");
}

@Override
protected void nodeOperation(AllocatedPersistentTask task, FailingCreationTaskParams params, PersistentTaskState state) {
fail("execution is unexpected");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.tasks.Task;
Expand All @@ -32,6 +33,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;

import static java.util.Objects.requireNonNull;
import static org.elasticsearch.core.Strings.format;
Expand Down Expand Up @@ -172,41 +174,65 @@ private <Params extends PersistentTaskParams> void startTask(PersistentTask<Para
taskInProgress.getTaskName()
);

TaskAwareRequest request = new TaskAwareRequest() {
TaskId parentTaskId = new TaskId("cluster", taskInProgress.getAllocationId());
final var request = new PersistentTaskAwareRequest<>(taskInProgress, executor);
try (var ignored = threadPool.getThreadContext().newTraceContext()) {
doStartTask(taskInProgress, executor, request);
}
}

@Override
public void setParentTask(TaskId taskId) {
throw new UnsupportedOperationException("parent task if for persistent tasks shouldn't change");
}
/**
* A {@link TaskAwareRequest} which creates the relevant task using a {@link PersistentTasksExecutor}.
*/
private static class PersistentTaskAwareRequest<Params extends PersistentTaskParams> implements TaskAwareRequest {
private final PersistentTask<Params> taskInProgress;
private final TaskId parentTaskId;
private final PersistentTasksExecutor<Params> executor;

private PersistentTaskAwareRequest(PersistentTask<Params> taskInProgress, PersistentTasksExecutor<Params> executor) {
this.taskInProgress = taskInProgress;
this.parentTaskId = new TaskId("cluster", taskInProgress.getAllocationId());
this.executor = executor;
}

@Override
public void setRequestId(long requestId) {
throw new UnsupportedOperationException("does not have a request ID");
}
@Override
public void setParentTask(TaskId taskId) {
throw new UnsupportedOperationException("parent task if for persistent tasks shouldn't change");
}

@Override
public TaskId getParentTask() {
return parentTaskId;
}
@Override
public void setRequestId(long requestId) {
throw new UnsupportedOperationException("does not have a request ID");
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return executor.createTask(id, type, action, parentTaskId, taskInProgress, headers);
}
};
@Override
public TaskId getParentTask() {
return parentTaskId;
}

try (var ignored = threadPool.getThreadContext().newTraceContext()) {
doStartTask(taskInProgress, executor, request);
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return executor.createTask(id, type, action, parentTaskId, taskInProgress, headers);
}
}

/**
* A no-op {@link PersistentTasksExecutor} to create a placeholder task if creating the real task fails for some reason.
*/
private static class PersistentTaskStartupFailureExecutor<Params extends PersistentTaskParams> extends PersistentTasksExecutor<Params> {
PersistentTaskStartupFailureExecutor(String taskName, Executor executor) {
super(taskName, executor);
}

@Override
protected void nodeOperation(AllocatedPersistentTask task, Params params, PersistentTaskState state) {}
}

private <Params extends PersistentTaskParams> void doStartTask(
PersistentTask<Params> taskInProgress,
PersistentTasksExecutor<Params> executor,
TaskAwareRequest request
) {
AllocatedPersistentTask task;
final AllocatedPersistentTask task;
try {
task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]", request);
} catch (Exception e) {
Expand All @@ -220,7 +246,21 @@ private <Params extends PersistentTaskParams> void doStartTask(
+ "], removing from persistent tasks",
e
);
notifyMasterOfFailedTask(taskInProgress, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you choose placeholder approach? Can it be a bug in how master handles task failures?

Copy link
Contributor Author

@DaveCTurner DaveCTurner Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's just that we assume here that the notifyMasterOfFailedTask takes effect immediately and that isn't true always (see linked case). We need to record the fact that we processed this task creation so that we don't keep on retrying until the master gets around to processing the failure notification.


// create a no-op placeholder task so that we don't keep trying to start this task while we wait for the cluster state update
// which handles the failure
final var placeholderTask = (AllocatedPersistentTask) taskManager.register(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this register fail too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how, the creation of this task doesn't do anything interesting enough to fail AFAICT.

"persistent",
taskInProgress.getTaskName() + "[c]",
new PersistentTaskAwareRequest<>(
taskInProgress,
new PersistentTaskStartupFailureExecutor<>(executor.getTaskName(), EsExecutors.DIRECT_EXECUTOR_SERVICE)
)
);
placeholderTask.init(persistentTasksService, taskManager, taskInProgress.getId(), taskInProgress.getAllocationId());
taskManager.unregister(placeholderTask);
runningTasks.put(taskInProgress.getAllocationId(), placeholderTask);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can elaborate more how placeholder will be retried or removed, please?

Copy link
Contributor Author

@DaveCTurner DaveCTurner Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The placeholderTask.markAsFailed(e) call (on the line below this one) tells the master that the task failed, which (eventually) removes the task assignment from the cluster state.

placeholderTask.markAsFailed(e);
return;
}

Expand Down