Skip to content

Commit

Permalink
Merge pull request #1478 from HubSpot/cache_pending_path
Browse files Browse the repository at this point in the history
add leader cache
  • Loading branch information
ssalinas authored Apr 28, 2017
2 parents c4afcf0 + df36882 commit 49f99c9
Show file tree
Hide file tree
Showing 52 changed files with 1,652 additions and 292 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class SingularityState {

private final Optional<Double> minimumPriorityLevel;

private final long avgStatusUpdateDelayMs;

@JsonCreator
public SingularityState(@JsonProperty("activeTasks") int activeTasks, @JsonProperty("launchingTasks") int launchingTasks, @JsonProperty("activeRequests") int activeRequests, @JsonProperty("cooldownRequests") int cooldownRequests,
@JsonProperty("pausedRequests") int pausedRequests, @JsonProperty("scheduledTasks") int scheduledTasks, @JsonProperty("pendingRequests") int pendingRequests, @JsonProperty("lbCleanupTasks") int lbCleanupTasks,
Expand All @@ -62,7 +64,8 @@ public SingularityState(@JsonProperty("activeTasks") int activeTasks, @JsonPrope
@JsonProperty("lateTasks") int lateTasks, @JsonProperty("futureTasks") int futureTasks, @JsonProperty("maxTaskLag") long maxTaskLag, @JsonProperty("generatedAt") long generatedAt,
@JsonProperty("overProvisionedRequestIds") List<String> overProvisionedRequestIds, @JsonProperty("underProvisionedRequestIds") List<String> underProvisionedRequestIds,
@JsonProperty("overProvisionedRequests") int overProvisionedRequests, @JsonProperty("underProvisionedRequests") int underProvisionedRequests, @JsonProperty("finishedRequests") int finishedRequests,
@JsonProperty("unknownRacks") int unknownRacks, @JsonProperty("unknownSlaves") int unknownSlaves, @JsonProperty("authDatastoreHealthy") Optional<Boolean> authDatastoreHealthy, @JsonProperty("minimumPriorityLevel") Optional<Double> minimumPriorityLevel) {
@JsonProperty("unknownRacks") int unknownRacks, @JsonProperty("unknownSlaves") int unknownSlaves, @JsonProperty("authDatastoreHealthy") Optional<Boolean> authDatastoreHealthy, @JsonProperty("minimumPriorityLevel") Optional<Double> minimumPriorityLevel,
@JsonProperty("avgStatusUpdateDelayMs") long avgStatusUpdateDelayMs) {
this.activeTasks = activeTasks;
this.launchingTasks = launchingTasks;
this.activeRequests = activeRequests;
Expand Down Expand Up @@ -96,6 +99,7 @@ public SingularityState(@JsonProperty("activeTasks") int activeTasks, @JsonPrope
this.underProvisionedRequestIds = underProvisionedRequestIds;
this.authDatastoreHealthy = authDatastoreHealthy;
this.minimumPriorityLevel = minimumPriorityLevel;
this.avgStatusUpdateDelayMs = avgStatusUpdateDelayMs;
}

public int getFinishedRequests() {
Expand Down Expand Up @@ -244,6 +248,10 @@ public Optional<Double> getMinimumPriorityLevel() {
return minimumPriorityLevel;
}

public long getAvgStatusUpdateDelayMs() {
return avgStatusUpdateDelayMs;
}

@Override
public String toString() {
return "SingularityState{" +
Expand Down Expand Up @@ -280,6 +288,7 @@ public String toString() {
", underProvisionedRequests=" + underProvisionedRequests +
", authDatastoreHealthy=" + authDatastoreHealthy +
", minimumPriorityLevel=" + minimumPriorityLevel +
", avgStatusUpdateDelayMs=" + avgStatusUpdateDelayMs +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
package com.hubspot.singularity;

import java.util.Collection;
import java.util.Comparator;
import java.util.List;

import javax.annotation.Nonnull;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.hubspot.mesos.JavaUtils;

public class SingularityTaskId extends SingularityId implements SingularityHistoryItem {
Expand All @@ -25,36 +16,6 @@ public class SingularityTaskId extends SingularityId implements SingularityHisto
private final String sanitizedHost;
private final String sanitizedRackId;

public static Predicate<SingularityTaskId> matchingRequest(final String requestId) {
return new Predicate<SingularityTaskId>() {

@Override
public boolean apply(@Nonnull SingularityTaskId input) {
return input.getRequestId().equals(requestId);
}

};
}

public static Predicate<SingularityTaskId> matchingDeploy(final String deployId) {
return new Predicate<SingularityTaskId>() {

@Override
public boolean apply(@Nonnull SingularityTaskId input) {
return input.getDeployId().equals(deployId);
}

};
}

public static Function<SingularityTaskId, String> TASK_ID_TO_REQUEST_ID = new Function<SingularityTaskId, String>() {

@Override
public String apply(@Nonnull SingularityTaskId input) {
return input.getRequestId();
}
};

public static Comparator<SingularityTaskId> INSTANCE_NO_COMPARATOR = new Comparator<SingularityTaskId>() {

@Override
Expand All @@ -73,19 +34,6 @@ public int compare(SingularityTaskId o1, SingularityTaskId o2) {

};

public static Predicate<SingularityTaskId> notIn(Collection<SingularityTaskId> exclude) {
return Predicates.not(Predicates.in(exclude));
}

@SuppressWarnings("unchecked")
public static List<SingularityTaskId> matchingAndNotIn(Collection<SingularityTaskId> taskIds, String requestId, String deployId, Collection<SingularityTaskId> exclude) {
return Lists.newArrayList(Iterables.filter(taskIds, Predicates.and(matchingRequest(requestId), matchingDeploy(deployId), notIn(exclude))));
}

public static List<SingularityTaskId> matchingAndNotIn(Collection<SingularityTaskId> taskIds, String requestId, Collection<SingularityTaskId> exclude) {
return Lists.newArrayList(Iterables.filter(taskIds, Predicates.and(matchingRequest(requestId), notIn(exclude))));
}

public SingularityTaskId(String requestId, String deployId, long startedAt, int instanceNo, String sanitizedHost, String sanitizedRackId) {
super(String.format("%s-%s-%s-%s-%s-%s", requestId, deployId, startedAt, instanceNo, sanitizedHost, sanitizedRackId));
this.requestId = requestId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

import io.dropwizard.lifecycle.Managed;

public class SingularityHttpClient extends AsyncHttpClient implements Managed {
public class SingularityAsyncHttpClient extends AsyncHttpClient implements Managed {

@Inject
public SingularityHttpClient() {}
public SingularityAsyncHttpClient() {}

@Override
public void start() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ public class SingularityLeaderController implements Managed, LeaderLatchListener
private volatile boolean master;

@Inject
public SingularityLeaderController(StateManager stateManager, SingularityConfiguration configuration, SingularityDriverManager driverManager, SingularityAbort abort, SingularityExceptionNotifier exceptionNotifier,
@Named(SingularityMainModule.HTTP_HOST_AND_PORT) HostAndPort hostAndPort, SingularityMesosScheduler scheduler, OfferCache offerCache) {
public SingularityLeaderController(StateManager stateManager, SingularityConfiguration configuration, SingularityDriverManager driverManager,
SingularityAbort abort, SingularityExceptionNotifier exceptionNotifier, @Named(SingularityMainModule.HTTP_HOST_AND_PORT) HostAndPort hostAndPort, SingularityMesosScheduler scheduler,
OfferCache offerCache) {
this.driverManager = driverManager;
this.stateManager = stateManager;
this.abort = abort;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import java.net.UnknownHostException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;

import javax.inject.Inject;
import javax.inject.Provider;
Expand All @@ -23,7 +25,6 @@
import com.amazonaws.services.s3.AmazonS3Client;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -106,7 +107,8 @@ public class SingularityMainModule implements Module {

public static final String LOST_TASKS_METER = "singularity.lost.tasks.meter";

public static final String STATUS_UPDATE_DELTA_TIMER = "singularity.status.update.delta.timer";
public static final String STATUS_UPDATE_DELTA_30S_AVERAGE = "singularity.status.update.delta.minute.average";
public static final String STATUS_UPDATE_DELTAS = "singularity.status.update.deltas";

private final SingularityConfiguration configuration;

Expand Down Expand Up @@ -153,7 +155,7 @@ public void configure(Binder binder) {
binder.bind(ObjectMapper.class).toProvider(DropwizardObjectMapperProvider.class).in(Scopes.SINGLETON);
binder.bind(MetricRegistry.class).toProvider(DropwizardMetricRegistryProvider.class).in(Scopes.SINGLETON);

binder.bind(AsyncHttpClient.class).to(SingularityHttpClient.class).in(Scopes.SINGLETON);
binder.bind(AsyncHttpClient.class).to(SingularityAsyncHttpClient.class).in(Scopes.SINGLETON);
binder.bind(ServerProvider.class).in(Scopes.SINGLETON);

binder.bind(SingularityDropwizardHealthcheck.class).in(Scopes.SINGLETON);
Expand Down Expand Up @@ -365,8 +367,15 @@ public Meter providesLostTasksMeter(MetricRegistry registry) {

@Provides
@Singleton
@Named(STATUS_UPDATE_DELTA_TIMER)
public Timer providesStatusUpdateDeltaMeter(MetricRegistry registry) {
return registry.timer("com.hubspot.singularity.statusUpdateDelta");
@Named(STATUS_UPDATE_DELTA_30S_AVERAGE)
public AtomicLong provideDeltasMap() {
return new AtomicLong(0);
}

@Provides
@Singleton
@Named(STATUS_UPDATE_DELTAS)
public ConcurrentHashMap<Long, Long> provideUpdateDeltasMap() {
return new ConcurrentHashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,12 @@ public boolean apply(@Nonnull T input) {
});
}

public Iterable<String> filterAuthorizedRequestIds(final Optional<SingularityUser> user, List<String> requestIds, final SingularityAuthorizationScope scope) {
public Iterable<String> filterAuthorizedRequestIds(final Optional<SingularityUser> user, List<String> requestIds, final SingularityAuthorizationScope scope, boolean useWebCache) {
if (hasAdminAuthorization(user)) {
return requestIds;
}

final Map<String, SingularityRequestWithState> requestMap = Maps.uniqueIndex(requestManager.getRequests(requestIds), new Function<SingularityRequestWithState, String>() {
final Map<String, SingularityRequestWithState> requestMap = Maps.uniqueIndex(requestManager.getRequests(requestIds, useWebCache), new Function<SingularityRequestWithState, String>() {
@Override
public String apply(@Nonnull SingularityRequestWithState input) {
return input.getRequest().getId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,20 @@ public class SingularityConfiguration extends Configuration {

private boolean cacheOffers = true;

private long cacheForWebForMillis = TimeUnit.SECONDS.toMillis(30);

private int cacheTasksMaxSize = 5000;

private int cacheTasksInitialSize = 100;

private long cacheTasksForMillis = TimeUnit.DAYS.toMillis(1);

private int cacheDeploysMaxSize = 2000;

private int cacheDeploysInitialSize = 100;

private long cacheDeploysForMillis = TimeUnit.DAYS.toMillis(5);

private long cacheStateForMillis = TimeUnit.SECONDS.toMillis(30);

private long checkDeploysEverySeconds = 5;
Expand Down Expand Up @@ -320,6 +328,10 @@ public class SingularityConfiguration extends Configuration {

private int maxDecommissioningSlaves = 2;

private long delayPollersWhenDeltaOverMs = 15000;

private boolean delayOfferProcessingForLargeStatusUpdateDelta = true;

public long getAskDriverToKillTasksAgainAfterMillis() {
return askDriverToKillTasksAgainAfterMillis;
}
Expand Down Expand Up @@ -392,6 +404,14 @@ public long getPendingDeployHoldTaskDuringDecommissionMillis() {
return pendingDeployHoldTaskDuringDecommissionMillis;
}

public long getCacheForWebForMillis() {
return cacheForWebForMillis;
}

public void setCacheForWebForMillis(long cacheForWebForMillis) {
this.cacheForWebForMillis = cacheForWebForMillis;
}

public void setPendingDeployHoldTaskDuringDecommissionMillis(long pendingDeployHoldTaskDuringDecommissionMillis) {
this.pendingDeployHoldTaskDuringDecommissionMillis = pendingDeployHoldTaskDuringDecommissionMillis;
}
Expand Down Expand Up @@ -432,6 +452,30 @@ public void setCacheTasksInitialSize(int cacheTasksInitialSize) {
this.cacheTasksInitialSize = cacheTasksInitialSize;
}

public int getCacheDeploysMaxSize() {
return cacheDeploysMaxSize;
}

public void setCacheDeploysMaxSize(int cacheDeploysMaxSize) {
this.cacheDeploysMaxSize = cacheDeploysMaxSize;
}

public int getCacheDeploysInitialSize() {
return cacheDeploysInitialSize;
}

public void setCacheDeploysInitialSize(int cacheDeploysInitialSize) {
this.cacheDeploysInitialSize = cacheDeploysInitialSize;
}

public long getCacheDeploysForMillis() {
return cacheDeploysForMillis;
}

public void setCacheDeploysForMillis(long cacheDeploysForMillis) {
this.cacheDeploysForMillis = cacheDeploysForMillis;
}

public int getCoreThreadpoolSize() {
return coreThreadpoolSize;
}
Expand Down Expand Up @@ -1334,4 +1378,20 @@ public int getMaxDecommissioningSlaves() {
public void setMaxDecommissioningSlaves(int maxDecommissioningSlaves) {
this.maxDecommissioningSlaves = maxDecommissioningSlaves;
}

public long getDelayPollersWhenDeltaOverMs() {
return delayPollersWhenDeltaOverMs;
}

public void setDelayPollersWhenDeltaOverMs(long delayPollersWhenDeltaOverMs) {
this.delayPollersWhenDeltaOverMs = delayPollersWhenDeltaOverMs;
}

public boolean isDelayOfferProcessingForLargeStatusUpdateDelta() {
return delayOfferProcessingForLargeStatusUpdateDelta;
}

public void setDelayOfferProcessingForLargeStatusUpdateDelta(boolean delayOfferProcessingForLargeStatusUpdateDelta) {
this.delayOfferProcessingForLargeStatusUpdateDelta = delayOfferProcessingForLargeStatusUpdateDelta;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,21 @@ private CuratorQueryMethod(OperationType operationType) {
}

private <T> List<T> getAsyncChildrenThrows(final String parent, final Transcoder<T> transcoder) throws Exception {
final List<String> children = getChildren(parent);
final List<String> paths = Lists.newArrayListWithCapacity(children.size());
try {
List<String> children = getChildren(parent);
final List<String> paths = Lists.newArrayListWithCapacity(children.size());

for (String child : children) {
paths.add(ZKPaths.makePath(parent, child));
}
for (String child : children) {
paths.add(ZKPaths.makePath(parent, child));
}

List<T> result = new ArrayList<>(getAsyncThrows(parent, paths, transcoder, Optional.<ZkCache<T>> absent()).values());

return new ArrayList<>(getAsyncThrows(parent, paths, transcoder, Optional.<ZkCache<T>> absent()).values());

return result;
} catch (Throwable t) {
throw t;
}
}

private <T> Map<String, T> getAsyncThrows(final String pathNameForLogs, final Collection<String> paths, final Transcoder<T> transcoder, final Optional<ZkCache<T>> cache) throws Exception {
Expand Down
Loading

0 comments on commit 49f99c9

Please sign in to comment.