Skip to content

Commit

Permalink
Merge pull request #2029 from HubSpot/more_race_conditions
Browse files Browse the repository at this point in the history
Timing-related fixes
  • Loading branch information
ssalinas authored Oct 31, 2019
2 parents deeed60 + 0e8b626 commit 91462c9
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class LoadBalancerClientImpl implements LoadBalancerClient {

private static final String CONTENT_TYPE_JSON = "application/json";
private static final String HEADER_CONTENT_TYPE = "Content-Type";
private static final String ALREADY_ENQUEUED_ERROR = "is already enqueued with different parameters";

private final String loadBalancerUri;
private final Optional<Map<String, String>> loadBalancerQueryParams;
Expand Down Expand Up @@ -130,6 +131,14 @@ private BaragonResponse readResponse(Response response) {
private SingularityLoadBalancerUpdate sendRequestWrapper(LoadBalancerRequestId loadBalancerRequestId, LoadBalancerMethod method, Request request, BaragonRequestState onFailure) {
final long start = System.currentTimeMillis();
final LoadBalancerUpdateHolder result = sendRequest(loadBalancerRequestId, request, onFailure);

if ((method != LoadBalancerMethod.CHECK_STATE && method != LoadBalancerMethod.PRE_ENQUEUE) &&
result.state == BaragonRequestState.FAILED
&& result.message.orElse("").contains(ALREADY_ENQUEUED_ERROR)) {
LOG.info("Baragon request {} already in the queue, will fetch current state instead", loadBalancerRequestId.getId());
return sendRequestWrapper(loadBalancerRequestId, LoadBalancerMethod.CHECK_STATE, request, onFailure);
}

LOG.debug("LB {} request {} had result {} after {}", request.getMethod(), loadBalancerRequestId, result, JavaUtils.duration(start));
return new SingularityLoadBalancerUpdate(result.state, loadBalancerRequestId, result.message, start, method, Optional.of(request.getUrl()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import com.hubspot.singularity.SingularityRequestWithState;
import com.hubspot.singularity.SingularityShellCommand;
import com.hubspot.singularity.SingularityTaskCleanup;
import com.hubspot.singularity.SingularityTaskHealthcheckResult;
import com.hubspot.singularity.SingularityTaskId;
import com.hubspot.singularity.SingularityTransformHelpers;
import com.hubspot.singularity.SingularityUser;
Expand Down Expand Up @@ -209,6 +210,24 @@ private void submitRequest(SingularityRequest request, Optional<SingularityReque
}
}

if (oldRequest.isPresent() && !oldRequest.get().getSkipHealthchecks().orElse(false) && request.getSkipHealthchecks().orElse(false)) {
LOG.info("Marking pending tasks as healthy for skipHealthchecks on {}", request.getId());
taskManager.getActiveTaskIdsForRequest(request.getId())
.forEach((t) -> {
// Will only be saved if async healthchecks have not already finished
taskManager.saveHealthcheckResult(
new SingularityTaskHealthcheckResult(
Optional.of(200),
Optional.empty(),
System.currentTimeMillis(),
Optional.of(String.format("Healthchecks skipped by %s", user.getId())),
Optional.empty(),
t,
Optional.empty())
);
});
}

requestHelper.updateRequest(request, oldRequest, requestState, historyType, user.getEmail(), skipHealthchecks, message, maybeBounceRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,15 @@ private SingularityDeployResult getDeployResult(final SingularityRequest request
return new SingularityDeployResult(DeployState.SUCCEEDED, "Request not deployable");
}

if (!deploy.isPresent()) {
// Check for abandoned pending deploy
Optional<SingularityDeployResult> result = deployManager.getDeployResult(request.getId(), pendingDeploy.getDeployMarker().getDeployId());
if (result.isPresent() && result.get().getDeployState().isDeployFinished()) {
LOG.info("Deploy was already finished, running cleanup of pending data for {}", pendingDeploy.getDeployMarker());
return result.get();
}
}

if (!pendingDeploy.getDeployProgress().isPresent()) {
return new SingularityDeployResult(DeployState.FAILED, "No deploy progress data present in Zookeeper. Please reattempt your deploy");
}
Expand Down Expand Up @@ -712,11 +721,6 @@ private SingularityDeployResult checkDeployProgress(final SingularityRequest req
}

final boolean isDeployOverdue = isDeployOverdue(pendingDeploy, deploy);
if (deployActiveTasks.size() < deployProgress.getTargetActiveInstances()) {
maybeUpdatePendingRequest(pendingDeploy, deploy, request, updatePendingDeployRequest);
return checkOverdue(request, deploy, pendingDeploy, deployActiveTasks, isDeployOverdue);
}

if (shouldCheckLbState(pendingDeploy)) {
final SingularityLoadBalancerUpdate lbUpdate = lbClient.getState(getLoadBalancerRequestId(pendingDeploy));
return processLbState(request, deploy, pendingDeploy, updatePendingDeployRequest, deployActiveTasks, otherActiveTasks, tasksToShutDown(deployProgress, otherActiveTasks, request), lbUpdate);
Expand All @@ -726,6 +730,11 @@ private SingularityDeployResult checkDeployProgress(final SingularityRequest req
return cancelLoadBalancer(pendingDeploy, getDeployFailures(request, deploy, pendingDeploy, DeployState.OVERDUE, deployActiveTasks));
}

if (deployActiveTasks.size() < deployProgress.getTargetActiveInstances()) {
maybeUpdatePendingRequest(pendingDeploy, deploy, request, updatePendingDeployRequest);
return checkOverdue(request, deploy, pendingDeploy, deployActiveTasks, isDeployOverdue);
}

if (isWaitingForCurrentLbRequest(pendingDeploy)) {
return new SingularityDeployResult(DeployState.WAITING, Optional.of("Waiting on load balancer API"), pendingDeploy.getLastLoadBalancerUpdate());
}
Expand Down Expand Up @@ -908,8 +917,12 @@ private SingularityDeployResult checkOverdue(SingularityRequest request, Optiona
String.format("Deploy was able to launch %s tasks, but not all of them became healthy within %s", deployActiveTasks.size(), JavaUtils.durationFromMillis(getAllowedMillis(deploy.get())));
}

if (deploy.isPresent() && isOverdue) {
return getDeployResultWithFailures(request, deploy, pendingDeploy, DeployState.OVERDUE, message, deployActiveTasks);
if (isOverdue) {
if (deploy.isPresent()) {
return getDeployResultWithFailures(request, deploy, pendingDeploy, DeployState.OVERDUE, message, deployActiveTasks);
} else {
return new SingularityDeployResult(DeployState.OVERDUE);
}
} else {
return new SingularityDeployResult(DeployState.WAITING);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.junit.jupiter.api.TestMethodOrder;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.hubspot.deploy.HealthcheckOptions;
import com.hubspot.deploy.HealthcheckOptionsBuilder;
import com.hubspot.mesos.Resources;
Expand All @@ -31,6 +32,9 @@
@TestMethodOrder(OrderAnnotation.class)
public class SingularityHealthchecksTest extends SingularitySchedulerTestBase {

@Inject
SingularityDeployHealthHelper deployHealthHelper;

public SingularityHealthchecksTest() {
super(false);
}
Expand Down Expand Up @@ -481,6 +485,24 @@ public void testPortNumber() {
}
}

@Test
public void testRespectsSkipHealthchecksAfterExpiration() {
initRequest();
initHCDeploy();
scheduler.drainPendingQueue();
startTask(firstDeploy);
Assertions.assertEquals(1, taskManager.getActiveTaskIds().size());
Assertions.assertEquals(0, deployHealthHelper.getHealthyTasks(request, Optional.of(firstDeploy), taskManager.getActiveTaskIds(), false).size());
SingularityTaskId taskId = taskManager.getActiveTaskIds().get(0);
Assertions.assertFalse(taskManager.getLastHealthcheck(taskId).isPresent());

requestResource.skipHealthchecks(requestId, new SingularitySkipHealthchecksRequest(Optional.of(true), Optional.of(1L), Optional.empty(), Optional.empty()), singularityUser);
Assertions.assertTrue(taskManager.getLastHealthcheck(taskId).isPresent());
Assertions.assertEquals(1, deployHealthHelper.getHealthyTasks(request, Optional.of(firstDeploy), taskManager.getActiveTaskIds(), false).size());
expiringUserActionPoller.runActionOnPoll();
Assertions.assertEquals(1, deployHealthHelper.getHealthyTasks(request, Optional.of(firstDeploy), taskManager.getActiveTaskIds(), false).size());
}

private void setConfigurationForNoDelay() {
configuration.setNewTaskCheckerBaseDelaySeconds(0);
configuration.setHealthcheckIntervalSeconds(0);
Expand Down

0 comments on commit 91462c9

Please sign in to comment.