Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Fixes for stop datafeed edge cases #49191

Merged
merged 2 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public static JobState getJobStateModifiedForReassignments(@Nullable PersistentT

public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) {
PersistentTasksCustomMetaData.PersistentTask<?> task = getDatafeedTask(datafeedId, tasks);
// TODO: report (task != null && task.getState() == null) as STARTING in version 8, and fix side effects
if (task != null && task.getState() != null) {
return (DatafeedState) task.getState();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
} else {
mlController = new DummyController();
autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) ->
new BlackHoleAutodetectProcess(job.getId());
new BlackHoleAutodetectProcess(job.getId(), onProcessCrash);
// factor of 1.0 makes renormalization a no-op
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0);
analyticsProcessFactory = (jobId, analyticsProcessConfig, state, executorService, onProcessCrash) -> null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksClusterService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.Task;
Expand All @@ -34,6 +35,7 @@
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -68,32 +70,46 @@ public TransportStopDatafeedAction(TransportService transportService, ThreadPool
* @param tasks Persistent task meta data
* @param startedDatafeedIds Started datafeed ids are added to this list
* @param stoppingDatafeedIds Stopping datafeed ids are added to this list
* @param notStoppedDatafeedIds Datafeed ids are added to this list for all datafeeds that are not stopped
*/
static void sortDatafeedIdsByTaskState(Set<String> expandedDatafeedIds,
static void sortDatafeedIdsByTaskState(Collection<String> expandedDatafeedIds,
PersistentTasksCustomMetaData tasks,
List<String> startedDatafeedIds,
List<String> stoppingDatafeedIds) {
List<String> stoppingDatafeedIds,
List<String> notStoppedDatafeedIds) {

for (String expandedDatafeedId : expandedDatafeedIds) {
addDatafeedTaskIdAccordingToState(expandedDatafeedId, MlTasks.getDatafeedState(expandedDatafeedId, tasks),
startedDatafeedIds, stoppingDatafeedIds);
startedDatafeedIds, stoppingDatafeedIds, notStoppedDatafeedIds);
}
}

private static void addDatafeedTaskIdAccordingToState(String datafeedId,
DatafeedState datafeedState,
List<String> startedDatafeedIds,
List<String> stoppingDatafeedIds) {
List<String> stoppingDatafeedIds,
List<String> notStoppedDatafeedIds) {
switch (datafeedState) {
case STARTING:
// The STARTING state is not used anywhere at present, so this should never happen.
// At present datafeeds that have a persistent task that hasn't yet been assigned
// a state are reported as STOPPED (which is not great). It could be considered a
// breaking change to introduce the STARTING state though, so let's aim to do it in
// version 8. Also consider treating STARTING like STARTED for stop API behaviour.
notStoppedDatafeedIds.add(datafeedId);
break;
case STARTED:
startedDatafeedIds.add(datafeedId);
notStoppedDatafeedIds.add(datafeedId);
break;
case STOPPED:
break;
case STOPPING:
stoppingDatafeedIds.add(datafeedId);
notStoppedDatafeedIds.add(datafeedId);
break;
default:
assert false : "Unexpected datafeed state " + datafeedState;
break;
}
}
Expand All @@ -118,17 +134,18 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi

List<String> startedDatafeeds = new ArrayList<>();
List<String> stoppingDatafeeds = new ArrayList<>();
sortDatafeedIdsByTaskState(expandedIds, tasks, startedDatafeeds, stoppingDatafeeds);
List<String> notStoppedDatafeeds = new ArrayList<>();
sortDatafeedIdsByTaskState(expandedIds, tasks, startedDatafeeds, stoppingDatafeeds, notStoppedDatafeeds);
if (startedDatafeeds.isEmpty() && stoppingDatafeeds.isEmpty()) {
listener.onResponse(new StopDatafeedAction.Response(true));
return;
}
request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()]));

if (request.isForce()) {
forceStopDatafeed(request, listener, tasks, startedDatafeeds);
forceStopDatafeed(request, listener, tasks, notStoppedDatafeeds);
} else {
normalStopDatafeed(task, request, listener, tasks, startedDatafeeds, stoppingDatafeeds);
normalStopDatafeed(task, request, listener, tasks, nodes, startedDatafeeds, stoppingDatafeeds);
}
},
listener::onFailure
Expand All @@ -137,20 +154,20 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi
}

