Skip to content

Commit

Permalink
Merge pull request #1932 from HubSpot/revisit_offer_cache
Browse files Browse the repository at this point in the history
Refactor calling of offer evaluation
  • Loading branch information
ssalinas authored Jun 7, 2019
2 parents 095816f + fafded1 commit f1b2960
Show file tree
Hide file tree
Showing 13 changed files with 302 additions and 283 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.mesos.v1.Protos.MasterInfo;
import org.apache.mesos.v1.Protos.Offer;
Expand Down Expand Up @@ -469,4 +470,8 @@ public static String getSafeTaskIdForDirectory(String taskId) {
public static String formatForLogging(Object object) {
return object.toString().replace("\n", "").replaceAll("( )+", " ");
}

public static String formatOfferIdsForLog(List<Offer> offers) {
return offers.stream().map((o) -> o.getId().getValue()).collect(Collectors.joining(", "));
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
package com.hubspot.singularity.mesos;

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -41,14 +35,12 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.RequestCleanupType;
import com.hubspot.singularity.SingularityAbort;
import com.hubspot.singularity.SingularityAbort.AbortReason;
import com.hubspot.singularity.SingularityAction;
import com.hubspot.singularity.SingularityKilledTaskIdRecord;
import com.hubspot.singularity.SingularityMainModule;
import com.hubspot.singularity.SingularityTask;
Expand All @@ -61,9 +53,6 @@
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.data.transcoders.Transcoder;
import com.hubspot.singularity.helpers.MesosProtosUtils;
import com.hubspot.singularity.helpers.MesosUtils;
import com.hubspot.singularity.mesos.SingularityOfferCache.CachedOffer;
import com.hubspot.singularity.mesos.SingularitySlaveAndRackManager.CheckResult;
import com.hubspot.singularity.scheduler.SingularityLeaderCacheCoordinator;
import com.hubspot.singularity.sentry.SingularityExceptionNotifier;

Expand All @@ -81,15 +70,10 @@ public class SingularityMesosSchedulerImpl extends SingularityMesosScheduler {
private final SingularityLeaderCacheCoordinator leaderCacheCoordinator;
private final SingularityMesosFrameworkMessageHandler messageHandler;
private final SingularitySlaveAndRackManager slaveAndRackManager;
private final DisasterManager disasterManager;
private final OfferCache offerCache;
private final SingularityMesosOfferScheduler offerScheduler;
private final SingularityMesosStatusUpdateHandler statusUpdateHandler;
private final SingularityMesosSchedulerClient mesosSchedulerClient;
private final boolean offerCacheEnabled;
private final boolean delayWhenStatusUpdateDeltaTooLarge;
private final long delayWhenDeltaOverMs;
private final AtomicLong statusUpdateDeltaAvg;
private final AtomicLong lastHeartbeatTime;
private final SingularityConfiguration configuration;
private final TaskManager taskManager;
Expand Down Expand Up @@ -126,16 +110,10 @@ public class SingularityMesosSchedulerImpl extends SingularityMesosScheduler {
this.abort = abort;
this.messageHandler = messageHandler;
this.slaveAndRackManager = slaveAndRackManager;
this.disasterManager = disasterManager;
this.offerCache = offerCache;
this.offerScheduler = offerScheduler;
this.statusUpdateHandler = statusUpdateHandler;
this.mesosSchedulerClient = mesosSchedulerClient;
this.offerCacheEnabled = configuration.isCacheOffers();
this.delayWhenStatusUpdateDeltaTooLarge = configuration.isDelayOfferProcessingForLargeStatusUpdateDelta();
this.delayWhenDeltaOverMs = configuration.getDelayPollersWhenDeltaOverMs();
this.statusUpdateDeltaAvg = statusUpdateDeltaAvg;

this.lastHeartbeatTime = lastHeartbeatTime;
this.taskManager = taskManager;
this.transcoder = transcoder;
Expand Down Expand Up @@ -176,148 +154,15 @@ public void resourceOffers(List<Offer> offers) {
mesosSchedulerClient.decline(offers.stream().map(Offer::getId).collect(Collectors.toList()));
return;
}
callWithOffersLock(() -> {
final long start = System.currentTimeMillis();
lastOfferTimestamp = Optional.of(start);
LOG.info("Received {} offer(s)", offers.size());
boolean delclineImmediately = false;
if (disasterManager.isDisabled(SingularityAction.PROCESS_OFFERS)) {
LOG.info("Processing offers is currently disabled, declining {} offers", offers.size());
delclineImmediately = true;
}
if (delayWhenStatusUpdateDeltaTooLarge && statusUpdateDeltaAvg.get() > delayWhenDeltaOverMs) {
LOG.info("Status update delta is too large ({}), declining offers while status updates catch up", statusUpdateDeltaAvg.get());
delclineImmediately = true;
}

if (delclineImmediately) {
mesosSchedulerClient.decline(offers.stream().map(Offer::getId).collect(Collectors.toList()));
return;
}

if (offerCacheEnabled) {
if (disasterManager.isDisabled(SingularityAction.CACHE_OFFERS)) {
offerCache.disableOfferCache();
} else {
offerCache.enableOfferCache();
}
}

List<Offer> offersToCheck = new ArrayList<>(offers);

List<CachedOffer> cachedOfferList = offerCache.checkoutOffers();
Map<String, CachedOffer> cachedOffers = new HashMap<>();
for (CachedOffer cachedOffer : cachedOfferList) {
if (isValidOffer(cachedOffer.getOffer())) {
cachedOffers.put(cachedOffer.getOfferId(), cachedOffer);
offersToCheck.add(cachedOffer.getOffer());
} else if (cachedOffer.getOffer().getId() != null && cachedOffer.getOffer().getId().getValue() != null) {
mesosSchedulerClient.decline(Collections.singletonList(cachedOffer.getOffer().getId()));
offerCache.rescindOffer(cachedOffer.getOffer().getId());
} else {
LOG.warn("Offer {} was not valid, but we can't decline it because we have no offer ID!");
}
}

offers.parallelStream().forEach((offer) -> {
if (!isValidOffer(offer)) {
offersToCheck.remove(offer);
if (offer.getId() != null && offer.getId().getValue() != null) {
mesosSchedulerClient.decline(Collections.singletonList(offer.getId()));
} else {
LOG.warn("Offer {} was not valid, but we can't decline it because we have no offer ID!");
}
return;
}
String rolesInfo = MesosUtils.getRoles(offer).toString();
LOG.debug("Received offer ID {} with roles {} from {} ({}) for {} cpu(s), {} memory, {} ports, and {} disk", offer.getId().getValue(), rolesInfo, offer.getHostname(), offer.getAgentId().getValue(), MesosUtils.getNumCpus(offer), MesosUtils.getMemory(offer),
MesosUtils.getNumPorts(offer), MesosUtils.getDisk(offer));

CheckResult checkResult = slaveAndRackManager.checkOffer(offer);
if (checkResult == CheckResult.NOT_ACCEPTING_TASKS) {
mesosSchedulerClient.decline(Collections.singletonList(offer.getId()));
offersToCheck.remove(offer);
LOG.debug("Will decline offer {}, slave {} is not currently in a state to launch tasks", offer.getId().getValue(), offer.getHostname());
}
});

final Set<OfferID> acceptedOffers = Sets.newHashSetWithExpectedSize(offersToCheck.size());

try {
Collection<SingularityOfferHolder> offerHolders = offerScheduler.checkOffers(offersToCheck);

for (SingularityOfferHolder offerHolder : offerHolders) {
if (!offerHolder.getAcceptedTasks().isEmpty()) {
List<Offer> leftoverOffers = offerHolder.launchTasksAndGetUnusedOffers(mesosSchedulerClient);

leftoverOffers.forEach((o) -> {
if (cachedOffers.containsKey(o.getId().getValue())) {
offerCache.returnOffer(cachedOffers.remove(o.getId().getValue()));
} else {
offerCache.cacheOffer(start, o);
}
});

List<Offer> offersAcceptedFromSlave = offerHolder.getOffers();
offersAcceptedFromSlave.removeAll(leftoverOffers);
offersAcceptedFromSlave.stream()
.filter((offer) -> cachedOffers.containsKey(offer.getId().getValue()))
.map((o) -> cachedOffers.remove(o.getId().getValue()))
.forEach(offerCache::useOffer);
acceptedOffers.addAll(offersAcceptedFromSlave.stream().map(Offer::getId).collect(Collectors.toList()));
} else {
offerHolder.getOffers().forEach((o) -> {
if (cachedOffers.containsKey(o.getId().getValue())) {
offerCache.returnOffer(cachedOffers.remove(o.getId().getValue()));
} else {
offerCache.cacheOffer(start, o);
}
});
}
}

LOG.info("{} remaining offers not accounted for in offer check", cachedOffers.size());
cachedOffers.values().forEach(offerCache::returnOffer);
} catch (Throwable t) {
LOG.error("Received fatal error while handling offers - will decline all available offers", t);

mesosSchedulerClient.decline(offersToCheck.stream()
.filter((o) -> {
if (o == null || o.getId() == null || o.getId().getValue() == null) {
LOG.warn("Got bad offer {} while trying to decline offers!", o);
return false;
}

return true;
})
.filter((o) -> !acceptedOffers.contains(o.getId()) && !cachedOffers.containsKey(o.getId().getValue()))
.map(Offer::getId)
.collect(Collectors.toList()));

offersToCheck.forEach((o) -> {
if (cachedOffers.containsKey(o.getId().getValue())) {
offerCache.returnOffer(cachedOffers.get(o.getId().getValue()));
}
});

throw t;
}

LOG.info("Finished handling {} new offer(s) ({}), {} accepted, {} declined/cached", offers.size(), JavaUtils.duration(start), acceptedOffers.size(),
offers.size() - acceptedOffers.size());
}, "resourceOffers");
}

private boolean isValidOffer(Offer offer) {
if (offer.getId() == null || offer.getId().getValue() == null) {
LOG.warn("Received offer with null ID, skipping ({})", offer);
return false;
}
if (offer.getAgentId() == null || offer.getAgentId().getValue() == null) {
LOG.warn("Received offer with null agent ID, skipping ({})", offer);
return false;
lastOfferTimestamp = Optional.of(System.currentTimeMillis());
try {
lock.runWithOffersLock(() -> offerScheduler.resourceOffers(offers), "SingularityMesosScheduler");
} catch (Throwable t) {
LOG.error("Scheduler threw an uncaught exception - exiting", t);
exceptionNotifier.notify(String.format("Scheduler threw an uncaught exception (%s)", t.getMessage()), t);
notifyStopping();
abort.abort(AbortReason.UNRECOVERABLE_ERROR, Optional.of(t));
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public List<Offer> launchTasksAndGetUnusedOffers(SingularityMesosSchedulerClient
for (SingularityMesosTaskHolder taskHolder : acceptedTasks) {
taskIds.add(taskHolder.getTask().getTaskId());
toLaunch.add(taskHolder.getMesosTask());
LOG.debug("Launching {} with offer {}", taskHolder.getTask().getTaskId(), offers.get(0).getId());
LOG.debug("Launching {} with possible offers {}", taskHolder.getTask().getTaskId(), MesosUtils.formatOfferIdsForLog(offers));
LOG.trace("Launching {} mesos task: {}", taskHolder.getTask().getTaskId(), MesosUtils.formatForLogging(taskHolder.getMesosTask()));
}

Expand Down Expand Up @@ -167,6 +167,8 @@ public List<Offer> launchTasksAndGetUnusedOffers(SingularityMesosSchedulerClient
Collections.singletonList(Operation.newBuilder().setType(Type.LAUNCH).setLaunch(Launch.newBuilder().addAllTaskInfos(toLaunch).build()).build())
);

LOG.debug("Launched tasks with offers {}, unused: {}", MesosUtils.formatOfferIdsForLog(neededOffers), MesosUtils.formatOfferIdsForLog(leftoverOffers));

LOG.info("{} tasks ({}) launched", taskIds.size(), taskIds);
return leftoverOffers;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,49 +1,41 @@
package com.hubspot.singularity.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import javax.inject.Singleton;

import org.apache.mesos.v1.Protos.Offer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.inject.Inject;
import com.hubspot.singularity.SingularityAction;
import com.hubspot.singularity.SingularityPendingTaskId;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DisasterManager;
import com.hubspot.singularity.mesos.OfferCache;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.mesos.SingularityMesosOfferScheduler;
import com.hubspot.singularity.mesos.SingularityMesosSchedulerClient;
import com.hubspot.singularity.mesos.SingularityOfferCache.CachedOffer;
import com.hubspot.singularity.mesos.SingularityOfferHolder;
import com.hubspot.singularity.mesos.SingularitySchedulerLock;

@Singleton
public class SingularitySchedulerPoller extends SingularityLeaderOnlyPoller {

private static final Logger LOG = LoggerFactory.getLogger(SingularitySchedulerPoller.class);

private final OfferCache offerCache;
private final SingularityMesosSchedulerClient schedulerClient;
private final SingularityMesosOfferScheduler offerScheduler;
private final TaskManager taskManager;
private final SingularityScheduler scheduler;
private final DisasterManager disasterManager;
private final SingularitySchedulerLock lock;

@Inject
SingularitySchedulerPoller(SingularityMesosOfferScheduler offerScheduler, OfferCache offerCache, SingularityMesosSchedulerClient schedulerClient,
SingularitySchedulerPoller(SingularityMesosOfferScheduler offerScheduler, TaskManager taskManager, SingularityScheduler scheduler,
SingularityConfiguration configuration, SingularitySchedulerLock lock, DisasterManager disasterManager) {
super(configuration.getCheckSchedulerEverySeconds(), TimeUnit.SECONDS, true);

this.offerCache = offerCache;
this.offerScheduler = offerScheduler;
this.schedulerClient = schedulerClient;
this.taskManager = taskManager;
this.scheduler = scheduler;
this.disasterManager = disasterManager;
this.lock = lock;
}
Expand All @@ -55,59 +47,15 @@ public void runActionOnPoll() {
return;
}


lock.runWithOffersLock(() -> {
List<CachedOffer> cachedOffers = offerCache.checkoutOffers();
Map<String, CachedOffer> offerIdToCachedOffer = new HashMap<>(cachedOffers.size());
List<Offer> offers = new ArrayList<>(cachedOffers.size());

for (CachedOffer cachedOffer : cachedOffers) {
offerIdToCachedOffer.put(cachedOffer.getOfferId(), cachedOffer);
offers.add(cachedOffer.getOffer());
}

Collection<SingularityOfferHolder> offerHolders = offerScheduler.checkOffers(offers);

if (offerHolders.isEmpty()) {
return;
}

int acceptedOffers = 0;
int launchedTasks = 0;

for (SingularityOfferHolder offerHolder : offerHolders) {
List<CachedOffer> cachedOffersFromHolder = offerHolder.getOffers().stream().map((o) -> offerIdToCachedOffer.get(o.getId().getValue())).collect(Collectors.toList());

if (!offerHolder.getAcceptedTasks().isEmpty()) {
List<Offer> unusedOffers = offerHolder.launchTasksAndGetUnusedOffers(schedulerClient);
launchedTasks += offerHolder.getAcceptedTasks().size();
acceptedOffers += cachedOffersFromHolder.size() - unusedOffers.size();

// Return to the cache those offers which we checked out of the cache, but didn't end up using.
List<CachedOffer> unusedCachedOffers = unusedOffers.stream().map((o) -> offerIdToCachedOffer.get(o.getId().getValue())).collect(Collectors.toList());
unusedCachedOffers.forEach((cachedOffer) -> {
offerIdToCachedOffer.remove(cachedOffer.getOfferId());
offerCache.returnOffer(cachedOffer);
});

// Notify the cache of the cached offers that we did use.
cachedOffersFromHolder.removeAll(unusedCachedOffers);
cachedOffersFromHolder.forEach((cachedOffer) -> {
offerIdToCachedOffer.remove(cachedOffer.getOfferId());
offerCache.useOffer(cachedOffer);
});
} else {
cachedOffersFromHolder.forEach((cachedOffer) -> {
offerIdToCachedOffer.remove(cachedOffer.getOfferId());
offerCache.returnOffer(cachedOffer);
});
}
for (SingularityPendingTaskId taskId : taskManager.getPendingTasksMarkedForDeletion()) {
lock.runWithRequestLock(() -> taskManager.deletePendingTask(taskId), taskId.getRequestId(), String.format("%s#%s", getClass().getSimpleName(), "checkOffers -> pendingTaskDeletes"));
}

LOG.info("{} remaining offers not accounted for in offer check", offerIdToCachedOffer.size());
offerIdToCachedOffer.values().forEach(offerCache::returnOffer);
scheduler.drainPendingQueue();

LOG.info("Launched {} tasks on {} cached offers (returned {})", launchedTasks, acceptedOffers, offerHolders.size() - acceptedOffers);
}, getClass().getSimpleName());
// Check against only cached offers
offerScheduler.resourceOffers(Collections.emptyList());
}, "SingularitySchedulerPoller");
}
}
Loading

0 comments on commit f1b2960

Please sign in to comment.