|
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
|
@@ -90,7 +91,7 @@ public class LeafQueue implements CSQueue {
|
|
|
private int maxActiveAppsUsingAbsCap; // Based on absolute capacity
|
|
|
private int maxActiveApplicationsPerUser;
|
|
|
|
|
|
- private Resource usedResources = Resources.createResource(0);
|
|
|
+ private Resource usedResources = Resources.createResource(0, 0);
|
|
|
private float usedCapacity = 0.0f;
|
|
|
private volatile int numContainers;
|
|
|
|
|
@@ -126,12 +127,16 @@ public class LeafQueue implements CSQueue {
|
|
|
|
|
|
private final int nodeLocalityDelay;
|
|
|
|
|
|
+ private final ResourceCalculator resourceCalculator;
|
|
|
+
|
|
|
public LeafQueue(CapacitySchedulerContext cs,
|
|
|
- String queueName, CSQueue parent,
|
|
|
- Comparator<FiCaSchedulerApp> applicationComparator, CSQueue old) {
|
|
|
+ String queueName, CSQueue parent, CSQueue old) {
|
|
|
this.scheduler = cs;
|
|
|
this.queueName = queueName;
|
|
|
this.parent = parent;
|
|
|
+
|
|
|
+ this.resourceCalculator = cs.getResourceCalculator();
|
|
|
+
|
|
|
// must be after parent and queueName are initialized
|
|
|
this.metrics = old != null ? old.getMetrics() :
|
|
|
QueueMetrics.forQueue(getQueuePath(), parent,
|
|
@@ -141,8 +146,9 @@ public class LeafQueue implements CSQueue {
|
|
|
this.minimumAllocation = cs.getMinimumResourceCapability();
|
|
|
this.maximumAllocation = cs.getMaximumResourceCapability();
|
|
|
this.minimumAllocationFactor =
|
|
|
- (float)(maximumAllocation.getMemory() - minimumAllocation.getMemory()) /
|
|
|
- maximumAllocation.getMemory();
|
|
|
+ Resources.ratio(resourceCalculator,
|
|
|
+ Resources.subtract(maximumAllocation, minimumAllocation),
|
|
|
+ maximumAllocation);
|
|
|
this.containerTokenSecretManager = cs.getContainerTokenSecretManager();
|
|
|
|
|
|
float capacity =
|
|
@@ -171,10 +177,12 @@ public class LeafQueue implements CSQueue {
|
|
|
getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
|
|
|
int maxActiveApplications =
|
|
|
CSQueueUtils.computeMaxActiveApplications(
|
|
|
+ resourceCalculator,
|
|
|
cs.getClusterResources(), this.minimumAllocation,
|
|
|
maxAMResourcePerQueuePercent, absoluteMaxCapacity);
|
|
|
this.maxActiveAppsUsingAbsCap =
|
|
|
CSQueueUtils.computeMaxActiveApplications(
|
|
|
+ resourceCalculator,
|
|
|
cs.getClusterResources(), this.minimumAllocation,
|
|
|
maxAMResourcePerQueuePercent, absoluteCapacity);
|
|
|
int maxActiveApplicationsPerUser =
|
|
@@ -207,6 +215,8 @@ public class LeafQueue implements CSQueue {
|
|
|
+ ", fullname=" + getQueuePath());
|
|
|
}
|
|
|
|
|
|
+ Comparator<FiCaSchedulerApp> applicationComparator =
|
|
|
+ cs.getApplicationComparator();
|
|
|
this.pendingApplications =
|
|
|
new TreeSet<FiCaSchedulerApp>(applicationComparator);
|
|
|
this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
|
|
@@ -256,7 +266,8 @@ public class LeafQueue implements CSQueue {
|
|
|
|
|
|
// Update metrics
|
|
|
CSQueueUtils.updateQueueStatistics(
|
|
|
- this, getParent(), clusterResource, minimumAllocation);
|
|
|
+ resourceCalculator, this, getParent(), clusterResource,
|
|
|
+ minimumAllocation);
|
|
|
|
|
|
LOG.info("Initializing " + queueName + "\n" +
|
|
|
"capacity = " + capacity +
|
|
@@ -545,7 +556,7 @@ public class LeafQueue implements CSQueue {
|
|
|
return queueName + ": " +
|
|
|
"capacity=" + capacity + ", " +
|
|
|
"absoluteCapacity=" + absoluteCapacity + ", " +
|
|
|
- "usedResources=" + usedResources.getMemory() + "MB, " +
|
|
|
+ "usedResources=" + usedResources +
|
|
|
"usedCapacity=" + getUsedCapacity() + ", " +
|
|
|
"absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " +
|
|
|
"numApps=" + getNumApplications() + ", " +
|
|
@@ -754,7 +765,7 @@ public class LeafQueue implements CSQueue {
|
|
|
}
|
|
|
|
|
|
private static final CSAssignment NULL_ASSIGNMENT =
|
|
|
- new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
|
|
|
+ new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
|
|
|
|
|
|
@Override
|
|
|
public synchronized CSAssignment
|
|
@@ -813,7 +824,8 @@ public class LeafQueue implements CSQueue {
|
|
|
}
|
|
|
|
|
|
// Check user limit
|
|
|
- if (!assignToUser(application.getUser(), userLimit)) {
|
|
|
+ if (!assignToUser(
|
|
|
+ clusterResource, application.getUser(), userLimit)) {
|
|
|
break;
|
|
|
}
|
|
|
|
|
@@ -827,7 +839,8 @@ public class LeafQueue implements CSQueue {
|
|
|
|
|
|
// Did we schedule or reserve a container?
|
|
|
Resource assigned = assignment.getResource();
|
|
|
- if (Resources.greaterThan(assigned, Resources.none())) {
|
|
|
+ if (Resources.greaterThan(
|
|
|
+ resourceCalculator, clusterResource, assigned, Resources.none())) {
|
|
|
|
|
|
// Book-keeping
|
|
|
// Note: Update headroom to account for current allocation too...
|
|
@@ -882,21 +895,25 @@ public class LeafQueue implements CSQueue {
|
|
|
|
|
|
// Doesn't matter... since it's already charged for at time of reservation
|
|
|
// "re-reservation" is *free*
|
|
|
- return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
|
|
|
+ return Resources.none();
|
|
|
}
|
|
|
|
|
|
private synchronized boolean assignToQueue(Resource clusterResource,
|
|
|
Resource required) {
|
|
|
// Check how of the cluster's absolute capacity we are currently using...
|
|
|
float potentialNewCapacity =
|
|
|
- (float)(usedResources.getMemory() + required.getMemory()) /
|
|
|
- clusterResource.getMemory();
|
|
|
+ Resources.divide(
|
|
|
+ resourceCalculator, clusterResource,
|
|
|
+ Resources.add(usedResources, required),
|
|
|
+ clusterResource);
|
|
|
if (potentialNewCapacity > absoluteMaxCapacity) {
|
|
|
LOG.info(getQueueName() +
|
|
|
- " usedResources: " + usedResources.getMemory() +
|
|
|
- " clusterResources: " + clusterResource.getMemory() +
|
|
|
- " currentCapacity " + ((float)usedResources.getMemory())/clusterResource.getMemory() +
|
|
|
- " required " + required.getMemory() +
|
|
|
+ " usedResources: " + usedResources +
|
|
|
+ " clusterResources: " + clusterResource +
|
|
|
+ " currentCapacity " +
|
|
|
+ Resources.divide(resourceCalculator, clusterResource,
|
|
|
+ usedResources, clusterResource) +
|
|
|
+ " required " + required +
|
|
|
" potentialNewCapacity: " + potentialNewCapacity + " ( " +
|
|
|
" max-capacity: " + absoluteMaxCapacity + ")");
|
|
|
return false;
|
|
@@ -919,14 +936,18 @@ public class LeafQueue implements CSQueue {
|
|
|
|
|
|
|
|
|
Resource queueMaxCap = // Queue Max-Capacity
|
|
|
- Resources.createResource(
|
|
|
- CSQueueUtils.roundDown(minimumAllocation,
|
|
|
- (int)(absoluteMaxCapacity * clusterResource.getMemory()))
|
|
|
- );
|
|
|
+ Resources.multiplyAndNormalizeDown(
|
|
|
+ resourceCalculator,
|
|
|
+ clusterResource,
|
|
|
+ absoluteMaxCapacity,
|
|
|
+ minimumAllocation);
|
|
|
|
|
|
Resource userConsumed = getUser(user).getConsumedResources();
|
|
|
Resource headroom =
|
|
|
- Resources.subtract(Resources.min(userLimit, queueMaxCap), userConsumed);
|
|
|
+ Resources.subtract(
|
|
|
+ Resources.min(resourceCalculator, clusterResource,
|
|
|
+ userLimit, queueMaxCap),
|
|
|
+ userConsumed);
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Headroom calculation for user " + user + ": " +
|
|
@@ -953,35 +974,46 @@ public class LeafQueue implements CSQueue {
|
|
|
// (usedResources + required) (which extra resources we are allocating)
|
|
|
|
|
|
// Allow progress for queues with miniscule capacity
|
|
|
- final int queueCapacity =
|
|
|
- Math.max(
|
|
|
- CSQueueUtils.roundUp(
|
|
|
- minimumAllocation,
|
|
|
- (int)(absoluteCapacity * clusterResource.getMemory())),
|
|
|
- required.getMemory()
|
|
|
- );
|
|
|
-
|
|
|
- final int consumed = usedResources.getMemory();
|
|
|
- final int currentCapacity =
|
|
|
- (consumed < queueCapacity) ?
|
|
|
- queueCapacity : (consumed + required.getMemory());
|
|
|
-
|
|
|
+ final Resource queueCapacity =
|
|
|
+ Resources.max(
|
|
|
+ resourceCalculator, clusterResource,
|
|
|
+ Resources.multiplyAndNormalizeUp(
|
|
|
+ resourceCalculator,
|
|
|
+ clusterResource,
|
|
|
+ absoluteCapacity,
|
|
|
+ minimumAllocation),
|
|
|
+ required);
|
|
|
+
|
|
|
+ Resource currentCapacity =
|
|
|
+ Resources.lessThan(resourceCalculator, clusterResource,
|
|
|
+ usedResources, queueCapacity) ?
|
|
|
+ queueCapacity : Resources.add(usedResources, required);
|
|
|
+
|
|
|
// Never allow a single user to take more than the
|
|
|
// queue's configured capacity * user-limit-factor.
|
|
|
// Also, the queue's configured capacity should be higher than
|
|
|
// queue-hard-limit * ulMin
|
|
|
|
|
|
final int activeUsers = activeUsersManager.getNumActiveUsers();
|
|
|
-
|
|
|
- int limit =
|
|
|
- CSQueueUtils.roundUp(
|
|
|
- minimumAllocation,
|
|
|
- Math.min(
|
|
|
- Math.max(divideAndCeil(currentCapacity, activeUsers),
|
|
|
- divideAndCeil((int)userLimit*currentCapacity, 100)),
|
|
|
- (int)(queueCapacity * userLimitFactor)
|
|
|
- )
|
|
|
- );
|
|
|
+
|
|
|
+ Resource limit =
|
|
|
+ Resources.roundUp(
|
|
|
+ resourceCalculator,
|
|
|
+ Resources.min(
|
|
|
+ resourceCalculator, clusterResource,
|
|
|
+ Resources.max(
|
|
|
+ resourceCalculator, clusterResource,
|
|
|
+ Resources.divideAndCeil(
|
|
|
+ resourceCalculator, currentCapacity, activeUsers),
|
|
|
+ Resources.divideAndCeil(
|
|
|
+ resourceCalculator,
|
|
|
+ Resources.multiplyAndRoundDown(
|
|
|
+ currentCapacity, userLimit),
|
|
|
+ 100)
|
|
|
+ ),
|
|
|
+ Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor)
|
|
|
+ ),
|
|
|
+ minimumAllocation);
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
String userName = application.getUser();
|
|
@@ -993,23 +1025,25 @@ public class LeafQueue implements CSQueue {
|
|
|
" consumed: " + getUser(userName).getConsumedResources() +
|
|
|
" limit: " + limit +
|
|
|
" queueCapacity: " + queueCapacity +
|
|
|
- " qconsumed: " + consumed +
|
|
|
+ " qconsumed: " + usedResources +
|
|
|
" currentCapacity: " + currentCapacity +
|
|
|
" activeUsers: " + activeUsers +
|
|
|
- " clusterCapacity: " + clusterResource.getMemory()
|
|
|
+ " clusterCapacity: " + clusterResource
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- return Resources.createResource(limit);
|
|
|
+ return limit;
|
|
|
}
|
|
|
|
|
|
- private synchronized boolean assignToUser(String userName, Resource limit) {
|
|
|
+ private synchronized boolean assignToUser(Resource clusterResource,
|
|
|
+ String userName, Resource limit) {
|
|
|
|
|
|
User user = getUser(userName);
|
|
|
|
|
|
// Note: We aren't considering the current request since there is a fixed
|
|
|
- // overhead of the AM, but it's a > check, not a >= check, so...
|
|
|
- if ((user.getConsumedResources().getMemory()) > limit.getMemory()) {
|
|
|
+ // overhead of the AM, but it's a > check, not a >= check, so...
|
|
|
+ if (Resources.greaterThan(resourceCalculator, clusterResource,
|
|
|
+ user.getConsumedResources(), limit)) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("User " + userName + " in queue " + getQueueName() +
|
|
|
" will exceed limit - " +
|
|
@@ -1023,21 +1057,15 @@ public class LeafQueue implements CSQueue {
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- static int divideAndCeil(int a, int b) {
|
|
|
- if (b == 0) {
|
|
|
- LOG.info("divideAndCeil called with a=" + a + " b=" + b);
|
|
|
- return 0;
|
|
|
- }
|
|
|
- return (a + (b - 1)) / b;
|
|
|
- }
|
|
|
-
|
|
|
boolean needContainers(FiCaSchedulerApp application, Priority priority, Resource required) {
|
|
|
int requiredContainers = application.getTotalRequiredResources(priority);
|
|
|
int reservedContainers = application.getNumReservedContainers(priority);
|
|
|
int starvation = 0;
|
|
|
if (reservedContainers > 0) {
|
|
|
float nodeFactor =
|
|
|
- ((float)required.getMemory() / getMaximumAllocation().getMemory());
|
|
|
+ Resources.ratio(
|
|
|
+ resourceCalculator, required, getMaximumAllocation()
|
|
|
+ );
|
|
|
|
|
|
// Use percentage of node required to bias against large containers...
|
|
|
// Protect against corner case where you need the whole node with
|
|
@@ -1052,7 +1080,7 @@ public class LeafQueue implements CSQueue {
|
|
|
" app.#re-reserve=" + application.getReReservations(priority) +
|
|
|
" reserved=" + reservedContainers +
|
|
|
" nodeFactor=" + nodeFactor +
|
|
|
- " minAllocFactor=" + minimumAllocationFactor +
|
|
|
+ " minAllocFactor=" + getMinimumAllocationFactor() +
|
|
|
" starvation=" + starvation);
|
|
|
}
|
|
|
}
|
|
@@ -1069,7 +1097,8 @@ public class LeafQueue implements CSQueue {
|
|
|
assigned =
|
|
|
assignNodeLocalContainers(clusterResource, node, application, priority,
|
|
|
reservedContainer);
|
|
|
- if (Resources.greaterThan(assigned, Resources.none())) {
|
|
|
+ if (Resources.greaterThan(resourceCalculator, clusterResource,
|
|
|
+ assigned, Resources.none())) {
|
|
|
return new CSAssignment(assigned, NodeType.NODE_LOCAL);
|
|
|
}
|
|
|
|
|
@@ -1077,7 +1106,8 @@ public class LeafQueue implements CSQueue {
|
|
|
assigned =
|
|
|
assignRackLocalContainers(clusterResource, node, application, priority,
|
|
|
reservedContainer);
|
|
|
- if (Resources.greaterThan(assigned, Resources.none())) {
|
|
|
+ if (Resources.greaterThan(resourceCalculator, clusterResource,
|
|
|
+ assigned, Resources.none())) {
|
|
|
return new CSAssignment(assigned, NodeType.RACK_LOCAL);
|
|
|
}
|
|
|
|
|
@@ -1231,7 +1261,8 @@ public class LeafQueue implements CSQueue {
|
|
|
|
|
|
Resource available = node.getAvailableResource();
|
|
|
|
|
|
- assert (available.getMemory() > 0);
|
|
|
+ assert Resources.greaterThan(
|
|
|
+ resourceCalculator, clusterResource, available, Resources.none());
|
|
|
|
|
|
// Create the container if necessary
|
|
|
Container container =
|
|
@@ -1239,12 +1270,13 @@ public class LeafQueue implements CSQueue {
|
|
|
|
|
|
// something went wrong getting/creating the container
|
|
|
if (container == null) {
|
|
|
+ LOG.warn("Couldn't get container for allocation!");
|
|
|
return Resources.none();
|
|
|
}
|
|
|
|
|
|
// Can we allocate a container on this node?
|
|
|
int availableContainers =
|
|
|
- available.getMemory() / capability.getMemory();
|
|
|
+ resourceCalculator.computeAvailableContainers(available, capability);
|
|
|
if (availableContainers > 0) {
|
|
|
// Allocate...
|
|
|
|
|
@@ -1267,8 +1299,9 @@ public class LeafQueue implements CSQueue {
|
|
|
// Inform the application
|
|
|
RMContainer allocatedContainer =
|
|
|
application.allocate(type, node, priority, request, container);
|
|
|
+
|
|
|
+ // Does the application need this resource?
|
|
|
if (allocatedContainer == null) {
|
|
|
- // Did the application need this resource?
|
|
|
return Resources.none();
|
|
|
}
|
|
|
|
|
@@ -1379,7 +1412,7 @@ public class LeafQueue implements CSQueue {
|
|
|
// Update queue metrics
|
|
|
Resources.addTo(usedResources, resource);
|
|
|
CSQueueUtils.updateQueueStatistics(
|
|
|
- this, getParent(), clusterResource, minimumAllocation);
|
|
|
+ resourceCalculator, this, getParent(), clusterResource, minimumAllocation);
|
|
|
++numContainers;
|
|
|
|
|
|
// Update user metrics
|
|
@@ -1404,7 +1437,8 @@ public class LeafQueue implements CSQueue {
|
|
|
// Update queue metrics
|
|
|
Resources.subtractFrom(usedResources, resource);
|
|
|
CSQueueUtils.updateQueueStatistics(
|
|
|
- this, getParent(), clusterResource, minimumAllocation);
|
|
|
+ resourceCalculator, this, getParent(), clusterResource,
|
|
|
+ minimumAllocation);
|
|
|
--numContainers;
|
|
|
|
|
|
// Update user metrics
|
|
@@ -1423,10 +1457,12 @@ public class LeafQueue implements CSQueue {
|
|
|
// Update queue properties
|
|
|
maxActiveApplications =
|
|
|
CSQueueUtils.computeMaxActiveApplications(
|
|
|
+ resourceCalculator,
|
|
|
clusterResource, minimumAllocation,
|
|
|
maxAMResourcePerQueuePercent, absoluteMaxCapacity);
|
|
|
maxActiveAppsUsingAbsCap =
|
|
|
CSQueueUtils.computeMaxActiveApplications(
|
|
|
+ resourceCalculator,
|
|
|
clusterResource, minimumAllocation,
|
|
|
maxAMResourcePerQueuePercent, absoluteCapacity);
|
|
|
maxActiveApplicationsPerUser =
|
|
@@ -1435,7 +1471,8 @@ public class LeafQueue implements CSQueue {
|
|
|
|
|
|
// Update metrics
|
|
|
CSQueueUtils.updateQueueStatistics(
|
|
|
- this, getParent(), clusterResource, minimumAllocation);
|
|
|
+ resourceCalculator, this, getParent(), clusterResource,
|
|
|
+ minimumAllocation);
|
|
|
|
|
|
// Update application properties
|
|
|
for (FiCaSchedulerApp application : activeApplications) {
|
|
@@ -1452,7 +1489,7 @@ public class LeafQueue implements CSQueue {
|
|
|
}
|
|
|
|
|
|
static class User {
|
|
|
- Resource consumed = Resources.createResource(0);
|
|
|
+ Resource consumed = Resources.createResource(0, 0);
|
|
|
int pendingApplications = 0;
|
|
|
int activeApplications = 0;
|
|
|
|