|
@@ -17,6 +17,8 @@
|
|
|
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
|
|
@@ -25,7 +27,7 @@ import org.apache.hadoop.fs.Path;
|
|
|
*
|
|
|
* Resource manager configuration involves setting up queues, and defining
|
|
|
* various properties for the queues. These are typically read from a file
|
|
|
- * called resource-manager-conf.xml that must be in the classpath of the
|
|
|
+ * called capacity-scheduler.xml that must be in the classpath of the
|
|
|
* application. The class provides APIs to get/set and reload the
|
|
|
* configuration for the queues.
|
|
|
*/
|
|
@@ -33,25 +35,13 @@ class CapacitySchedulerConf {
|
|
|
|
|
|
/** Default file name from which the resource manager configuration is read. */
|
|
|
public static final String SCHEDULER_CONF_FILE = "capacity-scheduler.xml";
|
|
|
-
|
|
|
- /** Default value for guaranteed capacity of maps (as percentage).
|
|
|
- * The default value is set to 100, to represent the entire queue.
|
|
|
- */
|
|
|
- public static final float DEFAULT_GUARANTEED_CAPACITY = 100;
|
|
|
-
|
|
|
- /** Default value for reclaiming redistributed resources.
|
|
|
- * The default value is set to <code>300</code>.
|
|
|
- */
|
|
|
- public static final int DEFAULT_RECLAIM_TIME_LIMIT = 300;
|
|
|
-
|
|
|
- /** Default value for minimum resource limit per user per queue, as a
|
|
|
- * percentage.
|
|
|
- * The default value is set to <code>100</code>, the idea
|
|
|
- * being that the default is suitable for organizations that do not
|
|
|
- * require setting up any queues.
|
|
|
- */
|
|
|
- public static final int DEFAULT_MIN_USER_LIMIT_PERCENT = 100;
|
|
|
-
|
|
|
+
|
|
|
+ private int defaultReclaimTime;
|
|
|
+
|
|
|
+ private int defaultUlimitMinimum;
|
|
|
+
|
|
|
+ private boolean defaultSupportPriority;
|
|
|
+
|
|
|
private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX =
|
|
|
"mapred.capacity-scheduler.queue.";
|
|
|
|
|
@@ -66,6 +56,7 @@ class CapacitySchedulerConf {
|
|
|
public CapacitySchedulerConf() {
|
|
|
rmConf = new Configuration(false);
|
|
|
rmConf.addResource(SCHEDULER_CONF_FILE);
|
|
|
+ initializeDefaults();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -78,22 +69,51 @@ class CapacitySchedulerConf {
|
|
|
public CapacitySchedulerConf(Path configFile) {
|
|
|
rmConf = new Configuration(false);
|
|
|
rmConf.addResource(configFile);
|
|
|
+ initializeDefaults();
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Method used to initialize the default values and the queue list
|
|
|
+ * which is used by the Capacity Scheduler.
|
|
|
+ */
|
|
|
+ private void initializeDefaults() {
|
|
|
+ defaultReclaimTime = rmConf.getInt(
|
|
|
+ "mapred.capacity-scheduler.default-reclaim-time-limit",300);
|
|
|
+ defaultUlimitMinimum = rmConf.getInt(
|
|
|
+ "mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);
|
|
|
+ defaultSupportPriority = rmConf.getBoolean(
|
|
|
+ "mapred.capacity-scheduler.default-supports-priority", false);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Get the guaranteed percentage of the cluster for the specified queue.
|
|
|
*
|
|
|
- * This method defaults to {@link #DEFAULT_GUARANTEED_CAPACITY} if
|
|
|
- * no value is specified in the configuration for this queue. If the queue
|
|
|
- * name is unknown, this method throws a {@link IllegalArgumentException}
|
|
|
+ * This method defaults to configured default Guaranteed Capacity if
|
|
|
+ * no value is specified in the configuration for this queue.
|
|
|
+ * If the configured capacity is negative value or greater than 100 an
|
|
|
+ * {@link IllegalArgumentException} is thrown.
|
|
|
+ *
|
|
|
+ * If default Guaranteed capacity is not configured for a queue, then
|
|
|
+ * system allocates capacity based on what is free at the time of
|
|
|
+ * capacity scheduler start
|
|
|
+ *
|
|
|
+ *
|
|
|
* @param queue name of the queue
|
|
|
* @return guaranteed percent of the cluster for the queue.
|
|
|
*/
|
|
|
public float getGuaranteedCapacity(String queue) {
|
|
|
- checkQueue(queue);
|
|
|
+ //Check done in order to return default GC which can be negative
|
|
|
+ //In case of both GC and default GC not configured.
|
|
|
+ //Last check is if the configuration is specified and is marked as
|
|
|
+ //negative we throw exception
|
|
|
+ String raw = rmConf.getRaw(toFullPropertyName(queue,
|
|
|
+ "guaranteed-capacity"));
|
|
|
+ if(raw == null) {
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
float result = rmConf.getFloat(toFullPropertyName(queue,
|
|
|
"guaranteed-capacity"),
|
|
|
- DEFAULT_GUARANTEED_CAPACITY);
|
|
|
+ -1);
|
|
|
if (result < 0.0 || result > 100.0) {
|
|
|
throw new IllegalArgumentException("Illegal capacity for queue " + queue +
|
|
|
" of " + result);
|
|
@@ -101,6 +121,17 @@ class CapacitySchedulerConf {
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Sets the Guaranteed capacity of the given queue.
|
|
|
+ *
|
|
|
+ * @param queue name of the queue
|
|
|
+ * @param gc guaranteed percent of the cluster for the queue.
|
|
|
+ */
|
|
|
+ public void setGuaranteedCapacity(String queue,float gc) {
|
|
|
+ rmConf.setFloat(toFullPropertyName(queue, "guaranteed-capacity"),gc);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* Get the amount of time before which redistributed resources must be
|
|
|
* reclaimed for the specified queue.
|
|
@@ -110,16 +141,23 @@ class CapacitySchedulerConf {
|
|
|
* submitted to the first queue requires back the resources, they must
|
|
|
* be reclaimed within the specified configuration time limit.
|
|
|
*
|
|
|
- * This method defaults to {@link #DEFAULT_RECLAIM_TIME_LIMIT} if
|
|
|
- * no value is specified in the configuration for this queue. If the queue
|
|
|
- * name is unknown, this method throws a {@link IllegalArgumentException}
|
|
|
+ * This method defaults to configured default reclaim time limit if
|
|
|
+ * no value is specified in the configuration for this queue.
|
|
|
+ *
|
|
|
+ * Throws an {@link IllegalArgumentException} when invalid value is
|
|
|
+ * configured.
|
|
|
+ *
|
|
|
* @param queue name of the queue
|
|
|
* @return reclaim time limit for this queue.
|
|
|
*/
|
|
|
public int getReclaimTimeLimit(String queue) {
|
|
|
- checkQueue(queue);
|
|
|
- return rmConf.getInt(toFullPropertyName(queue, "reclaim-time-limit"),
|
|
|
- DEFAULT_RECLAIM_TIME_LIMIT);
|
|
|
+ int reclaimTimeLimit = rmConf.getInt(toFullPropertyName(queue, "reclaim-time-limit"),
|
|
|
+ defaultReclaimTime);
|
|
|
+ if(reclaimTimeLimit <= 0) {
|
|
|
+ throw new IllegalArgumentException("Invalid reclaim time limit : "
|
|
|
+ + reclaimTimeLimit + " for queue : " + queue);
|
|
|
+ }
|
|
|
+ return reclaimTimeLimit;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -130,7 +168,6 @@ class CapacitySchedulerConf {
|
|
|
* must be retained.
|
|
|
*/
|
|
|
public void setReclaimTimeLimit(String queue, int value) {
|
|
|
- checkQueue(queue);
|
|
|
rmConf.setInt(toFullPropertyName(queue, "reclaim-time-limit"), value);
|
|
|
}
|
|
|
|
|
@@ -139,27 +176,23 @@ class CapacitySchedulerConf {
|
|
|
*
|
|
|
* If this value is false, then job priorities will be ignored in
|
|
|
* scheduling decisions. This method defaults to <code>false</code> if
|
|
|
- * the property is not configured for this queue. If the queue name is
|
|
|
- * unknown, this method throws a {@link IllegalArgumentException}
|
|
|
+ * the property is not configured for this queue.
|
|
|
* @param queue name of the queue
|
|
|
* @return Whether this queue supports priority or not.
|
|
|
*/
|
|
|
public boolean isPrioritySupported(String queue) {
|
|
|
- checkQueue(queue);
|
|
|
return rmConf.getBoolean(toFullPropertyName(queue, "supports-priority"),
|
|
|
- false);
|
|
|
+ defaultSupportPriority);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Set whether priority is supported for this queue.
|
|
|
*
|
|
|
- * If the queue name is unknown, this method throws a
|
|
|
- * {@link IllegalArgumentException}
|
|
|
+ *
|
|
|
* @param queue name of the queue
|
|
|
* @param value true, if the queue must support priorities, false otherwise.
|
|
|
*/
|
|
|
public void setPrioritySupported(String queue, boolean value) {
|
|
|
- checkQueue(queue);
|
|
|
rmConf.setBoolean(toFullPropertyName(queue, "supports-priority"), value);
|
|
|
}
|
|
|
|
|
@@ -167,33 +200,36 @@ class CapacitySchedulerConf {
|
|
|
* Get the minimum limit of resources for any user submitting jobs in
|
|
|
* this queue, in percentage.
|
|
|
*
|
|
|
- * This method defaults to {@link #DEFAULT_MIN_USER_LIMIT_PERCENT} if
|
|
|
- * no value is specified in the configuration for this queue. If the queue
|
|
|
- * name is unknown, this method throws a {@link IllegalArgumentException}
|
|
|
+ * This method defaults to default user limit configured if
|
|
|
+ * no value is specified in the configuration for this queue.
|
|
|
+ *
|
|
|
+ * Throws an {@link IllegalArgumentException} when invalid value is
|
|
|
+ * configured.
|
|
|
+ *
|
|
|
* @param queue name of the queue
|
|
|
* @return minimum limit of resources, in percentage, that will be
|
|
|
* available for a user.
|
|
|
*
|
|
|
*/
|
|
|
public int getMinimumUserLimitPercent(String queue) {
|
|
|
- checkQueue(queue);
|
|
|
- return rmConf.getInt(toFullPropertyName(queue,
|
|
|
- "minimum-user-limit-percent"),
|
|
|
- DEFAULT_MIN_USER_LIMIT_PERCENT);
|
|
|
+ int userLimit = rmConf.getInt(toFullPropertyName(queue,
|
|
|
+ "minimum-user-limit-percent"), defaultUlimitMinimum);
|
|
|
+ if(userLimit <= 0 || userLimit > 100) {
|
|
|
+ throw new IllegalArgumentException("Invalid user limit : "
|
|
|
+ + userLimit + " for queue : " + queue);
|
|
|
+ }
|
|
|
+ return userLimit;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Set the minimum limit of resources for any user submitting jobs in
|
|
|
* this queue, in percentage.
|
|
|
*
|
|
|
- * If the queue name is unknown, this method throws a
|
|
|
- * {@link IllegalArgumentException}
|
|
|
* @param queue name of the queue
|
|
|
* @param value minimum limit of resources for any user submitting jobs
|
|
|
* in this queue
|
|
|
*/
|
|
|
public void setMinimumUserLimitPercent(String queue, int value) {
|
|
|
- checkQueue(queue);
|
|
|
rmConf.setInt(toFullPropertyName(queue, "minimum-user-limit-percent"),
|
|
|
value);
|
|
|
}
|
|
@@ -204,19 +240,12 @@ class CapacitySchedulerConf {
|
|
|
*/
|
|
|
public synchronized void reloadConfiguration() {
|
|
|
rmConf.reloadConfiguration();
|
|
|
+ initializeDefaults();
|
|
|
}
|
|
|
|
|
|
- private synchronized void checkQueue(String queue) {
|
|
|
- /*if (queues == null) {
|
|
|
- queues = getQueues();
|
|
|
- }
|
|
|
- if (!queues.contains(queue)) {
|
|
|
- throw new IllegalArgumentException("Queue " + queue + " is undefined.");
|
|
|
- }*/
|
|
|
- }
|
|
|
-
|
|
|
private static final String toFullPropertyName(String queue,
|
|
|
String property) {
|
|
|
return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
|
|
|
}
|
|
|
+
|
|
|
}
|