Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,10 @@ public abstract class AbstractCSQueue implements CSQueue {
volatile CSQueue parent;
final String queueName;
private final String queuePath;
volatile int numContainers;

final Resource minimumAllocation;
volatile Resource maximumAllocation;
private volatile QueueState state = null;
final CSQueueMetrics metrics;
protected final PrivilegedEntity queueEntity;

final ResourceCalculator resourceCalculator;
Expand All @@ -107,16 +105,11 @@ public abstract class AbstractCSQueue implements CSQueue {
new HashMap<AccessType, AccessControlList>();
volatile boolean reservationsContinueLooking;

// Track resource usage-by-label like used-resource/pending-resource, etc.
volatile ResourceUsage queueUsage;

// Track capacities like
// used-capacity/abs-used-capacity/capacity/abs-capacity,
// etc.
QueueCapacities queueCapacities;

QueueResourceQuotas queueResourceQuotas;

// -1 indicates lifetime is disabled
private volatile long maxApplicationLifetime = -1;

Expand All @@ -127,6 +120,8 @@ public abstract class AbstractCSQueue implements CSQueue {
private volatile boolean defaultAppLifetimeWasSpecifiedInConfig = false;
private CSQueuePreemption preemptionSettings;

CSQueueUsageTracker usageTracker;

public enum CapacityConfigType {
// FIXME, from what I can see, Percentage mode can almost apply to weighted
// and percentage mode at the same time, there's only small area need to be
Expand All @@ -153,10 +148,6 @@ public enum CapacityConfigType {
// is it a dynamic queue?
private boolean dynamicQueue = false;

// The timestamp of the last submitted application to this queue.
// Only applies to dynamic queues.
private long lastSubmittedTimestamp;

public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this(cs, cs.getConfiguration(), queueName, parent, old);
Expand All @@ -175,24 +166,15 @@ public AbstractCSQueue(CapacitySchedulerContext cs,
this.activitiesManager = cs.getActivitiesManager();

// must be called after parent and queueName is set
this.metrics = old != null ?
CSQueueMetrics metrics = old != null ?
(CSQueueMetrics) old.getMetrics() :
CSQueueMetrics.forQueue(getQueuePath(), parent,
cs.getConfiguration().getEnableUserMetrics(), configuration);

usageTracker = new CSQueueUsageTracker(metrics);
this.csContext = cs;
this.minimumAllocation = csContext.getMinimumResourceCapability();

// initialize ResourceUsage
queueUsage = new ResourceUsage();
queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath());

// initialize QueueCapacities
queueCapacities = new QueueCapacities(parent == null);

// initialize queueResourceQuotas
queueResourceQuotas = new QueueResourceQuotas();

ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
Expand Down Expand Up @@ -246,11 +228,11 @@ public float getUsedCapacity() {

@Override
public Resource getUsedResources() {
return queueUsage.getUsed();
return usageTracker.getQueueUsage().getUsed();
}

public int getNumContainers() {
return numContainers;
return usageTracker.getNumContainers();
}

@Override
Expand All @@ -260,7 +242,7 @@ public QueueState getState() {

@Override
public CSQueueMetrics getMetrics() {
return metrics;
return usageTracker.getMetrics();
}

@Override
Expand Down Expand Up @@ -650,8 +632,8 @@ protected void updateConfigurableResourceLimits(Resource clusterResource) {
+ " minResource={} and maxResource={}", getQueuePath(), minResource,
maxResource);

queueResourceQuotas.setConfiguredMinResource(label, minResource);
queueResourceQuotas.setConfiguredMaxResource(label, maxResource);
usageTracker.getQueueResourceQuotas().setConfiguredMinResource(label, minResource);
usageTracker.getQueueResourceQuotas().setConfiguredMaxResource(label, maxResource);
}
}

Expand Down Expand Up @@ -815,6 +797,7 @@ public QueueStatistics getQueueStatistics() {
public Map<String, QueueConfigurations> getQueueConfigurations() {
Map<String, QueueConfigurations> queueConfigurations = new HashMap<>();
Set<String> nodeLabels = getNodeLabelsForQueue();
QueueResourceQuotas queueResourceQuotas = usageTracker.getQueueResourceQuotas();
for (String nodeLabel : nodeLabels) {
QueueConfigurations queueConfiguration =
recordFactory.newRecordInstance(QueueConfigurations.class);
Expand Down Expand Up @@ -857,10 +840,8 @@ void allocateResource(Resource clusterResource,
Resource resource, String nodePartition) {
writeLock.lock();
try {
queueUsage.incUsed(nodePartition, resource);

++numContainers;

usageTracker.getQueueUsage().incUsed(nodePartition, resource);
usageTracker.increaseNumContainers();
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, nodePartition);
} finally {
Expand All @@ -872,12 +853,12 @@ protected void releaseResource(Resource clusterResource,
Resource resource, String nodePartition) {
writeLock.lock();
try {
queueUsage.decUsed(nodePartition, resource);
usageTracker.getQueueUsage().decUsed(nodePartition, resource);

CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, nodePartition);

--numContainers;
usageTracker.decreaseNumContainers();
} finally {
writeLock.unlock();
}
Expand Down Expand Up @@ -921,12 +902,12 @@ public QueueCapacities getQueueCapacities() {

@Private
public ResourceUsage getQueueResourceUsage() {
return queueUsage;
return usageTracker.getQueueUsage();
}

@Override
public QueueResourceQuotas getQueueResourceQuotas() {
return queueResourceQuotas;
return usageTracker.getQueueResourceQuotas();
}

@Override
Expand Down Expand Up @@ -1056,7 +1037,7 @@ boolean canAssignToThisQueue(Resource clusterResource,
Resource currentLimitResource = getCurrentLimitResource(nodePartition,
clusterResource, currentResourceLimits, schedulingMode);

Resource nowTotalUsed = queueUsage.getUsed(nodePartition);
Resource nowTotalUsed = usageTracker.getQueueUsage().getUsed(nodePartition);

// Set headroom for currentResourceLimits:
// When queue is a parent queue: Headroom = limit - used + killable
Expand Down Expand Up @@ -1088,7 +1069,7 @@ boolean canAssignToThisQueue(Resource clusterResource,
newTotalWithoutReservedResource, currentLimitResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug("try to use reserved: " + getQueuePath()
+ " usedResources: " + queueUsage.getUsed()
+ " usedResources: " + usageTracker.getQueueUsage().getUsed()
+ ", clusterResources: " + clusterResource
+ ", reservedResources: " + resourceCouldBeUnreserved
+ ", capacity-without-reserved: "
Expand All @@ -1103,7 +1084,7 @@ boolean canAssignToThisQueue(Resource clusterResource,
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to assign to queue: " + getQueuePath()
+ " nodePartition: " + nodePartition
+ ", usedResources: " + queueUsage.getUsed(nodePartition)
+ ", usedResources: " + usageTracker.getQueueUsage().getUsed(nodePartition)
+ ", clusterResources: " + clusterResource
+ ", reservedResources: " + resourceCouldBeUnreserved
+ ", maxLimitCapacity: " + currentLimitResource
Expand All @@ -1114,11 +1095,11 @@ boolean canAssignToThisQueue(Resource clusterResource,
if (LOG.isDebugEnabled()) {
LOG.debug("Check assign to queue: " + getQueuePath()
+ " nodePartition: " + nodePartition
+ ", usedResources: " + queueUsage.getUsed(nodePartition)
+ ", usedResources: " + usageTracker.getQueueUsage().getUsed(nodePartition)
+ ", clusterResources: " + clusterResource
+ ", currentUsedCapacity: " + Resources
.divide(resourceCalculator, clusterResource,
queueUsage.getUsed(nodePartition), labelManager
usageTracker.getQueueUsage().getUsed(nodePartition), labelManager
.getResourceByLabel(nodePartition, clusterResource))
+ ", max-capacity: " + queueCapacities
.getAbsoluteMaximumCapacity(nodePartition));
Expand Down Expand Up @@ -1162,39 +1143,39 @@ private void countAndUpdate(String partition, Resource resource,

@Override
public void incReservedResource(String partition, Resource reservedRes) {
count(partition, reservedRes, queueUsage::incReserved,
count(partition, reservedRes, usageTracker.getQueueUsage()::incReserved,
parent == null ? null : parent::incReservedResource);
}

@Override
public void decReservedResource(String partition, Resource reservedRes) {
count(partition, reservedRes, queueUsage::decReserved,
count(partition, reservedRes, usageTracker.getQueueUsage()::decReserved,
parent == null ? null : parent::decReservedResource);
}

@Override
public void incPendingResource(String nodeLabel, Resource resourceToInc) {
count(nodeLabel, resourceToInc, queueUsage::incPending,
count(nodeLabel, resourceToInc, usageTracker.getQueueUsage()::incPending,
parent == null ? null : parent::incPendingResource);
}

@Override
public void decPendingResource(String nodeLabel, Resource resourceToDec) {
count(nodeLabel, resourceToDec, queueUsage::decPending,
count(nodeLabel, resourceToDec, usageTracker.getQueueUsage()::decPending,
parent == null ? null : parent::decPendingResource);
}

@Override
public void incUsedResource(String nodeLabel, Resource resourceToInc,
SchedulerApplicationAttempt application) {
countAndUpdate(nodeLabel, resourceToInc, queueUsage::incUsed,
countAndUpdate(nodeLabel, resourceToInc, usageTracker.getQueueUsage()::incUsed,
parent == null ? null : parent::incUsedResource);
}

@Override
public void decUsedResource(String nodeLabel, Resource resourceToDec,
SchedulerApplicationAttempt application) {
countAndUpdate(nodeLabel, resourceToDec, queueUsage::decUsed,
countAndUpdate(nodeLabel, resourceToDec, usageTracker.getQueueUsage()::decUsed,
parent == null ? null : parent::decUsedResource);
}

Expand All @@ -1205,7 +1186,7 @@ public void decUsedResource(String nodeLabel, Resource resourceToDec,
boolean hasPendingResourceRequest(String nodePartition,
Resource cluster, SchedulingMode schedulingMode) {
return SchedulerUtils.hasPendingResourceRequest(resourceCalculator,
queueUsage, nodePartition, cluster, schedulingMode);
usageTracker.getQueueUsage(), nodePartition, cluster, schedulingMode);
}

public boolean accessibleToPartition(String nodePartition) {
Expand Down Expand Up @@ -1304,10 +1285,10 @@ public boolean accept(Resource cluster,
schedulerContainer.getNodePartition(), cluster);
}
if (!Resources.fitsIn(resourceCalculator,
Resources.add(queueUsage.getUsed(partition), netAllocated),
Resources.add(usageTracker.getQueueUsage().getUsed(partition), netAllocated),
maxResourceLimit)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Used resource=" + queueUsage.getUsed(partition)
LOG.debug("Used resource=" + usageTracker.getQueueUsage().getUsed(partition)
+ " exceeded maxResourceLimit of the queue ="
+ maxResourceLimit);
}
Expand Down Expand Up @@ -1534,7 +1515,7 @@ void deriveCapacityFromAbsoluteConfigurations(String label,
// and the recently changed queue minResources.
// capacity = effectiveMinResource / {parent's effectiveMinResource}
float result = resourceCalculator.divide(clusterResource,
queueResourceQuotas.getEffectiveMinResource(label),
usageTracker.getQueueResourceQuotas().getEffectiveMinResource(label),
parent.getQueueResourceQuotas().getEffectiveMinResource(label));
queueCapacities.setCapacity(label,
Float.isInfinite(result) ? 0 : result);
Expand All @@ -1543,7 +1524,7 @@ void deriveCapacityFromAbsoluteConfigurations(String label,
// and the recently changed queue maxResources.
// maxCapacity = effectiveMaxResource / parent's effectiveMaxResource
result = resourceCalculator.divide(clusterResource,
queueResourceQuotas.getEffectiveMaxResource(label),
usageTracker.getQueueResourceQuotas().getEffectiveMaxResource(label),
parent.getQueueResourceQuotas().getEffectiveMaxResource(label));
queueCapacities.setMaximumCapacity(label,
Float.isInfinite(result) ? 0 : result);
Expand Down Expand Up @@ -1577,7 +1558,7 @@ void updateEffectiveResources(Resource clusterResource) {
if (getCapacityConfigType().equals(
CapacityConfigType.ABSOLUTE_RESOURCE)) {
newEffectiveMinResource = createNormalizedMinResource(
queueResourceQuotas.getConfiguredMinResource(label),
usageTracker.getQueueResourceQuotas().getConfiguredMinResource(label),
((ParentQueue) parent).getEffectiveMinRatioPerResource());

// Max resource of a queue should be the minimum of {parent's maxResources,
Expand All @@ -1597,9 +1578,9 @@ void updateEffectiveResources(Resource clusterResource) {
}

// Update the effective min
queueResourceQuotas.setEffectiveMinResource(label,
usageTracker.getQueueResourceQuotas().setEffectiveMinResource(label,
newEffectiveMinResource);
queueResourceQuotas.setEffectiveMaxResource(label,
usageTracker.getQueueResourceQuotas().setEffectiveMaxResource(label,
newEffectiveMaxResource);

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -1667,7 +1648,7 @@ public boolean isInactiveDynamicQueue() {
public void updateLastSubmittedTimeStamp() {
writeLock.lock();
try {
this.lastSubmittedTimestamp = Time.monotonicNow();
usageTracker.setLastSubmittedTimestamp(Time.monotonicNow());
} finally {
writeLock.unlock();
}
Expand All @@ -1677,7 +1658,7 @@ public long getLastSubmittedTimestamp() {
readLock.lock();

try {
return lastSubmittedTimestamp;
return usageTracker.getLastSubmittedTimestamp();
} finally {
readLock.unlock();
}
Expand All @@ -1687,7 +1668,7 @@ public long getLastSubmittedTimestamp() {
public void setLastSubmittedTimestamp(long lastSubmittedTimestamp) {
writeLock.lock();
try {
this.lastSubmittedTimestamp = lastSubmittedTimestamp;
usageTracker.setLastSubmittedTimestamp(lastSubmittedTimestamp);
} finally {
writeLock.unlock();
}
Expand Down
Loading