|
@@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.records.QueueState;
|
|
|
import org.apache.hadoop.yarn.api.records.QueueStatistics;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
|
|
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
@@ -106,10 +105,6 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|
|
Map<AccessType, AccessControlList> acls =
|
|
|
new HashMap<AccessType, AccessControlList>();
|
|
|
volatile boolean reservationsContinueLooking;
|
|
|
- private volatile boolean preemptionDisabled;
|
|
|
- // Indicates if the in-queue preemption setting is ever disabled within the
|
|
|
- // hierarchy of this queue.
|
|
|
- private boolean intraQueuePreemptionDisabledInHierarchy;
|
|
|
|
|
|
// Track resource usage-by-label like used-resource/pending-resource, etc.
|
|
|
volatile ResourceUsage queueUsage;
|
|
@@ -129,6 +124,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|
|
// Indicates if this queue's default lifetime was set by a config property,
|
|
|
// either at this level or anywhere in the queue's hierarchy.
|
|
|
private volatile boolean defaultAppLifetimeWasSpecifiedInConfig = false;
|
|
|
+ private CSQueuePreemption preemptionSettings;
|
|
|
|
|
|
public enum CapacityConfigType {
|
|
|
// FIXME, from what I can see, Percentage mode can almost apply to weighted
|
|
@@ -401,10 +397,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|
|
this, labelManager, null);
|
|
|
|
|
|
// Store preemption settings
|
|
|
- this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this,
|
|
|
- configuration);
|
|
|
- this.intraQueuePreemptionDisabledInHierarchy =
|
|
|
- isIntraQueueHierarchyPreemptionDisabled(this, configuration);
|
|
|
+ this.preemptionSettings = new CSQueuePreemption(this, csContext, configuration);
|
|
|
this.priority = configuration.getQueuePriority(
|
|
|
getQueuePath());
|
|
|
|
|
@@ -777,7 +770,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|
|
queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression);
|
|
|
queueInfo.setCurrentCapacity(getUsedCapacity());
|
|
|
queueInfo.setQueueStatistics(getQueueStatistics());
|
|
|
- queueInfo.setPreemptionDisabled(preemptionDisabled);
|
|
|
+ queueInfo.setPreemptionDisabled(getPreemptionDisabled());
|
|
|
queueInfo.setIntraQueuePreemptionDisabled(
|
|
|
getIntraQueuePreemptionDisabled());
|
|
|
queueInfo.setQueueConfigurations(getQueueConfigurations());
|
|
@@ -902,17 +895,18 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|
|
|
|
|
@Private
|
|
|
public boolean getPreemptionDisabled() {
|
|
|
- return preemptionDisabled;
|
|
|
+ return preemptionSettings.isPreemptionDisabled();
|
|
|
}
|
|
|
|
|
|
@Private
|
|
|
public boolean getIntraQueuePreemptionDisabled() {
|
|
|
- return intraQueuePreemptionDisabledInHierarchy || preemptionDisabled;
|
|
|
+ return preemptionSettings.isIntraQueuePreemptionDisabledInHierarchy() ||
|
|
|
+ preemptionSettings.isPreemptionDisabled();
|
|
|
}
|
|
|
|
|
|
@Private
|
|
|
public boolean getIntraQueuePreemptionDisabledInHierarchy() {
|
|
|
- return intraQueuePreemptionDisabledInHierarchy;
|
|
|
+ return preemptionSettings.isIntraQueuePreemptionDisabledInHierarchy();
|
|
|
}
|
|
|
|
|
|
@Private
|
|
@@ -935,43 +929,6 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|
|
return readLock;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * The specified queue is cross-queue preemptable if system-wide cross-queue
|
|
|
- * preemption is turned on unless any queue in the <em>qPath</em> hierarchy
|
|
|
- * has explicitly turned cross-queue preemption off.
|
|
|
- * NOTE: Cross-queue preemptability is inherited from a queue's parent.
|
|
|
- *
|
|
|
- * @param q queue to check preemption state
|
|
|
- * @param configuration capacity scheduler config
|
|
|
- * @return true if queue has cross-queue preemption disabled, false otherwise
|
|
|
- */
|
|
|
- private boolean isQueueHierarchyPreemptionDisabled(CSQueue q,
|
|
|
- CapacitySchedulerConfiguration configuration) {
|
|
|
- boolean systemWidePreemption =
|
|
|
- csContext.getConfiguration()
|
|
|
- .getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
|
|
|
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
|
|
|
- CSQueue parentQ = q.getParent();
|
|
|
-
|
|
|
- // If the system-wide preemption switch is turned off, all of the queues in
|
|
|
- // the qPath hierarchy have preemption disabled, so return true.
|
|
|
- if (!systemWidePreemption) return true;
|
|
|
-
|
|
|
- // If q is the root queue and the system-wide preemption switch is turned
|
|
|
- // on, then q does not have preemption disabled (default=false, below)
|
|
|
- // unless the preemption_disabled property is explicitly set.
|
|
|
- if (parentQ == null) {
|
|
|
- return configuration.getPreemptionDisabled(q.getQueuePath(), false);
|
|
|
- }
|
|
|
-
|
|
|
- // If this is not the root queue, inherit the default value for the
|
|
|
- // preemption_disabled property from the parent. Preemptability will be
|
|
|
- // inherited from the parent's hierarchy unless explicitly overridden at
|
|
|
- // this level.
|
|
|
- return configuration.getPreemptionDisabled(q.getQueuePath(),
|
|
|
- parentQ.getPreemptionDisabled());
|
|
|
- }
|
|
|
-
|
|
|
private long getInheritedMaxAppLifetime(CSQueue q,
|
|
|
CapacitySchedulerConfiguration conf) {
|
|
|
CSQueue parentQ = q.getParent();
|
|
@@ -1042,43 +999,6 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|
|
return defaultAppLifetime;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * The specified queue is intra-queue preemptable if
|
|
|
- * 1) system-wide intra-queue preemption is turned on
|
|
|
- * 2) no queue in the <em>qPath</em> hierarchy has explicitly turned off intra
|
|
|
- * queue preemption.
|
|
|
- * NOTE: Intra-queue preemptability is inherited from a queue's parent.
|
|
|
- *
|
|
|
- * @param q queue to check intra-queue preemption state
|
|
|
- * @param configuration capacity scheduler config
|
|
|
- * @return true if queue has intra-queue preemption disabled, false otherwise
|
|
|
- */
|
|
|
- private boolean isIntraQueueHierarchyPreemptionDisabled(CSQueue q,
|
|
|
- CapacitySchedulerConfiguration configuration) {
|
|
|
- boolean systemWideIntraQueuePreemption =
|
|
|
- csContext.getConfiguration().getBoolean(
|
|
|
- CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED,
|
|
|
- CapacitySchedulerConfiguration
|
|
|
- .DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED);
|
|
|
- // Intra-queue preemption is disabled for this queue if the system-wide
|
|
|
- // intra-queue preemption flag is false
|
|
|
- if (!systemWideIntraQueuePreemption) return true;
|
|
|
-
|
|
|
- // Check if this is the root queue and the root queue's intra-queue
|
|
|
- // preemption disable switch is set
|
|
|
- CSQueue parentQ = q.getParent();
|
|
|
- if (parentQ == null) {
|
|
|
- return configuration
|
|
|
- .getIntraQueuePreemptionDisabled(q.getQueuePath(), false);
|
|
|
- }
|
|
|
-
|
|
|
- // At this point, the master preemption switch is enabled down to this
|
|
|
- // queue's level. Determine whether or not intra-queue preemption is enabled
|
|
|
- // down to this queue's level and return that value.
|
|
|
- return configuration.getIntraQueuePreemptionDisabled(q.getQueuePath(),
|
|
|
- parentQ.getIntraQueuePreemptionDisabledInHierarchy());
|
|
|
- }
|
|
|
-
|
|
|
private Resource getCurrentLimitResource(String nodePartition,
|
|
|
Resource clusterResource, ResourceLimits currentResourceLimits,
|
|
|
SchedulingMode schedulingMode) {
|
|
@@ -1423,11 +1343,11 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|
|
LOG.info("The specified queue:" + getQueuePath()
|
|
|
+ " is already in the RUNNING state.");
|
|
|
} else {
|
|
|
- CSQueue parent = getParent();
|
|
|
- if (parent == null || parent.getState() == QueueState.RUNNING) {
|
|
|
+ CSQueue parentQueue = getParent();
|
|
|
+ if (parentQueue == null || parentQueue.getState() == QueueState.RUNNING) {
|
|
|
updateQueueState(QueueState.RUNNING);
|
|
|
} else {
|
|
|
- throw new YarnException("The parent Queue:" + parent.getQueuePath()
|
|
|
+ throw new YarnException("The parent Queue:" + parentQueue.getQueuePath()
|
|
|
+ " is not running. Please activate the parent queue first");
|
|
|
}
|
|
|
}
|