|
@@ -21,12 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
|
|
|
import org.apache.hadoop.classification.VisibleForTesting;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
|
|
|
- .QueueManagementDynamicEditPolicy;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
|
|
.SchedulerDynamicEditException;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
|
|
@@ -84,6 +81,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
|
|
public class GuaranteedOrZeroCapacityOverTimePolicy
|
|
|
implements AutoCreatedQueueManagementPolicy {
|
|
|
|
|
|
+ private static final int DEFAULT_QUEUE_PRINT_SIZE_LIMIT = 25;
|
|
|
private CapacitySchedulerContext scheduler;
|
|
|
private ManagedParentQueue managedParentQueue;
|
|
|
|
|
@@ -345,12 +343,11 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Compute/Adjust child queue capacities
|
|
|
- * for auto created leaf queues
|
|
|
- * This computes queue entitlements but does not update LeafQueueState or
|
|
|
- * queue capacities. Scheduler calls commitQueueManagemetChanges after
|
|
|
- * validation after applying queue changes and commits to LeafQueueState
|
|
|
- * are done in commitQueueManagementChanges.
|
|
|
+ * Computes / adjusts child queue capacities for auto created leaf queues.
|
|
|
+ * This method computes queue entitlements but does not update LeafQueueState or
|
|
|
+ * queue capacities.
|
|
|
+ * Scheduler calls commitQueueManagementChanges after validation after applying queue changes
|
|
|
+ * and commits to LeafQueueState are done in commitQueueManagementChanges.
|
|
|
*
|
|
|
* @return List of Queue Management change suggestions which could potentially
|
|
|
* be committed/rejected by the scheduler due to validation failures
|
|
@@ -367,117 +364,98 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|
|
managedParentQueue.getAutoCreatedQueueManagementPolicy());
|
|
|
|
|
|
//TODO : Add support for node labels on leaf queue template configurations
|
|
|
- //synch/add missing leaf queue(s) if any to state
|
|
|
+ //sync / add missing leaf queue(s) if any TO state
|
|
|
updateLeafQueueState();
|
|
|
|
|
|
readLock.lock();
|
|
|
try {
|
|
|
- List<QueueManagementChange> queueManagementChanges = new ArrayList<>();
|
|
|
- List<FiCaSchedulerApp> pendingApps = getSortedPendingApplications();
|
|
|
-
|
|
|
- //Map of LeafQueue->QueueCapacities - keep adding the computed
|
|
|
- // entitlements to this map and finally
|
|
|
- // build the leaf queue configuration Template for all identified leaf
|
|
|
- // queues
|
|
|
- Map<String, QueueCapacities> leafQueueEntitlements = new HashMap<>();
|
|
|
+ LeafQueueEntitlements leafQueueEntitlements = new LeafQueueEntitlements();
|
|
|
for (String nodeLabel : leafQueueTemplateNodeLabels) {
|
|
|
- // check if any leaf queues need to be deactivated based on pending
|
|
|
- // applications
|
|
|
- float parentAbsoluteCapacity =
|
|
|
- managedParentQueue.getQueueCapacities().getAbsoluteCapacity(
|
|
|
- nodeLabel);
|
|
|
- float leafQueueTemplateAbsoluteCapacity =
|
|
|
- leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel);
|
|
|
- Map<String, QueueCapacities> deactivatedLeafQueues =
|
|
|
- deactivateLeafQueuesIfInActive(managedParentQueue, nodeLabel,
|
|
|
- leafQueueEntitlements);
|
|
|
-
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- if ( deactivatedLeafQueues.size() > 0) {
|
|
|
- LOG.debug("Parent queue = {}, " +
|
|
|
- ", nodeLabel = {}, deactivated leaf queues = [{}] ",
|
|
|
- managedParentQueue.getQueuePath(), nodeLabel,
|
|
|
- deactivatedLeafQueues.size() > 25 ? deactivatedLeafQueues
|
|
|
- .size() : deactivatedLeafQueues);
|
|
|
-
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- float deactivatedCapacity = getTotalDeactivatedCapacity(
|
|
|
- deactivatedLeafQueues, nodeLabel);
|
|
|
-
|
|
|
- float sumOfChildQueueActivatedCapacity = parentQueueState.
|
|
|
- getAbsoluteActivatedChildQueueCapacity(nodeLabel);
|
|
|
+ DeactivatedLeafQueuesByLabel deactivatedLeafQueues =
|
|
|
+ deactivateLeafQueues(nodeLabel, leafQueueEntitlements);
|
|
|
+ deactivatedLeafQueues.printToDebug(LOG);
|
|
|
|
|
|
//Check if we need to activate anything at all?
|
|
|
- float availableCapacity =
|
|
|
- parentAbsoluteCapacity - sumOfChildQueueActivatedCapacity
|
|
|
- + deactivatedCapacity + EPSILON;
|
|
|
-
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Parent queue = " + managedParentQueue.getQueuePath()
|
|
|
- + ", nodeLabel = " + nodeLabel + ", absCapacity = "
|
|
|
- + parentAbsoluteCapacity + ", leafQueueAbsoluteCapacity = "
|
|
|
- + leafQueueTemplateAbsoluteCapacity + ", deactivatedCapacity = "
|
|
|
- + deactivatedCapacity + " , absChildActivatedCapacity = "
|
|
|
- + sumOfChildQueueActivatedCapacity + ", availableCapacity = "
|
|
|
- + availableCapacity);
|
|
|
- }
|
|
|
-
|
|
|
- if (availableCapacity >= leafQueueTemplateAbsoluteCapacity) {
|
|
|
- //sort applications across leaf queues by submit time
|
|
|
- if (pendingApps.size() > 0) {
|
|
|
- int maxLeafQueuesTobeActivated = getMaxLeavesToBeActivated(
|
|
|
- availableCapacity, leafQueueTemplateAbsoluteCapacity,
|
|
|
- pendingApps.size());
|
|
|
-
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Parent queue = " + managedParentQueue.getQueuePath()
|
|
|
- + " : Found " + maxLeafQueuesTobeActivated + " leaf queues"
|
|
|
- + " to be activated with " + pendingApps.size() + " apps ");
|
|
|
- }
|
|
|
-
|
|
|
- LinkedHashSet<String> leafQueuesToBeActivated = getSortedLeafQueues(
|
|
|
- nodeLabel, pendingApps, maxLeafQueuesTobeActivated,
|
|
|
- deactivatedLeafQueues.keySet());
|
|
|
-
|
|
|
- //Compute entitlement changes for the identified leaf queues
|
|
|
- // which is appended to the List of computedEntitlements
|
|
|
- updateLeafQueueCapacitiesByLabel(nodeLabel, leafQueuesToBeActivated,
|
|
|
- leafQueueEntitlements);
|
|
|
-
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- if (leafQueuesToBeActivated.size() > 0) {
|
|
|
- LOG.debug("Activated leaf queues : [{}]",
|
|
|
- leafQueuesToBeActivated.size() < 25 ?
|
|
|
- leafQueuesToBeActivated : leafQueuesToBeActivated.size());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ if (deactivatedLeafQueues.canActivateLeafQueues()) {
|
|
|
+ activateLeafQueues(leafQueueEntitlements, nodeLabel, deactivatedLeafQueues);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
//Populate new entitlements
|
|
|
-
|
|
|
- for (final Iterator<Map.Entry<String, QueueCapacities>> iterator =
|
|
|
- leafQueueEntitlements.entrySet().iterator(); iterator.hasNext(); ) {
|
|
|
- Map.Entry<String, QueueCapacities> queueCapacities = iterator.next();
|
|
|
- String leafQueueName = queueCapacities.getKey();
|
|
|
+ return leafQueueEntitlements.mapToQueueManagementChanges((leafQueueName, capacities) -> {
|
|
|
AutoCreatedLeafQueue leafQueue =
|
|
|
(AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager()
|
|
|
.getQueue(leafQueueName);
|
|
|
- AutoCreatedLeafQueueConfig newTemplate = buildTemplate(
|
|
|
- queueCapacities.getValue());
|
|
|
- queueManagementChanges.add(
|
|
|
- new QueueManagementChange.UpdateQueue(leafQueue, newTemplate));
|
|
|
-
|
|
|
- }
|
|
|
- return queueManagementChanges;
|
|
|
+ AutoCreatedLeafQueueConfig newTemplate = buildTemplate(capacities);
|
|
|
+ return new QueueManagementChange.UpdateQueue(leafQueue, newTemplate);
|
|
|
+ });
|
|
|
} finally {
|
|
|
readLock.unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void activateLeafQueues(LeafQueueEntitlements leafQueueEntitlements, String nodeLabel,
|
|
|
+ DeactivatedLeafQueuesByLabel deactivatedLeafQueues) throws SchedulerDynamicEditException {
|
|
|
+ //sort applications across leaf queues by submit time
|
|
|
+ List<FiCaSchedulerApp> pendingApps = getSortedPendingApplications();
|
|
|
+ if (pendingApps.size() > 0) {
|
|
|
+ int maxLeafQueuesTobeActivated = deactivatedLeafQueues.
|
|
|
+ getMaxLeavesToBeActivated(pendingApps.size());
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Parent queue = {}, Found {} leaf queues to be activated with {} aps",
|
|
|
+ managedParentQueue.getQueuePath(), maxLeafQueuesTobeActivated, pendingApps.size());
|
|
|
+ }
|
|
|
+
|
|
|
+ Set<String> leafQueuesToBeActivated = getSortedLeafQueues(
|
|
|
+ nodeLabel, pendingApps, maxLeafQueuesTobeActivated,
|
|
|
+ deactivatedLeafQueues.getQueues());
|
|
|
+
|
|
|
+ // Compute entitlement changes for the identified leaf queues
|
|
|
+ // which is appended to the List of computedEntitlements
|
|
|
+ updateLeafQueueCapacitiesByLabel(nodeLabel, leafQueuesToBeActivated, leafQueueEntitlements);
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled() && leafQueuesToBeActivated.size() > 0) {
|
|
|
+ LOG.debug("Activated leaf queues : [{}]",
|
|
|
+ getListContentsUpToLimit(leafQueuesToBeActivated));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Object getListContentsUpToLimit(Set<String> leafQueuesToBeActivated) {
|
|
|
+ return leafQueuesToBeActivated.size() < DEFAULT_QUEUE_PRINT_SIZE_LIMIT ?
|
|
|
+ leafQueuesToBeActivated : leafQueuesToBeActivated.size();
|
|
|
+ }
|
|
|
+
|
|
|
+ private Object getMapUpToLimit(Map<String, QueueCapacities> deactivatedLeafQueues) {
|
|
|
+ return deactivatedLeafQueues.size() > DEFAULT_QUEUE_PRINT_SIZE_LIMIT ?
|
|
|
+ deactivatedLeafQueues.size() : deactivatedLeafQueues;
|
|
|
+ }
|
|
|
+
|
|
|
+ private DeactivatedLeafQueuesByLabel deactivateLeafQueues(String nodeLabel,
|
|
|
+ LeafQueueEntitlements leafQueueEntitlements) throws SchedulerDynamicEditException {
|
|
|
+ // check if any leaf queues need to be deactivated based on pending applications
|
|
|
+ float parentAbsoluteCapacity =
|
|
|
+ managedParentQueue.getQueueCapacities().getAbsoluteCapacity(nodeLabel);
|
|
|
+ float leafQueueTemplateAbsoluteCapacity =
|
|
|
+ leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel);
|
|
|
+ Map<String, QueueCapacities> deactivatedLeafQueues =
|
|
|
+ deactivateLeafQueuesIfInActive(managedParentQueue, nodeLabel, leafQueueEntitlements);
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled() && deactivatedLeafQueues.size() > 0) {
|
|
|
+ LOG.debug("Parent queue = {}, nodeLabel = {}, deactivated leaf queues = [{}] ",
|
|
|
+ managedParentQueue.getQueuePath(), nodeLabel,
|
|
|
+ getMapUpToLimit(deactivatedLeafQueues));
|
|
|
+ }
|
|
|
+
|
|
|
+ return new DeactivatedLeafQueuesByLabel(deactivatedLeafQueues,
|
|
|
+ managedParentQueue.getQueuePath(),
|
|
|
+ nodeLabel,
|
|
|
+ parentQueueState.getAbsoluteActivatedChildQueueCapacity(nodeLabel),
|
|
|
+ parentAbsoluteCapacity,
|
|
|
+ leafQueueTemplateAbsoluteCapacity);
|
|
|
+ }
|
|
|
+
|
|
|
private void updateTemplateAbsoluteCapacities(QueueCapacities parentQueueCapacities,
|
|
|
GuaranteedOrZeroCapacityOverTimePolicy policy) {
|
|
|
writeLock.lock();
|
|
@@ -496,19 +474,6 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|
|
updateTemplateAbsoluteCapacities(queueCapacities, this);
|
|
|
}
|
|
|
|
|
|
- private float getTotalDeactivatedCapacity(
|
|
|
- Map<String, QueueCapacities> deactivatedLeafQueues, String nodeLabel) {
|
|
|
- float deactivatedCapacity = 0;
|
|
|
- for (Iterator<Map.Entry<String, QueueCapacities>> iterator =
|
|
|
- deactivatedLeafQueues.entrySet().iterator(); iterator.hasNext(); ) {
|
|
|
- Map.Entry<String, QueueCapacities> deactivatedQueueCapacity =
|
|
|
- iterator.next();
|
|
|
- deactivatedCapacity +=
|
|
|
- deactivatedQueueCapacity.getValue().getAbsoluteCapacity(nodeLabel);
|
|
|
- }
|
|
|
- return deactivatedCapacity;
|
|
|
- }
|
|
|
-
|
|
|
@VisibleForTesting
|
|
|
void updateLeafQueueState() {
|
|
|
writeLock.lock();
|
|
@@ -608,9 +573,15 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Map of LeafQueue -> QueueCapacities - keep adding the computed
|
|
|
+ * entitlements to this map and finally
|
|
|
+ * build the leaf queue configuration Template for all identified leaf
|
|
|
+ * queues
|
|
|
+ */
|
|
|
private Map<String, QueueCapacities> deactivateLeafQueuesIfInActive(
|
|
|
ParentQueue parentQueue, String nodeLabel,
|
|
|
- Map<String, QueueCapacities> leafQueueEntitlements)
|
|
|
+ LeafQueueEntitlements leafQueueEntitlements)
|
|
|
throws SchedulerDynamicEditException {
|
|
|
Map<String, QueueCapacities> deactivatedQueues = new HashMap<>();
|
|
|
|
|
@@ -618,18 +589,11 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|
|
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue;
|
|
|
if (leafQueue != null) {
|
|
|
if (isActive(leafQueue, nodeLabel) && !hasPendingApps(leafQueue)) {
|
|
|
- if (!leafQueueEntitlements.containsKey(leafQueue.getQueuePath())) {
|
|
|
- leafQueueEntitlements.put(leafQueue.getQueuePath(),
|
|
|
- new QueueCapacities(false));
|
|
|
- }
|
|
|
-
|
|
|
- QueueCapacities capacities = leafQueueEntitlements.get(
|
|
|
- leafQueue.getQueuePath());
|
|
|
+ QueueCapacities capacities = leafQueueEntitlements.getCapacityOfQueue(leafQueue);
|
|
|
updateToZeroCapacity(capacities, nodeLabel, (LeafQueue)childQueue);
|
|
|
- deactivatedQueues.put(leafQueue.getQueuePath(),
|
|
|
- leafQueueTemplateCapacities);
|
|
|
+ deactivatedQueues.put(leafQueue.getQueuePath(), leafQueueTemplateCapacities);
|
|
|
}
|
|
|
- } else{
|
|
|
+ } else {
|
|
|
LOG.warn("Could not find queue in scheduler while trying" + " to "
|
|
|
+ "deactivate for " + parentQueue);
|
|
|
}
|
|
@@ -640,35 +604,16 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|
|
|
|
|
private void updateLeafQueueCapacitiesByLabel(String nodeLabel,
|
|
|
Set<String> leafQueuesToBeActivated,
|
|
|
- Map<String, QueueCapacities> leafQueueEntitlements) {
|
|
|
- for (String curLeafQueue : leafQueuesToBeActivated) {
|
|
|
- if (!leafQueueEntitlements.containsKey(curLeafQueue)) {
|
|
|
- leafQueueEntitlements.put(curLeafQueue, new QueueCapacities(false));
|
|
|
- // Activate queues if capacity is available
|
|
|
- }
|
|
|
-
|
|
|
- QueueCapacities capacities = leafQueueEntitlements.get(curLeafQueue);
|
|
|
+ LeafQueueEntitlements leafQueueEntitlements) {
|
|
|
+ for (String leafQueue : leafQueuesToBeActivated) {
|
|
|
+ QueueCapacities capacities = leafQueueEntitlements.getCapacityOfQueueByPath(leafQueue);
|
|
|
updateCapacityFromTemplate(capacities, nodeLabel);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @VisibleForTesting
|
|
|
- public int getMaxLeavesToBeActivated(float availableCapacity,
|
|
|
- float childQueueAbsoluteCapacity, int numPendingApps)
|
|
|
- throws SchedulerDynamicEditException {
|
|
|
-
|
|
|
- if (childQueueAbsoluteCapacity > 0) {
|
|
|
- int numLeafQueuesNeeded = (int) Math.floor(
|
|
|
- availableCapacity / childQueueAbsoluteCapacity);
|
|
|
-
|
|
|
- return Math.min(numLeafQueuesNeeded, numPendingApps);
|
|
|
- }
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Commit queue management changes - which involves updating required state
|
|
|
- * on parent/underlying leaf queues
|
|
|
+ * on parent/underlying leaf queues.
|
|
|
*
|
|
|
* @param queueManagementChanges Queue Management changes to commit
|
|
|
* @throws SchedulerDynamicEditException when validation fails
|