|
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
|
@@ -115,10 +116,6 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
|
|
|
private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo();
|
|
|
|
|
|
- // sum of resources used by application masters for applications
|
|
|
- // running in this queue
|
|
|
- private final Resource usedAMResources = Resource.newInstance(0, 0);
|
|
|
-
|
|
|
public LeafQueue(CapacitySchedulerContext cs,
|
|
|
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
|
|
super(cs, queueName, parent, old);
|
|
@@ -435,7 +432,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
return queueName + ": " +
|
|
|
"capacity=" + capacity + ", " +
|
|
|
"absoluteCapacity=" + absoluteCapacity + ", " +
|
|
|
- "usedResources=" + usedResources + ", " +
|
|
|
+ "usedResources=" + queueUsage.getUsed() + ", " +
|
|
|
"usedCapacity=" + getUsedCapacity() + ", " +
|
|
|
"absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " +
|
|
|
"numApps=" + getNumApplications() + ", " +
|
|
@@ -464,7 +461,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
|
|
|
for (Map.Entry<String, User> entry: users.entrySet()) {
|
|
|
usersToReturn.add(new UserInfo(entry.getKey(), Resources.clone(
|
|
|
- entry.getValue().consumed), entry.getValue().getActiveApplications(),
|
|
|
+ entry.getValue().getUsed()), entry.getValue().getActiveApplications(),
|
|
|
entry.getValue().getPendingApplications()));
|
|
|
}
|
|
|
return usersToReturn;
|
|
@@ -633,7 +630,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
|
|
|
// Check am resource limit
|
|
|
Resource amIfStarted =
|
|
|
- Resources.add(application.getAMResource(), usedAMResources);
|
|
|
+ Resources.add(application.getAMResource(), queueUsage.getAMUsed());
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("application AMResource " + application.getAMResource() +
|
|
@@ -678,9 +675,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
}
|
|
|
user.activateApplication();
|
|
|
activeApplications.add(application);
|
|
|
- Resources.addTo(usedAMResources, application.getAMResource());
|
|
|
- Resources.addTo(user.getConsumedAMResources(),
|
|
|
- application.getAMResource());
|
|
|
+ queueUsage.incAMUsed(application.getAMResource());
|
|
|
+ user.getResourceUsage().incAMUsed(application.getAMResource());
|
|
|
i.remove();
|
|
|
LOG.info("Application " + application.getApplicationId() +
|
|
|
" from user: " + application.getUser() +
|
|
@@ -731,9 +727,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
if (!wasActive) {
|
|
|
pendingApplications.remove(application);
|
|
|
} else {
|
|
|
- Resources.subtractFrom(usedAMResources, application.getAMResource());
|
|
|
- Resources.subtractFrom(user.getConsumedAMResources(),
|
|
|
- application.getAMResource());
|
|
|
+ queueUsage.decAMUsed(application.getAMResource());
|
|
|
+ user.getResourceUsage().decAMUsed(application.getAMResource());
|
|
|
}
|
|
|
applicationAttemptMap.remove(application.getApplicationAttemptId());
|
|
|
|
|
@@ -972,8 +967,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
*/
|
|
|
Resource headroom =
|
|
|
Resources.min(resourceCalculator, clusterResource,
|
|
|
- Resources.subtract(userLimit, user.getTotalConsumedResources()),
|
|
|
- Resources.subtract(queueMaxCap, usedResources)
|
|
|
+ Resources.subtract(userLimit, user.getUsed()),
|
|
|
+ Resources.subtract(queueMaxCap, queueUsage.getUsed())
|
|
|
);
|
|
|
return headroom;
|
|
|
}
|
|
@@ -993,12 +988,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
|
|
|
boolean canAssign = true;
|
|
|
for (String label : labelCanAccess) {
|
|
|
- if (!usedResourcesByNodeLabels.containsKey(label)) {
|
|
|
- usedResourcesByNodeLabels.put(label, Resources.createResource(0));
|
|
|
- }
|
|
|
-
|
|
|
Resource potentialTotalCapacity =
|
|
|
- Resources.add(usedResourcesByNodeLabels.get(label), required);
|
|
|
+ Resources.add(queueUsage.getUsed(label), required);
|
|
|
|
|
|
float potentialNewCapacity =
|
|
|
Resources.divide(resourceCalculator, clusterResource,
|
|
@@ -1021,14 +1012,14 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
LOG.debug("try to use reserved: "
|
|
|
+ getQueueName()
|
|
|
+ " usedResources: "
|
|
|
- + usedResources
|
|
|
+ + queueUsage.getUsed()
|
|
|
+ " clusterResources: "
|
|
|
+ clusterResource
|
|
|
+ " reservedResources: "
|
|
|
+ application.getCurrentReservation()
|
|
|
+ " currentCapacity "
|
|
|
+ Resources.divide(resourceCalculator, clusterResource,
|
|
|
- usedResources, clusterResource) + " required " + required
|
|
|
+ queueUsage.getUsed(), clusterResource) + " required " + required
|
|
|
+ " potentialNewWithoutReservedCapacity: "
|
|
|
+ potentialNewWithoutReservedCapacity + " ( " + " max-capacity: "
|
|
|
+ absoluteMaxCapacity + ")");
|
|
@@ -1048,11 +1039,11 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug(getQueueName()
|
|
|
+ "Check assign to queue, label=" + label
|
|
|
- + " usedResources: " + usedResourcesByNodeLabels.get(label)
|
|
|
+ + " usedResources: " + queueUsage.getUsed(label)
|
|
|
+ " clusterResources: " + clusterResource
|
|
|
+ " currentCapacity "
|
|
|
+ Resources.divide(resourceCalculator, clusterResource,
|
|
|
- usedResourcesByNodeLabels.get(label),
|
|
|
+ queueUsage.getUsed(label),
|
|
|
labelManager.getResourceByLabel(label, clusterResource))
|
|
|
+ " potentialNewCapacity: " + potentialNewCapacity + " ( "
|
|
|
+ " max-capacity: " + absoluteMaxCapacity + ")");
|
|
@@ -1109,7 +1100,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
LOG.debug("Headroom calculation for user " + user + ": " +
|
|
|
" userLimit=" + userLimit +
|
|
|
" queueMaxCap=" + queueMaxCap +
|
|
|
- " consumed=" + queueUser.getTotalConsumedResources() +
|
|
|
+ " consumed=" + queueUser.getUsed() +
|
|
|
" headroom=" + headroom);
|
|
|
}
|
|
|
|
|
@@ -1164,8 +1155,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
|
|
|
Resource currentCapacity =
|
|
|
Resources.lessThan(resourceCalculator, clusterResource,
|
|
|
- usedResources, queueCapacity) ?
|
|
|
- queueCapacity : Resources.add(usedResources, required);
|
|
|
+ queueUsage.getUsed(), queueCapacity) ?
|
|
|
+ queueCapacity : Resources.add(queueUsage.getUsed(), required);
|
|
|
|
|
|
// Never allow a single user to take more than the
|
|
|
// queue's configured capacity * user-limit-factor.
|
|
@@ -1200,10 +1191,10 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
" userLimit=" + userLimit +
|
|
|
" userLimitFactor=" + userLimitFactor +
|
|
|
" required: " + required +
|
|
|
- " consumed: " + user.getTotalConsumedResources() +
|
|
|
+ " consumed: " + user.getUsed() +
|
|
|
" limit: " + limit +
|
|
|
" queueCapacity: " + queueCapacity +
|
|
|
- " qconsumed: " + usedResources +
|
|
|
+ " qconsumed: " + queueUsage.getUsed() +
|
|
|
" currentCapacity: " + currentCapacity +
|
|
|
" activeUsers: " + activeUsers +
|
|
|
" clusterCapacity: " + clusterResource
|
|
@@ -1228,7 +1219,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
// overhead of the AM, but it's a > check, not a >= check, so...
|
|
|
if (Resources
|
|
|
.greaterThan(resourceCalculator, clusterResource,
|
|
|
- user.getConsumedResourceByLabel(label),
|
|
|
+ user.getUsed(label),
|
|
|
limit)) {
|
|
|
// if enabled, check to see if could we potentially use this node instead
|
|
|
// of a reserved node if the application has reserved containers
|
|
@@ -1236,13 +1227,13 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
if (Resources.lessThanOrEqual(
|
|
|
resourceCalculator,
|
|
|
clusterResource,
|
|
|
- Resources.subtract(user.getTotalConsumedResources(),
|
|
|
+ Resources.subtract(user.getUsed(),
|
|
|
application.getCurrentReservation()), limit)) {
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("User " + userName + " in queue " + getQueueName()
|
|
|
+ " will exceed limit based on reservations - " + " consumed: "
|
|
|
- + user.getTotalConsumedResources() + " reserved: "
|
|
|
+ + user.getUsed() + " reserved: "
|
|
|
+ application.getCurrentReservation() + " limit: " + limit);
|
|
|
}
|
|
|
return true;
|
|
@@ -1251,7 +1242,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("User " + userName + " in queue " + getQueueName()
|
|
|
+ " will exceed limit - " + " consumed: "
|
|
|
- + user.getTotalConsumedResources() + " limit: " + limit);
|
|
|
+ + user.getUsed() + " limit: " + limit);
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
@@ -1673,7 +1664,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
" queue=" + this.toString() +
|
|
|
" usedCapacity=" + getUsedCapacity() +
|
|
|
" absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
|
|
|
- " used=" + usedResources +
|
|
|
+ " used=" + queueUsage.getUsed() +
|
|
|
" cluster=" + clusterResource);
|
|
|
|
|
|
return request.getCapability();
|
|
@@ -1774,9 +1765,9 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.info(getQueueName() +
|
|
|
" user=" + userName +
|
|
|
- " used=" + usedResources + " numContainers=" + numContainers +
|
|
|
+ " used=" + queueUsage.getUsed() + " numContainers=" + numContainers +
|
|
|
" headroom = " + application.getHeadroom() +
|
|
|
- " user-resources=" + user.getTotalConsumedResources()
|
|
|
+ " user-resources=" + user.getUsed()
|
|
|
);
|
|
|
}
|
|
|
}
|
|
@@ -1792,8 +1783,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
|
|
|
|
|
|
LOG.info(getQueueName() +
|
|
|
- " used=" + usedResources + " numContainers=" + numContainers +
|
|
|
- " user=" + userName + " user-resources=" + user.getTotalConsumedResources());
|
|
|
+ " used=" + queueUsage.getUsed() + " numContainers=" + numContainers +
|
|
|
+ " user=" + userName + " user-resources=" + user.getUsed());
|
|
|
}
|
|
|
|
|
|
private void updateAbsoluteCapacityResource(Resource clusterResource) {
|
|
@@ -1835,22 +1826,20 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
|
|
|
@VisibleForTesting
|
|
|
public static class User {
|
|
|
- Resource consumed = Resources.createResource(0, 0);
|
|
|
- Resource consumedAMResources = Resources.createResource(0, 0);
|
|
|
- Map<String, Resource> consumedByLabel = new HashMap<String, Resource>();
|
|
|
+ ResourceUsage userResourceUsage = new ResourceUsage();
|
|
|
int pendingApplications = 0;
|
|
|
int activeApplications = 0;
|
|
|
|
|
|
- public Resource getTotalConsumedResources() {
|
|
|
- return consumed;
|
|
|
+ public ResourceUsage getResourceUsage() {
|
|
|
+ return userResourceUsage;
|
|
|
}
|
|
|
|
|
|
- public Resource getConsumedResourceByLabel(String label) {
|
|
|
- Resource r = consumedByLabel.get(label);
|
|
|
- if (null != r) {
|
|
|
- return r;
|
|
|
- }
|
|
|
- return Resources.none();
|
|
|
+ public Resource getUsed() {
|
|
|
+ return userResourceUsage.getUsed();
|
|
|
+ }
|
|
|
+
|
|
|
+ public Resource getUsed(String label) {
|
|
|
+ return userResourceUsage.getUsed(label);
|
|
|
}
|
|
|
|
|
|
public int getPendingApplications() {
|
|
@@ -1862,7 +1851,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
}
|
|
|
|
|
|
public Resource getConsumedAMResources() {
|
|
|
- return consumedAMResources;
|
|
|
+ return userResourceUsage.getAMUsed();
|
|
|
}
|
|
|
|
|
|
public int getTotalApplications() {
|
|
@@ -1887,47 +1876,26 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public synchronized void assignContainer(Resource resource,
|
|
|
+ public void assignContainer(Resource resource,
|
|
|
Set<String> nodeLabels) {
|
|
|
- Resources.addTo(consumed, resource);
|
|
|
-
|
|
|
if (nodeLabels == null || nodeLabels.isEmpty()) {
|
|
|
- if (!consumedByLabel.containsKey(RMNodeLabelsManager.NO_LABEL)) {
|
|
|
- consumedByLabel.put(RMNodeLabelsManager.NO_LABEL,
|
|
|
- Resources.createResource(0));
|
|
|
- }
|
|
|
- Resources.addTo(consumedByLabel.get(RMNodeLabelsManager.NO_LABEL),
|
|
|
- resource);
|
|
|
+ userResourceUsage.incUsed(resource);
|
|
|
} else {
|
|
|
for (String label : nodeLabels) {
|
|
|
- if (!consumedByLabel.containsKey(label)) {
|
|
|
- consumedByLabel.put(label, Resources.createResource(0));
|
|
|
- }
|
|
|
- Resources.addTo(consumedByLabel.get(label), resource);
|
|
|
+ userResourceUsage.incUsed(label, resource);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public synchronized void releaseContainer(Resource resource, Set<String> nodeLabels) {
|
|
|
- Resources.subtractFrom(consumed, resource);
|
|
|
-
|
|
|
- // Update usedResources by labels
|
|
|
+ public void releaseContainer(Resource resource, Set<String> nodeLabels) {
|
|
|
if (nodeLabels == null || nodeLabels.isEmpty()) {
|
|
|
- if (!consumedByLabel.containsKey(RMNodeLabelsManager.NO_LABEL)) {
|
|
|
- consumedByLabel.put(RMNodeLabelsManager.NO_LABEL,
|
|
|
- Resources.createResource(0));
|
|
|
- }
|
|
|
- Resources.subtractFrom(
|
|
|
- consumedByLabel.get(RMNodeLabelsManager.NO_LABEL), resource);
|
|
|
+ userResourceUsage.decUsed(resource);
|
|
|
} else {
|
|
|
for (String label : nodeLabels) {
|
|
|
- if (!consumedByLabel.containsKey(label)) {
|
|
|
- consumedByLabel.put(label, Resources.createResource(0));
|
|
|
- }
|
|
|
- Resources.subtractFrom(consumedByLabel.get(label), resource);
|
|
|
+ userResourceUsage.decUsed(label, resource);
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1986,7 +1954,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
+ " resource=" + rmContainer.getContainer().getResource()
|
|
|
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
|
|
|
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
|
|
|
- + usedResources + " cluster=" + clusterResource);
|
|
|
+ + queueUsage.getUsed() + " cluster=" + clusterResource);
|
|
|
// Inform the parent queue
|
|
|
getParent().attachContainer(clusterResource, application, rmContainer);
|
|
|
}
|
|
@@ -2004,7 +1972,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
+ " resource=" + rmContainer.getContainer().getResource()
|
|
|
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
|
|
|
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
|
|
|
- + usedResources + " cluster=" + clusterResource);
|
|
|
+ + queueUsage.getUsed() + " cluster=" + clusterResource);
|
|
|
// Inform the parent queue
|
|
|
getParent().detachContainer(clusterResource, application, rmContainer);
|
|
|
}
|