-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve handling of failure to create persistent task #114386
Changes from all commits
93c5cc1
96c3717
9008b11
b0397fe
2fdb089
47fe955
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 114386 | ||
summary: Improve handling of failure to create persistent task | ||
area: Task Management | ||
type: bug | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,228 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.persistent; | ||
|
||
import org.elasticsearch.TransportVersion; | ||
import org.elasticsearch.client.internal.Client; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.ClusterStateUpdateTask; | ||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.Priority; | ||
import org.elasticsearch.common.UUIDs; | ||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.settings.SettingsModule; | ||
import org.elasticsearch.plugins.PersistentTaskPlugin; | ||
import org.elasticsearch.plugins.Plugin; | ||
import org.elasticsearch.plugins.PluginsService; | ||
import org.elasticsearch.tasks.TaskId; | ||
import org.elasticsearch.test.ClusterServiceUtils; | ||
import org.elasticsearch.test.ESIntegTestCase; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.xcontent.NamedXContentRegistry; | ||
import org.elasticsearch.xcontent.ParseField; | ||
import org.elasticsearch.xcontent.XContentBuilder; | ||
|
||
import java.io.IOException; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.stream.StreamSupport; | ||
|
||
import static org.hamcrest.Matchers.lessThanOrEqualTo; | ||
|
||
public class PersistentTaskCreationFailureIT extends ESIntegTestCase { | ||
@Override | ||
protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
return List.of(FailingCreationPersistentTasksPlugin.class); | ||
} | ||
|
||
private static boolean hasPersistentTask(ClusterState clusterState) { | ||
return findTasks(clusterState, FailingCreationPersistentTaskExecutor.TASK_NAME).isEmpty() == false; | ||
} | ||
|
||
public void testPersistentTasksThatFailDuringCreationAreRemovedFromClusterState() { | ||
|
||
final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); | ||
final var plugins = StreamSupport.stream(internalCluster().getInstances(PluginsService.class).spliterator(), false) | ||
.flatMap(ps -> ps.filterPlugins(FailingCreationPersistentTasksPlugin.class)) | ||
.toList(); | ||
plugins.forEach(plugin -> plugin.hasFailedToCreateTask.set(false)); | ||
|
||
final var taskCreatedListener = ClusterServiceUtils.addTemporaryStateListener( | ||
masterClusterService, | ||
PersistentTaskCreationFailureIT::hasPersistentTask | ||
); | ||
|
||
taskCreatedListener.andThenAccept(v -> { | ||
// enqueue some higher-priority cluster state updates to check that they do not cause retries of the failing task creation step | ||
for (int i = 0; i < 5; i++) { | ||
masterClusterService.submitUnbatchedStateUpdateTask("test", new ClusterStateUpdateTask(Priority.IMMEDIATE) { | ||
@Override | ||
public ClusterState execute(ClusterState currentState) { | ||
assertTrue(hasPersistentTask(currentState)); | ||
|
||
assertTrue(waitUntil(() -> { | ||
final var completePersistentTaskPendingTasksCount = masterClusterService.getMasterService() | ||
.pendingTasks() | ||
.stream() | ||
.filter( | ||
pendingClusterTask -> pendingClusterTask.getSource().string().equals("finish persistent task (failed)") | ||
) | ||
.count(); | ||
assertThat(completePersistentTaskPendingTasksCount, lessThanOrEqualTo(1L)); | ||
return completePersistentTaskPendingTasksCount == 1L; | ||
})); | ||
|
||
return currentState.copyAndUpdateMetadata( | ||
mdb -> mdb.putCustom( | ||
PersistentTasksCustomMetadata.TYPE, | ||
PersistentTasksCustomMetadata.builder( | ||
PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(currentState) | ||
) | ||
// create and remove a fake task just to force a change in lastAllocationId so that | ||
// PersistentTasksNodeService checks for changes and potentially retries | ||
.addTask("test", "test", null, PersistentTasksCustomMetadata.INITIAL_ASSIGNMENT) | ||
.removeTask("test") | ||
.build() | ||
) | ||
); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
fail(e); | ||
} | ||
}); | ||
} | ||
}); | ||
|
||
safeAwait( | ||
l -> internalCluster().getInstance(PersistentTasksService.class) | ||
.sendStartRequest( | ||
UUIDs.base64UUID(), | ||
FailingCreationPersistentTaskExecutor.TASK_NAME, | ||
new FailingCreationTaskParams(), | ||
null, | ||
l.map(ignored -> null) | ||
) | ||
); | ||
|
||
safeAwait( | ||
taskCreatedListener.<Void>andThen( | ||
(l, v) -> ClusterServiceUtils.addTemporaryStateListener( | ||
masterClusterService, | ||
clusterState -> hasPersistentTask(clusterState) == false | ||
).addListener(l) | ||
) | ||
); | ||
|
||
assertEquals(1L, plugins.stream().filter(plugin -> plugin.hasFailedToCreateTask.get()).count()); | ||
} | ||
|
||
public static class FailingCreationPersistentTasksPlugin extends Plugin implements PersistentTaskPlugin { | ||
|
||
private final AtomicBoolean hasFailedToCreateTask = new AtomicBoolean(); | ||
|
||
@Override | ||
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor( | ||
ClusterService clusterService, | ||
ThreadPool threadPool, | ||
Client client, | ||
SettingsModule settingsModule, | ||
IndexNameExpressionResolver expressionResolver | ||
) { | ||
return List.of(new FailingCreationPersistentTaskExecutor(hasFailedToCreateTask)); | ||
} | ||
|
||
@Override | ||
public List<NamedWriteableRegistry.Entry> getNamedWriteables() { | ||
return List.of( | ||
new NamedWriteableRegistry.Entry( | ||
PersistentTaskParams.class, | ||
FailingCreationPersistentTaskExecutor.TASK_NAME, | ||
FailingCreationTaskParams::new | ||
) | ||
); | ||
} | ||
|
||
@Override | ||
public List<NamedXContentRegistry.Entry> getNamedXContent() { | ||
return List.of( | ||
new NamedXContentRegistry.Entry( | ||
PersistentTaskParams.class, | ||
new ParseField(FailingCreationPersistentTaskExecutor.TASK_NAME), | ||
p -> { | ||
p.skipChildren(); | ||
return new FailingCreationTaskParams(); | ||
} | ||
) | ||
); | ||
} | ||
} | ||
|
||
public static class FailingCreationTaskParams implements PersistentTaskParams { | ||
public FailingCreationTaskParams() {} | ||
|
||
public FailingCreationTaskParams(StreamInput in) {} | ||
|
||
@Override | ||
public String getWriteableName() { | ||
return FailingCreationPersistentTaskExecutor.TASK_NAME; | ||
} | ||
|
||
@Override | ||
public TransportVersion getMinimalSupportedVersion() { | ||
return TransportVersion.current(); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException {} | ||
|
||
@Override | ||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { | ||
builder.startObject(); | ||
builder.endObject(); | ||
return builder; | ||
} | ||
} | ||
|
||
static class FailingCreationPersistentTaskExecutor extends PersistentTasksExecutor<FailingCreationTaskParams> { | ||
static final String TASK_NAME = "cluster:admin/persistent/test_creation_failure"; | ||
|
||
private final AtomicBoolean hasFailedToCreateTask; | ||
|
||
FailingCreationPersistentTaskExecutor(AtomicBoolean hasFailedToCreateTask) { | ||
super(TASK_NAME, r -> fail("execution is unexpected")); | ||
this.hasFailedToCreateTask = hasFailedToCreateTask; | ||
} | ||
|
||
@Override | ||
protected AllocatedPersistentTask createTask( | ||
long id, | ||
String type, | ||
String action, | ||
TaskId parentTaskId, | ||
PersistentTasksCustomMetadata.PersistentTask<FailingCreationTaskParams> taskInProgress, | ||
Map<String, String> headers | ||
) { | ||
assertTrue("already failed before", hasFailedToCreateTask.compareAndSet(false, true)); | ||
throw new RuntimeException("simulated"); | ||
} | ||
|
||
@Override | ||
protected void nodeOperation(AllocatedPersistentTask task, FailingCreationTaskParams params, PersistentTaskState state) { | ||
fail("execution is unexpected"); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.util.concurrent.EsExecutors; | ||
import org.elasticsearch.gateway.GatewayService; | ||
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; | ||
import org.elasticsearch.tasks.Task; | ||
|
@@ -32,6 +33,7 @@ | |
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
import java.util.concurrent.Executor; | ||
|
||
import static java.util.Objects.requireNonNull; | ||
import static org.elasticsearch.core.Strings.format; | ||
|
@@ -172,41 +174,65 @@ private <Params extends PersistentTaskParams> void startTask(PersistentTask<Para | |
taskInProgress.getTaskName() | ||
); | ||
|
||
TaskAwareRequest request = new TaskAwareRequest() { | ||
TaskId parentTaskId = new TaskId("cluster", taskInProgress.getAllocationId()); | ||
final var request = new PersistentTaskAwareRequest<>(taskInProgress, executor); | ||
try (var ignored = threadPool.getThreadContext().newTraceContext()) { | ||
doStartTask(taskInProgress, executor, request); | ||
} | ||
} | ||
|
||
@Override | ||
public void setParentTask(TaskId taskId) { | ||
throw new UnsupportedOperationException("parent task if for persistent tasks shouldn't change"); | ||
} | ||
/** | ||
* A {@link TaskAwareRequest} which creates the relevant task using a {@link PersistentTasksExecutor}. | ||
*/ | ||
private static class PersistentTaskAwareRequest<Params extends PersistentTaskParams> implements TaskAwareRequest { | ||
private final PersistentTask<Params> taskInProgress; | ||
private final TaskId parentTaskId; | ||
private final PersistentTasksExecutor<Params> executor; | ||
|
||
private PersistentTaskAwareRequest(PersistentTask<Params> taskInProgress, PersistentTasksExecutor<Params> executor) { | ||
this.taskInProgress = taskInProgress; | ||
this.parentTaskId = new TaskId("cluster", taskInProgress.getAllocationId()); | ||
this.executor = executor; | ||
} | ||
|
||
@Override | ||
public void setRequestId(long requestId) { | ||
throw new UnsupportedOperationException("does not have a request ID"); | ||
} | ||
@Override | ||
public void setParentTask(TaskId taskId) { | ||
throw new UnsupportedOperationException("parent task if for persistent tasks shouldn't change"); | ||
} | ||
|
||
@Override | ||
public TaskId getParentTask() { | ||
return parentTaskId; | ||
} | ||
@Override | ||
public void setRequestId(long requestId) { | ||
throw new UnsupportedOperationException("does not have a request ID"); | ||
} | ||
|
||
@Override | ||
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) { | ||
return executor.createTask(id, type, action, parentTaskId, taskInProgress, headers); | ||
} | ||
}; | ||
@Override | ||
public TaskId getParentTask() { | ||
return parentTaskId; | ||
} | ||
|
||
try (var ignored = threadPool.getThreadContext().newTraceContext()) { | ||
doStartTask(taskInProgress, executor, request); | ||
@Override | ||
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) { | ||
return executor.createTask(id, type, action, parentTaskId, taskInProgress, headers); | ||
} | ||
} | ||
|
||
/** | ||
* A no-op {@link PersistentTasksExecutor} to create a placeholder task if creating the real task fails for some reason. | ||
*/ | ||
private static class PersistentTaskStartupFailureExecutor<Params extends PersistentTaskParams> extends PersistentTasksExecutor<Params> { | ||
PersistentTaskStartupFailureExecutor(String taskName, Executor executor) { | ||
super(taskName, executor); | ||
} | ||
|
||
@Override | ||
protected void nodeOperation(AllocatedPersistentTask task, Params params, PersistentTaskState state) {} | ||
} | ||
|
||
private <Params extends PersistentTaskParams> void doStartTask( | ||
PersistentTask<Params> taskInProgress, | ||
PersistentTasksExecutor<Params> executor, | ||
TaskAwareRequest request | ||
) { | ||
AllocatedPersistentTask task; | ||
final AllocatedPersistentTask task; | ||
try { | ||
task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]", request); | ||
} catch (Exception e) { | ||
|
@@ -220,7 +246,21 @@ private <Params extends PersistentTaskParams> void doStartTask( | |
+ "], removing from persistent tasks", | ||
e | ||
); | ||
notifyMasterOfFailedTask(taskInProgress, e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why you choose placeholder approach? Can it be a bug in how master handles task failures? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, it's just that we assume here that the |
||
|
||
// create a no-op placeholder task so that we don't keep trying to start this task while we wait for the cluster state update | ||
// which handles the failure | ||
final var placeholderTask = (AllocatedPersistentTask) taskManager.register( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this register fail too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see how, the creation of this task doesn't do anything interesting enough to fail AFAICT. |
||
"persistent", | ||
taskInProgress.getTaskName() + "[c]", | ||
new PersistentTaskAwareRequest<>( | ||
taskInProgress, | ||
new PersistentTaskStartupFailureExecutor<>(executor.getTaskName(), EsExecutors.DIRECT_EXECUTOR_SERVICE) | ||
) | ||
); | ||
placeholderTask.init(persistentTasksService, taskManager, taskInProgress.getId(), taskInProgress.getAllocationId()); | ||
taskManager.unregister(placeholderTask); | ||
runningTasks.put(taskInProgress.getAllocationId(), placeholderTask); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can elaborate more how placeholder will be retried or removed, please? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
placeholderTask.markAsFailed(e); | ||
return; | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to force a change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid the fast-path through
PersistentTasksNodeService
that avoids the retry bug if this bit of the metadata is unchanged:elasticsearch/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java
Line 108 in 2380778