|
@@ -180,7 +180,9 @@ public class LeafQueue implements CSQueue {
|
|
|
Map<QueueACL, AccessControlList> acls =
|
|
|
cs.getConfiguration().getAcls(getQueuePath());
|
|
|
|
|
|
- setupQueueConfigs(capacity, absoluteCapacity,
|
|
|
+ setupQueueConfigs(
|
|
|
+ cs.getClusterResources(),
|
|
|
+ capacity, absoluteCapacity,
|
|
|
maximumCapacity, absoluteMaxCapacity,
|
|
|
userLimit, userLimitFactor,
|
|
|
maxApplications, maxApplicationsPerUser,
|
|
@@ -198,6 +200,7 @@ public class LeafQueue implements CSQueue {
|
|
|
}
|
|
|
|
|
|
private synchronized void setupQueueConfigs(
|
|
|
+ Resource clusterResource,
|
|
|
float capacity, float absoluteCapacity,
|
|
|
float maximumCapacity, float absoluteMaxCapacity,
|
|
|
int userLimit, float userLimitFactor,
|
|
@@ -235,6 +238,10 @@ public class LeafQueue implements CSQueue {
|
|
|
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
|
|
|
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
|
|
|
}
|
|
|
+
|
|
|
+ // Update metrics
|
|
|
+ CSQueueUtils.updateQueueStatistics(
|
|
|
+ this, parent, clusterResource, minimumAllocation);
|
|
|
|
|
|
LOG.info("Initializing " + queueName + "\n" +
|
|
|
"capacity = " + capacity +
|
|
@@ -386,11 +393,11 @@ public class LeafQueue implements CSQueue {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- synchronized void setUtilization(float utilization) {
|
|
|
+ public synchronized void setUtilization(float utilization) {
|
|
|
this.utilization = utilization;
|
|
|
}
|
|
|
|
|
|
- synchronized void setUsedCapacity(float usedCapacity) {
|
|
|
+ public synchronized void setUsedCapacity(float usedCapacity) {
|
|
|
this.usedCapacity = usedCapacity;
|
|
|
}
|
|
|
|
|
@@ -534,7 +541,9 @@ public class LeafQueue implements CSQueue {
|
|
|
}
|
|
|
|
|
|
LeafQueue leafQueue = (LeafQueue)queue;
|
|
|
- setupQueueConfigs(leafQueue.capacity, leafQueue.absoluteCapacity,
|
|
|
+ setupQueueConfigs(
|
|
|
+ clusterResource,
|
|
|
+ leafQueue.capacity, leafQueue.absoluteCapacity,
|
|
|
leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity,
|
|
|
leafQueue.userLimit, leafQueue.userLimitFactor,
|
|
|
leafQueue.maxApplications,
|
|
@@ -542,8 +551,6 @@ public class LeafQueue implements CSQueue {
|
|
|
leafQueue.getMaximumActiveApplications(),
|
|
|
leafQueue.getMaximumActiveApplicationsPerUser(),
|
|
|
leafQueue.state, leafQueue.acls);
|
|
|
-
|
|
|
- updateResource(clusterResource);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -883,7 +890,8 @@ public class LeafQueue implements CSQueue {
|
|
|
|
|
|
Resource queueMaxCap = // Queue Max-Capacity
|
|
|
Resources.createResource(
|
|
|
- roundDown((int)(absoluteMaxCapacity * clusterResource.getMemory()))
|
|
|
+ CSQueueUtils.roundDown(minimumAllocation,
|
|
|
+ (int)(absoluteMaxCapacity * clusterResource.getMemory()))
|
|
|
);
|
|
|
|
|
|
Resource userConsumed = getUser(user).getConsumedResources();
|
|
@@ -904,16 +912,6 @@ public class LeafQueue implements CSQueue {
|
|
|
return userLimit;
|
|
|
}
|
|
|
|
|
|
- private int roundUp(int memory) {
|
|
|
- int minMemory = minimumAllocation.getMemory();
|
|
|
- return divideAndCeil(memory, minMemory) * minMemory;
|
|
|
- }
|
|
|
-
|
|
|
- private int roundDown(int memory) {
|
|
|
- int minMemory = minimumAllocation.getMemory();
|
|
|
- return (memory / minMemory) * minMemory;
|
|
|
- }
|
|
|
-
|
|
|
@Lock(NoLock.class)
|
|
|
private Resource computeUserLimit(SchedulerApp application,
|
|
|
Resource clusterResource, Resource required) {
|
|
@@ -927,8 +925,11 @@ public class LeafQueue implements CSQueue {
|
|
|
// Allow progress for queues with miniscule capacity
|
|
|
final int queueCapacity =
|
|
|
Math.max(
|
|
|
- roundUp((int)(absoluteCapacity * clusterResource.getMemory())),
|
|
|
- required.getMemory());
|
|
|
+ CSQueueUtils.roundUp(
|
|
|
+ minimumAllocation,
|
|
|
+ (int)(absoluteCapacity * clusterResource.getMemory())),
|
|
|
+ required.getMemory()
|
|
|
+ );
|
|
|
|
|
|
final int consumed = usedResources.getMemory();
|
|
|
final int currentCapacity =
|
|
@@ -943,7 +944,8 @@ public class LeafQueue implements CSQueue {
|
|
|
final int activeUsers = activeUsersManager.getNumActiveUsers();
|
|
|
|
|
|
int limit =
|
|
|
- roundUp(
|
|
|
+ CSQueueUtils.roundUp(
|
|
|
+ minimumAllocation,
|
|
|
Math.min(
|
|
|
Math.max(divideAndCeil(currentCapacity, activeUsers),
|
|
|
divideAndCeil((int)userLimit*currentCapacity, 100)),
|
|
@@ -991,7 +993,7 @@ public class LeafQueue implements CSQueue {
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- private static int divideAndCeil(int a, int b) {
|
|
|
+ static int divideAndCeil(int a, int b) {
|
|
|
if (b == 0) {
|
|
|
LOG.info("divideAndCeil called with a=" + a + " b=" + b);
|
|
|
return 0;
|
|
@@ -1325,7 +1327,8 @@ public class LeafQueue implements CSQueue {
|
|
|
SchedulerApp application, Resource resource) {
|
|
|
// Update queue metrics
|
|
|
Resources.addTo(usedResources, resource);
|
|
|
- updateResource(clusterResource);
|
|
|
+ CSQueueUtils.updateQueueStatistics(
|
|
|
+ this, parent, clusterResource, minimumAllocation);
|
|
|
++numContainers;
|
|
|
|
|
|
// Update user metrics
|
|
@@ -1349,7 +1352,8 @@ public class LeafQueue implements CSQueue {
|
|
|
SchedulerApp application, Resource resource) {
|
|
|
// Update queue metrics
|
|
|
Resources.subtractFrom(usedResources, resource);
|
|
|
- updateResource(clusterResource);
|
|
|
+ CSQueueUtils.updateQueueStatistics(
|
|
|
+ this, parent, clusterResource, minimumAllocation);
|
|
|
--numContainers;
|
|
|
|
|
|
// Update user metrics
|
|
@@ -1374,6 +1378,10 @@ public class LeafQueue implements CSQueue {
|
|
|
CSQueueUtils.computeMaxActiveApplicationsPerUser(
|
|
|
maxActiveApplications, userLimit, userLimitFactor);
|
|
|
|
|
|
+ // Update metrics
|
|
|
+ CSQueueUtils.updateQueueStatistics(
|
|
|
+ this, parent, clusterResource, minimumAllocation);
|
|
|
+
|
|
|
// Update application properties
|
|
|
for (SchedulerApp application : activeApplications) {
|
|
|
synchronized (application) {
|
|
@@ -1383,18 +1391,6 @@ public class LeafQueue implements CSQueue {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private synchronized void updateResource(Resource clusterResource) {
|
|
|
- float queueLimit = clusterResource.getMemory() * absoluteCapacity;
|
|
|
- setUtilization(usedResources.getMemory() / queueLimit);
|
|
|
- setUsedCapacity(usedResources.getMemory()
|
|
|
- / (clusterResource.getMemory() * parent.getAbsoluteCapacity()));
|
|
|
-
|
|
|
- Resource resourceLimit =
|
|
|
- Resources.createResource(roundUp((int)queueLimit));
|
|
|
- metrics.setAvailableResourcesToQueue(
|
|
|
- Resources.subtractFrom(resourceLimit, usedResources));
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
public QueueMetrics getMetrics() {
|
|
|
return metrics;
|