private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, ActionListener<StopDatafeedAction.Response> listener,
PersistentTasksCustomMetaData tasks,
PersistentTasksCustomMetaData tasks, DiscoveryNodes nodes,
List<String> startedDatafeeds, List<String> stoppingDatafeeds) {
Set<String> executorNodes = new HashSet<>();
final Set<String> executorNodes = new HashSet<>();
for (String datafeedId : startedDatafeeds) {
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
if (datafeedTask == null) {
// This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
String msg = "Requested datafeed [" + datafeedId + "] be stopped, but datafeed's task could not be found.";
assert datafeedTask != null : msg;
logger.error(msg);
} else if (datafeedTask.isAssigned()) {
} else if (PersistentTasksClusterService.needsReassignment(datafeedTask.getAssignment(), nodes) == false) {
executorNodes.add(datafeedTask.getExecutorNode());
} else {
// This is the easy case - the datafeed is not currently assigned to a node,
// This is the easy case - the datafeed is not currently assigned to a valid node,
// so can be gracefully stopped simply by removing its persistent task. (Usually
// a graceful stop cannot be achieved by simply removing the persistent task, but
// if the datafeed has no running code then graceful/forceful are the same.)
Expand All @@ -171,48 +188,62 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A

ActionListener<StopDatafeedAction.Response> finalListener = ActionListener.wrap(
r -> waitForDatafeedStopped(allDataFeedsToWaitFor, request, r, listener),
listener::onFailure);
e -> {
if (ExceptionsHelper.unwrapCause(e) instanceof FailedNodeException) {
// A node has dropped out of the cluster since we started executing the requests.
// Since stopping an already stopped datafeed is not an error we can try again.
// The datafeeds that were running on the node that dropped out of the cluster
// will just have their persistent tasks cancelled. Datafeeds that were stopped
// by the previous attempt will be noops in the subsequent attempt.
doExecute(task, request, listener);
} else {
listener.onFailure(e);
}
});

super.doExecute(task, request, finalListener);
}

private void forceStopDatafeed(final StopDatafeedAction.Request request, final ActionListener<StopDatafeedAction.Response> listener,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method seems like it no longer needs to distinguish started and stopping datafeeds. Should we pass it a single list of datafeeds and do the concatenation a level up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that's done in the second commit.

While I was doing this I noticed that although we have a starting state we never use it. So I also added a TODO about using that for 8.0. (Looking back through the history, starting was added in 5.5, but couldn't be used until 6.x because 5.4 wouldn't understand it. Now we're beyond that we can use it, but we should only do this in a major version just in case somebody is relying on stopped, started and stopping being the only 3 states that exist.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need to use it. I recall adding it so we had it available without BWC in case we needed it. But surely, if it improves things we can definitely use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not using it creates a potential race condition where if you start and stop a datafeed in very quick succession then the stop will be ignored. It seems that none of our current tests do this.

PersistentTasksCustomMetaData tasks, final List<String> startedDatafeeds) {
PersistentTasksCustomMetaData tasks, final List<String> notStoppedDatafeeds) {
final AtomicInteger counter = new AtomicInteger();
final AtomicArray<Exception> failures = new AtomicArray<>(startedDatafeeds.size());
final AtomicArray<Exception> failures = new AtomicArray<>(notStoppedDatafeeds.size());

for (String datafeedId : startedDatafeeds) {
for (String datafeedId : notStoppedDatafeeds) {
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
if (datafeedTask != null) {
persistentTasksService.sendRemoveRequest(datafeedTask.getId(),
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
if (counter.incrementAndGet() == startedDatafeeds.size()) {
if (counter.incrementAndGet() == notStoppedDatafeeds.size()) {
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
}
}

@Override
public void onFailure(Exception e) {
final int slot = counter.incrementAndGet();
if ((ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException &&
Strings.isAllOrWildcard(new String[]{request.getDatafeedId()})) == false) {
// We validated that the datafeed names supplied in the request existed when we started processing the action.
// If the related tasks don't exist at this point then they must have been stopped by a simultaneous stop request.
// This is not an error.
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
failures.set(slot - 1, e);
}
if (slot == startedDatafeeds.size()) {
if (slot == notStoppedDatafeeds.size()) {
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
}
}
});
} else {
// This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
// This should not happen, because startedDatafeeds and stoppingDatafeeds
// were derived from the same tasks that were passed to this method
String msg = "Requested datafeed [" + datafeedId + "] be force-stopped, but datafeed's task could not be found.";
assert datafeedTask != null : msg;
logger.error(msg);
final int slot = counter.incrementAndGet();
failures.set(slot - 1, new RuntimeException(msg));
if (slot == startedDatafeeds.size()) {
if (slot == notStoppedDatafeeds.size()) {
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
}
}
Expand Down Expand Up @@ -313,7 +344,7 @@ protected StopDatafeedAction.Response newResponse(StopDatafeedAction.Request req
.convertToElastic(failedNodeExceptions.get(0));
} else {
// This can happen we the actual task in the node no longer exists,
// which means the datafeed(s) have already been closed.
// which means the datafeed(s) have already been stopped.
return new StopDatafeedAction.Response(true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@

import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
* A placeholder class simulating the actions of the native Autodetect process.
Expand All @@ -37,16 +40,21 @@
*/
public class BlackHoleAutodetectProcess implements AutodetectProcess {

public static final String MAGIC_FAILURE_VALUE = "253402300799";
public static final String MAGIC_FAILURE_VALUE_AS_DATE = "9999-12-31 23:59:59";

private static final String FLUSH_ID = "flush-1";

private final String jobId;
private final ZonedDateTime startTime;
private final BlockingQueue<AutodetectResult> results = new LinkedBlockingDeque<>();
private final Consumer<String> onProcessCrash;
private volatile boolean open = true;

public BlackHoleAutodetectProcess(String jobId) {
public BlackHoleAutodetectProcess(String jobId, Consumer<String> onProcessCrash) {
this.jobId = jobId;
startTime = ZonedDateTime.now();
this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
}

@Override
Expand All @@ -59,7 +67,13 @@ public boolean isReady() {
}

@Override
public void writeRecord(String[] record) throws IOException {
public void writeRecord(String[] record) {
if (Arrays.asList(record).contains(MAGIC_FAILURE_VALUE)) {
open = false;
onProcessCrash.accept("simulated failure");
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null);
results.add(result);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;

public class TransportStopDatafeedActionTests extends ESTestCase {
Expand All @@ -27,17 +26,21 @@ public void testSortDatafeedIdsByTaskState_GivenDatafeedId() {

List<String> startedDatafeeds = new ArrayList<>();
List<String> stoppingDatafeeds = new ArrayList<>();
List<String> notStoppedDatafeeds = new ArrayList<>();
TransportStopDatafeedAction.sortDatafeedIdsByTaskState(
Collections.singleton("datafeed_1"), tasks, startedDatafeeds, stoppingDatafeeds);
Collections.singleton("datafeed_1"), tasks, startedDatafeeds, stoppingDatafeeds, notStoppedDatafeeds);
assertEquals(Collections.singletonList("datafeed_1"), startedDatafeeds);
assertEquals(Collections.emptyList(), stoppingDatafeeds);
assertEquals(Collections.singletonList("datafeed_1"), notStoppedDatafeeds);

startedDatafeeds.clear();
stoppingDatafeeds.clear();
notStoppedDatafeeds.clear();
TransportStopDatafeedAction.sortDatafeedIdsByTaskState(
Collections.singleton("datafeed_2"), tasks, startedDatafeeds, stoppingDatafeeds);
Collections.singleton("datafeed_2"), tasks, startedDatafeeds, stoppingDatafeeds, notStoppedDatafeeds);
assertEquals(Collections.emptyList(), startedDatafeeds);
assertEquals(Collections.emptyList(), stoppingDatafeeds);
assertEquals(Collections.emptyList(), notStoppedDatafeeds);
}

public void testSortDatafeedIdsByTaskState_GivenAll() {
Expand All @@ -50,15 +53,17 @@ public void testSortDatafeedIdsByTaskState_GivenAll() {

List<String> startedDatafeeds = new ArrayList<>();
List<String> stoppingDatafeeds = new ArrayList<>();
List<String> notStoppedDatafeeds = new ArrayList<>();
TransportStopDatafeedAction.sortDatafeedIdsByTaskState(
new HashSet<>(Arrays.asList("datafeed_1", "datafeed_2", "datafeed_3")), tasks, startedDatafeeds, stoppingDatafeeds);
Arrays.asList("datafeed_1", "datafeed_2", "datafeed_3"), tasks, startedDatafeeds, stoppingDatafeeds, notStoppedDatafeeds);
assertEquals(Collections.singletonList("datafeed_1"), startedDatafeeds);
assertEquals(Collections.singletonList("datafeed_3"), stoppingDatafeeds);
assertEquals(Arrays.asList("datafeed_1", "datafeed_3"), notStoppedDatafeeds);

startedDatafeeds.clear();
stoppingDatafeeds.clear();
TransportStopDatafeedAction.sortDatafeedIdsByTaskState(Collections.singleton("datafeed_2"), tasks, startedDatafeeds,
stoppingDatafeeds);
stoppingDatafeeds, notStoppedDatafeeds);
assertEquals(Collections.emptyList(), startedDatafeeds);
assertEquals(Collections.emptyList(), stoppingDatafeeds);
}
Expand Down
Loading