From 9ae0140c82bd1736102ef7396c6a172e41fc4f9e Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 10 Oct 2024 18:14:45 +0100 Subject: [PATCH] Improve handling of failure to create persistent task (#114386) Today if creating a persistent task fails with an exception then we submit a cluster state update to fail the task but until that update executes we will retry the failing task creation and cluster state submission on all other cluster state updates that change the persistent tasks metadata. With this commit we register a placeholder task on the executing node to block further attempts to create it until the cluster state update is processed. --- docs/changelog/114386.yaml | 5 + .../PersistentTaskCreationFailureIT.java | 228 ++++++++++++++++++ .../PersistentTasksNodeService.java | 86 +++++-- 3 files changed, 296 insertions(+), 23 deletions(-) create mode 100644 docs/changelog/114386.yaml create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskCreationFailureIT.java diff --git a/docs/changelog/114386.yaml b/docs/changelog/114386.yaml new file mode 100644 index 0000000000000..cf9edda9de21e --- /dev/null +++ b/docs/changelog/114386.yaml @@ -0,0 +1,5 @@ +pr: 114386 +summary: Improve handling of failure to create persistent task +area: Task Management +type: bug +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskCreationFailureIT.java b/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskCreationFailureIT.java new file mode 100644 index 0000000000000..8a4d1ceda784b --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskCreationFailureIT.java @@ -0,0 +1,228 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.persistent; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.SettingsModule; +import org.elasticsearch.plugins.PersistentTaskPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.StreamSupport; + +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class PersistentTaskCreationFailureIT extends ESIntegTestCase { + @Override + protected Collection> nodePlugins() { + return List.of(FailingCreationPersistentTasksPlugin.class); + } + + private static boolean hasPersistentTask(ClusterState clusterState) { + return findTasks(clusterState, FailingCreationPersistentTaskExecutor.TASK_NAME).isEmpty() == false; + } + + public void testPersistentTasksThatFailDuringCreationAreRemovedFromClusterState() { + + final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final var plugins = StreamSupport.stream(internalCluster().getInstances(PluginsService.class).spliterator(), false) + .flatMap(ps -> ps.filterPlugins(FailingCreationPersistentTasksPlugin.class)) + .toList(); + plugins.forEach(plugin -> plugin.hasFailedToCreateTask.set(false)); + + final var taskCreatedListener = ClusterServiceUtils.addTemporaryStateListener( + masterClusterService, + PersistentTaskCreationFailureIT::hasPersistentTask + ); + + taskCreatedListener.andThenAccept(v -> { + // enqueue some higher-priority cluster state updates to check that they do not cause retries of the failing task creation step + for (int i = 0; i < 5; i++) { + masterClusterService.submitUnbatchedStateUpdateTask("test", new ClusterStateUpdateTask(Priority.IMMEDIATE) { + @Override + public ClusterState execute(ClusterState currentState) { + assertTrue(hasPersistentTask(currentState)); + + assertTrue(waitUntil(() -> { + final var completePersistentTaskPendingTasksCount = masterClusterService.getMasterService() + .pendingTasks() + .stream() + .filter( + pendingClusterTask -> pendingClusterTask.getSource().string().equals("finish persistent task (failed)") + ) + .count(); + assertThat(completePersistentTaskPendingTasksCount, lessThanOrEqualTo(1L)); + return completePersistentTaskPendingTasksCount == 1L; + })); + + return currentState.copyAndUpdateMetadata( + mdb -> mdb.putCustom( + PersistentTasksCustomMetadata.TYPE, + PersistentTasksCustomMetadata.builder( + PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(currentState) + ) + // create and remove a fake task just to force a change in lastAllocationId so that + // PersistentTasksNodeService checks for changes and potentially retries + .addTask("test", "test", null, PersistentTasksCustomMetadata.INITIAL_ASSIGNMENT) + .removeTask("test") + .build() + ) + ); + } + + @Override + public void onFailure(Exception e) { + fail(e); + } + }); + } + }); + + safeAwait( + l -> internalCluster().getInstance(PersistentTasksService.class) + .sendStartRequest( + UUIDs.base64UUID(), + FailingCreationPersistentTaskExecutor.TASK_NAME, + new FailingCreationTaskParams(), + null, + l.map(ignored -> null) + ) + ); + + safeAwait( + taskCreatedListener.andThen( + (l, v) -> ClusterServiceUtils.addTemporaryStateListener( + masterClusterService, + clusterState -> hasPersistentTask(clusterState) == false + ).addListener(l) + ) + ); + + assertEquals(1L, plugins.stream().filter(plugin -> plugin.hasFailedToCreateTask.get()).count()); + } + + public static class FailingCreationPersistentTasksPlugin extends Plugin implements PersistentTaskPlugin { + + private final AtomicBoolean hasFailedToCreateTask = new AtomicBoolean(); + + @Override + public List> getPersistentTasksExecutor( + ClusterService clusterService, + ThreadPool threadPool, + Client client, + SettingsModule settingsModule, + IndexNameExpressionResolver expressionResolver + ) { + return List.of(new FailingCreationPersistentTaskExecutor(hasFailedToCreateTask)); + } + + @Override + public List getNamedWriteables() { + return List.of( + new NamedWriteableRegistry.Entry( + PersistentTaskParams.class, + FailingCreationPersistentTaskExecutor.TASK_NAME, + FailingCreationTaskParams::new + ) + ); + } + + @Override + public List getNamedXContent() { + return List.of( + new NamedXContentRegistry.Entry( + PersistentTaskParams.class, + new ParseField(FailingCreationPersistentTaskExecutor.TASK_NAME), + p -> { + p.skipChildren(); + return new FailingCreationTaskParams(); + } + ) + ); + } + } + + public static class FailingCreationTaskParams implements PersistentTaskParams { + public FailingCreationTaskParams() {} + + public FailingCreationTaskParams(StreamInput in) {} + + @Override + public String getWriteableName() { + return FailingCreationPersistentTaskExecutor.TASK_NAME; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersion.current(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException {} + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.endObject(); + return builder; + } + } + + static class FailingCreationPersistentTaskExecutor extends PersistentTasksExecutor { + static final String TASK_NAME = "cluster:admin/persistent/test_creation_failure"; + + private final AtomicBoolean hasFailedToCreateTask; + + FailingCreationPersistentTaskExecutor(AtomicBoolean hasFailedToCreateTask) { + super(TASK_NAME, r -> fail("execution is unexpected")); + this.hasFailedToCreateTask = hasFailedToCreateTask; + } + + @Override + protected AllocatedPersistentTask createTask( + long id, + String type, + String action, + TaskId parentTaskId, + PersistentTasksCustomMetadata.PersistentTask taskInProgress, + Map headers + ) { + assertTrue("already failed before", hasFailedToCreateTask.compareAndSet(false, true)); + throw new RuntimeException("simulated"); + } + + @Override + protected void nodeOperation(AllocatedPersistentTask task, FailingCreationTaskParams params, PersistentTaskState state) { + fail("execution is unexpected"); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java index b86292be8e9ee..ff6a0b9018704 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; import org.elasticsearch.tasks.Task; @@ -32,6 +33,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.Executor; import static java.util.Objects.requireNonNull; import static org.elasticsearch.core.Strings.format; @@ -172,33 +174,57 @@ private void startTask(PersistentTask(taskInProgress, executor); + try (var ignored = threadPool.getThreadContext().newTraceContext()) { + doStartTask(taskInProgress, executor, request); + } + } - @Override - public void setParentTask(TaskId taskId) { - throw new UnsupportedOperationException("parent task if for persistent tasks shouldn't change"); - } + /** + * A {@link TaskAwareRequest} which creates the relevant task using a {@link PersistentTasksExecutor}. + */ + private static class PersistentTaskAwareRequest implements TaskAwareRequest { + private final PersistentTask taskInProgress; + private final TaskId parentTaskId; + private final PersistentTasksExecutor executor; + + private PersistentTaskAwareRequest(PersistentTask taskInProgress, PersistentTasksExecutor executor) { + this.taskInProgress = taskInProgress; + this.parentTaskId = new TaskId("cluster", taskInProgress.getAllocationId()); + this.executor = executor; + } - @Override - public void setRequestId(long requestId) { - throw new UnsupportedOperationException("does not have a request ID"); - } + @Override + public void setParentTask(TaskId taskId) { + throw new UnsupportedOperationException("parent task if for persistent tasks shouldn't change"); + } - @Override - public TaskId getParentTask() { - return parentTaskId; - } + @Override + public void setRequestId(long requestId) { + throw new UnsupportedOperationException("does not have a request ID"); + } - @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return executor.createTask(id, type, action, parentTaskId, taskInProgress, headers); - } - }; + @Override + public TaskId getParentTask() { + return parentTaskId; + } - try (var ignored = threadPool.getThreadContext().newTraceContext()) { - doStartTask(taskInProgress, executor, request); + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return executor.createTask(id, type, action, parentTaskId, taskInProgress, headers); + } + } + + /** + * A no-op {@link PersistentTasksExecutor} to create a placeholder task if creating the real task fails for some reason. + */ + private static class PersistentTaskStartupFailureExecutor extends PersistentTasksExecutor { + PersistentTaskStartupFailureExecutor(String taskName, Executor executor) { + super(taskName, executor); } + + @Override + protected void nodeOperation(AllocatedPersistentTask task, Params params, PersistentTaskState state) {} } private void doStartTask( @@ -206,7 +232,7 @@ private void doStartTask( PersistentTasksExecutor executor, TaskAwareRequest request ) { - AllocatedPersistentTask task; + final AllocatedPersistentTask task; try { task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]", request); } catch (Exception e) { @@ -220,7 +246,21 @@ private void doStartTask( + "], removing from persistent tasks", e ); - notifyMasterOfFailedTask(taskInProgress, e); + + // create a no-op placeholder task so that we don't keep trying to start this task while we wait for the cluster state update + // which handles the failure + final var placeholderTask = (AllocatedPersistentTask) taskManager.register( + "persistent", + taskInProgress.getTaskName() + "[c]", + new PersistentTaskAwareRequest<>( + taskInProgress, + new PersistentTaskStartupFailureExecutor<>(executor.getTaskName(), EsExecutors.DIRECT_EXECUTOR_SERVICE) + ) + ); + placeholderTask.init(persistentTasksService, taskManager, taskInProgress.getId(), taskInProgress.getAllocationId()); + taskManager.unregister(placeholderTask); + runningTasks.put(taskInProgress.getAllocationId(), placeholderTask); + placeholderTask.markAsFailed(e); return; }