Skip to content

Commit

Permalink
track resources reserved in requestUtilization
Browse files Browse the repository at this point in the history
  • Loading branch information
matush-v committed Jun 27, 2017
1 parent 98d5d38 commit 63b8dd8
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
public class RequestUtilization {
private final String requestId;
private final String deployId;
private long memBytesTotal = 0;
private double cpuTotal = 0;
private long memBytesUsed = 0;
private long memBytesReserved = 0;
private double cpuUsed = 0;
private double cpuReserved = 0;
private int numTasks = 0;

@JsonCreator
Expand All @@ -18,13 +20,23 @@ public RequestUtilization(@JsonProperty("requestId") String requestId,
this.deployId = deployId;
}

public RequestUtilization addMemBytes(long memBytes) {
this.memBytesTotal += memBytes;
public RequestUtilization addMemBytesUsed(long memBytes) {
this.memBytesUsed += memBytes;
return this;
}

public RequestUtilization addCpu(double cpu) {
this.cpuTotal += cpu;
public RequestUtilization addMemBytesReserved(long memBytes) {
this.memBytesReserved += memBytes;
return this;
}

public RequestUtilization addCpuUsed(double cpu) {
this.cpuUsed += cpu;
return this;
}

public RequestUtilization addCpuReserved(double cpu) {
this.cpuReserved += cpu;
return this;
}

Expand All @@ -33,12 +45,20 @@ public RequestUtilization incrementTaskCount() {
return this;
}

public long getMemBytesTotal() {
return memBytesTotal;
public long getMemBytesUsed() {
return memBytesUsed;
}

public long getMemBytesReserved() {
return memBytesReserved;
}

public double getCpuUsed() {
return cpuUsed;
}

public double getCpuTotal() {
return cpuTotal;
public double getCpuReserved() {
return cpuReserved;
}

public int getNumTasks() {
Expand All @@ -47,12 +67,12 @@ public int getNumTasks() {

@JsonIgnore
public double getAvgMemBytesUsed() {
return memBytesTotal / (double) numTasks;
return memBytesUsed / (double) numTasks;
}

@JsonIgnore
public double getAvgCpuUsed() {
return cpuTotal / (double) numTasks;
return cpuUsed / (double) numTasks;
}

public String getDeployId() {
Expand All @@ -68,8 +88,10 @@ public String toString() {
return "RequestUtilization{" +
"requestId=" + requestId +
", deployId=" + deployId +
", memBytesTotal=" + memBytesTotal +
", cpuTotal=" + cpuTotal +
", memBytesUsed=" + memBytesUsed +
", memBytesReserved=" + memBytesReserved +
", cpuUsed=" + cpuUsed +
", cpuReserved=" + cpuReserved +
", numTasks=" + numTasks +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ public void runActionOnPoll() {
longRunningTasksUsage.put(ResourceUsageType.CPU_USED, 0);
Optional<Long> memoryMbTotal = Optional.absent();
Optional<Double> cpusTotal = Optional.absent();
long memoryMbReserved = 0;
long cpuReserved = 0;
long memoryBytesUsed = 0;
double cpusUsed = 0;
long memoryMbReservedOnSlave = 0;
long cpuReservedOnSlave = 0;
long memoryBytesUsedOnSlave = 0;
double cpusUsedOnSlave = 0;

try {
List<MesosTaskMonitorObject> allTaskUsage = mesosClient.getSlaveResourceUsage(slave.getHost());
Expand All @@ -105,17 +105,19 @@ public void runActionOnPoll() {
SingularityTaskUsage latestUsage = getUsage(taskUsage);
List<SingularityTaskUsage> pastTaskUsages = usageManager.getTaskUsage(taskId);

updateRequestUtilization(utilizationPerRequestId, pastTaskUsages, latestUsage, task);

clearOldUsage(pastTaskUsages, taskId);
usageManager.saveSpecificTaskUsage(taskId, latestUsage);

Optional<SingularityTask> maybeTask = taskManager.getTask(task);
if (maybeTask.isPresent() && maybeTask.get().getTaskRequest().getDeploy().getResources().isPresent()) {
memoryMbReserved += maybeTask.get().getTaskRequest().getDeploy().getResources().get().getMemoryMb();
cpuReserved += maybeTask.get().getTaskRequest().getDeploy().getResources().get().getCpus();
double memoryMbReservedForTask = maybeTask.get().getTaskRequest().getDeploy().getResources().get().getMemoryMb();
double cpuReservedForTask = maybeTask.get().getTaskRequest().getDeploy().getResources().get().getCpus();
memoryMbReservedOnSlave += memoryMbReservedForTask;
cpuReservedOnSlave += cpuReservedForTask;
updateRequestUtilization(utilizationPerRequestId, pastTaskUsages, latestUsage, task, memoryMbReservedForTask, cpuReservedForTask);
}
memoryBytesUsed += latestUsage.getMemoryTotalBytes();
memoryBytesUsedOnSlave += latestUsage.getMemoryTotalBytes();

if (!pastTaskUsages.isEmpty()) {
SingularityTaskUsage lastUsage = pastTaskUsages.get(pastTaskUsages.size() - 1);
Expand All @@ -129,7 +131,7 @@ public void runActionOnPoll() {

usageManager.saveCurrentTaskUsage(taskId, currentUsage);

cpusUsed += taskCpusUsed;
cpusUsedOnSlave += taskCpusUsed;
}
}

Expand All @@ -142,7 +144,7 @@ public void runActionOnPoll() {
cpusTotal = Optional.of(slave.getResources().get().getNumCpus().get().doubleValue());
}

SingularitySlaveUsage slaveUsage = new SingularitySlaveUsage(memoryBytesUsed, memoryMbReserved, now, cpusUsed, cpuReserved, allTaskUsage.size(), memoryMbTotal, cpusTotal, longRunningTasksUsage);
SingularitySlaveUsage slaveUsage = new SingularitySlaveUsage(memoryBytesUsedOnSlave, memoryMbReservedOnSlave, now, cpusUsedOnSlave, cpuReservedOnSlave, allTaskUsage.size(), memoryMbTotal, cpusTotal, longRunningTasksUsage);
List<Long> slaveTimestamps = usageManager.getSlaveUsageTimestamps(slave.getId());
if (slaveTimestamps.size() + 1 > configuration.getNumUsageToKeep()) {
usageManager.deleteSpecificSlaveUsage(slave.getId(), slaveTimestamps.get(0));
Expand Down Expand Up @@ -195,7 +197,7 @@ private void updateLongRunningTasksUsage(Map<ResourceUsageType, Number> longRunn
longRunningTasksUsage.compute(ResourceUsageType.CPU_USED, (k, v) -> (v == null) ? cpuUsed : v.doubleValue() + cpuUsed);
}

private void updateRequestUtilization(Map<String, RequestUtilization> utilizationPerRequestId, List<SingularityTaskUsage> pastTaskUsages, SingularityTaskUsage latestUsage, SingularityTaskId task) {
private void updateRequestUtilization(Map<String, RequestUtilization> utilizationPerRequestId, List<SingularityTaskUsage> pastTaskUsages, SingularityTaskUsage latestUsage, SingularityTaskId task, double memoryMbReservedForTask, double cpuReservedForTask) {
List<SingularityTaskUsage> pastTaskUsagesCopy = new ArrayList<>();
pastTaskUsagesCopy.add(new SingularityTaskUsage(0, task.getStartedAt(), 0)); // to calculate oldest cpu usage
pastTaskUsagesCopy.addAll(pastTaskUsages);
Expand All @@ -208,11 +210,16 @@ private void updateRequestUtilization(Map<String, RequestUtilization> utilizatio
SingularityTaskUsage newerUsage = pastTaskUsagesCopy.get(i + 1);
double cpusUsed = (newerUsage.getCpuSeconds() - olderUsage.getCpuSeconds()) / (newerUsage.getTimestamp() - olderUsage.getTimestamp());

requestUtilization.addCpu(cpusUsed);
requestUtilization.addMemBytes(newerUsage.getMemoryTotalBytes());
requestUtilization.incrementTaskCount();
requestUtilization
.addCpuUsed(cpusUsed)
.addMemBytesUsed(newerUsage.getMemoryTotalBytes())
.incrementTaskCount();
}

requestUtilization
.addMemBytesReserved((long) (memoryMbReservedForTask * SingularitySlaveUsage.BYTES_PER_MEGABYTE * requestUtilization.getNumTasks()))
.addCpuReserved(cpuReservedForTask * requestUtilization.getNumTasks());

utilizationPerRequestId.put(task.getRequestId(), requestUtilization);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.hubspot.mesos.json.MesosTaskStatisticsObject;
import com.hubspot.singularity.MachineState;
import com.hubspot.singularity.SingularityClusterUtilization;
import com.hubspot.singularity.SingularitySlaveUsage;
import com.hubspot.singularity.SingularityTask;
import com.hubspot.singularity.SingularityTaskCurrentUsageWithId;
import com.hubspot.singularity.SingularityTaskId;
Expand Down Expand Up @@ -207,7 +208,9 @@ public void testUsagePollerComplex() throws InterruptedException {
@Test
public void itTracksClusterUtilizationSimple() {
initRequest();
initFirstDeployWithResources(10, .001);
double cpuReserved = 10;
double memMbReserved = .001;
initFirstDeployWithResources(cpuReserved, memMbReserved);
saveAndSchedule(request.toBuilder().setInstances(Optional.of(1)));
resourceOffers(1);

Expand All @@ -229,7 +232,12 @@ public void itTracksClusterUtilizationSimple() {

SingularityClusterUtilization utilization = usageManager.getClusterUtilization().get();

Assert.assertEquals(2, usageManager.getTaskUsage(t1).size());
int taskUsages = usageManager.getTaskUsage(t1).size();
Assert.assertEquals(2, taskUsages);

Assert.assertEquals(1, utilization.getRequestUtilizations().size());
Assert.assertEquals(cpuReserved * taskUsages, utilization.getRequestUtilizations().get(0).getCpuReserved(), 0);
Assert.assertEquals(memMbReserved * SingularitySlaveUsage.BYTES_PER_MEGABYTE * taskUsages, utilization.getRequestUtilizations().get(0).getMemBytesReserved(), 0);

Assert.assertEquals(0, utilization.getNumRequestsWithOverUtilizedCpu());
Assert.assertEquals(1, utilization.getNumRequestsWithUnderUtilizedCpu());
Expand All @@ -253,7 +261,9 @@ public void itTracksClusterUtilizationSimple() {
@Test
public void itDoesntIncludePerfectlyUtilizedRequestsInClusterUtilization() {
initRequest();
initFirstDeployWithResources(2, .001);
double cpuReserved = 2;
double memMbReserved = .001;
initFirstDeployWithResources(cpuReserved, memMbReserved);
saveAndSchedule(request.toBuilder().setInstances(Optional.of(1)));
resourceOffers(1);

Expand All @@ -275,7 +285,12 @@ public void itDoesntIncludePerfectlyUtilizedRequestsInClusterUtilization() {

SingularityClusterUtilization utilization = usageManager.getClusterUtilization().get();

Assert.assertEquals(2, usageManager.getTaskUsage(t1).size());
int taskUsages = usageManager.getTaskUsage(t1).size();
Assert.assertEquals(2, taskUsages);

Assert.assertEquals(1, utilization.getRequestUtilizations().size());
Assert.assertEquals(cpuReserved * taskUsages, utilization.getRequestUtilizations().get(0).getCpuReserved(), 0);
Assert.assertEquals(memMbReserved * SingularitySlaveUsage.BYTES_PER_MEGABYTE * taskUsages, utilization.getRequestUtilizations().get(0).getMemBytesReserved(), 0);

Assert.assertEquals(0, utilization.getNumRequestsWithOverUtilizedCpu());
Assert.assertEquals(0, utilization.getNumRequestsWithUnderUtilizedCpu());
Expand All @@ -298,7 +313,9 @@ public void itDoesntIncludePerfectlyUtilizedRequestsInClusterUtilization() {
@Test
public void itTracksOverusedCpuInClusterUtilization() {
initRequest();
initFirstDeployWithResources(2, .001);
double cpuReserved = 2;
double memMbReserved = .001;
initFirstDeployWithResources(cpuReserved, memMbReserved);
saveAndSchedule(request.toBuilder().setInstances(Optional.of(1)));
resourceOffers(1);

Expand All @@ -320,7 +337,12 @@ public void itTracksOverusedCpuInClusterUtilization() {

SingularityClusterUtilization utilization = usageManager.getClusterUtilization().get();

Assert.assertEquals(2, usageManager.getTaskUsage(t1).size());
int taskUsages = usageManager.getTaskUsage(t1).size();
Assert.assertEquals(2, taskUsages);

Assert.assertEquals(1, utilization.getRequestUtilizations().size());
Assert.assertEquals(cpuReserved * taskUsages, utilization.getRequestUtilizations().get(0).getCpuReserved(), 0);
Assert.assertEquals(memMbReserved * SingularitySlaveUsage.BYTES_PER_MEGABYTE * taskUsages, utilization.getRequestUtilizations().get(0).getMemBytesReserved(), 0);

Assert.assertEquals(1, utilization.getNumRequestsWithOverUtilizedCpu());
Assert.assertEquals(0, utilization.getNumRequestsWithUnderUtilizedCpu());
Expand Down

0 comments on commit 63b8dd8

Please sign in to comment.