|
@@ -97,7 +97,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
|
|
|
Set<FiCaSchedulerApp> pendingApplications;
|
|
|
|
|
|
- private final float minimumAllocationFactor;
|
|
|
+ private float minimumAllocationFactor;
|
|
|
|
|
|
private Map<String, User> users = new HashMap<String, User>();
|
|
|
|
|
@@ -123,53 +123,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
super(cs, queueName, parent, old);
|
|
|
this.scheduler = cs;
|
|
|
|
|
|
- this.activeUsersManager = new ActiveUsersManager(metrics);
|
|
|
- this.minimumAllocationFactor =
|
|
|
- Resources.ratio(resourceCalculator,
|
|
|
- Resources.subtract(maximumAllocation, minimumAllocation),
|
|
|
- maximumAllocation);
|
|
|
-
|
|
|
- float capacity = getCapacityFromConf();
|
|
|
- float absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
|
|
|
-
|
|
|
- float maximumCapacity =
|
|
|
- (float)cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100;
|
|
|
- float absoluteMaxCapacity =
|
|
|
- CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
|
|
|
-
|
|
|
- // Initially set to absoluteMax, will be updated to more accurate
|
|
|
- // max avail value during assignContainers
|
|
|
- absoluteMaxAvailCapacity = absoluteMaxCapacity;
|
|
|
-
|
|
|
- int userLimit = cs.getConfiguration().getUserLimit(getQueuePath());
|
|
|
- float userLimitFactor =
|
|
|
- cs.getConfiguration().getUserLimitFactor(getQueuePath());
|
|
|
-
|
|
|
- int maxApplications =
|
|
|
- cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath());
|
|
|
- if (maxApplications < 0) {
|
|
|
- int maxSystemApps = cs.getConfiguration().getMaximumSystemApplications();
|
|
|
- maxApplications = (int)(maxSystemApps * absoluteCapacity);
|
|
|
- }
|
|
|
- maxApplicationsPerUser =
|
|
|
- (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor);
|
|
|
-
|
|
|
- float maxAMResourcePerQueuePercent = cs.getConfiguration()
|
|
|
- .getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
|
|
|
-
|
|
|
- QueueState state = cs.getConfiguration().getState(getQueuePath());
|
|
|
-
|
|
|
- Map<QueueACL, AccessControlList> acls =
|
|
|
- cs.getConfiguration().getAcls(getQueuePath());
|
|
|
-
|
|
|
- setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
|
|
|
- maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor,
|
|
|
- maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
|
|
|
- state, acls, cs.getConfiguration().getNodeLocalityDelay(), accessibleLabels,
|
|
|
- defaultLabelExpression, this.capacitiyByNodeLabels,
|
|
|
- this.maxCapacityByNodeLabels,
|
|
|
- cs.getConfiguration().getReservationContinueLook());
|
|
|
-
|
|
|
+ this.activeUsersManager = new ActiveUsersManager(metrics);
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
LOG.debug("LeafQueue:" + " name=" + queueName
|
|
|
+ ", fullname=" + getQueuePath());
|
|
@@ -180,34 +134,13 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
this.pendingApplications =
|
|
|
new TreeSet<FiCaSchedulerApp>(applicationComparator);
|
|
|
this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
|
|
|
+
|
|
|
+ setupQueueConfigs(cs.getClusterResource());
|
|
|
}
|
|
|
-
|
|
|
- // externalizing in method, to allow overriding
|
|
|
- protected float getCapacityFromConf() {
|
|
|
- return (float)scheduler.getConfiguration().getCapacity(getQueuePath()) / 100;
|
|
|
- }
|
|
|
-
|
|
|
- protected synchronized void setupQueueConfigs(
|
|
|
- Resource clusterResource,
|
|
|
- float capacity, float absoluteCapacity,
|
|
|
- float maximumCapacity, float absoluteMaxCapacity,
|
|
|
- int userLimit, float userLimitFactor,
|
|
|
- int maxApplications, float maxAMResourcePerQueuePercent,
|
|
|
- int maxApplicationsPerUser, QueueState state,
|
|
|
- Map<QueueACL, AccessControlList> acls, int nodeLocalityDelay,
|
|
|
- Set<String> labels, String defaultLabelExpression,
|
|
|
- Map<String, Float> capacitieByLabel,
|
|
|
- Map<String, Float> maximumCapacitiesByLabel,
|
|
|
- boolean revervationContinueLooking) throws IOException {
|
|
|
- super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
|
|
|
- maximumCapacity, absoluteMaxCapacity, state, acls, labels,
|
|
|
- defaultLabelExpression, capacitieByLabel, maximumCapacitiesByLabel,
|
|
|
- revervationContinueLooking);
|
|
|
- // Sanity check
|
|
|
- CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
|
|
- float absCapacity = getParent().getAbsoluteCapacity() * capacity;
|
|
|
- CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absCapacity,
|
|
|
- absoluteMaxCapacity);
|
|
|
+
|
|
|
+ protected synchronized void setupQueueConfigs(Resource clusterResource)
|
|
|
+ throws IOException {
|
|
|
+ super.setupQueueConfigs(clusterResource);
|
|
|
|
|
|
this.lastClusterResource = clusterResource;
|
|
|
updateAbsoluteCapacityResource(clusterResource);
|
|
@@ -217,16 +150,28 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
// and all queues may not be realized yet, we'll use (optimistic)
|
|
|
// absoluteMaxCapacity (it will be replaced with the more accurate
|
|
|
// absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
|
|
|
- updateHeadroomInfo(clusterResource, absoluteMaxCapacity);
|
|
|
+ updateHeadroomInfo(clusterResource,
|
|
|
+ queueCapacities.getAbsoluteMaximumCapacity());
|
|
|
|
|
|
- this.absoluteCapacity = absCapacity;
|
|
|
+ CapacitySchedulerConfiguration conf = csContext.getConfiguration();
|
|
|
+ userLimit = conf.getUserLimit(getQueuePath());
|
|
|
+ userLimitFactor = conf.getUserLimitFactor(getQueuePath());
|
|
|
|
|
|
- this.userLimit = userLimit;
|
|
|
- this.userLimitFactor = userLimitFactor;
|
|
|
+ // Initially set to absoluteMax, will be updated to more accurate
|
|
|
+ // max avail value during assignContainers
|
|
|
+ absoluteMaxAvailCapacity = queueCapacities.getAbsoluteMaximumCapacity();
|
|
|
|
|
|
- this.maxApplications = maxApplications;
|
|
|
- this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent;
|
|
|
- this.maxApplicationsPerUser = maxApplicationsPerUser;
|
|
|
+ maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
|
|
|
+ if (maxApplications < 0) {
|
|
|
+ int maxSystemApps = conf.getMaximumSystemApplications();
|
|
|
+ maxApplications =
|
|
|
+ (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity());
|
|
|
+ }
|
|
|
+ maxApplicationsPerUser =
|
|
|
+ (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor);
|
|
|
+
|
|
|
+ maxAMResourcePerQueuePercent =
|
|
|
+ conf.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
|
|
|
|
|
|
if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels,
|
|
|
this.defaultLabelExpression)) {
|
|
@@ -242,7 +187,12 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
getAccessibleNodeLabels().iterator(), ',')));
|
|
|
}
|
|
|
|
|
|
- this.nodeLocalityDelay = nodeLocalityDelay;
|
|
|
+ nodeLocalityDelay = conf.getNodeLocalityDelay();
|
|
|
+
|
|
|
+ this.minimumAllocationFactor =
|
|
|
+ Resources.ratio(resourceCalculator,
|
|
|
+ Resources.subtract(maximumAllocation, minimumAllocation),
|
|
|
+ maximumAllocation);
|
|
|
|
|
|
StringBuilder aclsString = new StringBuilder();
|
|
|
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
|
|
@@ -250,21 +200,21 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
}
|
|
|
|
|
|
StringBuilder labelStrBuilder = new StringBuilder();
|
|
|
- if (labels != null) {
|
|
|
- for (String s : labels) {
|
|
|
+ if (accessibleLabels != null) {
|
|
|
+ for (String s : accessibleLabels) {
|
|
|
labelStrBuilder.append(s);
|
|
|
labelStrBuilder.append(",");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
LOG.info("Initializing " + queueName + "\n" +
|
|
|
- "capacity = " + capacity +
|
|
|
+ "capacity = " + queueCapacities.getCapacity() +
|
|
|
" [= (float) configuredCapacity / 100 ]" + "\n" +
|
|
|
- "asboluteCapacity = " + absoluteCapacity +
|
|
|
+ "asboluteCapacity = " + queueCapacities.getAbsoluteCapacity() +
|
|
|
" [= parentAbsoluteCapacity * capacity ]" + "\n" +
|
|
|
- "maxCapacity = " + maximumCapacity +
|
|
|
+ "maxCapacity = " + queueCapacities.getMaximumCapacity() +
|
|
|
" [= configuredMaxCapacity ]" + "\n" +
|
|
|
- "absoluteMaxCapacity = " + absoluteMaxCapacity +
|
|
|
+ "absoluteMaxCapacity = " + queueCapacities.getAbsoluteMaximumCapacity() +
|
|
|
" [= 1.0 maximumCapacity undefined, " +
|
|
|
"(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" +
|
|
|
"\n" +
|
|
@@ -279,7 +229,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
"maxApplicationsPerUser = " + maxApplicationsPerUser +
|
|
|
" [= (int)(maxApplications * (userLimit / 100.0f) * " +
|
|
|
"userLimitFactor) ]" + "\n" +
|
|
|
- "usedCapacity = " + usedCapacity +
|
|
|
+ "usedCapacity = " + queueCapacities.getUsedCapacity() +
|
|
|
" [= usedResourcesMemory / " +
|
|
|
"(clusterResourceMemory * absoluteCapacity)]" + "\n" +
|
|
|
"absoluteUsedCapacity = " + absoluteUsedCapacity +
|
|
@@ -435,8 +385,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
|
|
|
public String toString() {
|
|
|
return queueName + ": " +
|
|
|
- "capacity=" + capacity + ", " +
|
|
|
- "absoluteCapacity=" + absoluteCapacity + ", " +
|
|
|
+ "capacity=" + queueCapacities.getCapacity() + ", " +
|
|
|
+ "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " +
|
|
|
"usedResources=" + queueUsage.getUsed() + ", " +
|
|
|
"usedCapacity=" + getUsedCapacity() + ", " +
|
|
|
"absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " +
|
|
@@ -483,23 +433,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
" from " + newlyParsedQueue.getQueuePath());
|
|
|
}
|
|
|
|
|
|
- LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue;
|
|
|
- setupQueueConfigs(
|
|
|
- clusterResource,
|
|
|
- newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity,
|
|
|
- newlyParsedLeafQueue.maximumCapacity,
|
|
|
- newlyParsedLeafQueue.absoluteMaxCapacity,
|
|
|
- newlyParsedLeafQueue.userLimit, newlyParsedLeafQueue.userLimitFactor,
|
|
|
- newlyParsedLeafQueue.maxApplications,
|
|
|
- newlyParsedLeafQueue.maxAMResourcePerQueuePercent,
|
|
|
- newlyParsedLeafQueue.getMaxApplicationsPerUser(),
|
|
|
- newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
|
|
|
- newlyParsedLeafQueue.getNodeLocalityDelay(),
|
|
|
- newlyParsedLeafQueue.accessibleLabels,
|
|
|
- newlyParsedLeafQueue.defaultLabelExpression,
|
|
|
- newlyParsedLeafQueue.capacitiyByNodeLabels,
|
|
|
- newlyParsedLeafQueue.maxCapacityByNodeLabels,
|
|
|
- newlyParsedLeafQueue.reservationsContinueLooking);
|
|
|
+ setupQueueConfigs(clusterResource);
|
|
|
|
|
|
// queue metrics are updated, more resource may be available
|
|
|
// activate the pending applications if possible
|
|
@@ -1022,7 +956,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
application.getCurrentReservation()),
|
|
|
labelManager.getResourceByLabel(label, clusterResource));
|
|
|
|
|
|
- if (potentialNewWithoutReservedCapacity <= absoluteMaxCapacity) {
|
|
|
+ if (potentialNewWithoutReservedCapacity <= queueCapacities
|
|
|
+ .getAbsoluteMaximumCapacity()) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("try to use reserved: "
|
|
|
+ getQueueName()
|
|
@@ -1036,8 +971,9 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
+ Resources.divide(resourceCalculator, clusterResource,
|
|
|
queueUsage.getUsed(), clusterResource) + " required " + required
|
|
|
+ " potentialNewWithoutReservedCapacity: "
|
|
|
- + potentialNewWithoutReservedCapacity + " ( " + " max-capacity: "
|
|
|
- + absoluteMaxCapacity + ")");
|
|
|
+ + potentialNewWithoutReservedCapacity + " ( "
|
|
|
+ + " max-capacity: "
|
|
|
+ + queueCapacities.getAbsoluteMaximumCapacity() + ")");
|
|
|
}
|
|
|
// we could potentially use this node instead of reserved node
|
|
|
return true;
|
|
@@ -1046,7 +982,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
|
|
|
// Otherwise, if any of the label of this node beyond queue limit, we
|
|
|
// cannot allocate on this node. Consider a small epsilon here.
|
|
|
- if (potentialNewCapacity > getAbsoluteMaximumCapacityByNodeLabel(label) + 1e-4) {
|
|
|
+ if (potentialNewCapacity > queueCapacities
|
|
|
+ .getAbsoluteMaximumCapacity(label) + 1e-4) {
|
|
|
canAssign = false;
|
|
|
break;
|
|
|
}
|
|
@@ -1061,7 +998,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
queueUsage.getUsed(label),
|
|
|
labelManager.getResourceByLabel(label, clusterResource))
|
|
|
+ " potentialNewCapacity: " + potentialNewCapacity + " ( "
|
|
|
- + " max-capacity: " + absoluteMaxCapacity + ")");
|
|
|
+ + " max-capacity: " + queueCapacities.getAbsoluteMaximumCapacity()
|
|
|
+ + ")");
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1144,7 +1082,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
Resources.multiplyAndNormalizeUp(resourceCalculator,
|
|
|
labelManager.getResourceByLabel(firstLabel,
|
|
|
clusterResource),
|
|
|
- getAbsoluteCapacityByNodeLabel(firstLabel),
|
|
|
+ queueCapacities.getAbsoluteCapacity(firstLabel),
|
|
|
minimumAllocation));
|
|
|
} else {
|
|
|
// else there's no label on request, just to use absolute capacity as
|
|
@@ -1152,7 +1090,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
queueCapacity =
|
|
|
Resources.multiplyAndNormalizeUp(resourceCalculator, labelManager
|
|
|
.getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, clusterResource),
|
|
|
- absoluteCapacity, minimumAllocation);
|
|
|
+ queueCapacities.getAbsoluteCapacity(), minimumAllocation);
|
|
|
}
|
|
|
|
|
|
// Allow progress for queues with miniscule capacity
|
|
@@ -1797,12 +1735,9 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
}
|
|
|
|
|
|
private void updateAbsoluteCapacityResource(Resource clusterResource) {
|
|
|
-
|
|
|
- absoluteCapacityResource = Resources.multiplyAndNormalizeUp(
|
|
|
- resourceCalculator,
|
|
|
- clusterResource,
|
|
|
- absoluteCapacity, minimumAllocation);
|
|
|
-
|
|
|
+ absoluteCapacityResource =
|
|
|
+ Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource,
|
|
|
+ queueCapacities.getAbsoluteCapacity(), minimumAllocation);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1813,7 +1748,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
// Update headroom info based on new cluster resource value
|
|
|
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
|
|
|
// during allocation
|
|
|
- updateHeadroomInfo(clusterResource, absoluteMaxCapacity);
|
|
|
+ updateHeadroomInfo(clusterResource,
|
|
|
+ queueCapacities.getAbsoluteMaximumCapacity());
|
|
|
|
|
|
// Update metrics
|
|
|
CSQueueUtils.updateQueueStatistics(
|
|
@@ -1986,35 +1922,13 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
getParent().detachContainer(clusterResource, application, rmContainer);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- @Override
|
|
|
- public float getAbsActualCapacity() {
|
|
|
- //? Is this actually used by anything at present?
|
|
|
- // There is a findbugs warning -re lastClusterResource (now excluded),
|
|
|
- // when this is used, verify that the access is mt correct and remove
|
|
|
- // the findbugs exclusion if possible
|
|
|
- if (Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
|
|
|
- lastClusterResource, Resources.none())) {
|
|
|
- return absoluteCapacity;
|
|
|
- }
|
|
|
-
|
|
|
- Resource resourceRespectLabels =
|
|
|
- labelManager == null ? lastClusterResource : labelManager
|
|
|
- .getQueueResource(queueName, accessibleLabels, lastClusterResource);
|
|
|
- float absActualCapacity =
|
|
|
- Resources.divide(resourceCalculator, lastClusterResource,
|
|
|
- resourceRespectLabels, lastClusterResource);
|
|
|
-
|
|
|
- return absActualCapacity > absoluteCapacity ? absoluteCapacity
|
|
|
- : absActualCapacity;
|
|
|
- }
|
|
|
|
|
|
public void setCapacity(float capacity) {
|
|
|
- this.capacity = capacity;
|
|
|
+ queueCapacities.setCapacity(capacity);
|
|
|
}
|
|
|
|
|
|
public void setAbsoluteCapacity(float absoluteCapacity) {
|
|
|
- this.absoluteCapacity = absoluteCapacity;
|
|
|
+ queueCapacities.setAbsoluteCapacity(absoluteCapacity);
|
|
|
}
|
|
|
|
|
|
public void setMaxApplications(int maxApplications) {
|