|
@@ -119,7 +119,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
private final QueueResourceLimitsInfo queueResourceLimitsInfo =
|
|
|
new QueueResourceLimitsInfo();
|
|
|
|
|
|
- private volatile ResourceLimits currentResourceLimits = null;
|
|
|
+ private volatile ResourceLimits cachedResourceLimitsForHeadroom = null;
|
|
|
|
|
|
public LeafQueue(CapacitySchedulerContext cs,
|
|
|
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
|
@@ -149,7 +149,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
this.lastClusterResource = clusterResource;
|
|
|
updateAbsoluteCapacityResource(clusterResource);
|
|
|
|
|
|
- this.currentResourceLimits = new ResourceLimits(clusterResource);
|
|
|
+ this.cachedResourceLimitsForHeadroom = new ResourceLimits(clusterResource);
|
|
|
|
|
|
// Initialize headroom info, also used for calculating application
|
|
|
// master resource limits. Since this happens during queue initialization
|
|
@@ -820,13 +820,13 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
|
|
|
// Check queue max-capacity limit
|
|
|
if (!super.canAssignToThisQueue(clusterResource, node.getLabels(),
|
|
|
- this.currentResourceLimits, required, application.getCurrentReservation())) {
|
|
|
+ currentResourceLimits, required, application.getCurrentReservation())) {
|
|
|
return NULL_ASSIGNMENT;
|
|
|
}
|
|
|
|
|
|
// Check user limit
|
|
|
if (!assignToUser(clusterResource, application.getUser(), userLimit,
|
|
|
- application, true, requestedNodeLabels)) {
|
|
|
+ application, requestedNodeLabels, currentResourceLimits)) {
|
|
|
break;
|
|
|
}
|
|
|
|
|
@@ -836,7 +836,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
// Try to schedule
|
|
|
CSAssignment assignment =
|
|
|
assignContainersOnNode(clusterResource, node, application, priority,
|
|
|
- null);
|
|
|
+ null, currentResourceLimits);
|
|
|
|
|
|
// Did the application skip this node?
|
|
|
if (assignment.getSkipped()) {
|
|
@@ -897,7 +897,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
|
|
|
// Try to assign if we have sufficient resources
|
|
|
assignContainersOnNode(clusterResource, node, application, priority,
|
|
|
- rmContainer);
|
|
|
+ rmContainer, new ResourceLimits(Resources.none()));
|
|
|
|
|
|
// Doesn't matter... since it's already charged for at time of reservation
|
|
|
// "re-reservation" is *free*
|
|
@@ -943,7 +943,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
private void setQueueResourceLimitsInfo(
|
|
|
Resource clusterResource) {
|
|
|
synchronized (queueResourceLimitsInfo) {
|
|
|
- queueResourceLimitsInfo.setQueueCurrentLimit(currentResourceLimits
|
|
|
+ queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom
|
|
|
.getLimit());
|
|
|
queueResourceLimitsInfo.setClusterResource(clusterResource);
|
|
|
}
|
|
@@ -964,13 +964,13 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
setQueueResourceLimitsInfo(clusterResource);
|
|
|
|
|
|
Resource headroom =
|
|
|
- getHeadroom(queueUser, currentResourceLimits.getLimit(),
|
|
|
+ getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(),
|
|
|
clusterResource, userLimit);
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Headroom calculation for user " + user + ": " +
|
|
|
" userLimit=" + userLimit +
|
|
|
- " queueMaxAvailRes=" + currentResourceLimits.getLimit() +
|
|
|
+ " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() +
|
|
|
" consumed=" + queueUser.getUsed() +
|
|
|
" headroom=" + headroom);
|
|
|
}
|
|
@@ -1078,7 +1078,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
@Private
|
|
|
protected synchronized boolean assignToUser(Resource clusterResource,
|
|
|
String userName, Resource limit, FiCaSchedulerApp application,
|
|
|
- boolean checkReservations, Set<String> requestLabels) {
|
|
|
+ Set<String> requestLabels, ResourceLimits currentResoureLimits) {
|
|
|
User user = getUser(userName);
|
|
|
|
|
|
String label = CommonNodeLabelsManager.NO_LABEL;
|
|
@@ -1094,12 +1094,12 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
limit)) {
|
|
|
// if enabled, check to see if could we potentially use this node instead
|
|
|
// of a reserved node if the application has reserved containers
|
|
|
- if (this.reservationsContinueLooking && checkReservations) {
|
|
|
+ if (this.reservationsContinueLooking) {
|
|
|
if (Resources.lessThanOrEqual(
|
|
|
resourceCalculator,
|
|
|
clusterResource,
|
|
|
- Resources.subtract(user.getUsed(),
|
|
|
- application.getCurrentReservation()), limit)) {
|
|
|
+ Resources.subtract(user.getUsed(), application.getCurrentReservation()),
|
|
|
+ limit)) {
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("User " + userName + " in queue " + getQueueName()
|
|
@@ -1107,6 +1107,13 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
+ user.getUsed() + " reserved: "
|
|
|
+ application.getCurrentReservation() + " limit: " + limit);
|
|
|
}
|
|
|
+ Resource amountNeededToUnreserve = Resources.subtract(user.getUsed(label), limit);
|
|
|
+ // we can only acquire a new container if we unreserve first since we ignored the
|
|
|
+ // user limit. Choose the max of user limit or what was previously set by max
|
|
|
+ // capacity.
|
|
|
+ currentResoureLimits.setAmountNeededUnreserve(Resources.max(resourceCalculator,
|
|
|
+ clusterResource, currentResoureLimits.getAmountNeededUnreserve(),
|
|
|
+ amountNeededToUnreserve));
|
|
|
return true;
|
|
|
}
|
|
|
}
|
|
@@ -1153,7 +1160,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
|
|
|
private CSAssignment assignContainersOnNode(Resource clusterResource,
|
|
|
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
|
|
|
- RMContainer reservedContainer) {
|
|
|
+ RMContainer reservedContainer, ResourceLimits currentResoureLimits) {
|
|
|
Resource assigned = Resources.none();
|
|
|
|
|
|
NodeType requestType = null;
|
|
@@ -1166,7 +1173,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
assigned =
|
|
|
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
|
|
|
node, application, priority, reservedContainer,
|
|
|
- allocatedContainer);
|
|
|
+ allocatedContainer, currentResoureLimits);
|
|
|
if (Resources.greaterThan(resourceCalculator, clusterResource,
|
|
|
assigned, Resources.none())) {
|
|
|
|
|
@@ -1194,7 +1201,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
assigned =
|
|
|
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
|
|
|
node, application, priority, reservedContainer,
|
|
|
- allocatedContainer);
|
|
|
+ allocatedContainer, currentResoureLimits);
|
|
|
if (Resources.greaterThan(resourceCalculator, clusterResource,
|
|
|
assigned, Resources.none())) {
|
|
|
|
|
@@ -1222,7 +1229,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
assigned =
|
|
|
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
|
|
|
node, application, priority, reservedContainer,
|
|
|
- allocatedContainer);
|
|
|
+ allocatedContainer, currentResoureLimits);
|
|
|
|
|
|
// update locality statistics
|
|
|
if (allocatedContainer.getValue() != null) {
|
|
@@ -1233,20 +1240,11 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
|
|
|
return SKIP_ASSIGNMENT;
|
|
|
}
|
|
|
-
|
|
|
- private Resource getMinimumResourceNeedUnreserved(Resource askedResource) {
|
|
|
- // First we need to get minimum resource we need unreserve
|
|
|
- // minimum-resource-need-unreserve = used + asked - limit
|
|
|
- Resource minimumUnreservedResource =
|
|
|
- Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource),
|
|
|
- currentResourceLimits.getLimit());
|
|
|
- return minimumUnreservedResource;
|
|
|
- }
|
|
|
|
|
|
@Private
|
|
|
protected boolean findNodeToUnreserve(Resource clusterResource,
|
|
|
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
|
|
|
- Resource askedResource, Resource minimumUnreservedResource) {
|
|
|
+ Resource minimumUnreservedResource) {
|
|
|
// need to unreserve some other container first
|
|
|
NodeId idToUnreserve =
|
|
|
application.getNodeIdToUnreserve(priority, minimumUnreservedResource,
|
|
@@ -1267,7 +1265,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
LOG.debug("unreserving for app: " + application.getApplicationId()
|
|
|
+ " on nodeId: " + idToUnreserve
|
|
|
+ " in order to replace reserved application and place it on node: "
|
|
|
- + node.getNodeID() + " needing: " + askedResource);
|
|
|
+ + node.getNodeID() + " needing: " + minimumUnreservedResource);
|
|
|
}
|
|
|
|
|
|
// headroom
|
|
@@ -1286,45 +1284,16 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- @Private
|
|
|
- protected boolean checkLimitsToReserve(Resource clusterResource,
|
|
|
- FiCaSchedulerApp application, Resource capability) {
|
|
|
- // we can't reserve if we got here based on the limit
|
|
|
- // checks assuming we could unreserve!!!
|
|
|
- Resource userLimit = computeUserLimitAndSetHeadroom(application,
|
|
|
- clusterResource, capability, null);
|
|
|
-
|
|
|
- // Check queue max-capacity limit,
|
|
|
- // TODO: Consider reservation on labels
|
|
|
- if (!canAssignToThisQueue(clusterResource, null,
|
|
|
- this.currentResourceLimits, capability, Resources.none())) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("was going to reserve but hit queue limit");
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- // Check user limit
|
|
|
- if (!assignToUser(clusterResource, application.getUser(), userLimit,
|
|
|
- application, false, null)) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("was going to reserve but hit user limit");
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
private Resource assignNodeLocalContainers(Resource clusterResource,
|
|
|
ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
|
|
|
FiCaSchedulerApp application, Priority priority,
|
|
|
- RMContainer reservedContainer, MutableObject allocatedContainer) {
|
|
|
+ RMContainer reservedContainer, MutableObject allocatedContainer,
|
|
|
+ ResourceLimits currentResoureLimits) {
|
|
|
if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
|
|
|
reservedContainer)) {
|
|
|
return assignContainer(clusterResource, node, application, priority,
|
|
|
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
|
|
|
- allocatedContainer);
|
|
|
+ allocatedContainer, currentResoureLimits);
|
|
|
}
|
|
|
|
|
|
return Resources.none();
|
|
@@ -1333,12 +1302,13 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
private Resource assignRackLocalContainers(Resource clusterResource,
|
|
|
ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
|
|
|
FiCaSchedulerApp application, Priority priority,
|
|
|
- RMContainer reservedContainer, MutableObject allocatedContainer) {
|
|
|
+ RMContainer reservedContainer, MutableObject allocatedContainer,
|
|
|
+ ResourceLimits currentResoureLimits) {
|
|
|
if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
|
|
|
reservedContainer)) {
|
|
|
return assignContainer(clusterResource, node, application, priority,
|
|
|
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
|
|
|
- allocatedContainer);
|
|
|
+ allocatedContainer, currentResoureLimits);
|
|
|
}
|
|
|
|
|
|
return Resources.none();
|
|
@@ -1347,12 +1317,13 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
private Resource assignOffSwitchContainers(Resource clusterResource,
|
|
|
ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
|
|
|
FiCaSchedulerApp application, Priority priority,
|
|
|
- RMContainer reservedContainer, MutableObject allocatedContainer) {
|
|
|
+ RMContainer reservedContainer, MutableObject allocatedContainer,
|
|
|
+ ResourceLimits currentResoureLimits) {
|
|
|
if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
|
|
|
reservedContainer)) {
|
|
|
return assignContainer(clusterResource, node, application, priority,
|
|
|
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
|
|
|
- allocatedContainer);
|
|
|
+ allocatedContainer, currentResoureLimits);
|
|
|
}
|
|
|
|
|
|
return Resources.none();
|
|
@@ -1436,7 +1407,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
|
|
|
FiCaSchedulerApp application, Priority priority,
|
|
|
ResourceRequest request, NodeType type, RMContainer rmContainer,
|
|
|
- MutableObject createdContainer) {
|
|
|
+ MutableObject createdContainer, ResourceLimits currentResoureLimits) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("assignContainers: node=" + node.getNodeName()
|
|
|
+ " application=" + application.getApplicationId()
|
|
@@ -1488,6 +1459,10 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
// Can we allocate a container on this node?
|
|
|
int availableContainers =
|
|
|
resourceCalculator.computeAvailableContainers(available, capability);
|
|
|
+
|
|
|
+ boolean needToUnreserve = Resources.greaterThan(resourceCalculator,clusterResource,
|
|
|
+ currentResoureLimits.getAmountNeededUnreserve(), Resources.none());
|
|
|
+
|
|
|
if (availableContainers > 0) {
|
|
|
// Allocate...
|
|
|
|
|
@@ -1496,20 +1471,24 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
unreserve(application, priority, node, rmContainer);
|
|
|
} else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) {
|
|
|
// when reservationsContinueLooking is set, we may need to unreserve
|
|
|
- // some containers to meet this queue and its parents' resource limits
|
|
|
+ // some containers to meet this queue, its parents', or the users' resource limits.
|
|
|
// TODO, need change here when we want to support continuous reservation
|
|
|
// looking for labeled partitions.
|
|
|
- Resource minimumUnreservedResource =
|
|
|
- getMinimumResourceNeedUnreserved(capability);
|
|
|
- if (!shouldAllocOrReserveNewContainer
|
|
|
- || Resources.greaterThan(resourceCalculator, clusterResource,
|
|
|
- minimumUnreservedResource, Resources.none())) {
|
|
|
+ if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
|
|
|
+ // If we shouldn't allocate/reserve new container then we should unreserve one the same
|
|
|
+ // size we are asking for since the currentResoureLimits.getAmountNeededUnreserve
|
|
|
+ // could be zero. If the limit was hit then use the amount we need to unreserve to be
|
|
|
+ // under the limit.
|
|
|
+ Resource amountToUnreserve = capability;
|
|
|
+ if (needToUnreserve) {
|
|
|
+ amountToUnreserve = currentResoureLimits.getAmountNeededUnreserve();
|
|
|
+ }
|
|
|
boolean containerUnreserved =
|
|
|
findNodeToUnreserve(clusterResource, node, application, priority,
|
|
|
- capability, minimumUnreservedResource);
|
|
|
- // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved
|
|
|
+ amountToUnreserve);
|
|
|
+ // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved
|
|
|
// container (That means we *have to* unreserve some resource to
|
|
|
- // continue)). If we failed to unreserve some resource,
|
|
|
+ // continue)). If we failed to unreserve some resource, we can't continue.
|
|
|
if (!containerUnreserved) {
|
|
|
return Resources.none();
|
|
|
}
|
|
@@ -1541,13 +1520,13 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
if (shouldAllocOrReserveNewContainer || rmContainer != null) {
|
|
|
|
|
|
if (reservationsContinueLooking && rmContainer == null) {
|
|
|
- // we could possibly ignoring parent queue capacity limits when
|
|
|
- // reservationsContinueLooking is set.
|
|
|
- // If we're trying to reserve a container here, not container will be
|
|
|
- // unreserved for reserving the new one. Check limits again before
|
|
|
- // reserve the new container
|
|
|
- if (!checkLimitsToReserve(clusterResource,
|
|
|
- application, capability)) {
|
|
|
+ // we could possibly ignoring queue capacity or user limits when
|
|
|
+ // reservationsContinueLooking is set. Make sure we didn't need to unreserve
|
|
|
+ // one.
|
|
|
+ if (needToUnreserve) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("we needed to unreserve to be able to allocate");
|
|
|
+ }
|
|
|
return Resources.none();
|
|
|
}
|
|
|
}
|
|
@@ -1679,8 +1658,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
user.releaseContainer(resource, nodeLabels);
|
|
|
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
|
|
|
|
|
|
- LOG.info(getQueueName() +
|
|
|
- " used=" + queueUsage.getUsed() + " numContainers=" + numContainers +
|
|
|
+ LOG.info(getQueueName() +
|
|
|
+ " used=" + queueUsage.getUsed() + " numContainers=" + numContainers +
|
|
|
" user=" + userName + " user-resources=" + user.getUsed());
|
|
|
}
|
|
|
|
|
@@ -1697,14 +1676,14 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
// Even if ParentQueue will set limits respect child's max queue capacity,
|
|
|
// but when allocating reserved container, CapacityScheduler doesn't do
|
|
|
// this. So need cap limits by queue's max capacity here.
|
|
|
- this.currentResourceLimits = currentResourceLimits;
|
|
|
+ this.cachedResourceLimitsForHeadroom = new ResourceLimits(currentResourceLimits.getLimit());
|
|
|
Resource queueMaxResource =
|
|
|
Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
|
|
|
.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
|
|
|
queueCapacities
|
|
|
.getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
|
|
|
minimumAllocation);
|
|
|
- this.currentResourceLimits.setLimit(Resources.min(resourceCalculator,
|
|
|
+ this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(resourceCalculator,
|
|
|
clusterResource, queueMaxResource, currentResourceLimits.getLimit()));
|
|
|
}
|
|
|
|