|
@@ -18,12 +18,10 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
|
|
|
|
|
-import org.apache.hadoop.ipc.WeightedTimeCostProvider;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.placement.MappingRule;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.placement.csmappingrule.MappingRule;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.MappingRuleCreator;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -85,41 +83,41 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
LoggerFactory.getLogger(CapacitySchedulerConfiguration.class);
|
|
|
|
|
|
private static final String CS_CONFIGURATION_FILE = "capacity-scheduler.xml";
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final String PREFIX = "yarn.scheduler.capacity.";
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final String DOT = ".";
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final String MAXIMUM_APPLICATIONS_SUFFIX =
|
|
|
"maximum-applications";
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final String MAXIMUM_SYSTEM_APPLICATIONS =
|
|
|
PREFIX + MAXIMUM_APPLICATIONS_SUFFIX;
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final String MAXIMUM_AM_RESOURCE_SUFFIX =
|
|
|
"maximum-am-resource-percent";
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final String MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT =
|
|
|
PREFIX + MAXIMUM_AM_RESOURCE_SUFFIX;
|
|
|
|
|
|
@Private
|
|
|
public static final String QUEUES = "queues";
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final String CAPACITY = "capacity";
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final String MAXIMUM_CAPACITY = "maximum-capacity";
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final String USER_LIMIT = "minimum-user-limit-percent";
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final String USER_LIMIT_FACTOR = "user-limit-factor";
|
|
|
|
|
@@ -134,17 +132,17 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
|
|
|
@Private
|
|
|
public static final String STATE = "state";
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final String ACCESSIBLE_NODE_LABELS = "accessible-node-labels";
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final String DEFAULT_NODE_LABEL_EXPRESSION =
|
|
|
"default-node-label-expression";
|
|
|
|
|
|
public static final String RESERVE_CONT_LOOK_ALL_NODES = PREFIX
|
|
|
+ "reservations-continue-look-all-nodes";
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true;
|
|
|
|
|
@@ -177,29 +175,29 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
|
|
|
public static final String DEFAULT_APP_ORDERING_POLICY =
|
|
|
FIFO_APP_ORDERING_POLICY;
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
- public static final float
|
|
|
+ public static final float
|
|
|
DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT = 0.1f;
|
|
|
|
|
|
@Private
|
|
|
public static final float UNDEFINED = -1;
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final float MINIMUM_CAPACITY_VALUE = 0;
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final float MAXIMUM_CAPACITY_VALUE = 100;
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final float DEFAULT_MAXIMUM_CAPACITY_VALUE = -1.0f;
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final int DEFAULT_USER_LIMIT = 100;
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final float DEFAULT_USER_LIMIT_FACTOR = 1.0f;
|
|
|
|
|
@@ -217,17 +215,17 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
@Private public static final String RESOURCE_CALCULATOR_CLASS =
|
|
|
PREFIX + "resource-calculator";
|
|
|
|
|
|
- @Private public static final Class<? extends ResourceCalculator>
|
|
|
+ @Private public static final Class<? extends ResourceCalculator>
|
|
|
DEFAULT_RESOURCE_CALCULATOR_CLASS = DefaultResourceCalculator.class;
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final String ROOT = "root";
|
|
|
|
|
|
- @Private
|
|
|
- public static final String NODE_LOCALITY_DELAY =
|
|
|
+ @Private
|
|
|
+ public static final String NODE_LOCALITY_DELAY =
|
|
|
PREFIX + "node-locality-delay";
|
|
|
|
|
|
- @Private
|
|
|
+ @Private
|
|
|
public static final int DEFAULT_NODE_LOCALITY_DELAY = 40;
|
|
|
|
|
|
@Private
|
|
@@ -312,7 +310,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
|
|
|
@Private
|
|
|
public static final Integer DEFAULT_CONFIGURATION_APPLICATION_PRIORITY = 0;
|
|
|
-
|
|
|
+
|
|
|
@Private
|
|
|
public static final String AVERAGE_CAPACITY = "average-capacity";
|
|
|
|
|
@@ -421,7 +419,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
public CapacitySchedulerConfiguration() {
|
|
|
this(new Configuration());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public CapacitySchedulerConfiguration(Configuration configuration) {
|
|
|
this(configuration, true);
|
|
|
}
|
|
@@ -454,15 +452,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
}
|
|
|
return getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label + DOT;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public int getMaximumSystemApplications() {
|
|
|
- int maxApplications =
|
|
|
+ int maxApplications =
|
|
|
getInt(MAXIMUM_SYSTEM_APPLICATIONS, DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS);
|
|
|
return maxApplications;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public float getMaximumApplicationMasterResourcePercent() {
|
|
|
- return getFloat(MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
|
|
|
+ return getFloat(MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
|
|
|
DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT);
|
|
|
}
|
|
|
|
|
@@ -473,23 +471,23 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
* @return setting specified or -1 if not set
|
|
|
*/
|
|
|
public int getMaximumApplicationsPerQueue(String queue) {
|
|
|
- int maxApplicationsPerQueue =
|
|
|
- getInt(getQueuePrefix(queue) + MAXIMUM_APPLICATIONS_SUFFIX,
|
|
|
- (int)UNDEFINED);
|
|
|
+ int maxApplicationsPerQueue =
|
|
|
+ getInt(getQueuePrefix(queue) + MAXIMUM_APPLICATIONS_SUFFIX,
|
|
|
+ (int)UNDEFINED);
|
|
|
return maxApplicationsPerQueue;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Get the maximum am resource percent per queue setting.
|
|
|
* @param queue name of the queue
|
|
|
- * @return per queue setting or defaults to the global am-resource-percent
|
|
|
+ * @return per queue setting or defaults to the global am-resource-percent
|
|
|
* setting if per queue setting not present
|
|
|
*/
|
|
|
public float getMaximumApplicationMasterResourcePerQueuePercent(String queue) {
|
|
|
- return getFloat(getQueuePrefix(queue) + MAXIMUM_AM_RESOURCE_SUFFIX,
|
|
|
+ return getFloat(getQueuePrefix(queue) + MAXIMUM_AM_RESOURCE_SUFFIX,
|
|
|
getMaximumApplicationMasterResourcePercent());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void setMaximumApplicationMasterResourcePerQueuePercent(String queue,
|
|
|
float percent) {
|
|
|
setFloat(getQueuePrefix(queue) + MAXIMUM_AM_RESOURCE_SUFFIX, percent);
|
|
@@ -555,7 +553,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
|
|
|
return capacity;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void setCapacity(String queue, float capacity) {
|
|
|
if (queue.equals("root")) {
|
|
|
throw new IllegalArgumentException(
|
|
@@ -599,7 +597,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
: maxCapacity;
|
|
|
return maxCapacity;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void setMaximumCapacity(String queue, float maxCapacity) {
|
|
|
if (maxCapacity > MAXIMUM_CAPACITY_VALUE) {
|
|
|
throw new IllegalArgumentException("Illegal " +
|
|
@@ -609,7 +607,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
LOG.debug("CSConf - setMaxCapacity: queuePrefix={}, maxCapacity={}",
|
|
|
getQueuePrefix(queue), maxCapacity);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void setCapacityByLabel(String queue, String label, float capacity) {
|
|
|
setFloat(getNodeLabelPrefix(queue, label) + CAPACITY, capacity);
|
|
|
}
|
|
@@ -630,7 +628,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
set(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY,
|
|
|
absoluteResourceCapacity);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public int getUserLimit(String queue) {
|
|
|
int userLimit = getInt(getQueuePrefix(queue) + USER_LIMIT,
|
|
|
DEFAULT_USER_LIMIT);
|
|
@@ -643,12 +641,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public <S extends SchedulableEntity> OrderingPolicy<S> getAppOrderingPolicy(
|
|
|
String queue) {
|
|
|
-
|
|
|
+
|
|
|
String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY,
|
|
|
DEFAULT_APP_ORDERING_POLICY);
|
|
|
-
|
|
|
+
|
|
|
OrderingPolicy<S> orderingPolicy;
|
|
|
-
|
|
|
+
|
|
|
if (policyType.trim().equals(FIFO_APP_ORDERING_POLICY)) {
|
|
|
policyType = FifoOrderingPolicy.class.getName();
|
|
|
}
|
|
@@ -686,18 +684,18 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
LOG.debug("here setUserLimit: queuePrefix={}, userLimit={}",
|
|
|
getQueuePrefix(queue), getUserLimit(queue));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public float getUserLimitFactor(String queue) {
|
|
|
- float userLimitFactor =
|
|
|
- getFloat(getQueuePrefix(queue) + USER_LIMIT_FACTOR,
|
|
|
- DEFAULT_USER_LIMIT_FACTOR);
|
|
|
+ float userLimitFactor =
|
|
|
+ getFloat(getQueuePrefix(queue) + USER_LIMIT_FACTOR,
|
|
|
+ DEFAULT_USER_LIMIT_FACTOR);
|
|
|
return userLimitFactor;
|
|
|
}
|
|
|
|
|
|
public void setUserLimitFactor(String queue, float userLimitFactor) {
|
|
|
- setFloat(getQueuePrefix(queue) + USER_LIMIT_FACTOR, userLimitFactor);
|
|
|
+ setFloat(getQueuePrefix(queue) + USER_LIMIT_FACTOR, userLimitFactor);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public QueueState getConfiguredState(String queue) {
|
|
|
String state = get(getQueuePrefix(queue) + STATE);
|
|
|
if (state == null) {
|
|
@@ -725,12 +723,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
String str = StringUtils.join(",", labels);
|
|
|
set(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS, str);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public Set<String> getAccessibleNodeLabels(String queue) {
|
|
|
String accessibleLabelStr =
|
|
|
get(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS);
|
|
|
|
|
|
- // When accessible-label is null,
|
|
|
+ // When accessible-label is null,
|
|
|
if (accessibleLabelStr == null) {
|
|
|
// Only return null when queue is not ROOT
|
|
|
if (!queue.equals(ROOT)) {
|
|
@@ -757,7 +755,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
set.add(str.trim());
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// if labels contains "*", only keep ANY behind
|
|
|
if (set.contains(RMNodeLabelsManager.ANY)) {
|
|
|
set.clear();
|
|
@@ -813,15 +811,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
}
|
|
|
return capacity;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public float getLabeledQueueCapacity(String queue, String label) {
|
|
|
return internalGetLabeledQueueCapacity(queue, label, CAPACITY, 0f);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public float getLabeledQueueMaximumCapacity(String queue, String label) {
|
|
|
return internalGetLabeledQueueCapacity(queue, label, MAXIMUM_CAPACITY, 100f);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public String getDefaultNodeLabelExpression(String queue) {
|
|
|
String defaultLabelExpression = get(getQueuePrefix(queue)
|
|
|
+ DEFAULT_NODE_LABEL_EXPRESSION);
|
|
@@ -830,7 +828,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
}
|
|
|
return defaultLabelExpression.trim();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void setDefaultNodeLabelExpression(String queue, String exp) {
|
|
|
set(getQueuePrefix(queue) + DEFAULT_NODE_LABEL_EXPRESSION, exp);
|
|
|
}
|
|
@@ -860,7 +858,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
return getBoolean(RESERVE_CONT_LOOK_ALL_NODES,
|
|
|
DEFAULT_RESERVE_CONT_LOOK_ALL_NODES);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private static String getAclKey(QueueACL acl) {
|
|
|
return "acl_" + StringUtils.toLowerCase(acl.toString());
|
|
|
}
|
|
@@ -987,13 +985,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
|
|
|
return queues;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void setQueues(String queue, String[] subQueues) {
|
|
|
set(getQueuePrefix(queue) + QUEUES, StringUtils.arrayToString(subQueues));
|
|
|
LOG.debug("CSConf - setQueues: qPrefix={}, queues={}",
|
|
|
getQueuePrefix(queue), StringUtils.arrayToString(subQueues));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public Resource getMinimumAllocation() {
|
|
|
int minimumMemory = getInt(
|
|
|
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
|
@@ -1087,9 +1085,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
public ResourceCalculator getResourceCalculator() {
|
|
|
return ReflectionUtils.newInstance(
|
|
|
getClass(
|
|
|
- RESOURCE_CALCULATOR_CLASS,
|
|
|
- DEFAULT_RESOURCE_CALCULATOR_CLASS,
|
|
|
- ResourceCalculator.class),
|
|
|
+ RESOURCE_CALCULATOR_CLASS,
|
|
|
+ DEFAULT_RESOURCE_CALCULATOR_CLASS,
|
|
|
+ ResourceCalculator.class),
|
|
|
this);
|
|
|
}
|
|
|
|
|
@@ -1101,8 +1099,8 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
public void setResourceComparator(
|
|
|
Class<? extends ResourceCalculator> resourceCalculatorClass) {
|
|
|
setClass(
|
|
|
- RESOURCE_CALCULATOR_CLASS,
|
|
|
- resourceCalculatorClass,
|
|
|
+ RESOURCE_CALCULATOR_CLASS,
|
|
|
+ resourceCalculatorClass,
|
|
|
ResourceCalculator.class);
|
|
|
}
|
|
|
|
|
@@ -1488,18 +1486,18 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
* Sets the <em>disable_preemption</em> property in order to indicate
|
|
|
* whether or not container preemption will be disabled for the specified
|
|
|
* queue.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param queue queue path
|
|
|
* @param preemptionDisabled true if preemption is disabled on queue
|
|
|
*/
|
|
|
public void setPreemptionDisabled(String queue, boolean preemptionDisabled) {
|
|
|
setBoolean(getQueuePrefix(queue) + QUEUE_PREEMPTION_DISABLED,
|
|
|
- preemptionDisabled);
|
|
|
+ preemptionDisabled);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Indicates whether preemption is disabled on the specified queue.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param queue queue path to query
|
|
|
* @param defaultVal used as default if the <em>disable_preemption</em>
|
|
|
* is not set in the configuration
|
|
@@ -1533,7 +1531,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
public Set<String> getConfiguredNodeLabels(String queuePath) {
|
|
|
Set<String> configuredNodeLabels = new HashSet<String>();
|
|
|
Entry<String, String> e = null;
|
|
|
-
|
|
|
+
|
|
|
Iterator<Entry<String, String>> iter = iterator();
|
|
|
while (iter.hasNext()) {
|
|
|
e = iter.next();
|
|
@@ -1551,10 +1549,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|
|
configuredNodeLabels.add(labelName);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// always add NO_LABEL
|
|
|
configuredNodeLabels.add(RMNodeLabelsManager.NO_LABEL);
|
|
|
-
|
|
|
+
|
|
|
return configuredNodeLabels;
|
|
|
}
|
|
|
|