|
@@ -1039,6 +1039,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|
return CSAssignment.NULL_ASSIGNMENT;
|
|
return CSAssignment.NULL_ASSIGNMENT;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ Map<String, CachedUserLimit> userLimits = new HashMap<>();
|
|
|
|
+ boolean needAssignToQueueCheck = true;
|
|
for (Iterator<FiCaSchedulerApp> assignmentIterator =
|
|
for (Iterator<FiCaSchedulerApp> assignmentIterator =
|
|
orderingPolicy.getAssignmentIterator();
|
|
orderingPolicy.getAssignmentIterator();
|
|
assignmentIterator.hasNext(); ) {
|
|
assignmentIterator.hasNext(); ) {
|
|
@@ -1048,24 +1050,50 @@ public class LeafQueue extends AbstractCSQueue {
|
|
node.getNodeID(), SystemClock.getInstance().getTime(), application);
|
|
node.getNodeID(), SystemClock.getInstance().getTime(), application);
|
|
|
|
|
|
// Check queue max-capacity limit
|
|
// Check queue max-capacity limit
|
|
- if (!super.canAssignToThisQueue(clusterResource, ps.getPartition(),
|
|
|
|
- currentResourceLimits, application.getCurrentReservation(),
|
|
|
|
- schedulingMode)) {
|
|
|
|
- ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
|
|
|
|
- activitiesManager, node, application, application.getPriority(),
|
|
|
|
- ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
|
|
|
|
- ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
|
|
|
- getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
|
|
|
|
- ActivityDiagnosticConstant.EMPTY);
|
|
|
|
- return CSAssignment.NULL_ASSIGNMENT;
|
|
|
|
|
|
+ Resource appReserved = application.getCurrentReservation();
|
|
|
|
+ if (needAssignToQueueCheck) {
|
|
|
|
+ if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
|
|
|
|
+ currentResourceLimits, appReserved, schedulingMode)) {
|
|
|
|
+ ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
|
|
|
|
+ activitiesManager, node, application, application.getPriority(),
|
|
|
|
+ ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
|
|
|
|
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
|
|
|
+ getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
|
|
|
|
+ ActivityDiagnosticConstant.EMPTY);
|
|
|
|
+ return CSAssignment.NULL_ASSIGNMENT;
|
|
|
|
+ }
|
|
|
|
+ // If there was no reservation and canAssignToThisQueue returned
|
|
|
|
+ // true, there is no reason to check further.
|
|
|
|
+ if (!this.reservationsContinueLooking
|
|
|
|
+ || appReserved.equals(Resources.none())) {
|
|
|
|
+ needAssignToQueueCheck = false;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ CachedUserLimit cul = userLimits.get(application.getUser());
|
|
|
|
+ Resource cachedUserLimit = null;
|
|
|
|
+ if (cul != null) {
|
|
|
|
+ cachedUserLimit = cul.userLimit;
|
|
|
|
+ }
|
|
Resource userLimit = computeUserLimitAndSetHeadroom(application,
|
|
Resource userLimit = computeUserLimitAndSetHeadroom(application,
|
|
- clusterResource, ps.getPartition(), schedulingMode);
|
|
|
|
-
|
|
|
|
|
|
+ clusterResource, ps.getPartition(), schedulingMode, cachedUserLimit);
|
|
|
|
+ if (cul == null) {
|
|
|
|
+ cul = new CachedUserLimit(userLimit);
|
|
|
|
+ userLimits.put(application.getUser(), cul);
|
|
|
|
+ }
|
|
// Check user limit
|
|
// Check user limit
|
|
- if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
|
|
|
|
- application, ps.getPartition(), currentResourceLimits)) {
|
|
|
|
|
|
+ boolean userAssignable = true;
|
|
|
|
+ if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) {
|
|
|
|
+ userAssignable = false;
|
|
|
|
+ } else {
|
|
|
|
+ userAssignable = canAssignToUser(clusterResource, application.getUser(),
|
|
|
|
+ userLimit, application, node.getPartition(), currentResourceLimits);
|
|
|
|
+ if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) {
|
|
|
|
+ cul.canAssign = false;
|
|
|
|
+ cul.reservation = appReserved;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (!userAssignable) {
|
|
application.updateAMContainerDiagnostics(AMState.ACTIVATED,
|
|
application.updateAMContainerDiagnostics(AMState.ACTIVATED,
|
|
"User capacity has reached its maximum limit.");
|
|
"User capacity has reached its maximum limit.");
|
|
ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
|
|
ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
|
|
@@ -1140,7 +1168,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
|
|
|
// check user-limit
|
|
// check user-limit
|
|
Resource userLimit = computeUserLimitAndSetHeadroom(app, cluster, p,
|
|
Resource userLimit = computeUserLimitAndSetHeadroom(app, cluster, p,
|
|
- allocation.getSchedulingMode());
|
|
|
|
|
|
+ allocation.getSchedulingMode(), null);
|
|
|
|
|
|
// Deduct resources that we can release
|
|
// Deduct resources that we can release
|
|
Resource usedResource = Resources.clone(getUser(username).getUsed(p));
|
|
Resource usedResource = Resources.clone(getUser(username).getUsed(p));
|
|
@@ -1345,19 +1373,20 @@ public class LeafQueue extends AbstractCSQueue {
|
|
@Lock({LeafQueue.class})
|
|
@Lock({LeafQueue.class})
|
|
Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
|
|
Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
|
|
Resource clusterResource, String nodePartition,
|
|
Resource clusterResource, String nodePartition,
|
|
- SchedulingMode schedulingMode) {
|
|
|
|
|
|
+ SchedulingMode schedulingMode, Resource userLimit) {
|
|
String user = application.getUser();
|
|
String user = application.getUser();
|
|
User queueUser = getUser(user);
|
|
User queueUser = getUser(user);
|
|
|
|
|
|
// Compute user limit respect requested labels,
|
|
// Compute user limit respect requested labels,
|
|
// TODO, need consider headroom respect labels also
|
|
// TODO, need consider headroom respect labels also
|
|
- Resource userLimit =
|
|
|
|
- getResourceLimitForActiveUsers(application.getUser(), clusterResource,
|
|
|
|
- nodePartition, schedulingMode);
|
|
|
|
-
|
|
|
|
|
|
+ if (userLimit == null) {
|
|
|
|
+ userLimit = getResourceLimitForActiveUsers(application.getUser(),
|
|
|
|
+ clusterResource, nodePartition, schedulingMode);
|
|
|
|
+ }
|
|
setQueueResourceLimitsInfo(clusterResource);
|
|
setQueueResourceLimitsInfo(clusterResource);
|
|
|
|
|
|
Resource headroom =
|
|
Resource headroom =
|
|
|
|
+ metrics.getUserMetrics(user) == null ? Resources.none() :
|
|
getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(),
|
|
getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(),
|
|
clusterResource, userLimit, nodePartition);
|
|
clusterResource, userLimit, nodePartition);
|
|
|
|
|
|
@@ -1365,7 +1394,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
LOG.debug("Headroom calculation for user " + user + ": " + " userLimit="
|
|
LOG.debug("Headroom calculation for user " + user + ": " + " userLimit="
|
|
+ userLimit + " queueMaxAvailRes="
|
|
+ userLimit + " queueMaxAvailRes="
|
|
+ cachedResourceLimitsForHeadroom.getLimit() + " consumed="
|
|
+ cachedResourceLimitsForHeadroom.getLimit() + " consumed="
|
|
- + queueUser.getUsed() + " headroom=" + headroom + " partition="
|
|
|
|
|
|
+ + queueUser.getUsed() + " partition="
|
|
+ nodePartition);
|
|
+ nodePartition);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1726,7 +1755,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
.getSchedulableEntities()) {
|
|
.getSchedulableEntities()) {
|
|
computeUserLimitAndSetHeadroom(application, clusterResource,
|
|
computeUserLimitAndSetHeadroom(application, clusterResource,
|
|
RMNodeLabelsManager.NO_LABEL,
|
|
RMNodeLabelsManager.NO_LABEL,
|
|
- SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
writeLock.unlock();
|
|
writeLock.unlock();
|
|
@@ -2065,4 +2094,14 @@ public class LeafQueue extends AbstractCSQueue {
|
|
public Set<String> getAllUsers() {
|
|
public Set<String> getAllUsers() {
|
|
return this.getUsersManager().getUsers().keySet();
|
|
return this.getUsersManager().getUsers().keySet();
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ static class CachedUserLimit {
|
|
|
|
+ final Resource userLimit;
|
|
|
|
+ boolean canAssign = true;
|
|
|
|
+ Resource reservation = Resources.none();
|
|
|
|
+
|
|
|
|
+ CachedUserLimit(Resource userLimit) {
|
|
|
|
+ this.userLimit = userLimit;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|