Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor calling of offer evaluation #1932

Merged
merged 9 commits into from
Jun 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.mesos.v1.Protos.MasterInfo;
import org.apache.mesos.v1.Protos.Offer;
Expand Down Expand Up @@ -469,4 +470,8 @@ public static String getSafeTaskIdForDirectory(String taskId) {
public static String formatForLogging(Object object) {
return object.toString().replace("\n", "").replaceAll("( )+", " ");
}

public static String formatOfferIdsForLog(List<Offer> offers) {
return offers.stream().map((o) -> o.getId().getValue()).collect(Collectors.joining(", "));
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
package com.hubspot.singularity.mesos;

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -38,14 +32,12 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.RequestCleanupType;
import com.hubspot.singularity.SingularityAbort;
import com.hubspot.singularity.SingularityAbort.AbortReason;
import com.hubspot.singularity.SingularityAction;
import com.hubspot.singularity.SingularityKilledTaskIdRecord;
import com.hubspot.singularity.SingularityMainModule;
import com.hubspot.singularity.SingularityTask;
Expand All @@ -58,9 +50,6 @@
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.data.transcoders.Transcoder;
import com.hubspot.singularity.helpers.MesosProtosUtils;
import com.hubspot.singularity.helpers.MesosUtils;
import com.hubspot.singularity.mesos.SingularityOfferCache.CachedOffer;
import com.hubspot.singularity.mesos.SingularitySlaveAndRackManager.CheckResult;
import com.hubspot.singularity.scheduler.SingularityLeaderCacheCoordinator;
import com.hubspot.singularity.sentry.SingularityExceptionNotifier;

Expand All @@ -78,15 +67,10 @@ public class SingularityMesosSchedulerImpl extends SingularityMesosScheduler {
private final SingularityLeaderCacheCoordinator leaderCacheCoordinator;
private final SingularityMesosFrameworkMessageHandler messageHandler;
private final SingularitySlaveAndRackManager slaveAndRackManager;
private final DisasterManager disasterManager;
private final OfferCache offerCache;
private final SingularityMesosOfferScheduler offerScheduler;
private final SingularityMesosStatusUpdateHandler statusUpdateHandler;
private final SingularityMesosSchedulerClient mesosSchedulerClient;
private final boolean offerCacheEnabled;
private final boolean delayWhenStatusUpdateDeltaTooLarge;
private final long delayWhenDeltaOverMs;
private final AtomicLong statusUpdateDeltaAvg;
private final AtomicLong lastHeartbeatTime;
private final SingularityConfiguration configuration;
private final TaskManager taskManager;
Expand Down Expand Up @@ -123,16 +107,10 @@ public class SingularityMesosSchedulerImpl extends SingularityMesosScheduler {
this.abort = abort;
this.messageHandler = messageHandler;
this.slaveAndRackManager = slaveAndRackManager;
this.disasterManager = disasterManager;
this.offerCache = offerCache;
this.offerScheduler = offerScheduler;
this.statusUpdateHandler = statusUpdateHandler;
this.mesosSchedulerClient = mesosSchedulerClient;
this.offerCacheEnabled = configuration.isCacheOffers();
this.delayWhenStatusUpdateDeltaTooLarge = configuration.isDelayOfferProcessingForLargeStatusUpdateDelta();
this.delayWhenDeltaOverMs = configuration.getDelayPollersWhenDeltaOverMs();
this.statusUpdateDeltaAvg = statusUpdateDeltaAvg;

this.lastHeartbeatTime = lastHeartbeatTime;
this.taskManager = taskManager;
this.transcoder = transcoder;
Expand Down Expand Up @@ -173,148 +151,15 @@ public void resourceOffers(List<Offer> offers) {
mesosSchedulerClient.decline(offers.stream().map(Offer::getId).collect(Collectors.toList()));
return;
}
callWithOffersLock(() -> {
final long start = System.currentTimeMillis();
lastOfferTimestamp = Optional.of(start);
LOG.info("Received {} offer(s)", offers.size());
boolean delclineImmediately = false;
if (disasterManager.isDisabled(SingularityAction.PROCESS_OFFERS)) {
LOG.info("Processing offers is currently disabled, declining {} offers", offers.size());
delclineImmediately = true;
}
if (delayWhenStatusUpdateDeltaTooLarge && statusUpdateDeltaAvg.get() > delayWhenDeltaOverMs) {
LOG.info("Status update delta is too large ({}), declining offers while status updates catch up", statusUpdateDeltaAvg.get());
delclineImmediately = true;
}

if (delclineImmediately) {
mesosSchedulerClient.decline(offers.stream().map(Offer::getId).collect(Collectors.toList()));
return;
}

if (offerCacheEnabled) {
if (disasterManager.isDisabled(SingularityAction.CACHE_OFFERS)) {
offerCache.disableOfferCache();
} else {
offerCache.enableOfferCache();
}
}

List<Offer> offersToCheck = new ArrayList<>(offers);

List<CachedOffer> cachedOfferList = offerCache.checkoutOffers();
Map<String, CachedOffer> cachedOffers = new HashMap<>();
for (CachedOffer cachedOffer : cachedOfferList) {
if (isValidOffer(cachedOffer.getOffer())) {
cachedOffers.put(cachedOffer.getOfferId(), cachedOffer);
offersToCheck.add(cachedOffer.getOffer());
} else if (cachedOffer.getOffer().getId() != null && cachedOffer.getOffer().getId().getValue() != null) {
mesosSchedulerClient.decline(Collections.singletonList(cachedOffer.getOffer().getId()));
offerCache.rescindOffer(cachedOffer.getOffer().getId());
} else {
LOG.warn("Offer {} was not valid, but we can't decline it because we have no offer ID!");
}
}

offers.parallelStream().forEach((offer) -> {
if (!isValidOffer(offer)) {
offersToCheck.remove(offer);
if (offer.getId() != null && offer.getId().getValue() != null) {
mesosSchedulerClient.decline(Collections.singletonList(offer.getId()));
} else {
LOG.warn("Offer {} was not valid, but we can't decline it because we have no offer ID!");
}
return;
}
String rolesInfo = MesosUtils.getRoles(offer).toString();
LOG.debug("Received offer ID {} with roles {} from {} ({}) for {} cpu(s), {} memory, {} ports, and {} disk", offer.getId().getValue(), rolesInfo, offer.getHostname(), offer.getAgentId().getValue(), MesosUtils.getNumCpus(offer), MesosUtils.getMemory(offer),
MesosUtils.getNumPorts(offer), MesosUtils.getDisk(offer));

CheckResult checkResult = slaveAndRackManager.checkOffer(offer);
if (checkResult == CheckResult.NOT_ACCEPTING_TASKS) {
mesosSchedulerClient.decline(Collections.singletonList(offer.getId()));
offersToCheck.remove(offer);
LOG.debug("Will decline offer {}, slave {} is not currently in a state to launch tasks", offer.getId().getValue(), offer.getHostname());
}
});

final Set<OfferID> acceptedOffers = Sets.newHashSetWithExpectedSize(offersToCheck.size());

try {
Collection<SingularityOfferHolder> offerHolders = offerScheduler.checkOffers(offersToCheck);

for (SingularityOfferHolder offerHolder : offerHolders) {
if (!offerHolder.getAcceptedTasks().isEmpty()) {
List<Offer> leftoverOffers = offerHolder.launchTasksAndGetUnusedOffers(mesosSchedulerClient);

leftoverOffers.forEach((o) -> {
if (cachedOffers.containsKey(o.getId().getValue())) {
offerCache.returnOffer(cachedOffers.remove(o.getId().getValue()));
} else {
offerCache.cacheOffer(start, o);
}
});

List<Offer> offersAcceptedFromSlave = offerHolder.getOffers();
offersAcceptedFromSlave.removeAll(leftoverOffers);
offersAcceptedFromSlave.stream()
.filter((offer) -> cachedOffers.containsKey(offer.getId().getValue()))
.map((o) -> cachedOffers.remove(o.getId().getValue()))
.forEach(offerCache::useOffer);
acceptedOffers.addAll(offersAcceptedFromSlave.stream().map(Offer::getId).collect(Collectors.toList()));
} else {
offerHolder.getOffers().forEach((o) -> {
if (cachedOffers.containsKey(o.getId().getValue())) {
offerCache.returnOffer(cachedOffers.remove(o.getId().getValue()));
} else {
offerCache.cacheOffer(start, o);
}
});
}
}

LOG.info("{} remaining offers not accounted for in offer check", cachedOffers.size());
cachedOffers.values().forEach(offerCache::returnOffer);
} catch (Throwable t) {
LOG.error("Received fatal error while handling offers - will decline all available offers", t);

mesosSchedulerClient.decline(offersToCheck.stream()
.filter((o) -> {
if (o == null || o.getId() == null || o.getId().getValue() == null) {
LOG.warn("Got bad offer {} while trying to decline offers!", o);
return false;
}

return true;
})
.filter((o) -> !acceptedOffers.contains(o.getId()) && !cachedOffers.containsKey(o.getId().getValue()))
.map(Offer::getId)
.collect(Collectors.toList()));

offersToCheck.forEach((o) -> {
if (cachedOffers.containsKey(o.getId().getValue())) {
offerCache.returnOffer(cachedOffers.get(o.getId().getValue()));
}
});

throw t;
}

LOG.info("Finished handling {} new offer(s) ({}), {} accepted, {} declined/cached", offers.size(), JavaUtils.duration(start), acceptedOffers.size(),
offers.size() - acceptedOffers.size());
}, "resourceOffers");
}

private boolean isValidOffer(Offer offer) {
if (offer.getId() == null || offer.getId().getValue() == null) {
LOG.warn("Received offer with null ID, skipping ({})", offer);
return false;
}
if (offer.getAgentId() == null || offer.getAgentId().getValue() == null) {
LOG.warn("Received offer with null agent ID, skipping ({})", offer);
return false;
lastOfferTimestamp = Optional.of(System.currentTimeMillis());
try {
lock.runWithOffersLock(() -> offerScheduler.resourceOffers(offers), "SingularityMesosScheduler");
} catch (Throwable t) {
LOG.error("Scheduler threw an uncaught exception - exiting", t);
exceptionNotifier.notify(String.format("Scheduler threw an uncaught exception (%s)", t.getMessage()), t);
notifyStopping();
abort.abort(AbortReason.UNRECOVERABLE_ERROR, Optional.of(t));
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public List<Offer> launchTasksAndGetUnusedOffers(SingularityMesosSchedulerClient
for (SingularityMesosTaskHolder taskHolder : acceptedTasks) {
taskIds.add(taskHolder.getTask().getTaskId());
toLaunch.add(taskHolder.getMesosTask());
LOG.debug("Launching {} with offer {}", taskHolder.getTask().getTaskId(), offers.get(0).getId());
LOG.debug("Launching {} with possible offers {}", taskHolder.getTask().getTaskId(), MesosUtils.formatOfferIdsForLog(offers));
LOG.trace("Launching {} mesos task: {}", taskHolder.getTask().getTaskId(), MesosUtils.formatForLogging(taskHolder.getMesosTask()));
}

Expand Down Expand Up @@ -167,6 +167,8 @@ public List<Offer> launchTasksAndGetUnusedOffers(SingularityMesosSchedulerClient
Collections.singletonList(Operation.newBuilder().setType(Type.LAUNCH).setLaunch(Launch.newBuilder().addAllTaskInfos(toLaunch).build()).build())
);

LOG.debug("Launched tasks with offers {}, unused: {}", MesosUtils.formatOfferIdsForLog(neededOffers), MesosUtils.formatOfferIdsForLog(leftoverOffers));

LOG.info("{} tasks ({}) launched", taskIds.size(), taskIds);
return leftoverOffers;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,49 +1,41 @@
package com.hubspot.singularity.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import javax.inject.Singleton;

import org.apache.mesos.v1.Protos.Offer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.inject.Inject;
import com.hubspot.singularity.SingularityAction;
import com.hubspot.singularity.SingularityPendingTaskId;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DisasterManager;
import com.hubspot.singularity.mesos.OfferCache;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.mesos.SingularityMesosOfferScheduler;
import com.hubspot.singularity.mesos.SingularityMesosSchedulerClient;
import com.hubspot.singularity.mesos.SingularityOfferCache.CachedOffer;
import com.hubspot.singularity.mesos.SingularityOfferHolder;
import com.hubspot.singularity.mesos.SingularitySchedulerLock;

@Singleton
public class SingularitySchedulerPoller extends SingularityLeaderOnlyPoller {

private static final Logger LOG = LoggerFactory.getLogger(SingularitySchedulerPoller.class);

private final OfferCache offerCache;
private final SingularityMesosSchedulerClient schedulerClient;
private final SingularityMesosOfferScheduler offerScheduler;
private final TaskManager taskManager;
private final SingularityScheduler scheduler;
private final DisasterManager disasterManager;
private final SingularitySchedulerLock lock;

@Inject
SingularitySchedulerPoller(SingularityMesosOfferScheduler offerScheduler, OfferCache offerCache, SingularityMesosSchedulerClient schedulerClient,
SingularitySchedulerPoller(SingularityMesosOfferScheduler offerScheduler, TaskManager taskManager, SingularityScheduler scheduler,
SingularityConfiguration configuration, SingularitySchedulerLock lock, DisasterManager disasterManager) {
super(configuration.getCheckSchedulerEverySeconds(), TimeUnit.SECONDS, true);

this.offerCache = offerCache;
this.offerScheduler = offerScheduler;
this.schedulerClient = schedulerClient;
this.taskManager = taskManager;
this.scheduler = scheduler;
this.disasterManager = disasterManager;
this.lock = lock;
}
Expand All @@ -55,59 +47,15 @@ public void runActionOnPoll() {
return;
}


lock.runWithOffersLock(() -> {
List<CachedOffer> cachedOffers = offerCache.checkoutOffers();
Map<String, CachedOffer> offerIdToCachedOffer = new HashMap<>(cachedOffers.size());
List<Offer> offers = new ArrayList<>(cachedOffers.size());

for (CachedOffer cachedOffer : cachedOffers) {
offerIdToCachedOffer.put(cachedOffer.getOfferId(), cachedOffer);
offers.add(cachedOffer.getOffer());
}

Collection<SingularityOfferHolder> offerHolders = offerScheduler.checkOffers(offers);

if (offerHolders.isEmpty()) {
return;
}

int acceptedOffers = 0;
int launchedTasks = 0;

for (SingularityOfferHolder offerHolder : offerHolders) {
List<CachedOffer> cachedOffersFromHolder = offerHolder.getOffers().stream().map((o) -> offerIdToCachedOffer.get(o.getId().getValue())).collect(Collectors.toList());

if (!offerHolder.getAcceptedTasks().isEmpty()) {
List<Offer> unusedOffers = offerHolder.launchTasksAndGetUnusedOffers(schedulerClient);
launchedTasks += offerHolder.getAcceptedTasks().size();
acceptedOffers += cachedOffersFromHolder.size() - unusedOffers.size();

// Return to the cache those offers which we checked out of the cache, but didn't end up using.
List<CachedOffer> unusedCachedOffers = unusedOffers.stream().map((o) -> offerIdToCachedOffer.get(o.getId().getValue())).collect(Collectors.toList());
unusedCachedOffers.forEach((cachedOffer) -> {
offerIdToCachedOffer.remove(cachedOffer.getOfferId());
offerCache.returnOffer(cachedOffer);
});

// Notify the cache of the cached offers that we did use.
cachedOffersFromHolder.removeAll(unusedCachedOffers);
cachedOffersFromHolder.forEach((cachedOffer) -> {
offerIdToCachedOffer.remove(cachedOffer.getOfferId());
offerCache.useOffer(cachedOffer);
});
} else {
cachedOffersFromHolder.forEach((cachedOffer) -> {
offerIdToCachedOffer.remove(cachedOffer.getOfferId());
offerCache.returnOffer(cachedOffer);
});
}
for (SingularityPendingTaskId taskId : taskManager.getPendingTasksMarkedForDeletion()) {
lock.runWithRequestLock(() -> taskManager.deletePendingTask(taskId), taskId.getRequestId(), String.format("%s#%s", getClass().getSimpleName(), "checkOffers -> pendingTaskDeletes"));
}

LOG.info("{} remaining offers not accounted for in offer check", offerIdToCachedOffer.size());
offerIdToCachedOffer.values().forEach(offerCache::returnOffer);
scheduler.drainPendingQueue();

LOG.info("Launched {} tasks on {} cached offers (returned {})", launchedTasks, acceptedOffers, offerHolders.size() - acceptedOffers);
}, getClass().getSimpleName());
// Check against only cached offers
offerScheduler.resourceOffers(Collections.emptyList());
}, "SingularitySchedulerPoller");
}
}
Loading