From 08dfeeb83bf00c88a384409f4e8f3357f07ba1b2 Mon Sep 17 00:00:00 2001 From: Peter Teixeira Date: Thu, 8 Jun 2017 15:02:06 -0400 Subject: [PATCH 1/3] Retry failed client requests In situations when the client fails with certain error codes, it should be able to retry the request. Ideally, it would retry the request to a different host so that in the event that an instance malfunctioned or was removed, the request wasn't consistently failing. --- SingularityClient/pom.xml | 17 + .../singularity/client/SingularityClient.java | 410 ++++++++---------- .../client/SingularityClientTest.java | 72 +++ pom.xml | 6 + 4 files changed, 278 insertions(+), 227 deletions(-) create mode 100644 SingularityClient/src/test/java/com/hubspot/singularity/client/SingularityClientTest.java diff --git a/SingularityClient/pom.xml b/SingularityClient/pom.xml index 1bad3e101e..fef6f7b2e4 100644 --- a/SingularityClient/pom.xml +++ b/SingularityClient/pom.xml @@ -21,6 +21,11 @@ guava + + com.github.rholder + guava-retrying + + org.slf4j slf4j-api @@ -77,6 +82,18 @@ test + + org.assertj + assertj-core + test + + + + org.mockito + mockito-core + test + + org.slf4j slf4j-simple diff --git a/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClient.java b/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClient.java index 8db975806e..d35bb6d3b8 100644 --- a/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClient.java +++ b/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClient.java @@ -4,11 +4,14 @@ import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; import javax.inject.Provider; @@ -16,6 +19,11 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.type.TypeReference; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -209,11 +217,16 @@ public class SingularityClient { private final HttpClient httpClient; private final Optional credentials; + private final Retryer httpResponseRetryer = RetryerBuilder.newBuilder() + .withStopStrategy(StopStrategies.stopAfterAttempt(3)) + .withWaitStrategy(WaitStrategies.exponentialWait()) + .retryIfResult(response -> response == null || response.getStatusCode() == 503) + .build(); @Inject @Deprecated public SingularityClient(@Named(SingularityClientModule.CONTEXT_PATH) String contextPath, @Named(SingularityClientModule.HTTP_CLIENT_NAME) HttpClient httpClient, @Named(SingularityClientModule.HOSTS_PROPERTY_NAME) String hosts) { - this(contextPath, httpClient, Arrays.asList(hosts.split(",")), Optional.absent()); + this(contextPath, httpClient, Arrays.asList(hosts.split(",")), Optional.absent()); } public SingularityClient(String contextPath, HttpClient httpClient, List hosts, Optional credentials) { @@ -239,8 +252,8 @@ public SingularityClient(String contextPath, HttpClient httpClient, Provider Optional getSingle(String uri, String type, String id, Class clazz) { - return getSingleWithParams(uri, type, id, Optional.absent(), clazz); + private Optional getSingle(Function hostToUrl, String type, String id, Class clazz) { + return getSingleWithParams(hostToUrl, type, id, Optional.absent(), clazz); } - private Optional getSingleWithParams(String uri, String type, String id, Optional> queryParams, Class clazz) { + private Optional getSingleWithParams(Function hostToUrl, String type, String id, Optional> queryParams, Class clazz) { final long start = System.currentTimeMillis(); - HttpResponse response = executeGetSingleWithParams(uri, type, id, queryParams); + HttpResponse response = executeGetSingleWithParams(hostToUrl, type, id, queryParams); if (response.getStatusCode() == 404) { return Optional.absent(); @@ -296,9 +309,9 @@ private Optional getSingleWithParams(String uri, String type, String id, return Optional.fromNullable(response.getAs(clazz)); } - private Optional getSingleWithParams(String uri, String type, String id, Optional> queryParams, TypeReference typeReference) { + private Optional getSingleWithParams(Function hostToUrl, String type, String id, Optional> queryParams, TypeReference typeReference) { final long start = System.currentTimeMillis(); - HttpResponse response = executeGetSingleWithParams(uri, type, id, queryParams); + HttpResponse response = executeGetSingleWithParams(hostToUrl, type, id, queryParams); if (response.getStatusCode() == 404) { return Optional.absent(); @@ -311,42 +324,22 @@ private Optional getSingleWithParams(String uri, String type, String id, } - private HttpResponse executeGetSingleWithParams(String uri, String type, String id, Optional> queryParams) { + private HttpResponse executeGetSingleWithParams(Function hostToUrl, String type, String id, Optional> queryParams) { checkNotNull(id, String.format("Provide a %s id", type)); - LOG.info("Getting {} {} from {}", type, id, uri); + LOG.info("Getting {} {} from Singularity host", type, id); - HttpRequest.Builder requestBuilder = HttpRequest.newBuilder() - .setUrl(uri); - - if (queryParams.isPresent()) { - addQueryParams(requestBuilder, queryParams.get()); - } - - addCredentials(requestBuilder); - - return httpClient.execute(requestBuilder.build()); + return executeRequest(hostToUrl, Method.GET, Optional.absent(), queryParams.or(Collections.emptyMap())); } - private Collection getCollection(String uri, String type, TypeReference> typeReference) { - return getCollectionWithParams(uri, type, Optional.absent(), typeReference); + private Collection getCollection(Function hostToUrl, String type, TypeReference> typeReference) { + return getCollectionWithParams(hostToUrl, type, Optional.absent(), typeReference); } - private Collection getCollectionWithParams(String uri, String type, Optional> queryParams, TypeReference> typeReference) { - LOG.info("Getting all {} from {}", type, uri); - + private Collection getCollectionWithParams(Function hostToUrl, String type, Optional> queryParams, TypeReference> typeReference) { final long start = System.currentTimeMillis(); - HttpRequest.Builder requestBuilder = HttpRequest.newBuilder() - .setUrl(uri); - - if (queryParams.isPresent()) { - addQueryParams(requestBuilder, queryParams.get()); - } - - addCredentials(requestBuilder); - - HttpResponse response = httpClient.execute(requestBuilder.build()); + HttpResponse response = executeRequest(hostToUrl, Method.GET, Optional.absent(), queryParams.or(Collections.emptyMap())); if (response.getStatusCode() == 404) { return ImmutableList.of(); @@ -359,8 +352,8 @@ private Collection getCollectionWithParams(String uri, String type, Optio return response.getAs(typeReference); } - private void addQueryParams(HttpRequest.Builder requestBuilder, Map queryParams) { - for (Entry queryParamEntry : queryParams.entrySet()) { + private void addQueryParams(HttpRequest.Builder requestBuilder, Map queryParams) { + for (Entry queryParamEntry : queryParams.entrySet()) { if (queryParamEntry.getValue() instanceof String) { requestBuilder.setQueryParam(queryParamEntry.getKey()).to((String) queryParamEntry.getValue()); } else if (queryParamEntry.getValue() instanceof Integer) { @@ -382,63 +375,24 @@ private void addCredentials(HttpRequest.Builder requestBuilder) { } } - private void delete(String uri, String type, String id) { - delete(uri, type, id, Optional.absent()); + private void delete(Function hostToUrl, String type, String id) { + delete(hostToUrl, type, id, Optional.absent()); } - private void delete(String uri, String type, String id, Optional body) { - delete(uri, type, id, body, Optional.>absent()); + private void delete(Function hostToUrl, String type, String id, Optional body) { + delete(hostToUrl, type, id, body, Optional.>absent()); } - private Optional delete(String uri, String type, String id, Optional body, Optional> clazz) { - LOG.info("Deleting {} {} from {}", type, id, uri); - - final long start = System.currentTimeMillis(); - - HttpRequest.Builder request = HttpRequest.newBuilder().setUrl(uri).setMethod(Method.DELETE); - - if (body.isPresent()) { - request.setBody(body.get()); - } - - addCredentials(request); - - HttpResponse response = httpClient.execute(request.build()); - - if (response.getStatusCode() == 404) { - LOG.info("{} ({}) was not found", type, id); - return Optional.absent(); - } - - checkResponse(type, response); - - LOG.info("Deleted {} ({}) from Singularity in %sms", type, id, System.currentTimeMillis() - start); - - if (clazz.isPresent()) { - return Optional.of(response.getAs(clazz.get())); - } - - return Optional.absent(); + private Optional delete(Function hostToUrl, String type, String id, Optional body, Optional> clazz) { + return deleteWithParams(hostToUrl, type, id, body, Optional.absent(), clazz); } - private Optional deleteWithParams(String uri, String type, String id, Optional body, Optional> queryParams, Optional> clazz) { - LOG.info("Deleting {} {} from {}", type, id, uri); + private Optional deleteWithParams(Function hostToUrl, String type, String id, Optional body, Optional> queryParams, Optional> clazz) { + LOG.info("Deleting {} {} from Singularity", type, id); final long start = System.currentTimeMillis(); - HttpRequest.Builder request = HttpRequest.newBuilder().setUrl(uri).setMethod(Method.DELETE); - - if (body.isPresent()) { - request.setBody(body.get()); - } - - if (queryParams.isPresent()) { - addQueryParams(request, queryParams.get()); - } - - addCredentials(request); - - HttpResponse response = httpClient.execute(request.build()); + HttpResponse response = executeRequest(hostToUrl, Method.DELETE, body, queryParams.or(Collections.emptyMap())); if (response.getStatusCode() == 404) { LOG.info("{} ({}) was not found", type, id); @@ -456,13 +410,13 @@ private Optional deleteWithParams(String uri, String type, String id, Opt return Optional.absent(); } - private HttpResponse put(String uri, String type, Optional body) { - return executeRequest(uri, type, body, Method.PUT, Optional.absent()); + private HttpResponse put(Function hostToUri, String type, Optional body) { + return executeRequest(hostToUri, type, body, Method.PUT, Optional.absent()); } - private Optional post(String uri, String type, Optional body, Optional> clazz) { + private Optional post(Function hostToUri, String type, Optional body, Optional> clazz) { try { - HttpResponse response = executeRequest(uri, type, body, Method.POST, Optional.absent()); + HttpResponse response = executeRequest(hostToUri, type, body, Method.POST, Optional.absent()); if (clazz.isPresent()) { return Optional.of(response.getAs(clazz.get())); @@ -471,40 +425,47 @@ private Optional post(String uri, String type, Optional body, Optional LOG.warn("Http post failed", e); } - return Optional.absent(); + return Optional.absent(); } - private HttpResponse postWithParams(String uri, String type, Optional body, Optional> queryParams) { - return executeRequest(uri, type, body, Method.POST, queryParams); + private HttpResponse postWithParams(Function hostToUri, String type, Optional body, Optional> queryParams) { + return executeRequest(hostToUri, type, body, Method.POST, queryParams); } - private HttpResponse post(String uri, String type, Optional body) { - return executeRequest(uri, type, body, Method.POST, Optional.absent()); + private HttpResponse post(Function hostToUri, String type, Optional body) { + return executeRequest(hostToUri, type, body, Method.POST, Optional.absent()); } - private HttpResponse executeRequest(String uri, String type, Optional body, Method method, Optional> queryParams) { - + private HttpResponse executeRequest(Function hostToUri, String type, Optional body, Method method, Optional> queryParams) { final long start = System.currentTimeMillis(); - HttpRequest.Builder request = HttpRequest.newBuilder().setUrl(uri).setMethod(method); + HttpResponse response = executeRequest(hostToUri, method, body, queryParams.or(Collections.emptyMap())); + checkResponse(type, response); + LOG.info("Successfully {}ed {} in {}ms", method, type, System.currentTimeMillis() - start); + + return response; + } + + private HttpResponse executeRequest(Function hostToUri, Method method, Optional body, Map queryParams) { + HttpRequest.Builder request = HttpRequest.newBuilder().setMethod(method); if (body.isPresent()) { request.setBody(body.get()); } - if (queryParams.isPresent()) { - addQueryParams(request, queryParams.get()); - } - + addQueryParams(request, queryParams); addCredentials(request); - HttpResponse response = httpClient.execute(request.build()); - - checkResponse(type, response); - - LOG.info("Successfully {}ed {} in {}ms", method, type, System.currentTimeMillis() - start); - - return response; + try { + return httpResponseRetryer.call(() -> { + String url = hostToUri.apply(getHost()); + LOG.info("Making {} request to {}", method, url); + request.setUrl(hostToUri.apply(url)); + return httpClient.execute(request.build()); + }); + } catch (ExecutionException | RetryException exn) { + throw new SingularityClientException("Failed request to Singularity", exn); + } } // @@ -512,24 +473,21 @@ private HttpResponse executeRequest(String uri, String type, Optional body, M // public SingularityState getState(Optional skipCache, Optional includeRequestIds) { - final String uri = String.format(STATE_FORMAT, getApiBase()); + final Function uri = (host) -> String.format(STATE_FORMAT, getApiBase(host)); LOG.info("Fetching state from {}", uri); final long start = System.currentTimeMillis(); - HttpRequest.Builder request = HttpRequest.newBuilder().setUrl(uri); - + Map queryParams = new HashMap<>(); if (skipCache.isPresent()) { - request.setQueryParam("skipCache").to(skipCache.get()); + queryParams.put("skipCache", skipCache.get()); } if (includeRequestIds.isPresent()) { - request.setQueryParam("includeRequestIds").to(includeRequestIds.get()); + queryParams.put("includeRequestIds", includeRequestIds.get()); } - addCredentials(request); - - HttpResponse response = httpClient.execute(request.build()); + HttpResponse response = executeRequest(uri, Method.GET, Optional.absent(), queryParams); checkResponse("state", response); @@ -539,17 +497,13 @@ public SingularityState getState(Optional skipCache, Optional } public Optional getTaskReconciliationStatistics() { - final String uri = String.format(TASK_RECONCILIATION_FORMAT, getApiBase()); + final Function uri = (host) -> String.format(TASK_RECONCILIATION_FORMAT, getApiBase(host)); LOG.info("Fetch task reconciliation statistics from {}", uri); final long start = System.currentTimeMillis(); - HttpRequest.Builder request = HttpRequest.newBuilder().setUrl(uri); - - addCredentials(request); - - HttpResponse response = httpClient.execute(request.build()); + HttpResponse response = executeRequest(uri, Method.GET, Optional.absent(), Collections.emptyMap()); if (response.getStatusCode() == 404) { return Optional.absent(); @@ -567,13 +521,13 @@ public Optional getTaskReconciliationSt // public Optional getSingularityRequest(String requestId) { - final String singularityApiRequestUri = String.format(REQUEST_GET_FORMAT, getApiBase(), requestId); + final Function singularityApiRequestUri = (host) -> String.format(REQUEST_GET_FORMAT, getApiBase(host), requestId); return getSingle(singularityApiRequestUri, "request", requestId, SingularityRequestParent.class); } public Optional getTaskByRunIdForRequest(String requestId, String runId) { - final String singularityApiRequestUri = String.format(REQUEST_BY_RUN_ID_FORMAT, getApiBase(), requestId, runId); + final Function singularityApiRequestUri = (host) -> String.format(REQUEST_BY_RUN_ID_FORMAT, getApiBase(host), requestId, runId); return getSingle(singularityApiRequestUri, "requestByRunId", runId, SingularityTaskId.class); } @@ -581,7 +535,7 @@ public Optional getTaskByRunIdForRequest(String requestId, St public void createOrUpdateSingularityRequest(SingularityRequest request) { checkNotNull(request.getId(), "A posted Singularity Request must have an id"); - final String requestUri = String.format(REQUEST_CREATE_OR_UPDATE_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUEST_CREATE_OR_UPDATE_FORMAT, getApiBase(host)); post(requestUri, String.format("request %s", request.getId()), Optional.of(request)); } @@ -598,30 +552,30 @@ public void createOrUpdateSingularityRequest(SingularityRequest request) { * the singularity request that has been moved to deleting */ public Optional deleteSingularityRequest(String requestId, Optional deleteRequest) { - final String requestUri = String.format(REQUEST_DELETE_ACTIVE_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_DELETE_ACTIVE_FORMAT, getApiBase(host), requestId); return delete(requestUri, "active request", requestId, deleteRequest, Optional.of(SingularityRequest.class)); } public void pauseSingularityRequest(String requestId, Optional pauseRequest) { - final String requestUri = String.format(REQUEST_PAUSE_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_PAUSE_FORMAT, getApiBase(host), requestId); post(requestUri, String.format("pause of request %s", requestId), pauseRequest); } public void unpauseSingularityRequest(String requestId, Optional unpauseRequest) { - final String requestUri = String.format(REQUEST_UNPAUSE_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_UNPAUSE_FORMAT, getApiBase(host), requestId); post(requestUri, String.format("unpause of request %s", requestId), unpauseRequest); } public void scaleSingularityRequest(String requestId, SingularityScaleRequest scaleRequest) { - final String requestUri = String.format(REQUEST_SCALE_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_SCALE_FORMAT, getApiBase(host), requestId); put(requestUri, String.format("Scale of Request %s", requestId), Optional.of(scaleRequest)); } public SingularityPendingRequestParent runSingularityRequest(String requestId, Optional runNowRequest) { - final String requestUri = String.format(REQUEST_RUN_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_RUN_FORMAT, getApiBase(host), requestId); final HttpResponse response = post(requestUri, String.format("run of request %s", requestId), runNowRequest); @@ -629,13 +583,13 @@ public SingularityPendingRequestParent runSingularityRequest(String requestId, O } public void bounceSingularityRequest(String requestId, Optional bounceOptions) { - final String requestUri = String.format(REQUEST_BOUNCE_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_BOUNCE_FORMAT, getApiBase(host), requestId); post(requestUri, String.format("bounce of request %s", requestId), bounceOptions); } public void exitCooldown(String requestId, Optional exitCooldownRequest) { - final String requestUri = String.format(REQUEST_EXIT_COOLDOWN_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_EXIT_COOLDOWN_FORMAT, getApiBase(host), requestId); post(requestUri, String.format("exit cooldown of request %s", requestId), exitCooldownRequest); } @@ -645,11 +599,11 @@ public void exitCooldown(String requestId, Optional deployUnpause, Optional message) { - return createDeployForSingularityRequest(requestId, pendingDeploy, deployUnpause, message, Optional.absent()); + return createDeployForSingularityRequest(requestId, pendingDeploy, deployUnpause, message, Optional.absent()); } public SingularityRequestParent createDeployForSingularityRequest(String requestId, SingularityDeploy pendingDeploy, Optional deployUnpause, Optional message, Optional updatedRequest) { - final String requestUri = String.format(DEPLOYS_FORMAT, getApiBase()); + final Function requestUri = (String host) -> String.format(DEPLOYS_FORMAT, getApiBase(host)); HttpResponse response = post(requestUri, String.format("new deploy %s", new SingularityDeployKey(requestId, pendingDeploy.getId())), Optional.of(new SingularityDeployRequest(pendingDeploy, deployUnpause, message, updatedRequest))); @@ -666,7 +620,7 @@ private SingularityRequestParent getAndLogRequestAndDeployStatus(SingularityRequ } public SingularityRequestParent cancelPendingDeployForSingularityRequest(String requestId, String deployId) { - final String requestUri = String.format(DELETE_DEPLOY_FORMAT, getApiBase(), deployId, requestId); + final Function requestUri = (host) -> String.format(DELETE_DEPLOY_FORMAT, getApiBase(host), deployId, requestId); SingularityRequestParent singularityRequestParent = delete(requestUri, "pending deploy", new SingularityDeployKey(requestId, deployId).getId(), Optional.absent(), Optional.of(SingularityRequestParent.class)).get(); @@ -675,7 +629,7 @@ public SingularityRequestParent cancelPendingDeployForSingularityRequest(String } public SingularityRequestParent updateIncrementalDeployInstanceCount(SingularityUpdatePendingDeployRequest updateRequest) { - final String requestUri = String.format(UPDATE_DEPLOY_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(UPDATE_DEPLOY_FORMAT, getApiBase(host)); HttpResponse response = post(requestUri, String.format("update deploy %s", new SingularityDeployKey(updateRequest.getRequestId(), updateRequest.getDeployId())), Optional.of(updateRequest)); @@ -705,7 +659,7 @@ public SingularityRequestParent updateIncrementalDeployInstanceCount(Singularity * */ public Collection getSingularityRequests() { - final String requestUri = String.format(REQUESTS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUESTS_FORMAT, getApiBase(host)); return getCollection(requestUri, "[ACTIVE, PAUSED, COOLDOWN] requests", REQUESTS_COLLECTION); } @@ -717,7 +671,7 @@ public Collection getSingularityRequests() { * All ACTIVE {@link SingularityRequestParent} instances */ public Collection getActiveSingularityRequests() { - final String requestUri = String.format(REQUESTS_GET_ACTIVE_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUESTS_GET_ACTIVE_FORMAT, getApiBase(host)); return getCollection(requestUri, "ACTIVE requests", REQUESTS_COLLECTION); } @@ -730,7 +684,7 @@ public Collection getActiveSingularityRequests() { * All PAUSED {@link SingularityRequestParent} instances */ public Collection getPausedSingularityRequests() { - final String requestUri = String.format(REQUESTS_GET_PAUSED_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUESTS_GET_PAUSED_FORMAT, getApiBase(host)); return getCollection(requestUri, "PAUSED requests", REQUESTS_COLLECTION); } @@ -742,7 +696,7 @@ public Collection getPausedSingularityRequests() { * All {@link SingularityRequestParent} instances that their state is COOLDOWN */ public Collection getCoolDownSingularityRequests() { - final String requestUri = String.format(REQUESTS_GET_COOLDOWN_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUESTS_GET_COOLDOWN_FORMAT, getApiBase(host)); return getCollection(requestUri, "COOLDOWN requests", REQUESTS_COLLECTION); } @@ -754,7 +708,7 @@ public Collection getCoolDownSingularityRequests() { * A collection of {@link SingularityPendingRequest} instances that hold information about the singularity requests that are pending to become ACTIVE */ public Collection getPendingSingularityRequests() { - final String requestUri = String.format(REQUESTS_GET_PENDING_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUESTS_GET_PENDING_FORMAT, getApiBase(host)); return getCollection(requestUri, "pending requests", PENDING_REQUESTS_COLLECTION); } @@ -769,7 +723,7 @@ public Collection getPendingSingularityRequests() { * that are marked for deletion and are currently cleaning up. */ public Collection getCleanupSingularityRequests() { - final String requestUri = String.format(REQUESTS_GET_CLEANUP_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUESTS_GET_CLEANUP_FORMAT, getApiBase(host)); return getCollection(requestUri, "cleaning requests", CLEANUP_REQUESTS_COLLECTION); } @@ -783,19 +737,19 @@ public Collection getCleanupSingularityRequests() { // public Collection getActiveTasks() { - final String requestUri = String.format(TASKS_GET_ACTIVE_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(TASKS_GET_ACTIVE_FORMAT, getApiBase(host)); return getCollection(requestUri, "active tasks", TASKS_COLLECTION); } public Collection getActiveTasksOnSlave(final String slaveId) { - final String requestUri = String.format(TASKS_GET_ACTIVE_ON_SLAVE_FORMAT, getApiBase(), slaveId); + final Function requestUri = (host) -> String.format(TASKS_GET_ACTIVE_ON_SLAVE_FORMAT, getApiBase(host), slaveId); return getCollection(requestUri, String.format("active tasks on slave %s", slaveId), TASKS_COLLECTION); } public Optional killTask(String taskId, Optional killTaskRequest) { - final String requestUri = String.format(TASKS_KILL_TASK_FORMAT, getApiBase(), taskId); + final Function requestUri = (host) -> String.format(TASKS_KILL_TASK_FORMAT, getApiBase(host), taskId); return delete(requestUri, "task", taskId, killTaskRequest, Optional.of(SingularityTaskCleanupResult.class)); } @@ -805,13 +759,13 @@ public Optional killTask(String taskId, Optional getScheduledTasks() { - final String requestUri = String.format(TASKS_GET_SCHEDULED_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(TASKS_GET_SCHEDULED_FORMAT, getApiBase(host)); return getCollection(requestUri, "scheduled tasks", TASKS_REQUEST_COLLECTION); } public Collection getScheduledTaskIds() { - final String requestUri = String.format(TASKS_GET_SCHEDULED_IDS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(TASKS_GET_SCHEDULED_IDS_FORMAT, getApiBase(host)); return getCollection(requestUri, "scheduled task ids", PENDING_TASK_ID_COLLECTION); } @@ -820,13 +774,13 @@ public Collection getScheduledTaskIds() { // RACKS // private Collection getRacks(Optional rackState) { - final String requestUri = String.format(RACKS_FORMAT, getApiBase()); - Optional> maybeQueryParams = Optional.>absent(); + final Function requestUri = (host) -> String.format(RACKS_FORMAT, getApiBase(host)); + Optional> maybeQueryParams = Optional.absent(); String type = "racks"; if (rackState.isPresent()) { - maybeQueryParams = Optional.>of(ImmutableMap.of("state", rackState.get().toString())); + maybeQueryParams = Optional.of(ImmutableMap.of("state", rackState.get().toString())); type = String.format("%s racks", rackState.get().toString()); } @@ -836,29 +790,29 @@ private Collection getRacks(Optional rackState) { @Deprecated public void decomissionRack(String rackId) { - decommissionRack(rackId, Optional.absent()); + decommissionRack(rackId, Optional.absent()); } public void decommissionRack(String rackId, Optional machineChangeRequest) { - final String requestUri = String.format(RACKS_DECOMISSION_FORMAT, getApiBase(), rackId); + final Function requestUri = (host) -> String.format(RACKS_DECOMISSION_FORMAT, getApiBase(host), rackId); - post(requestUri, String.format("decomission rack %s", rackId), machineChangeRequest.or(Optional.of(new SingularityMachineChangeRequest(Optional.absent())))); + post(requestUri, String.format("decomission rack %s", rackId), machineChangeRequest.or(Optional.of(SingularityMachineChangeRequest.empty()))); } public void freezeRack(String rackId, Optional machineChangeRequest) { - final String requestUri = String.format(RACKS_FREEZE_FORMAT, getApiBase(), rackId); + final Function requestUri = (host) -> String.format(RACKS_FREEZE_FORMAT, getApiBase(host), rackId); - post(requestUri, String.format("freeze rack %s", rackId), machineChangeRequest.or(Optional.of(new SingularityMachineChangeRequest(Optional.absent())))); + post(requestUri, String.format("freeze rack %s", rackId), machineChangeRequest.or(Optional.of(SingularityMachineChangeRequest.empty()))); } public void activateRack(String rackId, Optional machineChangeRequest) { - final String requestUri = String.format(RACKS_ACTIVATE_FORMAT, getApiBase(), rackId); + final Function requestUri = (host) -> String.format(RACKS_ACTIVATE_FORMAT, getApiBase(host), rackId); - post(requestUri, String.format("decommission rack %s", rackId), machineChangeRequest.or(Optional.of(new SingularityMachineChangeRequest(Optional.absent())))); + post(requestUri, String.format("decommission rack %s", rackId), machineChangeRequest.or(Optional.of(SingularityMachineChangeRequest.empty()))); } public void deleteRack(String rackId) { - final String requestUri = String.format(RACKS_DELETE_FORMAT, getApiBase(), rackId); + final Function requestUri = (host) -> String.format(RACKS_DELETE_FORMAT, getApiBase(host), rackId); delete(requestUri, "dead rack", rackId); } @@ -876,14 +830,14 @@ public void deleteRack(String rackId) { * A collection of {@link SingularitySlave} */ public Collection getSlaves(Optional slaveState) { - final String requestUri = String.format(SLAVES_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(SLAVES_FORMAT, getApiBase(host)); - Optional> maybeQueryParams = Optional.>absent(); + Optional> maybeQueryParams = Optional.absent(); String type = "slaves"; if (slaveState.isPresent()) { - maybeQueryParams = Optional.>of(ImmutableMap.of("state", slaveState.get().toString())); + maybeQueryParams = Optional.of(ImmutableMap.of("state", slaveState.get().toString())); type = String.format("%s slaves", slaveState.get().toString()); } @@ -900,31 +854,36 @@ public Collection getSlaves(Optional slaveState) * A {@link SingularitySlave} */ public Optional getSlave(String slaveId) { - final String requestUri = String.format(SLAVE_DETAIL_FORMAT, getApiBase(), slaveId); + final Function requestUri = (host) -> String.format(SLAVE_DETAIL_FORMAT, getApiBase(host), slaveId); return getSingle(requestUri, "slave", slaveId, SingularitySlave.class); } + @Deprecated + public void decomissionSlave(String slaveId) { + decommissionSlave(slaveId, Optional.absent()); + } + public void decommissionSlave(String slaveId, Optional machineChangeRequest) { - final String requestUri = String.format(SLAVES_DECOMISSION_FORMAT, getApiBase(), slaveId); + final Function requestUri = (host) -> String.format(SLAVES_DECOMISSION_FORMAT, getApiBase(host), slaveId); post(requestUri, String.format("decommission slave %s", slaveId), machineChangeRequest.or(Optional.of(SingularityMachineChangeRequest.empty()))); } public void freezeSlave(String slaveId, Optional machineChangeRequest) { - final String requestUri = String.format(SLAVES_FREEZE_FORMAT, getApiBase(), slaveId); + final Function requestUri = (host) -> String.format(SLAVES_FREEZE_FORMAT, getApiBase(host), slaveId); post(requestUri, String.format("freeze slave %s", slaveId), machineChangeRequest.or(Optional.of(SingularityMachineChangeRequest.empty()))); } public void activateSlave(String slaveId, Optional machineChangeRequest) { - final String requestUri = String.format(SLAVES_ACTIVATE_FORMAT, getApiBase(), slaveId); + final Function requestUri = (host) -> String.format(SLAVES_ACTIVATE_FORMAT, getApiBase(host), slaveId); post(requestUri, String.format("activate slave %s", slaveId), machineChangeRequest.or(Optional.of(SingularityMachineChangeRequest.empty()))); } public void deleteSlave(String slaveId) { - final String requestUri = String.format(SLAVES_DELETE_FORMAT, getApiBase(), slaveId); + final Function requestUri = (host) -> String.format(SLAVES_DELETE_FORMAT, getApiBase(host), slaveId); delete(requestUri, "deleting slave", slaveId); } @@ -946,11 +905,11 @@ public void deleteSlave(String slaveId) { * A list of {@link SingularityRequestHistory} */ public Collection getHistoryForRequest(String requestId, Optional count, Optional page) { - final String requestUri = String.format(REQUEST_HISTORY_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_HISTORY_FORMAT, getApiBase(host), requestId); - Optional> maybeQueryParams = Optional.>absent(); + Optional> maybeQueryParams = Optional.absent(); - ImmutableMap.Builder queryParamsBuilder = ImmutableMap.builder(); + ImmutableMap.Builder queryParamsBuilder = ImmutableMap.builder(); if (count.isPresent() ) { queryParamsBuilder.put("count", count.get()); @@ -973,21 +932,19 @@ public Collection getHistoryForRequest(String request // public Collection getInactiveSlaves() { - final String requestUri = String.format(INACTIVE_SLAVES_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(INACTIVE_SLAVES_FORMAT, getApiBase(host)); return getCollection(requestUri, "inactiveSlaves", STRING_COLLECTION); } public void markSlaveAsInactive(String host) { - final String requestUri = String.format(INACTIVE_SLAVES_FORMAT, getApiBase()); - Map params = new HashMap<>(); - params.put("host", host); + final Function requestUri = (singularityHost) -> String.format(INACTIVE_SLAVES_FORMAT, getApiBase(singularityHost)); + Map params = Collections.singletonMap("host", host); postWithParams(requestUri, "deactivateSlave", Optional.absent(), Optional.of(params)); } public void clearInactiveSlave(String host) { - final String requestUri = String.format(INACTIVE_SLAVES_FORMAT, getApiBase()); - Map params = new HashMap<>(); - params.put("host", host); + final Function requestUri = (singularityHost) -> String.format(INACTIVE_SLAVES_FORMAT, getApiBase(host)); + Map params = Collections.singletonMap("host", host); deleteWithParams(requestUri, "clearInactiveSlave", host, Optional.absent(), Optional.of(params), Optional.absent()); } @@ -1004,13 +961,13 @@ public void clearInactiveSlave(String host) { * A {@link SingularityTaskIdHistory} object if the task exists */ public Optional getHistoryForTask(String taskId) { - final String requestUri = String.format(TASK_HISTORY_FORMAT, getApiBase(), taskId); + final Function requestUri = (host) -> String.format(TASK_HISTORY_FORMAT, getApiBase(host), taskId); return getSingle(requestUri, "task history", taskId, SingularityTaskHistory.class); } public Collection getActiveTaskHistoryForRequest(String requestId) { - final String requestUri = String.format(REQUEST_ACTIVE_TASKS_HISTORY_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_ACTIVE_TASKS_HISTORY_FORMAT, getApiBase(host), requestId); final String type = String.format("active task history for %s", requestId); @@ -1022,23 +979,23 @@ public Collection getInactiveTaskHistoryForRequest(Str } public Collection getInactiveTaskHistoryForRequest(String requestId, int count, int page) { - return getInactiveTaskHistoryForRequest(requestId, count, page, Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent()); + return getInactiveTaskHistoryForRequest(requestId, count, page, Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent()); } public Collection getInactiveTaskHistoryForRequest(String requestId, int count, int page, Optional host, Optional runId, Optional lastTaskStatus, Optional startedBefore, Optional startedAfter, Optional updatedBefore, Optional updatedAfter, Optional orderDirection) { - final String requestUri = String.format(REQUEST_INACTIVE_TASKS_HISTORY_FORMAT, getApiBase(), requestId); + final Function requestUri = (singularityHost) -> String.format(REQUEST_INACTIVE_TASKS_HISTORY_FORMAT, getApiBase(singularityHost), requestId); final String type = String.format("inactive (failed, killed, lost) task history for request %s", requestId); - Map params = taskSearchParams(Optional.of(requestId), Optional.absent(), runId, host, lastTaskStatus, startedBefore, startedAfter, updatedBefore, updatedAfter, orderDirection, count, page); + Map params = taskSearchParams(Optional.of(requestId), Optional.absent(), runId, host, lastTaskStatus, startedBefore, startedAfter, updatedBefore, updatedAfter, orderDirection, count, page); return getCollectionWithParams(requestUri, type, Optional.of(params), TASKID_HISTORY_COLLECTION); } public Optional getHistoryForRequestDeploy(String requestId, String deployId) { - final String requestUri = String.format(REQUEST_DEPLOY_HISTORY_FORMAT, getApiBase(), requestId, deployId); + final Function requestUri = (host) -> String.format(REQUEST_DEPLOY_HISTORY_FORMAT, getApiBase(host), requestId, deployId); return getSingle(requestUri, "deploy history", new SingularityDeployKey(requestId, deployId).getId(), SingularityDeployHistory.class); } @@ -1056,19 +1013,19 @@ public Optional getHistoryForRequestDeploy(String requ */ @Deprecated public Optional getHistoryForTask(String requestId, String runId) { - final String requestUri = String.format(TASK_HISTORY_BY_RUN_ID_FORMAT, getApiBase(), requestId, runId); - - return getSingle(requestUri, "task history", requestId, SingularityTaskIdHistory.class); + return getTaskIdHistoryByRunId(requestId, runId); } public Optional getTaskIdHistoryByRunId(String requestId, String runId) { - return getHistoryForTask(requestId, runId); + final Function requestUri = (host) -> String.format(TASK_HISTORY_BY_RUN_ID_FORMAT, getApiBase(host), requestId, runId); + + return getSingle(requestUri, "task history", requestId, SingularityTaskIdHistory.class); } public Collection getTaskHistory(Optional requestId, Optional deployId, Optional runId, Optional host, Optional lastTaskStatus, Optional startedBefore, Optional startedAfter, Optional updatedBefore, Optional updatedAfter, Optional orderDirection, Integer count, Integer page) { - final String requestUri = String.format(TASKS_HISTORY_FORMAT, getApiBase()); + final Function requestUri = (singularityHost) -> String.format(TASKS_HISTORY_FORMAT, getApiBase(singularityHost)); Map params = taskSearchParams(requestId, deployId, runId, host, lastTaskStatus, startedBefore, startedAfter, updatedBefore, updatedAfter, orderDirection, count, page); @@ -1078,7 +1035,7 @@ public Collection getTaskHistory(Optional requ public Optional> getTaskHistoryWithMetadata(Optional requestId, Optional deployId, Optional runId, Optional host, Optional lastTaskStatus, Optional startedBefore, Optional startedAfter, Optional updatedBefore, Optional updatedAfter, Optional orderDirection, Integer count, Integer page) { - final String requestUri = String.format(TASKS_HISTORY_WITHMETADATA_FORMAT, getApiBase()); + final Function requestUri = (singularityHost) -> String.format(TASKS_HISTORY_WITHMETADATA_FORMAT, getApiBase(singularityHost)); Map params = taskSearchParams(requestId, deployId, runId, host, lastTaskStatus, startedBefore, startedAfter, updatedBefore, updatedAfter, orderDirection, count, page); @@ -1129,47 +1086,47 @@ private Map taskSearchParams(Optional requestId, Optiona // public Optional addWebhook(SingularityWebhook webhook) { - final String requestUri = String.format(WEBHOOKS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(WEBHOOKS_FORMAT, getApiBase(host)); return post(requestUri, String.format("webhook %s", webhook.getUri()), Optional.of(webhook), Optional.of(SingularityCreateResult.class)); } public Optional deleteWebhook(String webhookId) { - final String requestUri = String.format(WEBHOOKS_DELETE_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(WEBHOOKS_DELETE_FORMAT, getApiBase(host)); Builder queryParamBuider = ImmutableMap.builder().put("webhookId", webhookId); - return deleteWithParams(requestUri, String.format("webhook with id %s", webhookId), webhookId, Optional.absent(), Optional.>of(queryParamBuider.build()), Optional.of(SingularityDeleteResult.class)); + return deleteWithParams(requestUri, String.format("webhook with id %s", webhookId), webhookId, Optional.absent(), Optional.of(queryParamBuider.build()), Optional.of(SingularityDeleteResult.class)); } public Collection getActiveWebhook() { - final String requestUri = String.format(WEBHOOKS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(WEBHOOKS_FORMAT, getApiBase(host)); return getCollection(requestUri, "active webhooks", WEBHOOKS_COLLECTION); } public Collection getQueuedDeployUpdates(String webhookId) { - final String requestUri = String.format(WEBHOOKS_GET_QUEUED_DEPLOY_UPDATES_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(WEBHOOKS_GET_QUEUED_DEPLOY_UPDATES_FORMAT, getApiBase(host)); Builder queryParamBuider = ImmutableMap.builder().put("webhookId", webhookId); - return getCollectionWithParams(requestUri, "deploy updates", Optional.>of(queryParamBuider.build()), DEPLOY_UPDATES_COLLECTION); + return getCollectionWithParams(requestUri, "deploy updates", Optional.of(queryParamBuider.build()), DEPLOY_UPDATES_COLLECTION); } public Collection getQueuedRequestUpdates(String webhookId) { - final String requestUri = String.format(WEBHOOKS_GET_QUEUED_REQUEST_UPDATES_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(WEBHOOKS_GET_QUEUED_REQUEST_UPDATES_FORMAT, getApiBase(host)); Builder queryParamBuider = ImmutableMap.builder().put("webhookId", webhookId); - return getCollectionWithParams(requestUri, "request updates", Optional.>of(queryParamBuider.build()), REQUEST_UPDATES_COLLECTION); + return getCollectionWithParams(requestUri, "request updates", Optional.of(queryParamBuider.build()), REQUEST_UPDATES_COLLECTION); } public Collection getQueuedTaskUpdates(String webhookId) { - final String requestUri = String.format(WEBHOOKS_GET_QUEUED_TASK_UPDATES_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(WEBHOOKS_GET_QUEUED_TASK_UPDATES_FORMAT, getApiBase(host)); Builder queryParamBuider = ImmutableMap.builder().put("webhookId", webhookId); - return getCollectionWithParams(requestUri, "request updates", Optional.>of(queryParamBuider.build()), TASK_UPDATES_COLLECTION); + return getCollectionWithParams(requestUri, "request updates", Optional.of(queryParamBuider.build()), TASK_UPDATES_COLLECTION); } // @@ -1188,9 +1145,9 @@ public Collection getQueuedTaskUpdates(String webh * A {@link SingularitySandbox} object that captures the information for the path to a specific task's Mesos sandbox */ public Optional browseTaskSandBox(String taskId, String path) { - final String requestUrl = String.format(SANDBOX_BROWSE_FORMAT, getApiBase(), taskId); + final Function requestUrl = (host) -> String.format(SANDBOX_BROWSE_FORMAT, getApiBase(host), taskId); - return getSingleWithParams(requestUrl, "browse sandbox for task", taskId, Optional.>of(ImmutableMap.of("path", path)), SingularitySandbox.class); + return getSingleWithParams(requestUrl, "browse sandbox for task", taskId, Optional.of(ImmutableMap.of("path", path)), SingularitySandbox.class); } @@ -1211,7 +1168,7 @@ public Optional browseTaskSandBox(String taskId, String path * A {@link MesosFileChunkObject} that contains the requested partial file contents */ public Optional readSandBoxFile(String taskId, String path, Optional grep, Optional offset, Optional length) { - final String requestUrl = String.format(SANDBOX_READ_FILE_FORMAT, getApiBase(), taskId); + final Function requestUrl = (host) -> String.format(SANDBOX_READ_FILE_FORMAT, getApiBase(host), taskId); Builder queryParamBuider = ImmutableMap.builder().put("path", path); @@ -1225,7 +1182,7 @@ public Optional readSandBoxFile(String taskId, String path queryParamBuider.put("length", length.get()); } - return getSingleWithParams(requestUrl, "Read sandbox file for task", taskId, Optional.>of(queryParamBuider.build()), MesosFileChunkObject.class); + return getSingleWithParams(requestUrl, "Read sandbox file for task", taskId, Optional.of(queryParamBuider.build()), MesosFileChunkObject.class); } // @@ -1242,7 +1199,7 @@ public Optional readSandBoxFile(String taskId, String path * A collection of {@link SingularityS3Log} */ public Collection getTaskLogs(String taskId) { - final String requestUri = String.format(S3_LOG_GET_TASK_LOGS, getApiBase(), taskId); + final Function requestUri = (host) -> String.format(S3_LOG_GET_TASK_LOGS, getApiBase(host), taskId); final String type = String.format("S3 logs for task %s", taskId); @@ -1259,7 +1216,7 @@ public Collection getTaskLogs(String taskId) { * A collection of {@link SingularityS3Log} */ public Collection getRequestLogs(String requestId) { - final String requestUri = String.format(S3_LOG_GET_REQUEST_LOGS, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(S3_LOG_GET_REQUEST_LOGS, getApiBase(host), requestId); final String type = String.format("S3 logs for request %s", requestId); @@ -1278,7 +1235,7 @@ public Collection getRequestLogs(String requestId) { * A collection of {@link SingularityS3Log} */ public Collection getDeployLogs(String requestId, String deployId) { - final String requestUri = String.format(S3_LOG_GET_DEPLOY_LOGS, getApiBase(), requestId, deployId); + final Function requestUri = (host) -> String.format(S3_LOG_GET_DEPLOY_LOGS, getApiBase(host), requestId, deployId); final String type = String.format("S3 logs for deploy %s of request %s", deployId, requestId); @@ -1292,7 +1249,7 @@ public Collection getDeployLogs(String requestId, String deplo * A collection of {@link SingularityRequestGroup} */ public Collection getRequestGroups() { - final String requestUri = String.format(REQUEST_GROUPS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUEST_GROUPS_FORMAT, getApiBase(host)); return getCollection(requestUri, "request groups", REQUEST_GROUP_COLLECTION); } @@ -1307,7 +1264,7 @@ public Collection getRequestGroups() { * A {@link SingularityRequestGroup} */ public Optional getRequestGroup(String requestGroupId) { - final String requestUri = String.format(REQUEST_GROUP_FORMAT, getApiBase(), requestGroupId); + final Function requestUri = (host) -> String.format(REQUEST_GROUP_FORMAT, getApiBase(host), requestGroupId); return getSingle(requestUri, "request group", requestGroupId, SingularityRequestGroup.class); } @@ -1322,7 +1279,7 @@ public Optional getRequestGroup(String requestGroupId) * A {@link SingularityRequestGroup} if the update was successful */ public Optional saveRequestGroup(SingularityRequestGroup requestGroup) { - final String requestUri = String.format(REQUEST_GROUPS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUEST_GROUPS_FORMAT, getApiBase(host)); return post(requestUri, "request group", Optional.of(requestGroup), Optional.of(SingularityRequestGroup.class)); } @@ -1334,7 +1291,7 @@ public Optional saveRequestGroup(SingularityRequestGrou * The request group ID to search for */ public void deleteRequestGroup(String requestGroupId) { - final String requestUri = String.format(REQUEST_GROUP_FORMAT, getApiBase(), requestGroupId); + final Function requestUri = (host) -> String.format(REQUEST_GROUP_FORMAT, getApiBase(host), requestGroupId); delete(requestUri, "request group", requestGroupId); } @@ -1344,47 +1301,47 @@ public void deleteRequestGroup(String requestGroupId) { // public Optional getDisasterStats() { - final String requestUri = String.format(DISASTER_STATS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(DISASTER_STATS_FORMAT, getApiBase(host)); return getSingle(requestUri, "disaster stats", "", SingularityDisastersData.class); } public Collection getActiveDisasters() { - final String requestUri = String.format(ACTIVE_DISASTERS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(ACTIVE_DISASTERS_FORMAT, getApiBase(host)); return getCollection(requestUri, "active disasters", DISASTERS_COLLECTION); } public void disableAutomatedDisasterCreation() { - final String requestUri = String.format(DISABLE_AUTOMATED_ACTIONS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(DISABLE_AUTOMATED_ACTIONS_FORMAT, getApiBase(host)); post(requestUri, "disable automated disasters", Optional.absent()); } public void enableAutomatedDisasterCreation() { - final String requestUri = String.format(ENABLE_AUTOMATED_ACTIONS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(ENABLE_AUTOMATED_ACTIONS_FORMAT, getApiBase(host)); post(requestUri, "enable automated disasters", Optional.absent()); } public void removeDisaster(SingularityDisasterType disasterType) { - final String requestUri = String.format(DISASTER_FORMAT, getApiBase(), disasterType); + final Function requestUri = (host) -> String.format(DISASTER_FORMAT, getApiBase(host), disasterType); delete(requestUri, "remove disaster", disasterType.toString()); } public void activateDisaster(SingularityDisasterType disasterType) { - final String requestUri = String.format(DISASTER_FORMAT, getApiBase(), disasterType); + final Function requestUri = (host) -> String.format(DISASTER_FORMAT, getApiBase(host), disasterType); post(requestUri, "activate disaster", Optional.absent()); } public Collection getDisabledActions() { - final String requestUri = String.format(DISABLED_ACTIONS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(DISABLED_ACTIONS_FORMAT, getApiBase(host)); return getCollection(requestUri, "disabled actions", DISABLED_ACTIONS_COLLECTION); } public void disableAction(SingularityAction action, Optional request) { - final String requestUri = String.format(DISABLED_ACTION_FORMAT, getApiBase(), action); + final Function requestUri = (host) -> String.format(DISABLED_ACTION_FORMAT, getApiBase(host), action); post(requestUri, "disable action", request); } public void enableAction(SingularityAction action) { - final String requestUri = String.format(DISABLED_ACTION_FORMAT, getApiBase(), action); + final Function requestUri = (host) -> String.format(DISABLED_ACTION_FORMAT, getApiBase(host), action); delete(requestUri, "disable action", action.toString()); } @@ -1393,17 +1350,17 @@ public void enableAction(SingularityAction action) { // public Optional getActivePriorityFreeze() { - final String requestUri = String.format(PRIORITY_FREEZE_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(PRIORITY_FREEZE_FORMAT, getApiBase(host)); return getSingle(requestUri, "priority freeze", "", SingularityPriorityFreezeParent.class); } public Optional createPriorityFreeze(SingularityPriorityFreeze priorityFreezeRequest) { - final String requestUri = String.format(PRIORITY_FREEZE_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(PRIORITY_FREEZE_FORMAT, getApiBase(host)); return post(requestUri, "priority freeze", Optional.of(priorityFreezeRequest), Optional.of(SingularityPriorityFreezeParent.class)); } public void deletePriorityFreeze() { - final String requestUri = String.format(PRIORITY_FREEZE_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(PRIORITY_FREEZE_FORMAT, getApiBase(host)); delete(requestUri, "priority freeze", ""); } @@ -1425,9 +1382,8 @@ public void deletePriorityFreeze() { * true if the user is authorized for scope, false otherwise */ public boolean isUserAuthorized(String requestId, String userId, SingularityAuthorizationScope scope) { - final String requestUri = String.format(AUTH_CHECK_FORMAT, getApiBase(), requestId, userId); - Map params = new HashMap<>(); - params.put("scope", scope.name()); + final Function requestUri = (host) -> String.format(AUTH_CHECK_FORMAT, getApiBase(host), requestId, userId); + Map params = Collections.singletonMap("scope", scope.name()); HttpResponse response = executeGetSingleWithParams(requestUri, "auth check", "", Optional.of(params)); return response.isSuccess(); } @@ -1446,7 +1402,7 @@ public boolean isUserAuthorized(String requestId, String userId, SingularityAuth * A {@link SingularityTaskState} if the task was found among active or inactive tasks */ public Optional getTaskState(String taskId) { - final String requestUri = String.format(TRACK_BY_TASK_ID_FORMAT, getApiBase(), taskId); + final Function requestUri = (host) -> String.format(TRACK_BY_TASK_ID_FORMAT, getApiBase(host), taskId); return getSingle(requestUri, "track by task id", taskId, SingularityTaskState.class); } @@ -1462,7 +1418,7 @@ public Optional getTaskState(String taskId) { * A {@link SingularityTaskState} if the task was found among pending, active or inactive tasks */ public Optional getTaskState(String requestId, String runId) { - final String requestUri = String.format(TRACK_BY_RUN_ID_FORMAT, getApiBase(), requestId, runId); + final Function requestUri = (host) -> String.format(TRACK_BY_RUN_ID_FORMAT, getApiBase(host), requestId, runId); return getSingle(requestUri, "track by task id", String.format("%s-%s", requestId, runId), SingularityTaskState.class); } diff --git a/SingularityClient/src/test/java/com/hubspot/singularity/client/SingularityClientTest.java b/SingularityClient/src/test/java/com/hubspot/singularity/client/SingularityClientTest.java new file mode 100644 index 0000000000..bd74486ae6 --- /dev/null +++ b/SingularityClient/src/test/java/com/hubspot/singularity/client/SingularityClientTest.java @@ -0,0 +1,72 @@ +package com.hubspot.singularity.client; + +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +import java.net.URI; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.hubspot.horizon.HttpClient; +import com.hubspot.horizon.HttpRequest; +import com.hubspot.horizon.HttpResponse; + +public class SingularityClientTest { + @Mock + private HttpClient httpClient; + @Mock + private HttpResponse response; + @Mock + private HttpRequest request; + private SingularityClient singularityClient; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + singularityClient = buildClient(); + + when(response.getRequest()) + .thenReturn(request); + when(request.getUrl()) + .thenReturn(new URI("test-url")); + } + + @Test + public void itRetriesRequestsThatErrorDueToDeadHost() { + when(httpClient.execute(any())) + .thenReturn(response); + when(response.getStatusCode()) + .thenReturn(503) + .thenReturn(200); + + singularityClient.pauseSingularityRequest("requestId", Optional.absent()); + + verify(httpClient, times(2)) + .execute(any()); + } + + @Test + public void itThrowsAnExceptionOnServerErrors() { + when(httpClient.execute(any())) + .thenReturn(response); + when(response.getStatusCode()) + .thenReturn(500); + when(response.isError()) + .thenReturn(true); + + assertThatExceptionOfType(SingularityClientException.class) + .isThrownBy(() -> singularityClient.pauseSingularityRequest("requestId", Optional.absent())); + } + + private SingularityClient buildClient() { + return new SingularityClient("singularity/v2/api", httpClient, ImmutableList.of("host1", "host2"), Optional.absent()); + } +} diff --git a/pom.xml b/pom.xml index 9506abf302..0f71e34349 100644 --- a/pom.xml +++ b/pom.xml @@ -262,6 +262,12 @@ + + + org.jukito + jukito + 1.5 + From 57fe03ba877f7d9b5be24c40210909696d3e08b1 Mon Sep 17 00:00:00 2001 From: Peter Teixeira Date: Fri, 9 Jun 2017 09:26:16 -0400 Subject: [PATCH 2/3] Respond to PR comments In particular, make retry more configurable by allowing the user to set the number of attempts and the strategy to determine whether a response should be retried. Also explicitly disabled retries on the request, so that it would be entirely handled within the Guava retryer. The reason that retries within the HttpClient can't be re-used is that doing so would just retry the given HttpRequest, and this needs to be able to alter the HttpRequest between attempts. Also is a little more careful about retrying to different hosts on requests. --- .../singularity/client/SingularityClient.java | 41 ++++++++++++++----- .../client/SingularityClientModule.java | 6 +++ .../client/SingularityClientProvider.java | 18 ++++++++ .../client/SingularityClientTest.java | 3 ++ 4 files changed, 57 insertions(+), 11 deletions(-) diff --git a/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClient.java b/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClient.java index d35bb6d3b8..5ef84f4d3f 100644 --- a/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClient.java +++ b/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClient.java @@ -2,6 +2,7 @@ import static com.google.common.base.Preconditions.checkNotNull; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -12,6 +13,7 @@ import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.function.Function; +import java.util.function.Predicate; import javax.inject.Provider; @@ -34,6 +36,7 @@ import com.hubspot.horizon.HttpRequest; import com.hubspot.horizon.HttpRequest.Method; import com.hubspot.horizon.HttpResponse; +import com.hubspot.horizon.RetryStrategy; import com.hubspot.mesos.json.MesosFileChunkObject; import com.hubspot.singularity.ExtendedTaskState; import com.hubspot.singularity.MachineState; @@ -217,11 +220,8 @@ public class SingularityClient { private final HttpClient httpClient; private final Optional credentials; - private final Retryer httpResponseRetryer = RetryerBuilder.newBuilder() - .withStopStrategy(StopStrategies.stopAfterAttempt(3)) - .withWaitStrategy(WaitStrategies.exponentialWait()) - .retryIfResult(response -> response == null || response.getStatusCode() == 503) - .build(); + + private final Retryer httpResponseRetryer; @Inject @Deprecated @@ -242,6 +242,10 @@ public SingularityClient(String contextPath, HttpClient httpClient, List } public SingularityClient(String contextPath, HttpClient httpClient, Provider> hostsProvider, Optional credentials, boolean ssl) { + this(contextPath, httpClient, hostsProvider, credentials, ssl, 3, HttpResponse::isServerError); + } + + public SingularityClient(String contextPath, HttpClient httpClient, Provider> hostsProvider, Optional credentials, boolean ssl, int retryAttempts, Predicate retryStrategy) { this.httpClient = httpClient; this.contextPath = contextPath; @@ -250,17 +254,19 @@ public SingularityClient(String contextPath, HttpClient httpClient, ProvidernewBuilder() + .withStopStrategy(StopStrategies.stopAfterAttempt(retryAttempts)) + .withWaitStrategy(WaitStrategies.exponentialWait()) + .retryIfResult(retryStrategy::test) + .retryIfException() + .build(); } private String getApiBase(String host) { return String.format(BASE_API_FORMAT, ssl ? "https" : "http", host, contextPath); } - private String getHost() { - final List hosts = hostsProvider.get(); - return hosts.get(random.nextInt(hosts.size())); - } - // // HttpClient Methods // @@ -456,9 +462,22 @@ private HttpResponse executeRequest(Function hostToUri, Method m addQueryParams(request, queryParams); addCredentials(request); + List hosts = new ArrayList<>(hostsProvider.get()); + request + .setRetryStrategy(RetryStrategy.NEVER_RETRY) + .setMaxRetries(1); + try { return httpResponseRetryer.call(() -> { - String url = hostToUri.apply(getHost()); + if (hosts.isEmpty()) { + // We've tried everything we started with. Look again. + hosts.addAll(hostsProvider.get()); + } + + int selection = random.nextInt(hosts.size()); + String host = hosts.get(selection); + String url = hostToUri.apply(host); + hosts.remove(selection); LOG.info("Making {} request to {}", method, url); request.setUrl(hostToUri.apply(url)); return httpClient.execute(request.build()); diff --git a/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClientModule.java b/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClientModule.java index 7a54e01c23..ae063f3296 100644 --- a/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClientModule.java +++ b/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClientModule.java @@ -33,6 +33,12 @@ public class SingularityClientModule extends AbstractModule { public static final String CREDENTIALS_PROPERTY_NAME = "singularity.client.credentials"; + // bind this to an int for the number of retry attempts on the request + public static final String RETRY_ATTEMPTS = "singularity.client.retry.attempts"; + + // bind this to a Predicate to say whether a request should be retried + public static final String RETRY_STRATEGY = "singularity.client.retry.strategy"; + private final List hosts; private final Optional httpConfig; diff --git a/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClientProvider.java b/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClientProvider.java index b718a8b9ea..cdd3b43fdb 100644 --- a/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClientProvider.java +++ b/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClientProvider.java @@ -5,6 +5,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.function.Predicate; import javax.inject.Named; import javax.inject.Provider; @@ -21,6 +22,7 @@ import com.google.common.collect.Lists; import com.google.inject.Inject; import com.hubspot.horizon.HttpClient; +import com.hubspot.horizon.HttpResponse; import com.hubspot.singularity.SingularityClientCredentials; @Singleton @@ -34,6 +36,9 @@ public class SingularityClientProvider implements Provider { private Optional credentials = Optional.absent(); private boolean ssl = false; + private int retryAttempts = 3; + private Predicate retryStrategy = HttpResponse::isServerError; + @Inject public SingularityClientProvider(@Named(SingularityClientModule.HTTP_CLIENT_NAME) HttpClient httpClient) { this.httpClient = httpClient; @@ -67,6 +72,19 @@ public SingularityClientProvider setCredentials(@Named(SingularityClientModule.C return this; } + @Inject(optional = true) + public SingularityClientProvider setRetryAttempts(@Named(SingularityClientModule.RETRY_ATTEMPTS) int retryAttempts) { + this.retryAttempts = retryAttempts; + return this; + } + + @Inject(optional = true) + public SingularityClientProvider setRetryStrategy(@Named(SingularityClientModule.RETRY_STRATEGY) Predicate retryStrategy) { + this.retryStrategy = retryStrategy; + return this; + } + + public SingularityClientProvider setHosts(String... hosts) { this.hosts = Arrays.asList(hosts); return this; diff --git a/SingularityClient/src/test/java/com/hubspot/singularity/client/SingularityClientTest.java b/SingularityClient/src/test/java/com/hubspot/singularity/client/SingularityClientTest.java index bd74486ae6..6823ab546a 100644 --- a/SingularityClient/src/test/java/com/hubspot/singularity/client/SingularityClientTest.java +++ b/SingularityClient/src/test/java/com/hubspot/singularity/client/SingularityClientTest.java @@ -46,6 +46,9 @@ public void itRetriesRequestsThatErrorDueToDeadHost() { when(response.getStatusCode()) .thenReturn(503) .thenReturn(200); + when(response.isServerError()) + .thenReturn(true) + .thenReturn(false); singularityClient.pauseSingularityRequest("requestId", Optional.absent()); From bdd1008e3a3cb7ee9c61abf0f0869472e245e022 Mon Sep 17 00:00:00 2001 From: Peter Teixeira Date: Tue, 20 Jun 2017 11:52:31 -0400 Subject: [PATCH 3/3] Don't re-run url builder I was doing something that looked like ``` url = hostToUrl(host) actualUrl = hostToUrl(url) ``` which was very obviously wrong. This fixes it so that it only builds the URL once, which is the correct write way to do this. --- .../hubspot/singularity/client/SingularityClient.java | 2 +- .../singularity/client/SingularityClientTest.java | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClient.java b/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClient.java index 5ef84f4d3f..cf511b9f52 100644 --- a/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClient.java +++ b/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClient.java @@ -479,7 +479,7 @@ private HttpResponse executeRequest(Function hostToUri, Method m String url = hostToUri.apply(host); hosts.remove(selection); LOG.info("Making {} request to {}", method, url); - request.setUrl(hostToUri.apply(url)); + request.setUrl(url); return httpClient.execute(request.build()); }); } catch (ExecutionException | RetryException exn) { diff --git a/SingularityClient/src/test/java/com/hubspot/singularity/client/SingularityClientTest.java b/SingularityClient/src/test/java/com/hubspot/singularity/client/SingularityClientTest.java index 6823ab546a..e6008f285d 100644 --- a/SingularityClient/src/test/java/com/hubspot/singularity/client/SingularityClientTest.java +++ b/SingularityClient/src/test/java/com/hubspot/singularity/client/SingularityClientTest.java @@ -1,5 +1,6 @@ package com.hubspot.singularity.client; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; @@ -10,6 +11,8 @@ import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -26,6 +29,9 @@ public class SingularityClientTest { private HttpResponse response; @Mock private HttpRequest request; + @Captor + private ArgumentCaptor requestCaptor; + private SingularityClient singularityClient; @Before @@ -53,7 +59,10 @@ public void itRetriesRequestsThatErrorDueToDeadHost() { singularityClient.pauseSingularityRequest("requestId", Optional.absent()); verify(httpClient, times(2)) - .execute(any()); + .execute(requestCaptor.capture()); + HttpRequest sentRequest = requestCaptor.getValue(); + assertThat(sentRequest.getUrl().toString()) + .matches("http://host(1|2)/singularity/v2/api/requests/request/requestId/pause"); } @Test