Browse Source

HADOOP-3479. Defines the configuration file for the resource manager in Hadoop. You can configure various parameters related to scheduling, such as queues and queue properties here. The properties for a queue follow a naming convention,such as, hadoop.rm.queue.queue-name.property-name. Contributed by Hemanth Yamijala.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@674442 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 years ago
parent
commit
ccb0022890

+ 6 - 0
CHANGES.txt

@@ -37,6 +37,12 @@ Trunk (unreleased changes)
       stream.reduce.output.field.separator
     All of them default to "\t". (Zheng Shao via omalley)
 
+    HADOOP-3479. Defines the configuration file for the resource manager in 
+    Hadoop. You can configure various parameters related to scheduling, such 
+    as queues and queue properties here. The properties for a queue follow a
+    naming convention,such as, hadoop.rm.queue.queue-name.property-name.
+    (Hemanth Yamijala via ddas)
+
   IMPROVEMENTS
 
     HADOOP-3577. Tools to inject blocks into name node and simulated

+ 86 - 0
conf/resource-manager-conf.xml

@@ -0,0 +1,86 @@
+<?xml version="1.0"?>
+
+<!-- This is the configuration file for the resource manager in Hadoop. -->
+<!-- You can configure various parameters related to scheduling, such as -->
+<!-- queues and queue properties here. -->
+<!-- The properties for a queue follow a naming convention,such as, -->
+<!-- hadoop.rm.queue.queue-name.property-name. -->
+
+<configuration>
+
+  <property>
+    <name>hadoop.rm.queue.names</name>
+    <value>default</value>
+    <description>Comma separated list of queue names that are configured
+      for this installation. Properties for a queue in this list will be 
+      listed as hadoop.rm.queue.queue-name.property-name.</description>
+  </property>
+  
+  <property>
+    <name>hadoop.rm.queue.default.guaranteed-capacity-maps</name>
+    <value></value>
+    <description>Guaranteed number of maps that will be available 
+      for jobs in this queue.
+    </description>    
+  </property>
+  
+  <property>
+    <name>hadoop.rm.queue.default.guaranteed-capacity-reduces</name>
+    <value></value>
+    <description>Guaranteed number of reduces that will be available 
+      for jobs in this queue.
+    </description>
+  </property>
+
+  <property>
+    <name>hadoop.rm.queue.default.reclaim-time-limit</name>
+    <value>0</value>
+    <description>The amount of time in seconds before which 
+      resources distributed to other queues will be reclaimed.
+    </description>
+  </property>
+
+  <property>
+    <name>hadoop.rm.queue.default.allowed-users</name>
+    <value></value>
+    <description>Comma separated list of users who can submit jobs 
+      to this queue. If empty, it means there are no ACLs 
+      configured.
+    </description>
+  </property>
+
+  <property>
+    <name>hadoop.rm.queue.default.denied-users</name>
+    <value></value>
+    <description>Comma separated list of users who cannot submit 
+      jobs to this queue. If empty, it means there are no ACLs 
+      configured.
+    </description>
+  </property>
+
+  <property>
+    <name>hadoop.rm.queue.default.allowed-users-override</name>
+    <value>false</value>
+    <description>If true, specifies that users in the allowed 
+      list will be able to submit, even if there names are in 
+      the denied list.
+    </description>
+  </property>
+
+  <property>
+    <name>hadoop.rm.queue.default.supports-priority</name>
+    <value>true</value>
+    <description>If true, priorities of jobs will be taken into 
+      account in scheduling decisions.
+    </description>
+  </property>
+
+  <property>
+    <name>hadoop.rm.queue.default.minimum-user-limit-percent</name>
+    <value>100</value>
+    <description>The minimum percentage of the cluster which can 
+      be utilized by a single user.
+    </description>
+  </property>
+  
+</configuration>

+ 35 - 12
src/core/org/apache/hadoop/conf/Configuration.java

@@ -72,8 +72,8 @@ import org.w3c.dom.Text;
  * <code>Path</code>, then the local filesystem is examined directly, without 
  * referring to the classpath.
  *
- * <p>Hadoop by default specifies two resources, loaded in-order from the
- * classpath: <ol>
+ * <p>Unless explicitly turned off, Hadoop by default specifies two 
+ * resources, loaded in-order from the classpath: <ol>
  * <li><tt><a href="{@docRoot}/../hadoop-default.html">hadoop-default.xml</a>
  * </tt>: Read-only defaults for hadoop.</li>
  * <li><tt>hadoop-site.xml</tt>: Site-specific configuration for a given hadoop
