Skip to content

Commit

Permalink
Delay master task failure notifications until commit (#92693)
Browse files Browse the repository at this point in the history
Today when the master service processes a batch of tasks in which some tasks
fail and others succeed, the failure notifications occur straight away.
However, the tasks may have failed because earlier tasks in the batch
succeeded, and the effects of those successful tasks will not be visible to
clients straight away. This commit delays all notifications until the
publication is complete.

Closes #92677
  • Loading branch information
DaveCTurner committed Jan 5, 2023
1 parent e1d2d35 commit 73863dd
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 19 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/92693.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 92693
summary: Delay master task failure notifications until commit
area: Cluster Coordination
type: bug
issues:
- 92677
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
*/
public interface ClusterStateTaskExecutor<T extends ClusterStateTaskListener> {
/**
* Update the cluster state based on the current state and the given tasks. Return the *same instance* if no update should be published.
* Update the cluster state based on the current state and the given tasks. Return {@code batchExecutionContext.initialState()} to avoid
* publishing any update.
* <p>
* If this method throws an exception then the cluster state is unchanged and every task's {@link ClusterStateTaskListener#onFailure}
* method is called.
Expand All @@ -30,8 +31,15 @@ public interface ClusterStateTaskExecutor<T extends ClusterStateTaskListener> {
* This works ok but beware that constructing a whole new {@link ClusterState} can be somewhat expensive, and there may sometimes be
* surprisingly many tasks to process in the batch. If it's possible to accumulate the effects of the tasks at a lower level then you
* should do that instead.
* <p>
* Returning {@code batchExecutionContext.initialState()} is an important and useful optimisation in most cases, but note that this
* fast-path exposes APIs to the risk of stale reads in the vicinity of a master failover: a node {@code N} that handles such a no-op
* task batch does not verify with its peers that it's still the master, and if it's not the master then another node {@code M} may
* already have become master and updated the state in a way that would be inconsistent with the response that {@code N} sends back to
* clients.
*
* @return The resulting cluster state after executing all the tasks. If {code initialState} is returned then no update is published.
* @return The resulting cluster state after executing all the tasks. If {code batchExecutionContext.initialState()} is returned then no
* update is published.
*/
ClusterState execute(BatchExecutionContext<T> batchExecutionContext) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,6 @@ private void runTasks(
previousClusterState,
executeTasks(previousClusterState, executionResults, executor, summary, threadPool.getThreadContext())
);
// fail all tasks that have failed
for (final var executionResult : executionResults) {
if (executionResult.failure != null) {
executionResult.updateTask.onFailure(executionResult.failure, executionResult::restoreResponseHeaders);
}
}
final TimeValue computationTime = getTimeSince(computationStartTime);
logExecutionTime(computationTime, "compute cluster state update", summary);

Expand Down Expand Up @@ -950,7 +944,7 @@ void onBatchFailure(Exception failure) {

void onPublishSuccess(ClusterState newClusterState) {
if (publishedStateConsumer == null && onPublicationSuccess == null) {
assert failure != null;
notifyFailure();
return;
}
try (ThreadContext.StoredContext ignored = updateTask.threadContextSupplier.get()) {
Expand All @@ -967,7 +961,7 @@ void onPublishSuccess(ClusterState newClusterState) {

void onClusterStateUnchanged(ClusterState clusterState) {
if (publishedStateConsumer == null && onPublicationSuccess == null) {
assert failure != null;
notifyFailure();
return;
}
try (ThreadContext.StoredContext ignored = updateTask.threadContextSupplier.get()) {
Expand All @@ -985,6 +979,10 @@ void onClusterStateUnchanged(ClusterState clusterState) {
void onPublishFailure(FailedToCommitClusterStateException e) {
if (publishedStateConsumer == null && onPublicationSuccess == null) {
assert failure != null;
var taskFailure = failure;
failure = new FailedToCommitClusterStateException(e.getMessage(), e);
failure.addSuppressed(taskFailure);
notifyFailure();
return;
}
try (ThreadContext.StoredContext ignored = updateTask.threadContextSupplier.get()) {
Expand All @@ -996,9 +994,14 @@ void onPublishFailure(FailedToCommitClusterStateException e) {
}
}

void notifyFailure() {
assert failure != null;
this.updateTask.onFailure(this.failure, this::restoreResponseHeaders);
}

ContextPreservingAckListener getContextPreservingAckListener() {
assert incomplete() == false;
return updateTask.wrapInTaskContext(clusterStateAckListener, this::restoreResponseHeaders);
return failure == null ? updateTask.wrapInTaskContext(clusterStateAckListener, this::restoreResponseHeaders) : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -903,8 +904,10 @@ class Task implements ClusterStateTaskListener {

final ActionListener<ClusterState> publishListener;
final String responseHeaderValue;
final boolean expectFailure;

Task(String responseHeaderValue, ActionListener<ClusterState> publishListener) {
Task(boolean expectFailure, String responseHeaderValue, ActionListener<ClusterState> publishListener) {
this.expectFailure = expectFailure;
this.responseHeaderValue = responseHeaderValue;
this.publishListener = publishListener;
}
Expand All @@ -920,6 +923,8 @@ public void onFailure(Exception e) {

final var testResponseHeaderName = "test-response-header";

final var taskFailedExceptionMessage = "simulated task failure";

final var executor = new ClusterStateTaskExecutor<Task>() {
@Override
@SuppressForbidden(reason = "consuming published cluster state for legacy reasons")
Expand All @@ -928,16 +933,22 @@ public ClusterState execute(BatchExecutionContext<Task> batchExecutionContext) {
try (var ignored = taskContext.captureResponseHeaders()) {
threadPool.getThreadContext().addResponseHeader(testResponseHeaderName, taskContext.getTask().responseHeaderValue);
}
taskContext.success(taskContext.getTask().publishListener::onResponse);
if (taskContext.getTask().expectFailure) {
taskContext.onFailure(new ElasticsearchException(taskFailedExceptionMessage));
} else {
taskContext.success(taskContext.getTask().publishListener::onResponse);
}
}
return ClusterState.builder(batchExecutionContext.initialState()).build();
}
};

final var blockedState = new AtomicReference<ClusterState>();
final var executionBarrier = new CyclicBarrier(2);
final ClusterStateUpdateTask blockMasterTask = new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
assertTrue(blockedState.compareAndSet(null, currentState));
executionBarrier.await(10, TimeUnit.SECONDS); // notify test thread that the master service is blocked
executionBarrier.await(10, TimeUnit.SECONDS); // wait for test thread to release us
return currentState;
Expand All @@ -955,9 +966,13 @@ public void onFailure(Exception e) {

masterService.submitUnbatchedStateUpdateTask("block", blockMasterTask);
executionBarrier.await(10, TimeUnit.SECONDS); // wait for the master service to be blocked
final var stateBeforeSuccess = blockedState.get();
assertNotNull(stateBeforeSuccess);

final AtomicReference<ClusterState> publishedState = new AtomicReference<>();
masterService.setClusterStatePublisher((clusterStatePublicationEvent, publishListener, ackListener) -> {
assertSame(stateBeforeSuccess, clusterStatePublicationEvent.getOldState());
assertNotSame(stateBeforeSuccess, clusterStatePublicationEvent.getNewState());
assertTrue(publishedState.compareAndSet(null, clusterStatePublicationEvent.getNewState()));
ClusterServiceUtils.setAllElapsedMillis(clusterStatePublicationEvent);
publishListener.onResponse(null);
Expand All @@ -971,18 +986,30 @@ public void onFailure(Exception e) {
final var testContextHeaderValue = randomAlphaOfLength(10);
final var testResponseHeaderValue = randomAlphaOfLength(10);
threadContext.putHeader(testContextHeaderName, testContextHeaderValue);
final var task = new Task(testResponseHeaderValue, new ActionListener<>() {
final var expectFailure = randomBoolean();
final var taskComplete = new AtomicBoolean();
final var task = new Task(expectFailure, testResponseHeaderValue, new ActionListener<>() {
@Override
public void onResponse(ClusterState clusterState) {
assertFalse(expectFailure);
assertEquals(testContextHeaderValue, threadContext.getHeader(testContextHeaderName));
assertEquals(List.of(testResponseHeaderValue), threadContext.getResponseHeaders().get(testResponseHeaderName));
assertSame(publishedState.get(), clusterState);
assertNotSame(stateBeforeSuccess, publishedState.get());
assertTrue(taskComplete.compareAndSet(false, true));
publishSuccessCountdown.countDown();
}

@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
assertTrue(expectFailure);
assertThat(e, instanceOf(ElasticsearchException.class));
assertThat(e.getMessage(), equalTo(taskFailedExceptionMessage));
assertEquals(List.of(testResponseHeaderValue), threadContext.getResponseHeaders().get(testResponseHeaderName));
assertNotNull(publishedState.get());
assertNotSame(stateBeforeSuccess, publishedState.get());
assertTrue(taskComplete.compareAndSet(false, true));
publishSuccessCountdown.countDown();
}
});

Expand All @@ -996,13 +1023,21 @@ public void onFailure(Exception e) {

// failure case: submit some tasks, possibly in different contexts, and verify that the expected listener is completed

assertNotNull(blockedState.getAndSet(null));
assertNotNull(publishedState.getAndSet(null));
masterService.submitUnbatchedStateUpdateTask("block", blockMasterTask);
executionBarrier.await(10, TimeUnit.SECONDS); // wait for the master service to be blocked
final var stateBeforeFailure = blockedState.get();
assertNotNull(stateBeforeFailure);

final var publicationFailedExceptionMessage = "simulated publication failure";

final String exceptionMessage = "simulated";
masterService.setClusterStatePublisher((clusterStatePublicationEvent, publishListener, ackListener) -> {
assertSame(stateBeforeFailure, clusterStatePublicationEvent.getOldState());
assertNotSame(stateBeforeFailure, clusterStatePublicationEvent.getNewState());
assertTrue(publishedState.compareAndSet(null, clusterStatePublicationEvent.getNewState()));
ClusterServiceUtils.setAllElapsedMillis(clusterStatePublicationEvent);
publishListener.onFailure(new FailedToCommitClusterStateException(exceptionMessage));
publishListener.onFailure(new FailedToCommitClusterStateException(publicationFailedExceptionMessage));
});

toSubmit = between(1, 10);
Expand All @@ -1013,7 +1048,9 @@ public void onFailure(Exception e) {
final String testContextHeaderValue = randomAlphaOfLength(10);
final String testResponseHeaderValue = randomAlphaOfLength(10);
threadContext.putHeader(testContextHeaderName, testContextHeaderValue);
final var task = new Task(testResponseHeaderValue, new ActionListener<>() {
final var expectFailure = randomBoolean();
final var taskComplete = new AtomicBoolean();
final var task = new Task(expectFailure, testResponseHeaderValue, new ActionListener<>() {
@Override
public void onResponse(ClusterState clusterState) {
throw new AssertionError("should not succeed");
Expand All @@ -1024,7 +1061,16 @@ public void onFailure(Exception e) {
assertEquals(testContextHeaderValue, threadContext.getHeader(testContextHeaderName));
assertEquals(List.of(testResponseHeaderValue), threadContext.getResponseHeaders().get(testResponseHeaderName));
assertThat(e, instanceOf(FailedToCommitClusterStateException.class));
assertThat(e.getMessage(), equalTo(exceptionMessage));
assertThat(e.getMessage(), equalTo(publicationFailedExceptionMessage));
if (expectFailure) {
assertThat(e.getSuppressed().length, greaterThan(0));
var suppressed = e.getSuppressed()[0];
assertThat(suppressed, instanceOf(ElasticsearchException.class));
assertThat(suppressed.getMessage(), equalTo(taskFailedExceptionMessage));
}
assertNotNull(publishedState.get());
assertNotSame(stateBeforeFailure, publishedState.get());
assertTrue(taskComplete.compareAndSet(false, true));
publishFailureCountdown.countDown();
}
});
Expand Down

0 comments on commit 73863dd

Please sign in to comment.