diff --git a/SingularityClient/pom.xml b/SingularityClient/pom.xml index 1bad3e101e..fef6f7b2e4 100644 --- a/SingularityClient/pom.xml +++ b/SingularityClient/pom.xml @@ -21,6 +21,11 @@ guava + + com.github.rholder + guava-retrying + + org.slf4j slf4j-api @@ -77,6 +82,18 @@ test + + org.assertj + assertj-core + test + + + + org.mockito + mockito-core + test + + org.slf4j slf4j-simple diff --git a/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClient.java b/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClient.java index 8db975806e..cf511b9f52 100644 --- a/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClient.java +++ b/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClient.java @@ -2,13 +2,18 @@ import static com.google.common.base.Preconditions.checkNotNull; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.function.Predicate; import javax.inject.Provider; @@ -16,6 +21,11 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.type.TypeReference; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -26,6 +36,7 @@ import com.hubspot.horizon.HttpRequest; import com.hubspot.horizon.HttpRequest.Method; import com.hubspot.horizon.HttpResponse; +import com.hubspot.horizon.RetryStrategy; import com.hubspot.mesos.json.MesosFileChunkObject; import com.hubspot.singularity.ExtendedTaskState; import com.hubspot.singularity.MachineState; @@ -210,10 +221,12 @@ public class SingularityClient { private final HttpClient httpClient; private final Optional credentials; + private final Retryer httpResponseRetryer; + @Inject @Deprecated public SingularityClient(@Named(SingularityClientModule.CONTEXT_PATH) String contextPath, @Named(SingularityClientModule.HTTP_CLIENT_NAME) HttpClient httpClient, @Named(SingularityClientModule.HOSTS_PROPERTY_NAME) String hosts) { - this(contextPath, httpClient, Arrays.asList(hosts.split(",")), Optional.absent()); + this(contextPath, httpClient, Arrays.asList(hosts.split(",")), Optional.absent()); } public SingularityClient(String contextPath, HttpClient httpClient, List hosts, Optional credentials) { @@ -229,6 +242,10 @@ public SingularityClient(String contextPath, HttpClient httpClient, List } public SingularityClient(String contextPath, HttpClient httpClient, Provider> hostsProvider, Optional credentials, boolean ssl) { + this(contextPath, httpClient, hostsProvider, credentials, ssl, 3, HttpResponse::isServerError); + } + + public SingularityClient(String contextPath, HttpClient httpClient, Provider> hostsProvider, Optional credentials, boolean ssl, int retryAttempts, Predicate retryStrategy) { this.httpClient = httpClient; this.contextPath = contextPath; @@ -237,15 +254,17 @@ public SingularityClient(String contextPath, HttpClient httpClient, ProvidernewBuilder() + .withStopStrategy(StopStrategies.stopAfterAttempt(retryAttempts)) + .withWaitStrategy(WaitStrategies.exponentialWait()) + .retryIfResult(retryStrategy::test) + .retryIfException() + .build(); } - private String getHost() { - final List hosts = hostsProvider.get(); - return hosts.get(random.nextInt(hosts.size())); + private String getApiBase(String host) { + return String.format(BASE_API_FORMAT, ssl ? "https" : "http", host, contextPath); } // @@ -278,13 +297,13 @@ private SingularityClientException fail(String type, HttpResponse response) { throw new SingularityClientException(String.format("Failed '%s' action on Singularity (%s) - code: %s, %s", type, uri, response.getStatusCode(), body), response.getStatusCode()); } - private Optional getSingle(String uri, String type, String id, Class clazz) { - return getSingleWithParams(uri, type, id, Optional.absent(), clazz); + private Optional getSingle(Function hostToUrl, String type, String id, Class clazz) { + return getSingleWithParams(hostToUrl, type, id, Optional.absent(), clazz); } - private Optional getSingleWithParams(String uri, String type, String id, Optional> queryParams, Class clazz) { + private Optional getSingleWithParams(Function hostToUrl, String type, String id, Optional> queryParams, Class clazz) { final long start = System.currentTimeMillis(); - HttpResponse response = executeGetSingleWithParams(uri, type, id, queryParams); + HttpResponse response = executeGetSingleWithParams(hostToUrl, type, id, queryParams); if (response.getStatusCode() == 404) { return Optional.absent(); @@ -296,9 +315,9 @@ private Optional getSingleWithParams(String uri, String type, String id, return Optional.fromNullable(response.getAs(clazz)); } - private Optional getSingleWithParams(String uri, String type, String id, Optional> queryParams, TypeReference typeReference) { + private Optional getSingleWithParams(Function hostToUrl, String type, String id, Optional> queryParams, TypeReference typeReference) { final long start = System.currentTimeMillis(); - HttpResponse response = executeGetSingleWithParams(uri, type, id, queryParams); + HttpResponse response = executeGetSingleWithParams(hostToUrl, type, id, queryParams); if (response.getStatusCode() == 404) { return Optional.absent(); @@ -311,42 +330,22 @@ private Optional getSingleWithParams(String uri, String type, String id, } - private HttpResponse executeGetSingleWithParams(String uri, String type, String id, Optional> queryParams) { + private HttpResponse executeGetSingleWithParams(Function hostToUrl, String type, String id, Optional> queryParams) { checkNotNull(id, String.format("Provide a %s id", type)); - LOG.info("Getting {} {} from {}", type, id, uri); - - HttpRequest.Builder requestBuilder = HttpRequest.newBuilder() - .setUrl(uri); + LOG.info("Getting {} {} from Singularity host", type, id); - if (queryParams.isPresent()) { - addQueryParams(requestBuilder, queryParams.get()); - } - - addCredentials(requestBuilder); - - return httpClient.execute(requestBuilder.build()); + return executeRequest(hostToUrl, Method.GET, Optional.absent(), queryParams.or(Collections.emptyMap())); } - private Collection getCollection(String uri, String type, TypeReference> typeReference) { - return getCollectionWithParams(uri, type, Optional.absent(), typeReference); + private Collection getCollection(Function hostToUrl, String type, TypeReference> typeReference) { + return getCollectionWithParams(hostToUrl, type, Optional.absent(), typeReference); } - private Collection getCollectionWithParams(String uri, String type, Optional> queryParams, TypeReference> typeReference) { - LOG.info("Getting all {} from {}", type, uri); - + private Collection getCollectionWithParams(Function hostToUrl, String type, Optional> queryParams, TypeReference> typeReference) { final long start = System.currentTimeMillis(); - HttpRequest.Builder requestBuilder = HttpRequest.newBuilder() - .setUrl(uri); - - if (queryParams.isPresent()) { - addQueryParams(requestBuilder, queryParams.get()); - } - - addCredentials(requestBuilder); - - HttpResponse response = httpClient.execute(requestBuilder.build()); + HttpResponse response = executeRequest(hostToUrl, Method.GET, Optional.absent(), queryParams.or(Collections.emptyMap())); if (response.getStatusCode() == 404) { return ImmutableList.of(); @@ -359,8 +358,8 @@ private Collection getCollectionWithParams(String uri, String type, Optio return response.getAs(typeReference); } - private void addQueryParams(HttpRequest.Builder requestBuilder, Map queryParams) { - for (Entry queryParamEntry : queryParams.entrySet()) { + private void addQueryParams(HttpRequest.Builder requestBuilder, Map queryParams) { + for (Entry queryParamEntry : queryParams.entrySet()) { if (queryParamEntry.getValue() instanceof String) { requestBuilder.setQueryParam(queryParamEntry.getKey()).to((String) queryParamEntry.getValue()); } else if (queryParamEntry.getValue() instanceof Integer) { @@ -382,63 +381,24 @@ private void addCredentials(HttpRequest.Builder requestBuilder) { } } - private void delete(String uri, String type, String id) { - delete(uri, type, id, Optional.absent()); + private void delete(Function hostToUrl, String type, String id) { + delete(hostToUrl, type, id, Optional.absent()); } - private void delete(String uri, String type, String id, Optional body) { - delete(uri, type, id, body, Optional.>absent()); + private void delete(Function hostToUrl, String type, String id, Optional body) { + delete(hostToUrl, type, id, body, Optional.>absent()); } - private Optional delete(String uri, String type, String id, Optional body, Optional> clazz) { - LOG.info("Deleting {} {} from {}", type, id, uri); - - final long start = System.currentTimeMillis(); - - HttpRequest.Builder request = HttpRequest.newBuilder().setUrl(uri).setMethod(Method.DELETE); - - if (body.isPresent()) { - request.setBody(body.get()); - } - - addCredentials(request); - - HttpResponse response = httpClient.execute(request.build()); - - if (response.getStatusCode() == 404) { - LOG.info("{} ({}) was not found", type, id); - return Optional.absent(); - } - - checkResponse(type, response); - - LOG.info("Deleted {} ({}) from Singularity in %sms", type, id, System.currentTimeMillis() - start); - - if (clazz.isPresent()) { - return Optional.of(response.getAs(clazz.get())); - } - - return Optional.absent(); + private Optional delete(Function hostToUrl, String type, String id, Optional body, Optional> clazz) { + return deleteWithParams(hostToUrl, type, id, body, Optional.absent(), clazz); } - private Optional deleteWithParams(String uri, String type, String id, Optional body, Optional> queryParams, Optional> clazz) { - LOG.info("Deleting {} {} from {}", type, id, uri); + private Optional deleteWithParams(Function hostToUrl, String type, String id, Optional body, Optional> queryParams, Optional> clazz) { + LOG.info("Deleting {} {} from Singularity", type, id); final long start = System.currentTimeMillis(); - HttpRequest.Builder request = HttpRequest.newBuilder().setUrl(uri).setMethod(Method.DELETE); - - if (body.isPresent()) { - request.setBody(body.get()); - } - - if (queryParams.isPresent()) { - addQueryParams(request, queryParams.get()); - } - - addCredentials(request); - - HttpResponse response = httpClient.execute(request.build()); + HttpResponse response = executeRequest(hostToUrl, Method.DELETE, body, queryParams.or(Collections.emptyMap())); if (response.getStatusCode() == 404) { LOG.info("{} ({}) was not found", type, id); @@ -456,13 +416,13 @@ private Optional deleteWithParams(String uri, String type, String id, Opt return Optional.absent(); } - private HttpResponse put(String uri, String type, Optional body) { - return executeRequest(uri, type, body, Method.PUT, Optional.absent()); + private HttpResponse put(Function hostToUri, String type, Optional body) { + return executeRequest(hostToUri, type, body, Method.PUT, Optional.absent()); } - private Optional post(String uri, String type, Optional body, Optional> clazz) { + private Optional post(Function hostToUri, String type, Optional body, Optional> clazz) { try { - HttpResponse response = executeRequest(uri, type, body, Method.POST, Optional.absent()); + HttpResponse response = executeRequest(hostToUri, type, body, Method.POST, Optional.absent()); if (clazz.isPresent()) { return Optional.of(response.getAs(clazz.get())); @@ -471,40 +431,60 @@ private Optional post(String uri, String type, Optional body, Optional LOG.warn("Http post failed", e); } - return Optional.absent(); + return Optional.absent(); } - private HttpResponse postWithParams(String uri, String type, Optional body, Optional> queryParams) { - return executeRequest(uri, type, body, Method.POST, queryParams); + private HttpResponse postWithParams(Function hostToUri, String type, Optional body, Optional> queryParams) { + return executeRequest(hostToUri, type, body, Method.POST, queryParams); } - private HttpResponse post(String uri, String type, Optional body) { - return executeRequest(uri, type, body, Method.POST, Optional.absent()); + private HttpResponse post(Function hostToUri, String type, Optional body) { + return executeRequest(hostToUri, type, body, Method.POST, Optional.absent()); } - private HttpResponse executeRequest(String uri, String type, Optional body, Method method, Optional> queryParams) { - + private HttpResponse executeRequest(Function hostToUri, String type, Optional body, Method method, Optional> queryParams) { final long start = System.currentTimeMillis(); - HttpRequest.Builder request = HttpRequest.newBuilder().setUrl(uri).setMethod(method); + HttpResponse response = executeRequest(hostToUri, method, body, queryParams.or(Collections.emptyMap())); + checkResponse(type, response); + LOG.info("Successfully {}ed {} in {}ms", method, type, System.currentTimeMillis() - start); + + return response; + } + + private HttpResponse executeRequest(Function hostToUri, Method method, Optional body, Map queryParams) { + HttpRequest.Builder request = HttpRequest.newBuilder().setMethod(method); if (body.isPresent()) { request.setBody(body.get()); } - if (queryParams.isPresent()) { - addQueryParams(request, queryParams.get()); - } - + addQueryParams(request, queryParams); addCredentials(request); - HttpResponse response = httpClient.execute(request.build()); - - checkResponse(type, response); - - LOG.info("Successfully {}ed {} in {}ms", method, type, System.currentTimeMillis() - start); + List hosts = new ArrayList<>(hostsProvider.get()); + request + .setRetryStrategy(RetryStrategy.NEVER_RETRY) + .setMaxRetries(1); - return response; + try { + return httpResponseRetryer.call(() -> { + if (hosts.isEmpty()) { + // We've tried everything we started with. Look again. + hosts.addAll(hostsProvider.get()); + } + + int selection = random.nextInt(hosts.size()); + String host = hosts.get(selection); + String url = hostToUri.apply(host); + hosts.remove(selection); + LOG.info("Making {} request to {}", method, url); + request.setUrl(url); + return httpClient.execute(request.build()); + }); + } catch (ExecutionException | RetryException exn) { + throw new SingularityClientException("Failed request to Singularity", exn); + } } // @@ -512,24 +492,21 @@ private HttpResponse executeRequest(String uri, String type, Optional body, M // public SingularityState getState(Optional skipCache, Optional includeRequestIds) { - final String uri = String.format(STATE_FORMAT, getApiBase()); + final Function uri = (host) -> String.format(STATE_FORMAT, getApiBase(host)); LOG.info("Fetching state from {}", uri); final long start = System.currentTimeMillis(); - HttpRequest.Builder request = HttpRequest.newBuilder().setUrl(uri); - + Map queryParams = new HashMap<>(); if (skipCache.isPresent()) { - request.setQueryParam("skipCache").to(skipCache.get()); + queryParams.put("skipCache", skipCache.get()); } if (includeRequestIds.isPresent()) { - request.setQueryParam("includeRequestIds").to(includeRequestIds.get()); + queryParams.put("includeRequestIds", includeRequestIds.get()); } - addCredentials(request); - - HttpResponse response = httpClient.execute(request.build()); + HttpResponse response = executeRequest(uri, Method.GET, Optional.absent(), queryParams); checkResponse("state", response); @@ -539,17 +516,13 @@ public SingularityState getState(Optional skipCache, Optional } public Optional getTaskReconciliationStatistics() { - final String uri = String.format(TASK_RECONCILIATION_FORMAT, getApiBase()); + final Function uri = (host) -> String.format(TASK_RECONCILIATION_FORMAT, getApiBase(host)); LOG.info("Fetch task reconciliation statistics from {}", uri); final long start = System.currentTimeMillis(); - HttpRequest.Builder request = HttpRequest.newBuilder().setUrl(uri); - - addCredentials(request); - - HttpResponse response = httpClient.execute(request.build()); + HttpResponse response = executeRequest(uri, Method.GET, Optional.absent(), Collections.emptyMap()); if (response.getStatusCode() == 404) { return Optional.absent(); @@ -567,13 +540,13 @@ public Optional getTaskReconciliationSt // public Optional getSingularityRequest(String requestId) { - final String singularityApiRequestUri = String.format(REQUEST_GET_FORMAT, getApiBase(), requestId); + final Function singularityApiRequestUri = (host) -> String.format(REQUEST_GET_FORMAT, getApiBase(host), requestId); return getSingle(singularityApiRequestUri, "request", requestId, SingularityRequestParent.class); } public Optional getTaskByRunIdForRequest(String requestId, String runId) { - final String singularityApiRequestUri = String.format(REQUEST_BY_RUN_ID_FORMAT, getApiBase(), requestId, runId); + final Function singularityApiRequestUri = (host) -> String.format(REQUEST_BY_RUN_ID_FORMAT, getApiBase(host), requestId, runId); return getSingle(singularityApiRequestUri, "requestByRunId", runId, SingularityTaskId.class); } @@ -581,7 +554,7 @@ public Optional getTaskByRunIdForRequest(String requestId, St public void createOrUpdateSingularityRequest(SingularityRequest request) { checkNotNull(request.getId(), "A posted Singularity Request must have an id"); - final String requestUri = String.format(REQUEST_CREATE_OR_UPDATE_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUEST_CREATE_OR_UPDATE_FORMAT, getApiBase(host)); post(requestUri, String.format("request %s", request.getId()), Optional.of(request)); } @@ -598,30 +571,30 @@ public void createOrUpdateSingularityRequest(SingularityRequest request) { * the singularity request that has been moved to deleting */ public Optional deleteSingularityRequest(String requestId, Optional deleteRequest) { - final String requestUri = String.format(REQUEST_DELETE_ACTIVE_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_DELETE_ACTIVE_FORMAT, getApiBase(host), requestId); return delete(requestUri, "active request", requestId, deleteRequest, Optional.of(SingularityRequest.class)); } public void pauseSingularityRequest(String requestId, Optional pauseRequest) { - final String requestUri = String.format(REQUEST_PAUSE_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_PAUSE_FORMAT, getApiBase(host), requestId); post(requestUri, String.format("pause of request %s", requestId), pauseRequest); } public void unpauseSingularityRequest(String requestId, Optional unpauseRequest) { - final String requestUri = String.format(REQUEST_UNPAUSE_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_UNPAUSE_FORMAT, getApiBase(host), requestId); post(requestUri, String.format("unpause of request %s", requestId), unpauseRequest); } public void scaleSingularityRequest(String requestId, SingularityScaleRequest scaleRequest) { - final String requestUri = String.format(REQUEST_SCALE_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_SCALE_FORMAT, getApiBase(host), requestId); put(requestUri, String.format("Scale of Request %s", requestId), Optional.of(scaleRequest)); } public SingularityPendingRequestParent runSingularityRequest(String requestId, Optional runNowRequest) { - final String requestUri = String.format(REQUEST_RUN_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_RUN_FORMAT, getApiBase(host), requestId); final HttpResponse response = post(requestUri, String.format("run of request %s", requestId), runNowRequest); @@ -629,13 +602,13 @@ public SingularityPendingRequestParent runSingularityRequest(String requestId, O } public void bounceSingularityRequest(String requestId, Optional bounceOptions) { - final String requestUri = String.format(REQUEST_BOUNCE_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_BOUNCE_FORMAT, getApiBase(host), requestId); post(requestUri, String.format("bounce of request %s", requestId), bounceOptions); } public void exitCooldown(String requestId, Optional exitCooldownRequest) { - final String requestUri = String.format(REQUEST_EXIT_COOLDOWN_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_EXIT_COOLDOWN_FORMAT, getApiBase(host), requestId); post(requestUri, String.format("exit cooldown of request %s", requestId), exitCooldownRequest); } @@ -645,11 +618,11 @@ public void exitCooldown(String requestId, Optional deployUnpause, Optional message) { - return createDeployForSingularityRequest(requestId, pendingDeploy, deployUnpause, message, Optional.absent()); + return createDeployForSingularityRequest(requestId, pendingDeploy, deployUnpause, message, Optional.absent()); } public SingularityRequestParent createDeployForSingularityRequest(String requestId, SingularityDeploy pendingDeploy, Optional deployUnpause, Optional message, Optional updatedRequest) { - final String requestUri = String.format(DEPLOYS_FORMAT, getApiBase()); + final Function requestUri = (String host) -> String.format(DEPLOYS_FORMAT, getApiBase(host)); HttpResponse response = post(requestUri, String.format("new deploy %s", new SingularityDeployKey(requestId, pendingDeploy.getId())), Optional.of(new SingularityDeployRequest(pendingDeploy, deployUnpause, message, updatedRequest))); @@ -666,7 +639,7 @@ private SingularityRequestParent getAndLogRequestAndDeployStatus(SingularityRequ } public SingularityRequestParent cancelPendingDeployForSingularityRequest(String requestId, String deployId) { - final String requestUri = String.format(DELETE_DEPLOY_FORMAT, getApiBase(), deployId, requestId); + final Function requestUri = (host) -> String.format(DELETE_DEPLOY_FORMAT, getApiBase(host), deployId, requestId); SingularityRequestParent singularityRequestParent = delete(requestUri, "pending deploy", new SingularityDeployKey(requestId, deployId).getId(), Optional.absent(), Optional.of(SingularityRequestParent.class)).get(); @@ -675,7 +648,7 @@ public SingularityRequestParent cancelPendingDeployForSingularityRequest(String } public SingularityRequestParent updateIncrementalDeployInstanceCount(SingularityUpdatePendingDeployRequest updateRequest) { - final String requestUri = String.format(UPDATE_DEPLOY_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(UPDATE_DEPLOY_FORMAT, getApiBase(host)); HttpResponse response = post(requestUri, String.format("update deploy %s", new SingularityDeployKey(updateRequest.getRequestId(), updateRequest.getDeployId())), Optional.of(updateRequest)); @@ -705,7 +678,7 @@ public SingularityRequestParent updateIncrementalDeployInstanceCount(Singularity * */ public Collection getSingularityRequests() { - final String requestUri = String.format(REQUESTS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUESTS_FORMAT, getApiBase(host)); return getCollection(requestUri, "[ACTIVE, PAUSED, COOLDOWN] requests", REQUESTS_COLLECTION); } @@ -717,7 +690,7 @@ public Collection getSingularityRequests() { * All ACTIVE {@link SingularityRequestParent} instances */ public Collection getActiveSingularityRequests() { - final String requestUri = String.format(REQUESTS_GET_ACTIVE_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUESTS_GET_ACTIVE_FORMAT, getApiBase(host)); return getCollection(requestUri, "ACTIVE requests", REQUESTS_COLLECTION); } @@ -730,7 +703,7 @@ public Collection getActiveSingularityRequests() { * All PAUSED {@link SingularityRequestParent} instances */ public Collection getPausedSingularityRequests() { - final String requestUri = String.format(REQUESTS_GET_PAUSED_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUESTS_GET_PAUSED_FORMAT, getApiBase(host)); return getCollection(requestUri, "PAUSED requests", REQUESTS_COLLECTION); } @@ -742,7 +715,7 @@ public Collection getPausedSingularityRequests() { * All {@link SingularityRequestParent} instances that their state is COOLDOWN */ public Collection getCoolDownSingularityRequests() { - final String requestUri = String.format(REQUESTS_GET_COOLDOWN_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUESTS_GET_COOLDOWN_FORMAT, getApiBase(host)); return getCollection(requestUri, "COOLDOWN requests", REQUESTS_COLLECTION); } @@ -754,7 +727,7 @@ public Collection getCoolDownSingularityRequests() { * A collection of {@link SingularityPendingRequest} instances that hold information about the singularity requests that are pending to become ACTIVE */ public Collection getPendingSingularityRequests() { - final String requestUri = String.format(REQUESTS_GET_PENDING_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUESTS_GET_PENDING_FORMAT, getApiBase(host)); return getCollection(requestUri, "pending requests", PENDING_REQUESTS_COLLECTION); } @@ -769,7 +742,7 @@ public Collection getPendingSingularityRequests() { * that are marked for deletion and are currently cleaning up. */ public Collection getCleanupSingularityRequests() { - final String requestUri = String.format(REQUESTS_GET_CLEANUP_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUESTS_GET_CLEANUP_FORMAT, getApiBase(host)); return getCollection(requestUri, "cleaning requests", CLEANUP_REQUESTS_COLLECTION); } @@ -783,19 +756,19 @@ public Collection getCleanupSingularityRequests() { // public Collection getActiveTasks() { - final String requestUri = String.format(TASKS_GET_ACTIVE_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(TASKS_GET_ACTIVE_FORMAT, getApiBase(host)); return getCollection(requestUri, "active tasks", TASKS_COLLECTION); } public Collection getActiveTasksOnSlave(final String slaveId) { - final String requestUri = String.format(TASKS_GET_ACTIVE_ON_SLAVE_FORMAT, getApiBase(), slaveId); + final Function requestUri = (host) -> String.format(TASKS_GET_ACTIVE_ON_SLAVE_FORMAT, getApiBase(host), slaveId); return getCollection(requestUri, String.format("active tasks on slave %s", slaveId), TASKS_COLLECTION); } public Optional killTask(String taskId, Optional killTaskRequest) { - final String requestUri = String.format(TASKS_KILL_TASK_FORMAT, getApiBase(), taskId); + final Function requestUri = (host) -> String.format(TASKS_KILL_TASK_FORMAT, getApiBase(host), taskId); return delete(requestUri, "task", taskId, killTaskRequest, Optional.of(SingularityTaskCleanupResult.class)); } @@ -805,13 +778,13 @@ public Optional killTask(String taskId, Optional getScheduledTasks() { - final String requestUri = String.format(TASKS_GET_SCHEDULED_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(TASKS_GET_SCHEDULED_FORMAT, getApiBase(host)); return getCollection(requestUri, "scheduled tasks", TASKS_REQUEST_COLLECTION); } public Collection getScheduledTaskIds() { - final String requestUri = String.format(TASKS_GET_SCHEDULED_IDS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(TASKS_GET_SCHEDULED_IDS_FORMAT, getApiBase(host)); return getCollection(requestUri, "scheduled task ids", PENDING_TASK_ID_COLLECTION); } @@ -820,13 +793,13 @@ public Collection getScheduledTaskIds() { // RACKS // private Collection getRacks(Optional rackState) { - final String requestUri = String.format(RACKS_FORMAT, getApiBase()); - Optional> maybeQueryParams = Optional.>absent(); + final Function requestUri = (host) -> String.format(RACKS_FORMAT, getApiBase(host)); + Optional> maybeQueryParams = Optional.absent(); String type = "racks"; if (rackState.isPresent()) { - maybeQueryParams = Optional.>of(ImmutableMap.of("state", rackState.get().toString())); + maybeQueryParams = Optional.of(ImmutableMap.of("state", rackState.get().toString())); type = String.format("%s racks", rackState.get().toString()); } @@ -836,29 +809,29 @@ private Collection getRacks(Optional rackState) { @Deprecated public void decomissionRack(String rackId) { - decommissionRack(rackId, Optional.absent()); + decommissionRack(rackId, Optional.absent()); } public void decommissionRack(String rackId, Optional machineChangeRequest) { - final String requestUri = String.format(RACKS_DECOMISSION_FORMAT, getApiBase(), rackId); + final Function requestUri = (host) -> String.format(RACKS_DECOMISSION_FORMAT, getApiBase(host), rackId); - post(requestUri, String.format("decomission rack %s", rackId), machineChangeRequest.or(Optional.of(new SingularityMachineChangeRequest(Optional.absent())))); + post(requestUri, String.format("decomission rack %s", rackId), machineChangeRequest.or(Optional.of(SingularityMachineChangeRequest.empty()))); } public void freezeRack(String rackId, Optional machineChangeRequest) { - final String requestUri = String.format(RACKS_FREEZE_FORMAT, getApiBase(), rackId); + final Function requestUri = (host) -> String.format(RACKS_FREEZE_FORMAT, getApiBase(host), rackId); - post(requestUri, String.format("freeze rack %s", rackId), machineChangeRequest.or(Optional.of(new SingularityMachineChangeRequest(Optional.absent())))); + post(requestUri, String.format("freeze rack %s", rackId), machineChangeRequest.or(Optional.of(SingularityMachineChangeRequest.empty()))); } public void activateRack(String rackId, Optional machineChangeRequest) { - final String requestUri = String.format(RACKS_ACTIVATE_FORMAT, getApiBase(), rackId); + final Function requestUri = (host) -> String.format(RACKS_ACTIVATE_FORMAT, getApiBase(host), rackId); - post(requestUri, String.format("decommission rack %s", rackId), machineChangeRequest.or(Optional.of(new SingularityMachineChangeRequest(Optional.absent())))); + post(requestUri, String.format("decommission rack %s", rackId), machineChangeRequest.or(Optional.of(SingularityMachineChangeRequest.empty()))); } public void deleteRack(String rackId) { - final String requestUri = String.format(RACKS_DELETE_FORMAT, getApiBase(), rackId); + final Function requestUri = (host) -> String.format(RACKS_DELETE_FORMAT, getApiBase(host), rackId); delete(requestUri, "dead rack", rackId); } @@ -876,14 +849,14 @@ public void deleteRack(String rackId) { * A collection of {@link SingularitySlave} */ public Collection getSlaves(Optional slaveState) { - final String requestUri = String.format(SLAVES_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(SLAVES_FORMAT, getApiBase(host)); - Optional> maybeQueryParams = Optional.>absent(); + Optional> maybeQueryParams = Optional.absent(); String type = "slaves"; if (slaveState.isPresent()) { - maybeQueryParams = Optional.>of(ImmutableMap.of("state", slaveState.get().toString())); + maybeQueryParams = Optional.of(ImmutableMap.of("state", slaveState.get().toString())); type = String.format("%s slaves", slaveState.get().toString()); } @@ -900,31 +873,36 @@ public Collection getSlaves(Optional slaveState) * A {@link SingularitySlave} */ public Optional getSlave(String slaveId) { - final String requestUri = String.format(SLAVE_DETAIL_FORMAT, getApiBase(), slaveId); + final Function requestUri = (host) -> String.format(SLAVE_DETAIL_FORMAT, getApiBase(host), slaveId); return getSingle(requestUri, "slave", slaveId, SingularitySlave.class); } + @Deprecated + public void decomissionSlave(String slaveId) { + decommissionSlave(slaveId, Optional.absent()); + } + public void decommissionSlave(String slaveId, Optional machineChangeRequest) { - final String requestUri = String.format(SLAVES_DECOMISSION_FORMAT, getApiBase(), slaveId); + final Function requestUri = (host) -> String.format(SLAVES_DECOMISSION_FORMAT, getApiBase(host), slaveId); post(requestUri, String.format("decommission slave %s", slaveId), machineChangeRequest.or(Optional.of(SingularityMachineChangeRequest.empty()))); } public void freezeSlave(String slaveId, Optional machineChangeRequest) { - final String requestUri = String.format(SLAVES_FREEZE_FORMAT, getApiBase(), slaveId); + final Function requestUri = (host) -> String.format(SLAVES_FREEZE_FORMAT, getApiBase(host), slaveId); post(requestUri, String.format("freeze slave %s", slaveId), machineChangeRequest.or(Optional.of(SingularityMachineChangeRequest.empty()))); } public void activateSlave(String slaveId, Optional machineChangeRequest) { - final String requestUri = String.format(SLAVES_ACTIVATE_FORMAT, getApiBase(), slaveId); + final Function requestUri = (host) -> String.format(SLAVES_ACTIVATE_FORMAT, getApiBase(host), slaveId); post(requestUri, String.format("activate slave %s", slaveId), machineChangeRequest.or(Optional.of(SingularityMachineChangeRequest.empty()))); } public void deleteSlave(String slaveId) { - final String requestUri = String.format(SLAVES_DELETE_FORMAT, getApiBase(), slaveId); + final Function requestUri = (host) -> String.format(SLAVES_DELETE_FORMAT, getApiBase(host), slaveId); delete(requestUri, "deleting slave", slaveId); } @@ -946,11 +924,11 @@ public void deleteSlave(String slaveId) { * A list of {@link SingularityRequestHistory} */ public Collection getHistoryForRequest(String requestId, Optional count, Optional page) { - final String requestUri = String.format(REQUEST_HISTORY_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_HISTORY_FORMAT, getApiBase(host), requestId); - Optional> maybeQueryParams = Optional.>absent(); + Optional> maybeQueryParams = Optional.absent(); - ImmutableMap.Builder queryParamsBuilder = ImmutableMap.builder(); + ImmutableMap.Builder queryParamsBuilder = ImmutableMap.builder(); if (count.isPresent() ) { queryParamsBuilder.put("count", count.get()); @@ -973,21 +951,19 @@ public Collection getHistoryForRequest(String request // public Collection getInactiveSlaves() { - final String requestUri = String.format(INACTIVE_SLAVES_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(INACTIVE_SLAVES_FORMAT, getApiBase(host)); return getCollection(requestUri, "inactiveSlaves", STRING_COLLECTION); } public void markSlaveAsInactive(String host) { - final String requestUri = String.format(INACTIVE_SLAVES_FORMAT, getApiBase()); - Map params = new HashMap<>(); - params.put("host", host); + final Function requestUri = (singularityHost) -> String.format(INACTIVE_SLAVES_FORMAT, getApiBase(singularityHost)); + Map params = Collections.singletonMap("host", host); postWithParams(requestUri, "deactivateSlave", Optional.absent(), Optional.of(params)); } public void clearInactiveSlave(String host) { - final String requestUri = String.format(INACTIVE_SLAVES_FORMAT, getApiBase()); - Map params = new HashMap<>(); - params.put("host", host); + final Function requestUri = (singularityHost) -> String.format(INACTIVE_SLAVES_FORMAT, getApiBase(host)); + Map params = Collections.singletonMap("host", host); deleteWithParams(requestUri, "clearInactiveSlave", host, Optional.absent(), Optional.of(params), Optional.absent()); } @@ -1004,13 +980,13 @@ public void clearInactiveSlave(String host) { * A {@link SingularityTaskIdHistory} object if the task exists */ public Optional getHistoryForTask(String taskId) { - final String requestUri = String.format(TASK_HISTORY_FORMAT, getApiBase(), taskId); + final Function requestUri = (host) -> String.format(TASK_HISTORY_FORMAT, getApiBase(host), taskId); return getSingle(requestUri, "task history", taskId, SingularityTaskHistory.class); } public Collection getActiveTaskHistoryForRequest(String requestId) { - final String requestUri = String.format(REQUEST_ACTIVE_TASKS_HISTORY_FORMAT, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(REQUEST_ACTIVE_TASKS_HISTORY_FORMAT, getApiBase(host), requestId); final String type = String.format("active task history for %s", requestId); @@ -1022,23 +998,23 @@ public Collection getInactiveTaskHistoryForRequest(Str } public Collection getInactiveTaskHistoryForRequest(String requestId, int count, int page) { - return getInactiveTaskHistoryForRequest(requestId, count, page, Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent()); + return getInactiveTaskHistoryForRequest(requestId, count, page, Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent(), Optional.absent()); } public Collection getInactiveTaskHistoryForRequest(String requestId, int count, int page, Optional host, Optional runId, Optional lastTaskStatus, Optional startedBefore, Optional startedAfter, Optional updatedBefore, Optional updatedAfter, Optional orderDirection) { - final String requestUri = String.format(REQUEST_INACTIVE_TASKS_HISTORY_FORMAT, getApiBase(), requestId); + final Function requestUri = (singularityHost) -> String.format(REQUEST_INACTIVE_TASKS_HISTORY_FORMAT, getApiBase(singularityHost), requestId); final String type = String.format("inactive (failed, killed, lost) task history for request %s", requestId); - Map params = taskSearchParams(Optional.of(requestId), Optional.absent(), runId, host, lastTaskStatus, startedBefore, startedAfter, updatedBefore, updatedAfter, orderDirection, count, page); + Map params = taskSearchParams(Optional.of(requestId), Optional.absent(), runId, host, lastTaskStatus, startedBefore, startedAfter, updatedBefore, updatedAfter, orderDirection, count, page); return getCollectionWithParams(requestUri, type, Optional.of(params), TASKID_HISTORY_COLLECTION); } public Optional getHistoryForRequestDeploy(String requestId, String deployId) { - final String requestUri = String.format(REQUEST_DEPLOY_HISTORY_FORMAT, getApiBase(), requestId, deployId); + final Function requestUri = (host) -> String.format(REQUEST_DEPLOY_HISTORY_FORMAT, getApiBase(host), requestId, deployId); return getSingle(requestUri, "deploy history", new SingularityDeployKey(requestId, deployId).getId(), SingularityDeployHistory.class); } @@ -1056,19 +1032,19 @@ public Optional getHistoryForRequestDeploy(String requ */ @Deprecated public Optional getHistoryForTask(String requestId, String runId) { - final String requestUri = String.format(TASK_HISTORY_BY_RUN_ID_FORMAT, getApiBase(), requestId, runId); - - return getSingle(requestUri, "task history", requestId, SingularityTaskIdHistory.class); + return getTaskIdHistoryByRunId(requestId, runId); } public Optional getTaskIdHistoryByRunId(String requestId, String runId) { - return getHistoryForTask(requestId, runId); + final Function requestUri = (host) -> String.format(TASK_HISTORY_BY_RUN_ID_FORMAT, getApiBase(host), requestId, runId); + + return getSingle(requestUri, "task history", requestId, SingularityTaskIdHistory.class); } public Collection getTaskHistory(Optional requestId, Optional deployId, Optional runId, Optional host, Optional lastTaskStatus, Optional startedBefore, Optional startedAfter, Optional updatedBefore, Optional updatedAfter, Optional orderDirection, Integer count, Integer page) { - final String requestUri = String.format(TASKS_HISTORY_FORMAT, getApiBase()); + final Function requestUri = (singularityHost) -> String.format(TASKS_HISTORY_FORMAT, getApiBase(singularityHost)); Map params = taskSearchParams(requestId, deployId, runId, host, lastTaskStatus, startedBefore, startedAfter, updatedBefore, updatedAfter, orderDirection, count, page); @@ -1078,7 +1054,7 @@ public Collection getTaskHistory(Optional requ public Optional> getTaskHistoryWithMetadata(Optional requestId, Optional deployId, Optional runId, Optional host, Optional lastTaskStatus, Optional startedBefore, Optional startedAfter, Optional updatedBefore, Optional updatedAfter, Optional orderDirection, Integer count, Integer page) { - final String requestUri = String.format(TASKS_HISTORY_WITHMETADATA_FORMAT, getApiBase()); + final Function requestUri = (singularityHost) -> String.format(TASKS_HISTORY_WITHMETADATA_FORMAT, getApiBase(singularityHost)); Map params = taskSearchParams(requestId, deployId, runId, host, lastTaskStatus, startedBefore, startedAfter, updatedBefore, updatedAfter, orderDirection, count, page); @@ -1129,47 +1105,47 @@ private Map taskSearchParams(Optional requestId, Optiona // public Optional addWebhook(SingularityWebhook webhook) { - final String requestUri = String.format(WEBHOOKS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(WEBHOOKS_FORMAT, getApiBase(host)); return post(requestUri, String.format("webhook %s", webhook.getUri()), Optional.of(webhook), Optional.of(SingularityCreateResult.class)); } public Optional deleteWebhook(String webhookId) { - final String requestUri = String.format(WEBHOOKS_DELETE_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(WEBHOOKS_DELETE_FORMAT, getApiBase(host)); Builder queryParamBuider = ImmutableMap.builder().put("webhookId", webhookId); - return deleteWithParams(requestUri, String.format("webhook with id %s", webhookId), webhookId, Optional.absent(), Optional.>of(queryParamBuider.build()), Optional.of(SingularityDeleteResult.class)); + return deleteWithParams(requestUri, String.format("webhook with id %s", webhookId), webhookId, Optional.absent(), Optional.of(queryParamBuider.build()), Optional.of(SingularityDeleteResult.class)); } public Collection getActiveWebhook() { - final String requestUri = String.format(WEBHOOKS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(WEBHOOKS_FORMAT, getApiBase(host)); return getCollection(requestUri, "active webhooks", WEBHOOKS_COLLECTION); } public Collection getQueuedDeployUpdates(String webhookId) { - final String requestUri = String.format(WEBHOOKS_GET_QUEUED_DEPLOY_UPDATES_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(WEBHOOKS_GET_QUEUED_DEPLOY_UPDATES_FORMAT, getApiBase(host)); Builder queryParamBuider = ImmutableMap.builder().put("webhookId", webhookId); - return getCollectionWithParams(requestUri, "deploy updates", Optional.>of(queryParamBuider.build()), DEPLOY_UPDATES_COLLECTION); + return getCollectionWithParams(requestUri, "deploy updates", Optional.of(queryParamBuider.build()), DEPLOY_UPDATES_COLLECTION); } public Collection getQueuedRequestUpdates(String webhookId) { - final String requestUri = String.format(WEBHOOKS_GET_QUEUED_REQUEST_UPDATES_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(WEBHOOKS_GET_QUEUED_REQUEST_UPDATES_FORMAT, getApiBase(host)); Builder queryParamBuider = ImmutableMap.builder().put("webhookId", webhookId); - return getCollectionWithParams(requestUri, "request updates", Optional.>of(queryParamBuider.build()), REQUEST_UPDATES_COLLECTION); + return getCollectionWithParams(requestUri, "request updates", Optional.of(queryParamBuider.build()), REQUEST_UPDATES_COLLECTION); } public Collection getQueuedTaskUpdates(String webhookId) { - final String requestUri = String.format(WEBHOOKS_GET_QUEUED_TASK_UPDATES_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(WEBHOOKS_GET_QUEUED_TASK_UPDATES_FORMAT, getApiBase(host)); Builder queryParamBuider = ImmutableMap.builder().put("webhookId", webhookId); - return getCollectionWithParams(requestUri, "request updates", Optional.>of(queryParamBuider.build()), TASK_UPDATES_COLLECTION); + return getCollectionWithParams(requestUri, "request updates", Optional.of(queryParamBuider.build()), TASK_UPDATES_COLLECTION); } // @@ -1188,9 +1164,9 @@ public Collection getQueuedTaskUpdates(String webh * A {@link SingularitySandbox} object that captures the information for the path to a specific task's Mesos sandbox */ public Optional browseTaskSandBox(String taskId, String path) { - final String requestUrl = String.format(SANDBOX_BROWSE_FORMAT, getApiBase(), taskId); + final Function requestUrl = (host) -> String.format(SANDBOX_BROWSE_FORMAT, getApiBase(host), taskId); - return getSingleWithParams(requestUrl, "browse sandbox for task", taskId, Optional.>of(ImmutableMap.of("path", path)), SingularitySandbox.class); + return getSingleWithParams(requestUrl, "browse sandbox for task", taskId, Optional.of(ImmutableMap.of("path", path)), SingularitySandbox.class); } @@ -1211,7 +1187,7 @@ public Optional browseTaskSandBox(String taskId, String path * A {@link MesosFileChunkObject} that contains the requested partial file contents */ public Optional readSandBoxFile(String taskId, String path, Optional grep, Optional offset, Optional length) { - final String requestUrl = String.format(SANDBOX_READ_FILE_FORMAT, getApiBase(), taskId); + final Function requestUrl = (host) -> String.format(SANDBOX_READ_FILE_FORMAT, getApiBase(host), taskId); Builder queryParamBuider = ImmutableMap.builder().put("path", path); @@ -1225,7 +1201,7 @@ public Optional readSandBoxFile(String taskId, String path queryParamBuider.put("length", length.get()); } - return getSingleWithParams(requestUrl, "Read sandbox file for task", taskId, Optional.>of(queryParamBuider.build()), MesosFileChunkObject.class); + return getSingleWithParams(requestUrl, "Read sandbox file for task", taskId, Optional.of(queryParamBuider.build()), MesosFileChunkObject.class); } // @@ -1242,7 +1218,7 @@ public Optional readSandBoxFile(String taskId, String path * A collection of {@link SingularityS3Log} */ public Collection getTaskLogs(String taskId) { - final String requestUri = String.format(S3_LOG_GET_TASK_LOGS, getApiBase(), taskId); + final Function requestUri = (host) -> String.format(S3_LOG_GET_TASK_LOGS, getApiBase(host), taskId); final String type = String.format("S3 logs for task %s", taskId); @@ -1259,7 +1235,7 @@ public Collection getTaskLogs(String taskId) { * A collection of {@link SingularityS3Log} */ public Collection getRequestLogs(String requestId) { - final String requestUri = String.format(S3_LOG_GET_REQUEST_LOGS, getApiBase(), requestId); + final Function requestUri = (host) -> String.format(S3_LOG_GET_REQUEST_LOGS, getApiBase(host), requestId); final String type = String.format("S3 logs for request %s", requestId); @@ -1278,7 +1254,7 @@ public Collection getRequestLogs(String requestId) { * A collection of {@link SingularityS3Log} */ public Collection getDeployLogs(String requestId, String deployId) { - final String requestUri = String.format(S3_LOG_GET_DEPLOY_LOGS, getApiBase(), requestId, deployId); + final Function requestUri = (host) -> String.format(S3_LOG_GET_DEPLOY_LOGS, getApiBase(host), requestId, deployId); final String type = String.format("S3 logs for deploy %s of request %s", deployId, requestId); @@ -1292,7 +1268,7 @@ public Collection getDeployLogs(String requestId, String deplo * A collection of {@link SingularityRequestGroup} */ public Collection getRequestGroups() { - final String requestUri = String.format(REQUEST_GROUPS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUEST_GROUPS_FORMAT, getApiBase(host)); return getCollection(requestUri, "request groups", REQUEST_GROUP_COLLECTION); } @@ -1307,7 +1283,7 @@ public Collection getRequestGroups() { * A {@link SingularityRequestGroup} */ public Optional getRequestGroup(String requestGroupId) { - final String requestUri = String.format(REQUEST_GROUP_FORMAT, getApiBase(), requestGroupId); + final Function requestUri = (host) -> String.format(REQUEST_GROUP_FORMAT, getApiBase(host), requestGroupId); return getSingle(requestUri, "request group", requestGroupId, SingularityRequestGroup.class); } @@ -1322,7 +1298,7 @@ public Optional getRequestGroup(String requestGroupId) * A {@link SingularityRequestGroup} if the update was successful */ public Optional saveRequestGroup(SingularityRequestGroup requestGroup) { - final String requestUri = String.format(REQUEST_GROUPS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(REQUEST_GROUPS_FORMAT, getApiBase(host)); return post(requestUri, "request group", Optional.of(requestGroup), Optional.of(SingularityRequestGroup.class)); } @@ -1334,7 +1310,7 @@ public Optional saveRequestGroup(SingularityRequestGrou * The request group ID to search for */ public void deleteRequestGroup(String requestGroupId) { - final String requestUri = String.format(REQUEST_GROUP_FORMAT, getApiBase(), requestGroupId); + final Function requestUri = (host) -> String.format(REQUEST_GROUP_FORMAT, getApiBase(host), requestGroupId); delete(requestUri, "request group", requestGroupId); } @@ -1344,47 +1320,47 @@ public void deleteRequestGroup(String requestGroupId) { // public Optional getDisasterStats() { - final String requestUri = String.format(DISASTER_STATS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(DISASTER_STATS_FORMAT, getApiBase(host)); return getSingle(requestUri, "disaster stats", "", SingularityDisastersData.class); } public Collection getActiveDisasters() { - final String requestUri = String.format(ACTIVE_DISASTERS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(ACTIVE_DISASTERS_FORMAT, getApiBase(host)); return getCollection(requestUri, "active disasters", DISASTERS_COLLECTION); } public void disableAutomatedDisasterCreation() { - final String requestUri = String.format(DISABLE_AUTOMATED_ACTIONS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(DISABLE_AUTOMATED_ACTIONS_FORMAT, getApiBase(host)); post(requestUri, "disable automated disasters", Optional.absent()); } public void enableAutomatedDisasterCreation() { - final String requestUri = String.format(ENABLE_AUTOMATED_ACTIONS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(ENABLE_AUTOMATED_ACTIONS_FORMAT, getApiBase(host)); post(requestUri, "enable automated disasters", Optional.absent()); } public void removeDisaster(SingularityDisasterType disasterType) { - final String requestUri = String.format(DISASTER_FORMAT, getApiBase(), disasterType); + final Function requestUri = (host) -> String.format(DISASTER_FORMAT, getApiBase(host), disasterType); delete(requestUri, "remove disaster", disasterType.toString()); } public void activateDisaster(SingularityDisasterType disasterType) { - final String requestUri = String.format(DISASTER_FORMAT, getApiBase(), disasterType); + final Function requestUri = (host) -> String.format(DISASTER_FORMAT, getApiBase(host), disasterType); post(requestUri, "activate disaster", Optional.absent()); } public Collection getDisabledActions() { - final String requestUri = String.format(DISABLED_ACTIONS_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(DISABLED_ACTIONS_FORMAT, getApiBase(host)); return getCollection(requestUri, "disabled actions", DISABLED_ACTIONS_COLLECTION); } public void disableAction(SingularityAction action, Optional request) { - final String requestUri = String.format(DISABLED_ACTION_FORMAT, getApiBase(), action); + final Function requestUri = (host) -> String.format(DISABLED_ACTION_FORMAT, getApiBase(host), action); post(requestUri, "disable action", request); } public void enableAction(SingularityAction action) { - final String requestUri = String.format(DISABLED_ACTION_FORMAT, getApiBase(), action); + final Function requestUri = (host) -> String.format(DISABLED_ACTION_FORMAT, getApiBase(host), action); delete(requestUri, "disable action", action.toString()); } @@ -1393,17 +1369,17 @@ public void enableAction(SingularityAction action) { // public Optional getActivePriorityFreeze() { - final String requestUri = String.format(PRIORITY_FREEZE_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(PRIORITY_FREEZE_FORMAT, getApiBase(host)); return getSingle(requestUri, "priority freeze", "", SingularityPriorityFreezeParent.class); } public Optional createPriorityFreeze(SingularityPriorityFreeze priorityFreezeRequest) { - final String requestUri = String.format(PRIORITY_FREEZE_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(PRIORITY_FREEZE_FORMAT, getApiBase(host)); return post(requestUri, "priority freeze", Optional.of(priorityFreezeRequest), Optional.of(SingularityPriorityFreezeParent.class)); } public void deletePriorityFreeze() { - final String requestUri = String.format(PRIORITY_FREEZE_FORMAT, getApiBase()); + final Function requestUri = (host) -> String.format(PRIORITY_FREEZE_FORMAT, getApiBase(host)); delete(requestUri, "priority freeze", ""); } @@ -1425,9 +1401,8 @@ public void deletePriorityFreeze() { * true if the user is authorized for scope, false otherwise */ public boolean isUserAuthorized(String requestId, String userId, SingularityAuthorizationScope scope) { - final String requestUri = String.format(AUTH_CHECK_FORMAT, getApiBase(), requestId, userId); - Map params = new HashMap<>(); - params.put("scope", scope.name()); + final Function requestUri = (host) -> String.format(AUTH_CHECK_FORMAT, getApiBase(host), requestId, userId); + Map params = Collections.singletonMap("scope", scope.name()); HttpResponse response = executeGetSingleWithParams(requestUri, "auth check", "", Optional.of(params)); return response.isSuccess(); } @@ -1446,7 +1421,7 @@ public boolean isUserAuthorized(String requestId, String userId, SingularityAuth * A {@link SingularityTaskState} if the task was found among active or inactive tasks */ public Optional getTaskState(String taskId) { - final String requestUri = String.format(TRACK_BY_TASK_ID_FORMAT, getApiBase(), taskId); + final Function requestUri = (host) -> String.format(TRACK_BY_TASK_ID_FORMAT, getApiBase(host), taskId); return getSingle(requestUri, "track by task id", taskId, SingularityTaskState.class); } @@ -1462,7 +1437,7 @@ public Optional getTaskState(String taskId) { * A {@link SingularityTaskState} if the task was found among pending, active or inactive tasks */ public Optional getTaskState(String requestId, String runId) { - final String requestUri = String.format(TRACK_BY_RUN_ID_FORMAT, getApiBase(), requestId, runId); + final Function requestUri = (host) -> String.format(TRACK_BY_RUN_ID_FORMAT, getApiBase(host), requestId, runId); return getSingle(requestUri, "track by task id", String.format("%s-%s", requestId, runId), SingularityTaskState.class); } diff --git a/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClientModule.java b/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClientModule.java index 7a54e01c23..ae063f3296 100644 --- a/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClientModule.java +++ b/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClientModule.java @@ -33,6 +33,12 @@ public class SingularityClientModule extends AbstractModule { public static final String CREDENTIALS_PROPERTY_NAME = "singularity.client.credentials"; + // bind this to an int for the number of retry attempts on the request + public static final String RETRY_ATTEMPTS = "singularity.client.retry.attempts"; + + // bind this to a Predicate to say whether a request should be retried + public static final String RETRY_STRATEGY = "singularity.client.retry.strategy"; + private final List hosts; private final Optional httpConfig; diff --git a/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClientProvider.java b/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClientProvider.java index b718a8b9ea..cdd3b43fdb 100644 --- a/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClientProvider.java +++ b/SingularityClient/src/main/java/com/hubspot/singularity/client/SingularityClientProvider.java @@ -5,6 +5,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.function.Predicate; import javax.inject.Named; import javax.inject.Provider; @@ -21,6 +22,7 @@ import com.google.common.collect.Lists; import com.google.inject.Inject; import com.hubspot.horizon.HttpClient; +import com.hubspot.horizon.HttpResponse; import com.hubspot.singularity.SingularityClientCredentials; @Singleton @@ -34,6 +36,9 @@ public class SingularityClientProvider implements Provider { private Optional credentials = Optional.absent(); private boolean ssl = false; + private int retryAttempts = 3; + private Predicate retryStrategy = HttpResponse::isServerError; + @Inject public SingularityClientProvider(@Named(SingularityClientModule.HTTP_CLIENT_NAME) HttpClient httpClient) { this.httpClient = httpClient; @@ -67,6 +72,19 @@ public SingularityClientProvider setCredentials(@Named(SingularityClientModule.C return this; } + @Inject(optional = true) + public SingularityClientProvider setRetryAttempts(@Named(SingularityClientModule.RETRY_ATTEMPTS) int retryAttempts) { + this.retryAttempts = retryAttempts; + return this; + } + + @Inject(optional = true) + public SingularityClientProvider setRetryStrategy(@Named(SingularityClientModule.RETRY_STRATEGY) Predicate retryStrategy) { + this.retryStrategy = retryStrategy; + return this; + } + + public SingularityClientProvider setHosts(String... hosts) { this.hosts = Arrays.asList(hosts); return this; diff --git a/SingularityClient/src/test/java/com/hubspot/singularity/client/SingularityClientTest.java b/SingularityClient/src/test/java/com/hubspot/singularity/client/SingularityClientTest.java new file mode 100644 index 0000000000..e6008f285d --- /dev/null +++ b/SingularityClient/src/test/java/com/hubspot/singularity/client/SingularityClientTest.java @@ -0,0 +1,84 @@ +package com.hubspot.singularity.client; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +import java.net.URI; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.hubspot.horizon.HttpClient; +import com.hubspot.horizon.HttpRequest; +import com.hubspot.horizon.HttpResponse; + +public class SingularityClientTest { + @Mock + private HttpClient httpClient; + @Mock + private HttpResponse response; + @Mock + private HttpRequest request; + @Captor + private ArgumentCaptor requestCaptor; + + private SingularityClient singularityClient; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + singularityClient = buildClient(); + + when(response.getRequest()) + .thenReturn(request); + when(request.getUrl()) + .thenReturn(new URI("test-url")); + } + + @Test + public void itRetriesRequestsThatErrorDueToDeadHost() { + when(httpClient.execute(any())) + .thenReturn(response); + when(response.getStatusCode()) + .thenReturn(503) + .thenReturn(200); + when(response.isServerError()) + .thenReturn(true) + .thenReturn(false); + + singularityClient.pauseSingularityRequest("requestId", Optional.absent()); + + verify(httpClient, times(2)) + .execute(requestCaptor.capture()); + HttpRequest sentRequest = requestCaptor.getValue(); + assertThat(sentRequest.getUrl().toString()) + .matches("http://host(1|2)/singularity/v2/api/requests/request/requestId/pause"); + } + + @Test + public void itThrowsAnExceptionOnServerErrors() { + when(httpClient.execute(any())) + .thenReturn(response); + when(response.getStatusCode()) + .thenReturn(500); + when(response.isError()) + .thenReturn(true); + + assertThatExceptionOfType(SingularityClientException.class) + .isThrownBy(() -> singularityClient.pauseSingularityRequest("requestId", Optional.absent())); + } + + private SingularityClient buildClient() { + return new SingularityClient("singularity/v2/api", httpClient, ImmutableList.of("host1", "host2"), Optional.absent()); + } +} diff --git a/pom.xml b/pom.xml index 9506abf302..0f71e34349 100644 --- a/pom.xml +++ b/pom.xml @@ -262,6 +262,12 @@ + + + org.jukito + jukito + 1.5 +