diff --git a/docs/changelog/114386.yaml b/docs/changelog/114386.yaml new file mode 100644 index 0000000000000..cf9edda9de21e --- /dev/null +++ b/docs/changelog/114386.yaml @@ -0,0 +1,5 @@ +pr: 114386 +summary: Improve handling of failure to create persistent task +area: Task Management +type: bug +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskCreationFailureIT.java b/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskCreationFailureIT.java new file mode 100644 index 0000000000000..8a4d1ceda784b --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskCreationFailureIT.java @@ -0,0 +1,228 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.persistent; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.SettingsModule; +import org.elasticsearch.plugins.PersistentTaskPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.StreamSupport; + +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class PersistentTaskCreationFailureIT extends ESIntegTestCase { + @Override + protected Collection> nodePlugins() { + return List.of(FailingCreationPersistentTasksPlugin.class); + } + + private static boolean hasPersistentTask(ClusterState clusterState) { + return findTasks(clusterState, FailingCreationPersistentTaskExecutor.TASK_NAME).isEmpty() == false; + } + + public void testPersistentTasksThatFailDuringCreationAreRemovedFromClusterState() { + + final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final var plugins = StreamSupport.stream(internalCluster().getInstances(PluginsService.class).spliterator(), false) + .flatMap(ps -> ps.filterPlugins(FailingCreationPersistentTasksPlugin.class)) + .toList(); + plugins.forEach(plugin -> plugin.hasFailedToCreateTask.set(false)); + + final var taskCreatedListener = ClusterServiceUtils.addTemporaryStateListener( + masterClusterService, + PersistentTaskCreationFailureIT::hasPersistentTask + ); + + taskCreatedListener.andThenAccept(v -> { + // enqueue some higher-priority cluster state updates to check that they do not cause retries of the failing task creation step + for (int i = 0; i < 5; i++) { + masterClusterService.submitUnbatchedStateUpdateTask("test", new ClusterStateUpdateTask(Priority.IMMEDIATE) { + @Override + public ClusterState execute(ClusterState currentState) { + assertTrue(hasPersistentTask(currentState)); + + assertTrue(waitUntil(() -> { + final var completePersistentTaskPendingTasksCount = masterClusterService.getMasterService() + .pendingTasks() + .stream() + .filter( + pendingClusterTask -> pendingClusterTask.getSource().string().equals("finish persistent task (failed)") + ) + .count(); + assertThat(completePersistentTaskPendingTasksCount, lessThanOrEqualTo(1L)); + return completePersistentTaskPendingTasksCount == 1L; + })); + + return currentState.copyAndUpdateMetadata( + mdb -> mdb.putCustom( + PersistentTasksCustomMetadata.TYPE, + PersistentTasksCustomMetadata.builder( + PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(currentState) + ) + // create and remove a fake task just to force a change in lastAllocationId so that + // PersistentTasksNodeService checks for changes and potentially retries + .addTask("test", "test", null, PersistentTasksCustomMetadata.INITIAL_ASSIGNMENT) + .removeTask("test") + .build() + ) + ); + } + + @Override + public void onFailure(Exception e) { + fail(e); + } + }); + } + }); + + safeAwait( + l -> internalCluster().getInstance(PersistentTasksService.class) + .sendStartRequest( + UUIDs.base64UUID(), + FailingCreationPersistentTaskExecutor.TASK_NAME, + new FailingCreationTaskParams(), + null, + l.map(ignored -> null) + ) + ); + + safeAwait( + taskCreatedListener.andThen( + (l, v) -> ClusterServiceUtils.addTemporaryStateListener( + masterClusterService, + clusterState -> hasPersistentTask(clusterState) == false + ).addListener(l) + ) + ); + + assertEquals(1L, plugins.stream().filter(plugin -> plugin.hasFailedToCreateTask.get()).count()); + } + + public static class FailingCreationPersistentTasksPlugin extends Plugin implements PersistentTaskPlugin { + + private final AtomicBoolean hasFailedToCreateTask = new AtomicBoolean(); + + @Override + public List> getPersistentTasksExecutor( + ClusterService clusterService, + ThreadPool threadPool, + Client client, + SettingsModule settingsModule, + IndexNameExpressionResolver expressionResolver + ) { + return List.of(new FailingCreationPersistentTaskExecutor(hasFailedToCreateTask)); + } + + @Override + public List getNamedWriteables() { + return List.of( + new NamedWriteableRegistry.Entry( + PersistentTaskParams.class, + FailingCreationPersistentTaskExecutor.TASK_NAME, + FailingCreationTaskParams::new + ) + ); + } + + @Override + public List getNamedXContent() { + return List.of( + new NamedXContentRegistry.Entry( + PersistentTaskParams.class, + new ParseField(FailingCreationPersistentTaskExecutor.TASK_NAME), + p -> { + p.skipChildren(); + return new FailingCreationTaskParams(); + } + ) + ); + } + } + + public static class FailingCreationTaskParams implements PersistentTaskParams { + public FailingCreationTaskParams() {} + + public FailingCreationTaskParams(StreamInput in) {} + + @Override + public String getWriteableName() { + return FailingCreationPersistentTaskExecutor.TASK_NAME; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersion.current(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException {} + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.endObject(); + return builder; + } + } + + static class FailingCreationPersistentTaskExecutor extends PersistentTasksExecutor { + static final String TASK_NAME = "cluster:admin/persistent/test_creation_failure"; + + private final AtomicBoolean hasFailedToCreateTask; + + FailingCreationPersistentTaskExecutor(AtomicBoolean hasFailedToCreateTask) { + super(TASK_NAME, r -> fail("execution is unexpected")); + this.hasFailedToCreateTask = hasFailedToCreateTask; + } + + @Override + protected AllocatedPersistentTask createTask( + long id, + String type, + String action, + TaskId parentTaskId, + PersistentTasksCustomMetadata.PersistentTask taskInProgress, + Map headers + ) { + assertTrue("already failed before", hasFailedToCreateTask.compareAndSet(false, true)); + throw new RuntimeException("simulated"); + } + + @Override + protected void nodeOperation(AllocatedPersistentTask task, FailingCreationTaskParams params, PersistentTaskState state) { + fail("execution is unexpected"); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java index b86292be8e9ee..ff6a0b9018704 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; import org.elasticsearch.tasks.Task; @@ -32,6 +33,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.Executor; import static java.util.Objects.requireNonNull; import static org.elasticsearch.core.Strings.format; @@ -172,33 +174,57 @@ private void startTask(PersistentTask(taskInProgress, executor); + try (var ignored = threadPool.getThreadContext().newTraceContext()) { + doStartTask(taskInProgress, executor, request); + } + } - @Override - public void setParentTask(TaskId taskId) { - throw new UnsupportedOperationException("parent task if for persistent tasks shouldn't change"); - } + /** + * A {@link TaskAwareRequest} which creates the relevant task using a {@link PersistentTasksExecutor}. + */ + private static class PersistentTaskAwareRequest implements TaskAwareRequest { + private final PersistentTask taskInProgress; + private final TaskId parentTaskId; + private final PersistentTasksExecutor executor; + + private PersistentTaskAwareRequest(PersistentTask taskInProgress, PersistentTasksExecutor executor) { + this.taskInProgress = taskInProgress; + this.parentTaskId = new TaskId("cluster", taskInProgress.getAllocationId()); + this.executor = executor; + } - @Override - public void setRequestId(long requestId) { - throw new UnsupportedOperationException("does not have a request ID"); - } + @Override + public void setParentTask(TaskId taskId) { + throw new UnsupportedOperationException("parent task if for persistent tasks shouldn't change"); + } - @Override - public TaskId getParentTask() { - return parentTaskId; - } + @Override + public void setRequestId(long requestId) { + throw new UnsupportedOperationException("does not have a request ID"); + } - @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return executor.createTask(id, type, action, parentTaskId, taskInProgress, headers); - } - }; + @Override + public TaskId getParentTask() { + return parentTaskId; + } - try (var ignored = threadPool.getThreadContext().newTraceContext()) { - doStartTask(taskInProgress, executor, request); + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return executor.createTask(id, type, action, parentTaskId, taskInProgress, headers); + } + } + + /** + * A no-op {@link PersistentTasksExecutor} to create a placeholder task if creating the real task fails for some reason. + */ + private static class PersistentTaskStartupFailureExecutor extends PersistentTasksExecutor { + PersistentTaskStartupFailureExecutor(String taskName, Executor executor) { + super(taskName, executor); } + + @Override + protected void nodeOperation(AllocatedPersistentTask task, Params params, PersistentTaskState state) {} } private void doStartTask( @@ -206,7 +232,7 @@ private void doStartTask( PersistentTasksExecutor executor, TaskAwareRequest request ) { - AllocatedPersistentTask task; + final AllocatedPersistentTask task; try { task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]", request); } catch (Exception e) { @@ -220,7 +246,21 @@ private void doStartTask( + "], removing from persistent tasks", e ); - notifyMasterOfFailedTask(taskInProgress, e); + + // create a no-op placeholder task so that we don't keep trying to start this task while we wait for the cluster state update + // which handles the failure + final var placeholderTask = (AllocatedPersistentTask) taskManager.register( + "persistent", + taskInProgress.getTaskName() + "[c]", + new PersistentTaskAwareRequest<>( + taskInProgress, + new PersistentTaskStartupFailureExecutor<>(executor.getTaskName(), EsExecutors.DIRECT_EXECUTOR_SERVICE) + ) + ); + placeholderTask.init(persistentTasksService, taskManager, taskInProgress.getId(), taskInProgress.getAllocationId()); + taskManager.unregister(placeholderTask); + runningTasks.put(taskInProgress.getAllocationId(), placeholderTask); + placeholderTask.markAsFailed(e); return; }