diff --git a/SingularityBase/src/main/java/com/hubspot/singularity/TaskCleanupType.java b/SingularityBase/src/main/java/com/hubspot/singularity/TaskCleanupType.java index 7805802234..4a03b56d60 100644 --- a/SingularityBase/src/main/java/com/hubspot/singularity/TaskCleanupType.java +++ b/SingularityBase/src/main/java/com/hubspot/singularity/TaskCleanupType.java @@ -20,6 +20,7 @@ public enum TaskCleanupType { PAUSING(false, false), PRIORITY_KILL(true, true), REBALANCE_CPU_USAGE(false, false), + REBALANCE_MEMORY_USAGE(false, false), REBALANCE_RACKS(false, false), REBALANCE_SLAVE_ATTRIBUTES(false, false), REQUEST_DELETING(true, true), diff --git a/SingularityService/pom.xml b/SingularityService/pom.xml index 12fca5c1c6..8719324e2f 100644 --- a/SingularityService/pom.xml +++ b/SingularityService/pom.xml @@ -121,6 +121,11 @@ dropwizard-guicier + + io.dropwizard + dropwizard-util + + com.hubspot.jackson jackson-datatype-protobuf diff --git a/SingularityService/src/main/java/com/hubspot/singularity/config/SingularityConfiguration.java b/SingularityService/src/main/java/com/hubspot/singularity/config/SingularityConfiguration.java index 227f4cb100..335db94f33 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/config/SingularityConfiguration.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/config/SingularityConfiguration.java @@ -82,7 +82,9 @@ public class SingularityConfiguration extends Configuration { private int maxConcurrentUsageCollections = 15; - private boolean shuffleTasksForOverloadedSlaves = false; // recommended 'true' when oversubscribing cpu for larger clusters + private boolean shuffleTasksForOverloadedSlaves = false; // recommended 'true' when oversubscribing resources for larger clusters + + private double shuffleTasksWhenSlaveMemoryUtilizationPercentageExceeds = 0.82; private int maxTasksToShuffleTotal = 6; // Do not allow more than this many shuffle cleanups at once cluster-wide @@ -1497,6 +1499,14 @@ public void setShuffleTasksForOverloadedSlaves(boolean shuffleTasksForOverloaded this.shuffleTasksForOverloadedSlaves = shuffleTasksForOverloadedSlaves; } + public double getShuffleTasksWhenSlaveMemoryUtilizationPercentageExceeds() { + return shuffleTasksWhenSlaveMemoryUtilizationPercentageExceeds; + } + + public void setShuffleTasksWhenSlaveMemoryUtilizationPercentageExceeds(double shuffleTasksWhenSlaveMemoryUtilizationPercentageExceeds) { + this.shuffleTasksWhenSlaveMemoryUtilizationPercentageExceeds = shuffleTasksWhenSlaveMemoryUtilizationPercentageExceeds; + } + public int getMaxTasksToShuffleTotal() { return maxTasksToShuffleTotal; } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsageHelper.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsageHelper.java index 71dfc7c99c..a611602d84 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsageHelper.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsageHelper.java @@ -156,7 +156,8 @@ public void collectSlaveUsage( break; } - boolean slaveOverloaded = systemCpusTotal > 0 && systemLoad / systemCpusTotal > 1.0; + boolean slaveOverloadedForCpu = systemCpusTotal > 0 && systemLoad / systemCpusTotal > 1.0; + boolean slaveExperiencingHighMemUsage = ((systemMemTotalBytes - systemMemFreeBytes) / systemMemTotalBytes) > configuration.getShuffleTasksWhenSlaveMemoryUtilizationPercentageExceeds(); List possibleTasksToShuffle = new ArrayList<>(); for (MesosTaskMonitorObject taskUsage : allTaskUsage) { @@ -216,11 +217,11 @@ public void collectSlaveUsage( cpusUsedOnSlave += taskCpusUsed; } - if (configuration.isShuffleTasksForOverloadedSlaves() && currentUsage != null && currentUsage.getCpusUsed() > 0) { + if (currentUsage != null && currentUsage.getCpusUsed() > 0) { if (isEligibleForShuffle(task)) { Optional maybeCleanupUpdate = taskManager.getTaskHistoryUpdate(task, ExtendedTaskState.TASK_CLEANING); if (maybeCleanupUpdate.isPresent() && isTaskAlreadyCleanedUpForShuffle(maybeCleanupUpdate.get())) { - LOG.trace("Task {} already being cleaned up to spread cpu usage, skipping", taskId); + LOG.trace("Task {} already being cleaned up to spread cpu or mem usage, skipping", taskId); } else { if (maybeResources.isPresent()) { possibleTasksToShuffle.add(new TaskIdWithUsage(task, maybeResources.get(), currentUsage)); @@ -244,7 +245,7 @@ public void collectSlaveUsage( memoryMbTotal, diskMbUsedOnSlave, diskMbReservedOnSlave, diskMbTotal, allTaskUsage.size(), now, systemMemTotalBytes, systemMemFreeBytes, systemCpusTotal, systemLoad1Min, systemLoad5Min, systemLoad15Min, slaveDiskUsed, slaveDiskTotal); - if (slaveOverloaded) { + if (slaveOverloadedForCpu || slaveExperiencingHighMemUsage) { overLoadedHosts.put(slaveUsage, possibleTasksToShuffle); } @@ -314,11 +315,13 @@ private boolean isLongRunning(SingularityTaskId task) { } private boolean isTaskAlreadyCleanedUpForShuffle(SingularityTaskHistoryUpdate taskHistoryUpdate) { - if (taskHistoryUpdate.getStatusMessage().or("").contains(TaskCleanupType.REBALANCE_CPU_USAGE.name())) { + String statusMessage = taskHistoryUpdate.getStatusMessage().or(""); + if (statusMessage.contains(TaskCleanupType.REBALANCE_CPU_USAGE.name()) || statusMessage.contains(TaskCleanupType.REBALANCE_MEMORY_USAGE.name())) { return true; } for (SingularityTaskHistoryUpdate previous : taskHistoryUpdate.getPrevious()) { - if (previous.getStatusMessage().or("").contains(TaskCleanupType.REBALANCE_CPU_USAGE.name())) { + statusMessage = previous.getStatusMessage().or(""); + if (statusMessage.contains(TaskCleanupType.REBALANCE_CPU_USAGE.name()) || statusMessage.contains(TaskCleanupType.REBALANCE_MEMORY_USAGE.name())) { return true; } } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsagePoller.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsagePoller.java index 44a3c5cade..38ceaf5c77 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsagePoller.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsagePoller.java @@ -35,6 +35,9 @@ import com.hubspot.singularity.data.RequestManager; import com.hubspot.singularity.data.TaskManager; import com.hubspot.singularity.data.usage.UsageManager; +import com.hubspot.singularity.scheduler.SingularityUsagePoller.OverusedResource.Type; + +import io.dropwizard.util.SizeUnit; public class SingularityUsagePoller extends SingularityLeaderOnlyPoller { @@ -111,10 +114,42 @@ public void runActionOnPoll() { } } + static class OverusedResource { + enum Type { MEMORY, CPU }; + + double overusage; + Type resourceType; + + OverusedResource(double overusage, Type resourceType) { + this.overusage = overusage; + this.resourceType = resourceType; + } + } + + private double getTargetMemoryUtilizationForHost(SingularitySlaveUsage usage) { + return configuration.getShuffleTasksWhenSlaveMemoryUtilizationPercentageExceeds() * usage.getSystemMemTotalBytes(); + } + + private OverusedResource getMostOverusedResource(SingularitySlaveUsage overloadedSlave, double currentCpuLoad, double currentMemUsageBytes) { + double cpuOverage = currentCpuLoad - overloadedSlave.getSystemCpusTotal(); + + double cpuOverusage = cpuOverage / overloadedSlave.getSystemCpusTotal(); + + double targetMemUsageBytes = getTargetMemoryUtilizationForHost(overloadedSlave); + double memOverageBytes = currentMemUsageBytes - targetMemUsageBytes; + double memOverusage = memOverageBytes / targetMemUsageBytes; + + if (cpuOverusage > memOverusage) { + return new OverusedResource(cpuOverusage, Type.CPU); + } else { + return new OverusedResource(memOverusage, Type.MEMORY); + } + } + private void shuffleTasksOnOverloadedHosts(Map> overLoadedHosts) { List shuffleCleanups = taskManager.getCleanupTasks() .stream() - .filter((taskCleanup) -> taskCleanup.getCleanupType() == TaskCleanupType.REBALANCE_CPU_USAGE) + .filter((taskCleanup) -> taskCleanup.getCleanupType() == TaskCleanupType.REBALANCE_CPU_USAGE || taskCleanup.getCleanupType() == TaskCleanupType.REBALANCE_MEMORY_USAGE) .collect(Collectors.toList()); long currentShuffleCleanupsTotal = shuffleCleanups.size(); Set requestsWithShuffledTasks = shuffleCleanups @@ -122,57 +157,110 @@ private void shuffleTasksOnOverloadedHosts(Map taskCleanup.getTaskId().getRequestId()) .collect(Collectors.toSet()); - List overLoadedSlavesByUsage = overLoadedHosts.keySet().stream() - .sorted((usage1, usage2) -> Double.compare( - getSystemLoadForShuffle(usage2), - getSystemLoadForShuffle(usage1) - )) + List overloadedSlavesByOverusage = overLoadedHosts.keySet().stream() + .sorted((usage1, usage2) -> { + OverusedResource mostOverusedResource1 = getMostOverusedResource(usage1, getSystemLoadForShuffle(usage1), usage1.getMemoryBytesUsed()); + OverusedResource mostOverusedResource2 = getMostOverusedResource(usage2, getSystemLoadForShuffle(usage2), usage2.getMemoryBytesUsed()); + + return Double.compare(mostOverusedResource2.overusage, mostOverusedResource1.overusage); + }) .collect(Collectors.toList()); - for (SingularitySlaveUsage overloadedSlave : overLoadedSlavesByUsage) { + + for (SingularitySlaveUsage overloadedSlave : overloadedSlavesByOverusage) { if (currentShuffleCleanupsTotal >= configuration.getMaxTasksToShuffleTotal()) { LOG.debug("Not shuffling any more tasks (totalShuffleCleanups: {})", currentShuffleCleanupsTotal); break; } int shuffledTasksOnSlave = 0; - List possibleTasksToShuffle = overLoadedHosts.get(overloadedSlave); - possibleTasksToShuffle.sort((u1, u2) -> - Double.compare( - u2.getUsage().getCpusUsed() / u2.getRequestedResources().getCpus(), - u1.getUsage().getCpusUsed() / u1.getRequestedResources().getCpus() - )); - double systemLoad = getSystemLoadForShuffle(overloadedSlave); - double cpuOverage = systemLoad - overloadedSlave.getSystemCpusTotal(); + double currentCpuLoad = getSystemLoadForShuffle(overloadedSlave); + double currentMemUsageBytes = overloadedSlave.getSystemMemTotalBytes() - overloadedSlave.getSystemMemFreeBytes(); + + OverusedResource mostOverusedResource = getMostOverusedResource(overloadedSlave, currentCpuLoad, currentMemUsageBytes); + + List possibleTasksToShuffle; + boolean shufflingForCpu; + if (mostOverusedResource.resourceType == Type.CPU) { + shufflingForCpu = true; + possibleTasksToShuffle = overLoadedHosts.get(overloadedSlave); + possibleTasksToShuffle.sort((u1, u2) -> + Double.compare( + u2.getUsage().getCpusUsed() / u2.getRequestedResources().getCpus(), + u1.getUsage().getCpusUsed() / u1.getRequestedResources().getCpus() + )); + } else { + shufflingForCpu = false; + possibleTasksToShuffle = overLoadedHosts.get(overloadedSlave); + possibleTasksToShuffle.sort((u1, u2) -> + Double.compare( + u2.getUsage().getMemoryTotalBytes() / u2.getRequestedResources().getMemoryMb(), + u1.getUsage().getMemoryTotalBytes() / u1.getRequestedResources().getMemoryMb() + )); + } for (TaskIdWithUsage taskIdWithUsage : possibleTasksToShuffle) { if (requestsWithShuffledTasks.contains(taskIdWithUsage.getTaskId().getRequestId())) { LOG.debug("Request {} already has a shuffling task, skipping", taskIdWithUsage.getTaskId().getRequestId()); continue; } - if (cpuOverage <= 0 || shuffledTasksOnSlave > configuration.getMaxTasksToShufflePerHost() || currentShuffleCleanupsTotal >= configuration.getMaxTasksToShuffleTotal()) { - LOG.debug("Not shuffling any more tasks (overage: {}, shuffledOnHost: {}, totalShuffleCleanups: {})", cpuOverage, shuffledTasksOnSlave, currentShuffleCleanupsTotal); + + boolean resourceNoLongerOverutilized = (shufflingForCpu && currentCpuLoad <= overloadedSlave.getSystemCpusTotal()) || (!shufflingForCpu && currentMemUsageBytes <= getTargetMemoryUtilizationForHost(overloadedSlave)); + boolean shufflingTooManyTasks = shuffledTasksOnSlave > configuration.getMaxTasksToShufflePerHost() || currentShuffleCleanupsTotal >= configuration.getMaxTasksToShuffleTotal(); + + if (resourceNoLongerOverutilized || shufflingTooManyTasks) { + LOG.debug("Not shuffling any more tasks on slave {} ({} overage : {}%, shuffledOnHost: {}, totalShuffleCleanups: {})", taskIdWithUsage.getTaskId().getSanitizedHost(), mostOverusedResource.resourceType, mostOverusedResource.overusage * 100, shuffledTasksOnSlave, currentShuffleCleanupsTotal); break; } - LOG.debug("Cleaning up task {} to free up cpu on overloaded host (remaining cpu overage: {})", taskIdWithUsage.getTaskId(), cpuOverage); - Optional message = Optional.of(String.format( - "Load on slave is %s / %s, shuffling task using %s / %s to less busy host", - systemLoad, - overloadedSlave.getSystemCpusTotal(), - taskIdWithUsage.getUsage().getCpusUsed(), - taskIdWithUsage.getRequestedResources().getCpus())); - taskManager.createTaskCleanup( - new SingularityTaskCleanup( - Optional.absent(), - TaskCleanupType.REBALANCE_CPU_USAGE, - System.currentTimeMillis(), - taskIdWithUsage.getTaskId(), - message, - Optional.of(UUID.randomUUID().toString()), - Optional.absent(), Optional.absent())); + + Optional message; + + if (shufflingForCpu) { + message = Optional.of(String.format( + "Load on slave is %s / %s, shuffling task using %s / %s to less busy host", + currentCpuLoad, + overloadedSlave.getSystemCpusTotal(), + taskIdWithUsage.getUsage().getCpusUsed(), + taskIdWithUsage.getRequestedResources().getCpus())); + + currentCpuLoad -= taskIdWithUsage.getUsage().getCpusUsed(); + LOG.debug("Cleaning up task {} to free up cpu on overloaded host (remaining cpu overage: {})", taskIdWithUsage.getTaskId(), currentCpuLoad - overloadedSlave.getSystemCpusTotal()); + + taskManager.createTaskCleanup( + new SingularityTaskCleanup( + Optional.absent(), + TaskCleanupType.REBALANCE_CPU_USAGE, + System.currentTimeMillis(), + taskIdWithUsage.getTaskId(), + message, + Optional.of(UUID.randomUUID().toString()), + Optional.absent(), Optional.absent())); + } else { + message = Optional.of(String.format( + "Mem usage on slave is %sMiB / %sMiB, shuffling task using %sMiB / %sMiB to less busy host", + SizeUnit.BYTES.toMegabytes(((long) currentMemUsageBytes)), + SizeUnit.BYTES.toMegabytes(((long) overloadedSlave.getSystemMemTotalBytes())), + SizeUnit.BYTES.toMegabytes(taskIdWithUsage.getUsage().getMemoryTotalBytes()), + ((long) taskIdWithUsage.getRequestedResources().getMemoryMb()))); + + currentMemUsageBytes -= taskIdWithUsage.getUsage().getMemoryTotalBytes(); + + LOG.debug("Cleaning up task {} to free up mem on overloaded host (remaining mem overage: {}MiB)", taskIdWithUsage.getTaskId(), SizeUnit.BYTES.toMegabytes(((long) (currentMemUsageBytes - getTargetMemoryUtilizationForHost(overloadedSlave))))); + + taskManager.createTaskCleanup( + new SingularityTaskCleanup( + Optional.absent(), + TaskCleanupType.REBALANCE_MEMORY_USAGE, + System.currentTimeMillis(), + taskIdWithUsage.getTaskId(), + message, + Optional.of(UUID.randomUUID().toString()), + Optional.absent(), Optional.absent())); + } + requestManager.addToPendingQueue(new SingularityPendingRequest(taskIdWithUsage.getTaskId().getRequestId(), taskIdWithUsage.getTaskId() .getDeployId(), System.currentTimeMillis(), Optional.absent(), PendingType.TASK_BOUNCE, Optional.absent(), Optional.absent(), Optional.absent(), message, Optional.of(UUID.randomUUID().toString()))); - cpuOverage -= taskIdWithUsage.getUsage().getCpusUsed(); + shuffledTasksOnSlave++; currentShuffleCleanupsTotal++; requestsWithShuffledTasks.add(taskIdWithUsage.getTaskId().getRequestId()); diff --git a/SingularityService/src/test/java/com/hubspot/singularity/scheduler/SingularityUsageTest.java b/SingularityService/src/test/java/com/hubspot/singularity/scheduler/SingularityUsageTest.java index 775d82cdab..6659f3d0ee 100644 --- a/SingularityService/src/test/java/com/hubspot/singularity/scheduler/SingularityUsageTest.java +++ b/SingularityService/src/test/java/com/hubspot/singularity/scheduler/SingularityUsageTest.java @@ -450,7 +450,7 @@ public void itDelaysTaskShuffles() { initFirstDeployWithResources(configuration.getMesosConfiguration().getDefaultCpus(), configuration.getMesosConfiguration().getDefaultMemory()); saveAndSchedule(requestManager.getRequest(requestId).get().getRequest().toBuilder().setInstances(Optional.of(3))); resourceOffers(1); - SingularitySlaveUsage highUsage = new SingularitySlaveUsage(15, 10, Optional.of(10.0), 1, 1, Optional.of(30L), 1, 1, Optional.of(1024L), 1, System.currentTimeMillis(), 1, 30000, 10, 15, 15, 15, 0, 107374182); + SingularitySlaveUsage highUsage = new SingularitySlaveUsage(15, 10, Optional.of(10.0), 1, 1, Optional.of(30L), 1, 1, Optional.of(1024L), 1, System.currentTimeMillis(), 200000, 30000, 10, 15, 15, 15, 0, 107374182); usageManager.saveCurrentSlaveUsage(new SingularitySlaveUsageWithId(highUsage, "host1")); SingularityTaskId taskId1 = taskManager.getActiveTaskIds().get(0); @@ -474,7 +474,7 @@ public void itDelaysTaskShuffles() { mesosClient.setSlaveResourceUsage("host1", Arrays.asList(t1u1, t2u1, t3u1)); mesosClient.setSlaveMetricsSnapshot( "host1", - new MesosSlaveMetricsSnapshotObject(0, 0, 0, 10.0, 0, 0, 0, 0, 0, 0, 0, 0, 10.0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0, 0, 15, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0, 0, 0, 0, 0, 0, 15, 0, 0, 0, 0) + new MesosSlaveMetricsSnapshotObject(0, 0, 0, 10.0, 0, 0, 0, 0, 0, 0, 0, 0, 10.0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0, 0, 15, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 200000, 0, 30000, 0, 0, 0, 15, 0, 0, 0, 0) ); usagePoller.runActionOnPoll(); @@ -484,7 +484,7 @@ public void itDelaysTaskShuffles() { Assert.assertFalse(taskManager.getTaskCleanup(taskId3.getId()).isPresent()); // Even though it's not the worst offender, task 2 is cleaned up because it's been running long enough. - Assert.assertEquals(taskManager.getTaskCleanup(taskId2.getId()).get().getCleanupType(), TaskCleanupType.REBALANCE_CPU_USAGE); + Assert.assertEquals(TaskCleanupType.REBALANCE_CPU_USAGE, taskManager.getTaskCleanup(taskId2.getId()).get().getCleanupType()); } finally { configuration.setShuffleTasksForOverloadedSlaves(false); } @@ -492,16 +492,62 @@ public void itDelaysTaskShuffles() { } @Test - public void itCreatesTaskCleanupsWhenAMachineIsOverloaded() { + public void itCreatesTaskCleanupsWhenAMachineIsOverloadedOnMemory() { try { configuration.setShuffleTasksForOverloadedSlaves(true); configuration.setMinutesBeforeNewTaskEligibleForShuffle(0); + configuration.setShuffleTasksWhenSlaveMemoryUtilizationPercentageExceeds(0.90); initRequest(); initFirstDeployWithResources(configuration.getMesosConfiguration().getDefaultCpus(), configuration.getMesosConfiguration().getDefaultMemory()); saveAndSchedule(requestManager.getRequest(requestId).get().getRequest().toBuilder().setInstances(Optional.of(3))); resourceOffers(1); - SingularitySlaveUsage highUsage = new SingularitySlaveUsage(15, 10, Optional.of(10.0), 1, 1, Optional.of(30L), 1, 1, Optional.of(1024L), 1, System.currentTimeMillis(), 1, 30000, 10, 15, 15, 15, 0, 107374182); + SingularitySlaveUsage highUsage = new SingularitySlaveUsage(10, 10, Optional.of(10.0), 1, 1, Optional.of(30L), 1, 1, Optional.of(1024L), 1, System.currentTimeMillis(), 200000, 10000, 10, 10, 10, 10, 0, 107374182); + usageManager.saveCurrentSlaveUsage(new SingularitySlaveUsageWithId(highUsage, "host1")); + + SingularityTaskId taskId1 = taskManager.getActiveTaskIds().get(0); + String t1 = taskId1.getId(); + SingularityTaskId taskId2 = taskManager.getActiveTaskIds().get(1); + String t2 = taskId2.getId(); + SingularityTaskId taskId3 = taskManager.getActiveTaskIds().get(2); + String t3 = taskId3.getId(); + statusUpdate(taskManager.getTask(taskId1).get(), TaskState.TASK_STARTING, Optional.of(taskId1.getStartedAt())); + statusUpdate(taskManager.getTask(taskId2).get(), TaskState.TASK_STARTING, Optional.of(taskId2.getStartedAt())); + statusUpdate(taskManager.getTask(taskId3).get(), TaskState.TASK_STARTING, Optional.of(taskId3.getStartedAt())); + // task 1 using 3G mem + MesosTaskMonitorObject t1u1 = getTaskMonitor(t1, 2, TimeUnit.MILLISECONDS.toSeconds(taskId1.getStartedAt()) + 5, 95000); + // task 2 using 2G mem + MesosTaskMonitorObject t2u1 = getTaskMonitor(t2, 5, TimeUnit.MILLISECONDS.toSeconds(taskId2.getStartedAt()) + 5, 63333); + // task 3 using 1G mem + MesosTaskMonitorObject t3u1 = getTaskMonitor(t3, 5, TimeUnit.MILLISECONDS.toSeconds(taskId3.getStartedAt()) + 5, 31667); + mesosClient.setSlaveResourceUsage("host1", Arrays.asList(t1u1, t2u1, t3u1)); + mesosClient.setSlaveMetricsSnapshot( + "host1", + new MesosSlaveMetricsSnapshotObject(0, 0, 0, 10.0, 0, 0, 0, 0, 0, 0, 0, 0, 10.0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 200000, 0, 10000, 0, 0, 0, 10, 0, 0, 0, 0) + ); + + usagePoller.runActionOnPoll(); + + // First task is cleaned up + Assert.assertEquals(TaskCleanupType.REBALANCE_MEMORY_USAGE, taskManager.getTaskCleanup(taskId1.getId()).get().getCleanupType()); + // Second task is not cleaned up because it is from the same request as task 1 + Assert.assertFalse(taskManager.getTaskCleanup(taskId2.getId()).isPresent()); + } finally { + configuration.setShuffleTasksForOverloadedSlaves(false); + } + } + + @Test + public void itCreatesTaskCleanupsWhenAMachineIsOverloadedOnCpu() { + try { + configuration.setShuffleTasksForOverloadedSlaves(true); + configuration.setMinutesBeforeNewTaskEligibleForShuffle(0); + + initRequest(); + initFirstDeployWithResources(configuration.getMesosConfiguration().getDefaultCpus(), configuration.getMesosConfiguration().getDefaultMemory()); + saveAndSchedule(requestManager.getRequest(requestId).get().getRequest().toBuilder().setInstances(Optional.of(3))); + resourceOffers(1); + SingularitySlaveUsage highUsage = new SingularitySlaveUsage(15, 10, Optional.of(10.0), 1, 1, Optional.of(30L), 1, 1, Optional.of(1024L), 1, System.currentTimeMillis(), 200000, 30000, 10, 15, 15, 15, 0, 107374182); usageManager.saveCurrentSlaveUsage(new SingularitySlaveUsageWithId(highUsage, "host1")); SingularityTaskId taskId1 = taskManager.getActiveTaskIds().get(0); @@ -522,13 +568,13 @@ public void itCreatesTaskCleanupsWhenAMachineIsOverloaded() { mesosClient.setSlaveResourceUsage("host1", Arrays.asList(t1u1, t2u1, t3u1)); mesosClient.setSlaveMetricsSnapshot( "host1", - new MesosSlaveMetricsSnapshotObject(0, 0, 0, 10.0, 0, 0, 0, 0, 0, 0, 0, 0, 10.0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0, 0, 15, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0, 0, 0, 0, 0, 0, 15, 0, 0, 0, 0) + new MesosSlaveMetricsSnapshotObject(0, 0, 0, 10.0, 0, 0, 0, 0, 0, 0, 0, 0, 10.0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0, 0, 15, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 200000, 0, 30000, 0, 0, 0, 15, 0, 0, 0, 0) ); usagePoller.runActionOnPoll(); // First task is cleaned up - Assert.assertEquals(taskManager.getTaskCleanup(taskId1.getId()).get().getCleanupType(), TaskCleanupType.REBALANCE_CPU_USAGE); + Assert.assertEquals(TaskCleanupType.REBALANCE_CPU_USAGE, taskManager.getTaskCleanup(taskId1.getId()).get().getCleanupType()); // Second task is not cleaned up because it is from the same request as task 1 Assert.assertFalse(taskManager.getTaskCleanup(taskId2.getId()).isPresent()); } finally { @@ -547,7 +593,7 @@ public void itLimitsTheNumberOfTaskCleanupsToCreate() { initFirstDeployWithResources(configuration.getMesosConfiguration().getDefaultCpus(), configuration.getMesosConfiguration().getDefaultMemory()); saveAndSchedule(requestManager.getRequest(requestId).get().getRequest().toBuilder().setInstances(Optional.of(3))); resourceOffers(1); - SingularitySlaveUsage highUsage = new SingularitySlaveUsage(15, 10, Optional.of(10.0), 1, 1, Optional.of(30L), 1, 1, Optional.of(1024L), 1, System.currentTimeMillis(), 1, 30000, 10, 15, 15, 15, 0, 107374182); + SingularitySlaveUsage highUsage = new SingularitySlaveUsage(15, 10, Optional.of(10.0), 1, 1, Optional.of(30L), 1, 1, Optional.of(1024L), 1, System.currentTimeMillis(), 200000, 30000, 10, 15, 15, 15, 0, 107374182); usageManager.saveCurrentSlaveUsage(new SingularitySlaveUsageWithId(highUsage, "host1")); SingularityTaskId taskId1 = taskManager.getActiveTaskIds().get(0); @@ -563,13 +609,13 @@ public void itLimitsTheNumberOfTaskCleanupsToCreate() { mesosClient.setSlaveResourceUsage("host1", Arrays.asList(t1u1, t2u1)); mesosClient.setSlaveMetricsSnapshot( "host1", - new MesosSlaveMetricsSnapshotObject(0, 0, 0, 10.0, 0, 0, 0, 0, 0, 0, 0, 0, 10.0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0, 0, 15, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0, 0, 0, 0, 0, 0, 15, 0, 0, 0, 0) + new MesosSlaveMetricsSnapshotObject(0, 0, 0, 10.0, 0, 0, 0, 0, 0, 0, 0, 0, 10.0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0, 0, 15, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 200000, 0, 30000, 0, 0, 0, 15, 0, 0, 0, 0) ); usagePoller.runActionOnPoll(); // First task is cleaned up - Assert.assertEquals(taskManager.getTaskCleanup(taskId1.getId()).get().getCleanupType(), TaskCleanupType.REBALANCE_CPU_USAGE); + Assert.assertEquals(TaskCleanupType.REBALANCE_CPU_USAGE, taskManager.getTaskCleanup(taskId1.getId()).get().getCleanupType()); // Second task doesn't get cleaned up dur to cluster wide limit Assert.assertFalse(taskManager.getTaskCleanup(taskId2.getId()).isPresent()); } finally {