@@ -153,13 +153,26 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
   
   /** A new configuration. */
   public Configuration() {
+    this(true);
+  }
+
+  /** A new configuration where the behavior of reading from the default 
+   * resources can be turned off.
+   * 
+   * If the parameter {@code loadDefaults} is false, the new instance
+   * will not load resources from the default files. 
+   * @param loadDefaults specifies whether to load from the default files
+   */
+  public Configuration(boolean loadDefaults) {
     if (LOG.isDebugEnabled()) {
       LOG.debug(StringUtils.stringifyException(new IOException("config()")));
     }
-    resources.add("hadoop-default.xml");
-    resources.add("hadoop-site.xml");
+    if (loadDefaults) {
+      resources.add("hadoop-default.xml");
+      resources.add("hadoop-site.xml");
+    }
   }
-
+  
   /** 
    * A new configuration with the same settings cloned from another.
    * 
@@ -189,7 +202,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
    *             with that name.
    */
   public void addResource(String name) {
-    addResource(resources, name);
+    addResourceObject(name);
   }
 
   /**
@@ -203,7 +216,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
    *            the classpath.
    */
   public void addResource(URL url) {
-    addResource(resources, url);
+    addResourceObject(url);
   }
 
   /**
@@ -217,17 +230,27 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
    *             the classpath.
    */
   public void addResource(Path file) {
-    addResource(resources, file);
+    addResourceObject(file);
   }
 
