|
@@ -87,6 +87,8 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|
protected CapacitySchedulerContext csContext;
|
|
protected CapacitySchedulerContext csContext;
|
|
protected YarnAuthorizationProvider authorizer = null;
|
|
protected YarnAuthorizationProvider authorizer = null;
|
|
|
|
|
|
|
|
+ private Resource clusterResource;
|
|
|
|
+
|
|
public AbstractCSQueue(CapacitySchedulerContext cs,
|
|
public AbstractCSQueue(CapacitySchedulerContext cs,
|
|
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
|
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
|
this.labelManager = cs.getRMContext().getNodeLabelManager();
|
|
this.labelManager = cs.getRMContext().getNodeLabelManager();
|
|
@@ -291,6 +293,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|
.getReservationContinueLook();
|
|
.getReservationContinueLook();
|
|
|
|
|
|
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
|
|
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
|
|
|
|
+ this.clusterResource = clusterResource;
|
|
}
|
|
}
|
|
|
|
|
|
protected QueueInfo getQueueInfo() {
|
|
protected QueueInfo getQueueInfo() {
|
|
@@ -322,7 +325,9 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|
if (nodeLabels == null || nodeLabels.isEmpty()) {
|
|
if (nodeLabels == null || nodeLabels.isEmpty()) {
|
|
queueUsage.incUsed(resource);
|
|
queueUsage.incUsed(resource);
|
|
} else {
|
|
} else {
|
|
- for (String label : Sets.intersection(accessibleLabels, nodeLabels)) {
|
|
|
|
|
|
+ Set<String> anls = (accessibleLabels.contains(RMNodeLabelsManager.ANY))
|
|
|
|
+ ? labelManager.getClusterNodeLabels() : accessibleLabels;
|
|
|
|
+ for (String label : Sets.intersection(anls, nodeLabels)) {
|
|
queueUsage.incUsed(label, resource);
|
|
queueUsage.incUsed(label, resource);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -338,7 +343,9 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|
if (null == nodeLabels || nodeLabels.isEmpty()) {
|
|
if (null == nodeLabels || nodeLabels.isEmpty()) {
|
|
queueUsage.decUsed(resource);
|
|
queueUsage.decUsed(resource);
|
|
} else {
|
|
} else {
|
|
- for (String label : Sets.intersection(accessibleLabels, nodeLabels)) {
|
|
|
|
|
|
+ Set<String> anls = (accessibleLabels.contains(RMNodeLabelsManager.ANY))
|
|
|
|
+ ? labelManager.getClusterNodeLabels() : accessibleLabels;
|
|
|
|
+ for (String label : Sets.intersection(anls, nodeLabels)) {
|
|
queueUsage.decUsed(label, resource);
|
|
queueUsage.decUsed(label, resource);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -527,4 +534,43 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|
// sorry, you cannot access
|
|
// sorry, you cannot access
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Retrieve used resources by this queue for the specified node label.
|
|
|
|
+ * Used capacity of label =
|
|
|
|
+ * (queue's used resources labeled by nodeLabel)
|
|
|
|
+ * / ( (all resources labeled by nodLabel)
|
|
|
|
+ * X (percent of labeled resources allocated to this queue) )
|
|
|
|
+ * @param nodeLabel label for which to get used resources
|
|
|
|
+ * @return used resources by this queue for specified label
|
|
|
|
+ */
|
|
|
|
+ public final synchronized float getUsedCapacity(final String nodeLabel) {
|
|
|
|
+ Resource availableToQueue =
|
|
|
|
+ Resources.multiply(
|
|
|
|
+ labelManager.getResourceByLabel(nodeLabel, this.clusterResource),
|
|
|
|
+ queueCapacities.getAbsoluteCapacity(nodeLabel));
|
|
|
|
+ if (!Resources.greaterThan(resourceCalculator, this.clusterResource,
|
|
|
|
+ availableToQueue, Resources.none())) {
|
|
|
|
+ return 0.0f;
|
|
|
|
+ }
|
|
|
|
+ return
|
|
|
|
+ Resources.divide(resourceCalculator, this.clusterResource,
|
|
|
|
+ queueUsage.getUsed(nodeLabel), availableToQueue);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Retrieve absolute used resources by this queue for the specified node label.
|
|
|
|
+ * @param nodeLabel label for which to get absolute used resources
|
|
|
|
+ * @return absolute used resources by this queue for specified label
|
|
|
|
+ */
|
|
|
|
+ public final synchronized float getAbsoluteUsedCapacity(final String nodeLabel) {
|
|
|
|
+ Resource labeledResources =
|
|
|
|
+ labelManager.getResourceByLabel(nodeLabel, this.clusterResource);
|
|
|
|
+ if (!Resources.greaterThan(resourceCalculator, this.clusterResource,
|
|
|
|
+ labeledResources, Resources.none())) {
|
|
|
|
+ return 0.0f;
|
|
|
|
+ }
|
|
|
|
+ return Resources.divide(resourceCalculator, this.clusterResource,
|
|
|
|
+ queueUsage.getUsed(nodeLabel), labeledResources);
|
|
|
|
+ }
|
|
}
|
|
}
|