-  private synchronized void addResource(ArrayList<Object> resources,
-                                        Object resource) {
-    
-    resources.add(resource);                      // add to resources
+  /**
+   * Reload configuration from previously added resources.
+   *
+   * This method will clear all the configuration read from the added 
+   * resources, and final parameters. This will make the resources to 
+   * be read again before accessing the values. Values that are added
+   * via set methods will overlay values read from the resources.
+   */
+  public synchronized void reloadConfiguration() {
     properties = null;                            // trigger reload
     finalParameters.clear();                      // clear site-limits
   }
   
+  private synchronized void addResourceObject(Object resource) {
+    resources.add(resource);                      // add to resources
+    reloadConfiguration();
+  }
+  
   private static Pattern varPat = Pattern.compile("\\$\\{[^\\}\\$\u0020]+\\}");
   private static int MAX_SUBST = 20;
 

+ 447 - 0
src/mapred/org/apache/hadoop/mapred/ResourceManagerConf.java

@@ -0,0 +1,447 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Class providing access to resource manager configuration.
+ * 
+ * Resource manager configuration involves setting up queues, and defining
+ * various properties for the queues. These are typically read from a file 
+ * called resource-manager-conf.xml that must be in the classpath of the
+ * application. The class provides APIs to get/set and reload the 
+ * configuration for the queues.
+ */
+class ResourceManagerConf {
+  
+  /** Default file name from which the resource manager configuration is read. */ 
+  public static final String RM_CONF_FILE = "resource-manager-conf.xml";
+
+  /** Default value for guaranteed capacity of maps.
+   * The default value is set to <code>Integer.MAX_VALUE</code>, the idea
+   * being that the default is suitable for organizations that do not
+   * require setting up any queues. 
+   */ 
+  public static final int DEFAULT_GUARANTEED_CAPACITY_MAPS = 
+                                                    Integer.MAX_VALUE;
+
+  /** Default value for guaranteed capacity of reduces.
+   * The default value is set to <code>Integer.MAX_VALUE</code>, the idea
+   * being that the default is suitable for organizations that do not
+   * require setting up any queues. 
+   */ 
+  public static final int DEFAULT_GUARANTEED_CAPACITY_REDUCES = 
+                                                    Integer.MAX_VALUE;
+
+  /** Default value for reclaiming redistributed resources.
+   * The default value is set to <code>0</code>, the idea
+   * being that the default is suitable for organizations that do not
+   * require setting up any queues.
+   */ 
+  public static final int DEFAULT_RECLAIM_TIME_LIMIT = 0;
+  
+  /** Default value for minimum resource limit per user per queue, as a 
+   * percentage.
+   * The default value is set to <code>100</code>, the idea
+   * being that the default is suitable for organizations that do not
+   * require setting up any queues.
+   */ 
+  public static final int DEFAULT_MIN_USER_LIMIT_PERCENT = 100;
+
+  private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX 
+                                              = "hadoop.rm.queue.";
+
+  private Configuration rmConf;
+  private Set<String> queues;
+  
+  /**
+   * Create a new ResourceManagerConf.
+   * This method reads from the default configuration file mentioned in
+   * {@link RM_CONF_FILE}, that must be present in the classpath of the
+   * application.
+   */
+  public ResourceManagerConf() {
+    rmConf = new Configuration(false);
+    rmConf.addResource(RM_CONF_FILE);
+  }
+
+  /**
+   * Create a new ResourceManagerConf reading the specified configuration
+   * file.
+   * 
+   * @param configFile {@link Path} to the configuration file containing
+   * the resource manager configuration.
+   */
+  public ResourceManagerConf(Path configFile) {
+    rmConf = new Configuration(false);
+    rmConf.addResource(configFile);
+  }
+  
+  /**
+   * Return the set of configured queue names.
+   * @return set of configured queue names.
+   */
+  public synchronized Set<String> getQueues() {
+    if (queues == null) {
+      String[] vals = rmConf.getStrings("hadoop.rm.queue.names");
+      queues = new TreeSet<String>();
+      for (String val : vals) {
+        queues.add(val);
+      }
+    }
+    return queues;
+  }
+  
+  /**
+   * Define the set of queues known to this configuration.
+   * This will override the queue names that are read from the
+   * configuration file. 
+   * @param newQueues Set of queue names
+   */
+  public synchronized void setQueues(Set<String> newQueues) {
+    StringBuffer queueSb = new StringBuffer();
+    for (String queue : newQueues) {
+      queueSb.append(queue).append(',');
+    }
+    if (newQueues.size() > 0) {
+      String value = queueSb.substring(0, queueSb.length());
+      rmConf.set("hadoop.rm.queue.names", value);
+      queues = null;
+    }
+  }
+  
+  /**
+   * Get the guaranteed number of maps for the specified queue.
+   * 
+   * This method defaults to {@link #DEFAULT_GUARANTEED_CAPACITY_MAPS} if
+   * no value is specified in the configuration for this queue. If the queue
+   * name is unknown, this method throws a {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @return guaranteed number of maps for this queue.
+   */
+  public int getGuaranteedCapacityMaps(String queue) {
+    checkQueue(queue);
+    return rmConf.getInt(toFullPropertyName(queue, 
+                          "guaranteed-capacity-maps"), 
+                          DEFAULT_GUARANTEED_CAPACITY_MAPS);
+  }
+
+  /**
+   * Set the guaranteed number of maps for the specified queue
+   * @param queue Name of the queue
+   * @param value Guaranteed number of maps.
+   */
+  public void setGuaranteedCapacityMaps(String queue, int value) {
+    checkQueue(queue);
+    rmConf.setInt(toFullPropertyName(queue,"guaranteed-capacity-maps"), 
+                    value);
+  }
+  
+  /**
+   * Get the guaranteed number of reduces for the specified queue.
+   * 
+   * This method defaults to {@link #DEFAULT_GUARANTEED_CAPACITY_REDUCES} if
+   * no value is specified in the configuration for this queue. If the queue
+   * name is unknown, this method throws a {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @return guaranteed number of reduces for this queue.
+   */
+  public int getGuaranteedCapacityReduces(String queue) {
+    checkQueue(queue);
+    return rmConf.getInt(toFullPropertyName(queue,
+                          "guaranteed-capacity-reduces"), 
+                          DEFAULT_GUARANTEED_CAPACITY_REDUCES);
+  }
+
+  /**
+   * Set the guaranteed number of reduces for the specified queue
+   * @param queue Name of the queue
+   * @param value Guaranteed number of reduces.
+   */
+  public void setGuaranteedCapacityReduces(String queue, int value) {
+    checkQueue(queue);
+    rmConf.setInt(toFullPropertyName(queue,"guaranteed-capacity-reduces"), 
+                    value);
+  }
+  
+  /**
+   * Get the amount of time before which redistributed resources must be
+   * reclaimed for the specified queue.
+   * 
+   * The resource manager distributes spare capacity from a free queue
+   * to ones which are in need for more resources. However, if a job 
+   * submitted to the first queue requires back the resources, they must
+   * be reclaimed within the specified configuration time limit.
+   * 
+   * This method defaults to {@link #DEFAULT_RECLAIM_TIME_LIMIT} if
+   * no value is specified in the configuration for this queue. If the queue
+   * name is unknown, this method throws a {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @return reclaim time limit for this queue.
+   */
+  public int getReclaimTimeLimit(String queue) {
+    checkQueue(queue);
+    return rmConf.getInt(toFullPropertyName(queue, "reclaim-time-limit"), 
+                          DEFAULT_RECLAIM_TIME_LIMIT);
+  }
+  
+  /**
+   * Set the amount of time before which redistributed resources must be
+   * reclaimed for the specified queue.
+   * @param queue Name of the queue
+   * @param value Amount of time before which the redistributed resources
+   * must be retained.
+   */
+  public void setReclaimTimeLimit(String queue, int value) {
+    checkQueue(queue);
+    rmConf.setInt(toFullPropertyName(queue, "reclaim-time-limit"), value);
+  }
+  
+  /**
+   * Get the list of users who can submit jobs to this queue.
+   * 
+   * This method defaults to <code>null</code> if no value is specified 
+   * in the configuration for this queue. If the queue
+   * name is unknown, this method throws a {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @return list of users who can submit jobs to this queue.
+   */
+  public String[] getAllowedUsers(String queue) {
+    checkQueue(queue);
+    return rmConf.getStrings(toFullPropertyName(queue, "allowed-users"), 
+                              (String[])null);
+  }
+  
+  /**
+   * Set the list of users who can submit jobs to this queue.
+   * 
+   * If the queue name is unknown, this method throws a 
+   * {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @param values list of users allowed to submit jobs to this queue.
+   */
+  public void setAllowedUsers(String queue, String... values) {
+    checkQueue(queue);
+    rmConf.setStrings(toFullPropertyName(queue, "allowed-users"), values);
+  }
+  
+  /**
+   * Get the list of users who cannot submit jobs to this queue.
+   * 
+   * This method defaults to <code>null</code> if no value is specified 
+   * in the configuration for this queue. If the queue
+   * name is unknown, this method throws a {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @return list of users who cannot submit jobs to this queue.
+   */
+  public String[] getDeniedUsers(String queue) {
+    checkQueue(queue);
+    return rmConf.getStrings(toFullPropertyName(queue, "denied-users"), 
+                              (String[])null);
+  }
+
+  /**
+   * Set the list of users who cannot submit jobs to this queue.
+   * 
+   * If the queue name is unknown, this method throws a 
+   * {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @param values list of users denied to submit jobs to this queue.
+   */
+  public void setDeniedUsers(String queue, String... values) {
+    checkQueue(queue);
+    rmConf.setStrings(toFullPropertyName(queue, "denied-users"), values);
+  }
+
+  /**
+   * Get whether users present in allowed users list will override values
+   * in the denied user list for this queue.
+   * 
+   * If a user name is specified in both the allowed user list and the denied
+   * user list, this configuration determines whether to honor the allowed
+   * user list or the denied user list. This method defaults to 
+   * <code>false</code> if no value is specified in the configuration for 
+   * this queue. If the queue name is unknown, this method throws a 
+   * {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @return 
+   */
+  public boolean doAllowedUsersOverride(String queue) {
+    checkQueue(queue);
+    return rmConf.getBoolean(toFullPropertyName(queue, 
+                              "allowed-users-override"), false);
+  }
+
+  /**
+   * Set whether users present in allowed users list will override values
+   * in the denied user list for this queue.
+   * 
+   * If the queue name is unknown, this method throws a 
+   * {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @param value true if the allowed users take priority, false otherwise.
+   */
+  public void setAllowedUsersOverride(String queue, boolean value) {
+    checkQueue(queue);
+    rmConf.setBoolean(toFullPropertyName(queue, "allowed-users-override"),
+                        value);
+  }
+  
+  /**
+   * Get whether priority is supported for this queue.
+   * 
+   * If this value is false, then job priorities will be ignored in 
+   * scheduling decisions. This method defaults to <code>false</code> if 
+   * the property is not configured for this queue. If the queue name is 
+   * unknown, this method throws a {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @return Whether this queue supports priority or not.
+   */
+  public boolean isPrioritySupported(String queue) {
+    checkQueue(queue);
+    return rmConf.getBoolean(toFullPropertyName(queue, "supports-priority"),
+                              false);  
+  }
+  
+  /**
+   * Set whether priority is supported for this queue.
+   * 
+   * If the queue name is unknown, this method throws a 
+   * {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @param value true, if the queue must support priorities, false otherwise.
+   */
+  public void setPrioritySupported(String queue, boolean value) {
+    checkQueue(queue);
+    rmConf.setBoolean(toFullPropertyName(queue, "supports-priority"), value);
+  }
+  
+  /**
+   * Get the minimum limit of resources for any user submitting jobs in 
+   * this queue, in percentage.
+   * 
+   * This method defaults to {@link #DEFAULT_MIN_USER_LIMIT_PERCENT} if
+   * no value is specified in the configuration for this queue. If the queue
+   * name is unknown, this method throws a {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @return minimum limit of resources, in percentage, that will be 
+   * available for a user.
+   * 
+   */
+  public int getMinimumUserLimitPercent(String queue) {
+    checkQueue(queue);
+    return rmConf.getInt(toFullPropertyName(queue, 
+                          "minimum-user-limit-percent"), 
+                          DEFAULT_MIN_USER_LIMIT_PERCENT);
+  }
+  
+  /**
+   * Set the minimum limit of resources for any user submitting jobs in
+   * this queue, in percentage.
+   * 
+   * If the queue name is unknown, this method throws a 
+   * {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @param value minimum limit of resources for any user submitting jobs
+   * in this queue
+   */
+  public void setMinimumUserLimitPercent(String queue, int value) {
+    checkQueue(queue);
+    rmConf.setInt(toFullPropertyName(queue, "minimum-user-limit-percent"), 
+                    value);
+  }
+  
+  /**
+   * Reload configuration by clearing the information read from the 
+   * underlying configuration file.
+   */
+  public synchronized void reloadConfiguration() {
+    queues = null;
+    rmConf.reloadConfiguration();
+  }
+  
+  /**
+   * Print configuration read from the underlying configuration file.
+   * 
+   * @param writer {@link PrintWriter} to which the output must be written.
+   */
+  public void printConfiguration(PrintWriter writer) {
+    
+    Set<String> queueSet = getQueues();
+    if (queueSet == null) {
+      writer.println("No queues configured.");
+      return;
+    }
+    StringBuffer sb = new StringBuffer();
+    for (String queue : queueSet) {
+      sb.append(queue).append(",");
+    }
+    writer.println("hadoop.rm.queue.names: " + sb.substring(0, sb.length()-1));
+    
+    for (String queue : queueSet) {
+      writer.println(toFullPropertyName(queue, "guaranteed-capacity-maps") + 
+                      ": " + getGuaranteedCapacityMaps(queue));
+      writer.println(toFullPropertyName(queue, "guaranteed-capacity-reduces") + 
+                      ": " + getGuaranteedCapacityReduces(queue));
+      writer.println(toFullPropertyName(queue, "reclaim-time-limit") + 
+                      ": " + getReclaimTimeLimit(queue));
+      writer.println(toFullPropertyName(queue, "minimum-user-limit-percent") +
+                      ": " + getMinimumUserLimitPercent(queue));
+      writer.println(toFullPropertyName(queue, "supports-priority") + 
+                      ": " + isPrioritySupported(queue));
+      writer.println(toFullPropertyName(queue, "allowed-users-override") +
+                      ": " + doAllowedUsersOverride(queue));
+      printUserList(writer, queue, "allowed-users", getAllowedUsers(queue));
+      printUserList(writer, queue, "denied-users", getDeniedUsers(queue));
+    }
+  }
+
+  private void printUserList(PrintWriter writer, String queue, 
+                              String listType, String[] users) {
+    if (users == null) {
+      writer.println(toFullPropertyName(queue, listType) + ": No users configured.");
+    } else {
+      writer.println(toFullPropertyName(queue, listType) + ": ");
+      Arrays.sort(users);
+      for (String user : users) {
+        writer.println("\t" + user);
+      }
+    }
+  }
+  
+  private synchronized void checkQueue(String queue) {
+    if (queues == null) {
+      queues = getQueues();
+    }
+    if (!queues.contains(queue)) {
+      throw new IllegalArgumentException("Queue " + queue + " is undefined.");
+    }
+  }
+
+  private static final String toFullPropertyName(String queue, 
+                                                  String property) {
+      return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
+  }
+}

+ 42 - 1
src/test/org/apache/hadoop/conf/TestConfiguration.java

@@ -292,7 +292,48 @@ public class TestConfiguration extends TestCase {
     assertEquals(-20, conf.getInt("test.int3", 0));
     assertEquals(-20, conf.getLong("test.int3", 0));
   }
-	  
+	
+  public void testReload() throws IOException {
+    out=new BufferedWriter(new FileWriter(CONFIG));
+    startConfig();
+    appendProperty("test.key1", "final-value1", true);
+    appendProperty("test.key2", "value2");
+    endConfig();
+    Path fileResource = new Path(CONFIG);
+    conf.addResource(fileResource);
+    
+    out=new BufferedWriter(new FileWriter(CONFIG2));
+    startConfig();
+    appendProperty("test.key1", "value1");
+    appendProperty("test.key3", "value3");
+    endConfig();
+    Path fileResource1 = new Path(CONFIG2);
+    conf.addResource(fileResource1);
+    
+    // add a few values via set.
+    conf.set("test.key3", "value4");
+    conf.set("test.key4", "value5");
+    
+    assertEquals("final-value1", conf.get("test.key1"));
+    assertEquals("value2", conf.get("test.key2"));
+    assertEquals("value4", conf.get("test.key3"));
+    assertEquals("value5", conf.get("test.key4"));
+    
+    // change values in the test file...
+    out=new BufferedWriter(new FileWriter(CONFIG));
+    startConfig();
+    appendProperty("test.key1", "final-value1");
+    appendProperty("test.key3", "final-value3", true);
+    endConfig();
+    
+    conf.reloadConfiguration();
+    assertEquals("value1", conf.get("test.key1"));
+    // overlayed property overrides.
+    assertEquals("value4", conf.get("test.key3"));
+    assertEquals(null, conf.get("test.key2"));
+    assertEquals("value5", conf.get("test.key4"));
+  }
+  
   public static void main(String[] argv) throws Exception {
     junit.textui.TestRunner.main(new String[]{
       TestConfiguration.class.getName()

+ 447 - 0
src/test/org/apache/hadoop/mapred/TestResourceManagerConf.java

@@ -0,0 +1,447 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PipedReader;
+import java.io.PipedWriter;
+import java.io.PrintWriter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+
+public class TestResourceManagerConf extends TestCase {
+
+  private static final String TEST_CONF_FILE = 
+                new File(".", "test-conf.xml").getAbsolutePath();
+  private PrintWriter writer;
+  private Map<String, String> defaultProperties;
+  private ResourceManagerConf testConf;
+  
+  public TestResourceManagerConf() {
+    defaultProperties = setupQueueProperties(
+        new String[] { "guaranteed-capacity-maps", 
+                       "guaranteed-capacity-reduces",
+                       "reclaim-time-limit",
+                       "allowed-users",
+                       "denied-users",
+                       "allowed-users-override",
+                       "supports-priority",
+                       "minimum-user-limit-percent" }, 
+        new String[] { String.valueOf(Integer.MAX_VALUE), 
+                        String.valueOf(Integer.MAX_VALUE),
+                        "0", null, null, "false",
+                        "true", "100" }
+                      );
+  }
+  
+  public void setUp() throws IOException {
+    openFile();
+  }
+  
+  public void tearDown() throws IOException {
+    File testConfFile = new File(TEST_CONF_FILE);
+    if (testConfFile.exists()) {
+      testConfFile.delete();  
+    }
+  }
+  
+  public void testDefaults() {
+    testConf = new ResourceManagerConf();
+    Map<String, Map<String, String>> queueDetails
+                            = new HashMap<String, Map<String,String>>();
+    queueDetails.put("default", defaultProperties);
+    checkQueueProperties(testConf, queueDetails);
+  }
+  
+  public void testQueues() {
+
+    Map<String, String> q1Props = setupQueueProperties(
+        new String[] { "guaranteed-capacity-maps", 
+                       "guaranteed-capacity-reduces",
+                       "reclaim-time-limit",
+                       "allowed-users",
+                       "denied-users",
+                       "allowed-users-override",
+                       "supports-priority",
+                       "minimum-user-limit-percent" }, 
+        new String[] { "10", "5", "600", "u1,u2,u3", "u1,g1", "false",
+                        "true", "25" }
+                      );
+
+    Map<String, String> q2Props = setupQueueProperties(
+        new String[] { "guaranteed-capacity-maps", 
+                       "guaranteed-capacity-reduces",
+                       "reclaim-time-limit",
+                       "allowed-users",
+                       "denied-users",
+                       "allowed-users-override",
+                       "supports-priority",
+                       "minimum-user-limit-percent" }, 
+        new String[] { "100", "50", "6000", "u4,u5,u6", "u6,g2", "true",
+                        "false", "50" }
+                      );
+
+    startConfig();
+    writeQueueDetails("default", q1Props);
+    writeQueueDetails("research", q2Props);
+    writeQueues("default,research");
+    endConfig();
+
+    testConf = new ResourceManagerConf(new Path(TEST_CONF_FILE));
+
+    Map<String, Map<String, String>> queueDetails
+              = new HashMap<String, Map<String,String>>();
+    queueDetails.put("default", q1Props);
+    queueDetails.put("research", q2Props);
+    checkQueueProperties(testConf, queueDetails);
+  }
+  
+  public void testQueueWithDefaultProperties() {
+    Map<String, String> q1Props = setupQueueProperties(
+        new String[] { "guaranteed-capacity-maps", 
+                       "guaranteed-capacity-reduces",
+                       "allowed-users",
+                       "allowed-users-override",
+                       "minimum-user-limit-percent" }, 
+        new String[] { "20", "10", "u7,u8,u9", "false", "75" }
+                      );
+    startConfig();
+    writeQueueDetails("default", q1Props);
+    writeQueues("default");
+    endConfig();
+
+    testConf = new ResourceManagerConf(new Path(TEST_CONF_FILE));
+
+    Map<String, Map<String, String>> queueDetails
+              = new HashMap<String, Map<String,String>>();
+    Map<String, String> expProperties = new HashMap<String, String>();
+    for (String key : q1Props.keySet()) {
+      expProperties.put(key, q1Props.get(key));
+    }
+    expProperties.put("reclaim-time-limit", "0");
+    expProperties.put("denied-users", null);
+    expProperties.put("supports-priority", "false");
+    queueDetails.put("default", expProperties);
+    checkQueueProperties(testConf, queueDetails);
+  }
+
+  public void testInvalidQueue() {
+    try {
+      testConf = new ResourceManagerConf();
+      testConf.getGuaranteedCapacityMaps("invalid");
+      fail("Should not return value for invalid queue");
+    } catch (IllegalArgumentException iae) {
+      assertEquals("Queue invalid is undefined.", iae.getMessage());
+    }
+  }
+
+  public void testQueueAddition() {
+    testConf = new ResourceManagerConf();
+    Set<String> queues = new HashSet<String>();
+    queues.add("default");
+    queues.add("newqueue");
+    testConf.setQueues(queues);
+    
+    testConf.setGuaranteedCapacityMaps("newqueue", 1);
+    testConf.setGuaranteedCapacityReduces("newqueue", 1);
+    testConf.setReclaimTimeLimit("newqueue", 30);
+    testConf.setMinimumUserLimitPercent("newqueue", 40);
+    testConf.setAllowedUsers("newqueue", "a", "b", "c");
+    testConf.setDeniedUsers("newqueue", "d", "e", "f");
+    testConf.setAllowedUsersOverride("newqueue", true);
+    testConf.setPrioritySupported("newqueue", false);
+    
+    Map<String, String> q1Props = setupQueueProperties(
+        new String[] { "guaranteed-capacity-maps", 
+                       "guaranteed-capacity-reduces",
+                       "reclaim-time-limit",
+                       "allowed-users",
+                       "denied-users",
+                       "allowed-users-override",
+                       "supports-priority",
+                       "minimum-user-limit-percent" }, 
+        new String[] { "1", "1", "30", "a,b,c", "d,e,f", "true",
+                        "false", "40" }
+                      );
+    
+    Map<String, Map<String, String>> queueDetails = 
+                  new HashMap<String, Map<String, String>>();
+    queueDetails.put("default", defaultProperties);
+    queueDetails.put("newqueue", q1Props);
+    checkQueueProperties(testConf, queueDetails);
+  }
+  
+  public void testReload() throws IOException {
+    // use the setup in the test case testQueues as a base...
+    testQueues();
+    
+    // write new values to the file...
+    Map<String, String> q1Props = setupQueueProperties(
+        new String[] { "guaranteed-capacity-maps", 
+                       "guaranteed-capacity-reduces",
+                       "reclaim-time-limit",
+                       "allowed-users",
+                       "denied-users",
+                       "allowed-users-override",
+                       "supports-priority",
+                       "minimum-user-limit-percent" }, 
+        new String[] { "20", "5", "600", "u1,u2,u3,u4", "u1,g1", "false",
+                        "true", "40" }
+                      );
+
+    Map<String, String> q2Props = setupQueueProperties(
+        new String[] { "guaranteed-capacity-maps", 
+                       "guaranteed-capacity-reduces",
+                       "reclaim-time-limit",
+                       "allowed-users",
+                       "denied-users",
+                       "allowed-users-override",
+                       "supports-priority",
+                       "minimum-user-limit-percent" }, 
+        new String[] { "1000", "500", "3000", "u4,u6", "g2", "false",
+                        "false", "50" }
+                      );
+
+    openFile();
+    startConfig();
+    writeQueueDetails("default", q1Props);
+    writeQueueDetails("production", q2Props);
+    writeQueues("default,production");
+    endConfig();
+    
+    testConf.reloadConfiguration();
+    
+    Map<String, Map<String, String>> queueDetails 
+                      = new HashMap<String, Map<String, String>>();
+    queueDetails.put("default", q1Props);
+    queueDetails.put("production", q2Props);
+    checkQueueProperties(testConf, queueDetails);
+  }
+
+  public void testPrint() throws IOException, InterruptedException {
+    
+    Map<String, String> q1Props = setupQueueProperties(
+        new String[] { "guaranteed-capacity-maps", 
+                       "guaranteed-capacity-reduces",
+                       "reclaim-time-limit",
+                       "allowed-users",
+                       "denied-users",
+                       "allowed-users-override",
+                       "supports-priority",
+                       "minimum-user-limit-percent" }, 
+        new String[] { "20", "5", "600", "u2,a1,c3,b4", "u1,g1", "false",
+                        "true", "40" }
+                      );
+
+    Map<String, String> q2Props = setupQueueProperties(
+        new String[] { "guaranteed-capacity-maps", 
+                       "guaranteed-capacity-reduces",
+                       "reclaim-time-limit",
+                       "allowed-users",
+                       "denied-users",
+                       "allowed-users-override",
+                       "supports-priority",
+                       "minimum-user-limit-percent" }, 
+        new String[] { "1000", "500", "3000", "e5,d6", "g2", "false",
+                        "false", "50" }
+                      );
+    
+    startConfig();
+    writeQueueDetails("default", q1Props);
+    writeQueueDetails("research", q2Props);
+    writeQueues("default,research");
+    endConfig();
+
+    testConf = new ResourceManagerConf(new Path(TEST_CONF_FILE));
+    
+    PipedWriter pw = new PipedWriter();
+    PrintWriter writer = new PrintWriter(pw);
+    PipedReader pr = new PipedReader(pw);
+    BufferedReader br = new BufferedReader(pr);
+    
+    PrintReaderThread prThread = new PrintReaderThread(br);
+    prThread.start();
+
+    testConf.printConfiguration(writer);
+    writer.close();
+    
+    prThread.join();
+    ArrayList<String> output = prThread.getOutput();
+    
+    ListIterator iter = output.listIterator();
+    
+    assertEquals("hadoop.rm.queue.names: default,research", iter.next());
+    checkPrintOutput("default", q1Props, iter,
+                      new String[] {"a1", "b4", "c3", "u2"},
+                      new String[] {"g1", "u1"});
+    checkPrintOutput("research", q2Props, iter,
+                      new String[] {"d6", "e5" },
+                      new String[] {"g2"});
+  }
+  
+  private void checkPrintOutput(String queue, 
+                                Map<String, String> queueProps,
+                                ListIterator iter,
+                                String[] allowedUsers,
+                                String[] deniedUsers) {
+    
+    String[] propsOrder = {"guaranteed-capacity-maps", 
+                             "guaranteed-capacity-reduces",
+                             "reclaim-time-limit",
+                             "minimum-user-limit-percent",
+                             "supports-priority",
+                             "allowed-users-override"};
+    
+    for (String prop : propsOrder) {
+      assertEquals(("hadoop.rm.queue." + queue + "." + prop + ": " 
+                    + queueProps.get(prop)), iter.next());
+    }
+    
+    checkUserList(queue, iter, allowedUsers, "allowed-users");
+    checkUserList(queue, iter, deniedUsers, "denied-users");
+  }
+
+  private void checkUserList(String queue, 
+                              ListIterator iter, 
+                              String[] userList,
+                              String listType) {
+    assertEquals("hadoop.rm.queue." + queue + "." + listType + ": ",
+                    iter.next());
+    for (String user : userList) {
+      assertEquals("\t" + user, iter.next());
+    }
+  }
+
+  private void checkQueueProperties(
+                        ResourceManagerConf testConf,
+                        Map<String, Map<String, String>> queueDetails) {
+    Set<String> queues = testConf.getQueues();
+    assertEquals(queueDetails.keySet().size(), queues.size());
+    for (String name : queueDetails.keySet()) {
+      assertTrue(queues.contains(name));  
+    }
+    
+    for (String queueName : queueDetails.keySet()) {
+      Map<String, String> map = queueDetails.get(queueName);
+      assertEquals(Integer.parseInt(map.get("guaranteed-capacity-maps")),
+          testConf.getGuaranteedCapacityMaps(queueName));
+      assertEquals(Integer.parseInt(map.get("guaranteed-capacity-reduces")),
+          testConf.getGuaranteedCapacityReduces(queueName));
+      assertEquals(Integer.parseInt(map.get("minimum-user-limit-percent")),
+          testConf.getMinimumUserLimitPercent(queueName));
+      assertEquals(Integer.parseInt(map.get("reclaim-time-limit")),
+          testConf.getReclaimTimeLimit(queueName));
+      assertEquals(Boolean.parseBoolean(map.get("supports-priority")),
+          testConf.isPrioritySupported(queueName));
+      String[] expAllowedUsers = StringUtils.getStrings(
+                                    map.get("allowed-users"));
+      String[] allowedUsers = testConf.getAllowedUsers(queueName);
+      assertTrue(Arrays.equals(expAllowedUsers, allowedUsers));
+      String[] expDeniedUsers = StringUtils.getStrings(
+                                    map.get("denied-users"));
+      String[] deniedUsers = testConf.getDeniedUsers(queueName);
+      assertTrue(Arrays.equals(expDeniedUsers, deniedUsers));
+      assertEquals(Boolean.parseBoolean(map.get("allowed-users-override")),
+          testConf.doAllowedUsersOverride(queueName));
+    }
+  }
+  
+  private Map<String, String> setupQueueProperties(String[] keys, 
+                                                String[] values) {
+    HashMap<String, String> map = new HashMap<String, String>();
+    for(int i=0; i<keys.length; i++) {
+      map.put(keys[i], values[i]);
+    }
+    return map;
+  }
+
+  private void openFile() throws IOException {
+    FileWriter fw = new FileWriter(TEST_CONF_FILE);
+    BufferedWriter bw = new BufferedWriter(fw);
+    writer = new PrintWriter(bw);
+  }
+  
+  private void startConfig() {
+    writer.println("<?xml version=\"1.0\"?>");
+    writer.println("<configuration>");
+  }
+  
+  private void writeQueues(String queueList) {
+    writer.println("<property>");
+    writer.println("<name>hadoop.rm.queue.names</name>");
+    writer.println("<value>"+queueList+"</value>");
+    writer.println("</property>");
+  }
+  
+  private void writeQueueDetails(String queue, Map<String, String> props) {
+    for (String key : props.keySet()) {
+      writer.println("<property>");
+      writer.println("<name>hadoop.rm.queue." + queue + "." + key +
+                    "</name>");
+      writer.println("<value>"+props.get(key)+"</value>");
+      writer.println("</property>");
+    }
+  }
+  
+  private void endConfig() {
+    writer.println("</configuration>");
+    writer.close();
+  }
+  
+  class PrintReaderThread extends Thread {
+    
+    private BufferedReader reader;
+    private ArrayList<String> lines;
+    
+    public PrintReaderThread(BufferedReader reader) {
+      this.reader = reader;
+      lines = new ArrayList<String>();
+    }
+    
+    public void run() {
+      try {
+        String line = reader.readLine();
+        while (line != null) {
+          lines.add(line);
+          line = reader.readLine();
+        }
+      } catch (IOException ioe) {
+        lines.clear();
+      }
+    }
+    
+    public ArrayList<String> getOutput() {
+      return lines;
+    }
+  }
+}