Browse Source

HADOOP-3445. Add capacity scheduler that provides guaranteed capacities to
queues as a percentage of the cluster. (Vivek Ratan via omalley)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@694415 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 16 years ago
parent
commit
49b6353927

+ 50 - 0
conf/capacity-scheduler.xml.template

@@ -0,0 +1,50 @@
+<?xml version="1.0"?>
+
+<!-- This is the configuration file for the resource manager in Hadoop. -->
+<!-- You can configure various scheduling parameters related to queues. -->
+<!-- The properties for a queue follow a naming convention,such as, -->
+<!-- mapred.capacity-scheduler.queue.<queue-name>.property-name. -->
+
+<configuration>
+
+  <property>
+    <name>mapred.capacity-scheduler.queue.default.guaranteed-capacity</name>
+    <value>100</value>
+    <description>Percentage of the number of slots in the cluster that are
+      guaranteed to be available for jobs in this queue.
+    </description>    
+  </property>
+  
+  <property>
+    <name>mapred.capacity-scheduler.queue.default.reclaim-time-limit</name>
+    <value>300</value>
+    <description>The amount of time, in seconds, before which 
+      resources distributed to other queues will be reclaimed.
+    </description>
+  </property>
+
+  <property>
+    <name>mapred.capacity-scheduler.queue.default.supports-priority</name>
+    <value>false</value>
+    <description>If true, priorities of jobs will be taken into 
+      account in scheduling decisions.
+    </description>
+  </property>
+
+  <property>
+    <name>mapred.capacity-scheduler.queue.default.minimum-user-limit-percent</name>
+    <value>100</value>
+    <description> Each queue enforces a limit on the percentage of resources 
+    allocated to a user at any given time, if there is competition for them. 
+    This user limit can vary between a minimum and maximum value. The former
+    depends on the number of users who have submitted jobs, and the latter is
+    set to this property value. For example, suppose the value of this 
+    property is 25. If two users have submitted jobs to a queue, no single 
+    user can use more than 50% of the queue resources. If a third user submits
+    a job, no single user can use more than 33% of the queue resources. With 4 
+    or more users, no user can use more than 25% of the queue's resources. A 
+    value of 100 implies no user limits are imposed. 
+    </description>
+  </property>
+  
+</configuration>

+ 0 - 86
conf/resource-manager-conf.xml

@@ -1,86 +0,0 @@
-<?xml version="1.0"?>
-
-<!-- This is the configuration file for the resource manager in Hadoop. -->
-<!-- You can configure various parameters related to scheduling, such as -->
-<!-- queues and queue properties here. -->
-<!-- The properties for a queue follow a naming convention,such as, -->
-<!-- hadoop.rm.queue.queue-name.property-name. -->
-
-<configuration>
-
-  <property>
-    <name>hadoop.rm.queue.names</name>
-    <value>default</value>
-    <description>Comma separated list of queue names that are configured
-      for this installation. Properties for a queue in this list will be 
-      listed as hadoop.rm.queue.queue-name.property-name.</description>
-  </property>
-  
-  <property>
-    <name>hadoop.rm.queue.default.guaranteed-capacity-maps</name>
-    <value></value>
-    <description>Guaranteed number of maps that will be available 
-      for jobs in this queue.
-    </description>    
-  </property>
-  
-  <property>
-    <name>hadoop.rm.queue.default.guaranteed-capacity-reduces</name>
-    <value></value>
-    <description>Guaranteed number of reduces that will be available 
-      for jobs in this queue.
-    </description>
-  </property>
-
-  <property>
-    <name>hadoop.rm.queue.default.reclaim-time-limit</name>
-    <value>0</value>
-    <description>The amount of time in seconds before which 
-      resources distributed to other queues will be reclaimed.
-    </description>
-  </property>
-
-  <property>
-    <name>hadoop.rm.queue.default.allowed-users</name>
-    <value></value>
-    <description>Comma separated list of users who can submit jobs 
-      to this queue. If empty, it means there are no ACLs 
-      configured.
-    </description>
-  </property>
-
-  <property>
-    <name>hadoop.rm.queue.default.denied-users</name>
-    <value></value>
-    <description>Comma separated list of users who cannot submit 
-      jobs to this queue. If empty, it means there are no ACLs 
-      configured.
-    </description>
-  </property>
-
-  <property>
-    <name>hadoop.rm.queue.default.allowed-users-override</name>
-    <value>false</value>
-    <description>If true, specifies that users in the allowed 
-      list will be able to submit, even if there names are in 
-      the denied list.
-    </description>
-  </property>
-
-  <property>
-    <name>hadoop.rm.queue.default.supports-priority</name>
-    <value>true</value>
-    <description>If true, priorities of jobs will be taken into 
-      account in scheduling decisions.
-    </description>
-  </property>
-
-  <property>
-    <name>hadoop.rm.queue.default.minimum-user-limit-percent</name>
-    <value>100</value>
-    <description>The minimum percentage of the cluster which can 
-      be utilized by a single user.
-    </description>
-  </property>
-  
-</configuration>

+ 1 - 0
src/contrib/build.xml

@@ -48,6 +48,7 @@
     <subant target="test">
       <fileset dir="." includes="streaming/build.xml"/>
       <fileset dir="." includes="fairscheduler/build.xml"/>
+      <fileset dir="." includes="capacity-scheduler/build.xml"/>
     </subant>
   </target>
   

+ 135 - 0
src/contrib/capacity-scheduler/README

@@ -0,0 +1,135 @@
+# Copyright 2008 The Apache Software Foundation Licensed under the
+# Apache License, Version 2.0 (the "License"); you may not use this
+# file except in compliance with the License.  You may obtain a copy
+# of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
+# required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.  See the License for the specific language governing
+# permissions and limitations under the License.
+
+This package implements a scheduler for Map-Reduce jobs, called Capacity
+Task Scheduler (or just Capacity Scheduler), which provides a way to share 
+large clusters. The scheduler provides the following features (which are 
+described in detail in HADOOP-3421):
+
+* Support for queues, where a job is submitted to a queue. 
+* Queues are guaranteed a fraction of the capacity of the grid (their 
+ 'guaranteed capacity') in the sense that a certain capacity of resources 
+ will be at their disposal. All jobs submitted to the queues of an Org will 
+ have access to the capacity guaranteed to the Org.
+* Free resources can be allocated to any queue beyond its guaranteed capacity.
+ These excess allocated resources can be reclaimed and made available to 
+ another queue in order to meet its capacity guarantee.
+* The scheduler guarantees that excess resources taken from a queue will be 
+ restored to it within N minutes of its need for them.
+* Queues optionally support job priorities (disabled by default). 
+* Within a queue, jobs with higher priority will have access to the queue's 
+ resources before jobs with lower priority. However, once a job is running, it
+ will not be preempted for a higher priority job.
+* In order to prevent one or more users from monopolizing its resources, each 
+ queue enforces a limit on the percentage of resources allocated to a user at
+ any given time, if there is competition for them.
+* Support for memory-intensive jobs, wherein a job can optionally specify 
+ higher memory-requirements than the default, and the tasks of the job will 
+ only be run on TaskTrackers that have enough memory to spare. 
+
+Whenever a TaskTracker is free, the Capacity Scheduler first picks a queue 
+that needs to reclaim any resources the earliest. If no such queue is found, 
+it then picks a queue which has most free space (whose ratio of # of running
+slots to guaranteed capacity is the lowest). 
+
+--------------------------------------------------------------------------------
+
+BUILDING:
+
+In HADOOP_HOME, run ant package to build Hadoop and its contrib packages.
+
+--------------------------------------------------------------------------------
+
+INSTALLING:
+
+To run the capacity scheduler in your Hadoop installation, you need to put it 
+on the CLASSPATH. The easiest way is to copy the 
+hadoop-*-capacity-scheduler.jar from 
+HADOOP_HOME/build/contrib/capacity-scheduler to HADOOP_HOME/lib. Alternatively
+you can modify HADOOP_CLASSPATH to include this jar, in conf/hadoop-env.sh.
+
+You will also need to set the following property in the Hadoop config file
+(conf/hadoop-site.xml) to have Hadoop use the capacity scheduler:
+
+<property>
+  <name>mapred.jobtracker.taskScheduler</name>
+  <value>org.apache.hadoop.mapred.CapacityTaskScheduler</value>
+</property>
+
+--------------------------------------------------------------------------------
+
+CONFIGURATION:
+
+The following properties can be set in hadoop-site.xml to configure the
+scheduler:
+
+mapred.capacity-scheduler.reclaimCapacity.interval: 
+		The capacity scheduler checks, every 'interval' seconds, whether any 
+		capacity needs to be reclaimed. The default value is 5 seconds. 
+
+The scheduling information for queues is maintained in a configuration file
+called 'capacity-scheduler.xml'. Note that the queue names are set in 
+hadoop-site.xml. capacity-scheduler.xml sets the scheduling properties
+for each queue. See that file for configuration details, but the following 
+are the configuration options for each queue: 
+
+mapred.capacity-scheduler.queue.<queue-name>.guaranteed-capacity
+		Percentage of the number of slots in the cluster that are
+    guaranteed to be available for jobs in this queue.
+    The sum of guaranteed capacities for all queues should be less than or
+    equal 100. 
+
+mapred.capacity-scheduler.queue.<queue-name>.reclaim-time-limit
+		The amount of time, in seconds, before which resources distributed to other 
+		queues will be reclaimed.
+
+mapred.capacity-scheduler.queue.<queue-name>.supports-priority
+		If true, priorities of jobs will be taken into account in scheduling 
+		decisions.
+		
+mapred.capacity-scheduler.queue.<queue-name>.minimum-user-limit-percent
+		Each queue enforces a limit on the percentage of resources 
+    allocated to a user at any given time, if there is competition for them. 
+    This user limit can vary between a minimum and maximum value. The former
+    depends on the number of users who have submitted jobs, and the latter is
+    set to this property value. For example, suppose the value of this 
+    property is 25. If two users have submitted jobs to a queue, no single 
+    user can use more than 50% of the queue resources. If a third user submits
+    a job, no single user can use more than 33% of the queue resources. With 4 
+    or more users, no user can use more than 25% of the queue's resources. A 
+    value of 100 implies no user limits are imposed.
+		
+
+--------------------------------------------------------------------------------
+
+IMPLEMENTATION:
+
+When a TaskTracker is free, the capacity scheduler does the following (note
+that many of these steps can be, and will be, enhanced over time to provide
+better algorithms): 
+1. Decide whether to giev it a Map or Reduce task, depending on how many tasks
+the TT is already running of that type, with respect to the maximum taks it 
+can run.
+2. The scheduler then picks a queue. Queues that need to reclaim capacity 
+sooner, come before queues that don't. For queues that don't, they're ordered 
+by a ratio of (# of running tasks)/Guaranteed capacity, which indicates how 
+much 'free space' the queue has, or how much it is over capacity.
+3. A job is picked in the queue based on its state (running jobs are picked
+first), its priority (if the queue supports priorities) or its submission 
+time, and whether the job's user is under or over limit. 
+4. A task is picked from the job in the same way it always has. 
+
+Periodically, a thread checks each queue to see if it needs to reclaim any 
+capacity. Queues that are running below capacity and that have tasks waiting,
+need to reclaim capacity within a certain perdiod of time. If a queue hasn't 
+received enough tasks in a certain amount of time, tasks will be killed from 
+queues that are running over capacity.
+
+--------------------------------------------------------------------------------

+ 28 - 0
src/contrib/capacity-scheduler/build.xml

@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<!-- 
+Before you can run these subtargets directly, you need 
+to call at top-level: ant deploy-contrib compile-core-test
+-->
+<project name="capacity-scheduler" default="jar">
+
+  <import file="../build-contrib.xml"/>
+
+</project>

+ 222 - 0
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java

@@ -0,0 +1,222 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Class providing access to resource manager configuration.
+ * 
+ * Resource manager configuration involves setting up queues, and defining
+ * various properties for the queues. These are typically read from a file 
+ * called resource-manager-conf.xml that must be in the classpath of the
+ * application. The class provides APIs to get/set and reload the 
+ * configuration for the queues.
+ */
+class CapacitySchedulerConf {
+  
+  /** Default file name from which the resource manager configuration is read. */ 
+  public static final String SCHEDULER_CONF_FILE = "capacity-scheduler.xml";
+
+  /** Default value for guaranteed capacity of maps (as percentage).
+   * The default value is set to 100, to represent the entire queue. 
+   */ 
+  public static final float DEFAULT_GUARANTEED_CAPACITY = 100;
+
+  /** Default value for reclaiming redistributed resources.
+   * The default value is set to <code>300</code>. 
+   */ 
+  public static final int DEFAULT_RECLAIM_TIME_LIMIT = 300;
+  
+  /** Default value for minimum resource limit per user per queue, as a 
+   * percentage.
+   * The default value is set to <code>100</code>, the idea
+   * being that the default is suitable for organizations that do not
+   * require setting up any queues.
+   */ 
+  public static final int DEFAULT_MIN_USER_LIMIT_PERCENT = 100;
+
+  private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX = 
+    "mapred.capacity-scheduler.queue.";
+
+  private Configuration rmConf;
+  
+  /**
+   * Create a new ResourceManagerConf.
+   * This method reads from the default configuration file mentioned in
+   * {@link RM_CONF_FILE}, that must be present in the classpath of the
+   * application.
+   */
+  public CapacitySchedulerConf() {
+    rmConf = new Configuration(false);
+    rmConf.addResource(SCHEDULER_CONF_FILE);
+  }
+
+  /**
+   * Create a new ResourceManagerConf reading the specified configuration
+   * file.
+   * 
+   * @param configFile {@link Path} to the configuration file containing
+   * the resource manager configuration.
+   */
+  public CapacitySchedulerConf(Path configFile) {
+    rmConf = new Configuration(false);
+    rmConf.addResource(configFile);
+  }
+  
+  /**
+   * Get the guaranteed percentage of the cluster for the specified queue.
+   * 
+   * This method defaults to {@link #DEFAULT_GUARANTEED_CAPACITY} if
+   * no value is specified in the configuration for this queue. If the queue
+   * name is unknown, this method throws a {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @return guaranteed percent of the cluster for the queue.
+   */
+  public float getGuaranteedCapacity(String queue) {
+    checkQueue(queue);
+    float result = rmConf.getFloat(toFullPropertyName(queue, 
+                                   "guaranteed-capacity"), 
+                                   DEFAULT_GUARANTEED_CAPACITY);
+    if (result < 0.0 || result > 100.0) {
+      throw new IllegalArgumentException("Illegal capacity for queue " + queue +
+                                         " of " + result);
+    }
+    return result;
+  }
+  
+  /**
+   * Get the amount of time before which redistributed resources must be
+   * reclaimed for the specified queue.
+   * 
+   * The resource manager distributes spare capacity from a free queue
+   * to ones which are in need for more resources. However, if a job 
+   * submitted to the first queue requires back the resources, they must
+   * be reclaimed within the specified configuration time limit.
+   * 
+   * This method defaults to {@link #DEFAULT_RECLAIM_TIME_LIMIT} if
+   * no value is specified in the configuration for this queue. If the queue
+   * name is unknown, this method throws a {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @return reclaim time limit for this queue.
+   */
+  public int getReclaimTimeLimit(String queue) {
+    checkQueue(queue);
+    return rmConf.getInt(toFullPropertyName(queue, "reclaim-time-limit"), 
+                          DEFAULT_RECLAIM_TIME_LIMIT);
+  }
+  
+  /**
+   * Set the amount of time before which redistributed resources must be
+   * reclaimed for the specified queue.
+   * @param queue Name of the queue
+   * @param value Amount of time before which the redistributed resources
+   * must be retained.
+   */
+  public void setReclaimTimeLimit(String queue, int value) {
+    checkQueue(queue);
+    rmConf.setInt(toFullPropertyName(queue, "reclaim-time-limit"), value);
+  }
+  
+  /**
+   * Get whether priority is supported for this queue.
+   * 
+   * If this value is false, then job priorities will be ignored in 
+   * scheduling decisions. This method defaults to <code>false</code> if 
+   * the property is not configured for this queue. If the queue name is 
+   * unknown, this method throws a {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @return Whether this queue supports priority or not.
+   */
+  public boolean isPrioritySupported(String queue) {
+    checkQueue(queue);
+    return rmConf.getBoolean(toFullPropertyName(queue, "supports-priority"),
+                              false);  
+  }
+  
+  /**
+   * Set whether priority is supported for this queue.
+   * 
+   * If the queue name is unknown, this method throws a 
+   * {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @param value true, if the queue must support priorities, false otherwise.
+   */
+  public void setPrioritySupported(String queue, boolean value) {
+    checkQueue(queue);
+    rmConf.setBoolean(toFullPropertyName(queue, "supports-priority"), value);
+  }
+  
+  /**
+   * Get the minimum limit of resources for any user submitting jobs in 
+   * this queue, in percentage.
+   * 
+   * This method defaults to {@link #DEFAULT_MIN_USER_LIMIT_PERCENT} if
+   * no value is specified in the configuration for this queue. If the queue
+   * name is unknown, this method throws a {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @return minimum limit of resources, in percentage, that will be 
+   * available for a user.
+   * 
+   */
+  public int getMinimumUserLimitPercent(String queue) {
+    checkQueue(queue);
+    return rmConf.getInt(toFullPropertyName(queue, 
+                          "minimum-user-limit-percent"), 
+                          DEFAULT_MIN_USER_LIMIT_PERCENT);
+  }
+  
+  /**
+   * Set the minimum limit of resources for any user submitting jobs in
+   * this queue, in percentage.
+   * 
+   * If the queue name is unknown, this method throws a 
+   * {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @param value minimum limit of resources for any user submitting jobs
+   * in this queue
+   */
+  public void setMinimumUserLimitPercent(String queue, int value) {
+    checkQueue(queue);
+    rmConf.setInt(toFullPropertyName(queue, "minimum-user-limit-percent"), 
+                    value);
+  }
+  
+  /**
+   * Reload configuration by clearing the information read from the 
+   * underlying configuration file.
+   */
+  public synchronized void reloadConfiguration() {
+    rmConf.reloadConfiguration();
+  }
+  
+  private synchronized void checkQueue(String queue) {
+    /*if (queues == null) {
+      queues = getQueues();
+    }
+    if (!queues.contains(queue)) {
+      throw new IllegalArgumentException("Queue " + queue + " is undefined.");
+    }*/
+  }
+
+  private static final String toFullPropertyName(String queue, 
+                                                  String property) {
+      return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
+  }
+}

+ 1068 - 0
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

@@ -0,0 +1,1068 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A {@link TaskScheduler} that implements the requirements in HADOOP-3421
+ * and provides a HOD-less way to share large clusters. This scheduler 
+ * provides the following features: 
+ *  * support for queues, where a job is submitted to a queue. 
+ *  * Queues are guaranteed a fraction of the capacity of the grid (their 
+ *  'guaranteed capacity') in the sense that a certain capacity of resources 
+ *  will be at their disposal. All jobs submitted to the queues of an Org 
+ *  will have access to the capacity guaranteed to the Org.
+ *  * Free resources can be allocated to any queue beyond its guaranteed 
+ *  capacity. These excess allocated resources can be reclaimed and made 
+ *  available to another queue in order to meet its capacity guarantee.
+ *  * The scheduler guarantees that excess resources taken from a queue will 
+ *  be restored to it within N minutes of its need for them.
+ *  * Queues optionally support job priorities (disabled by default). 
+ *  * Within a queue, jobs with higher priority will have access to the 
+ *  queue's resources before jobs with lower priority. However, once a job 
+ *  is running, it will not be preempted for a higher priority job.
+ *  * In order to prevent one or more users from monopolizing its resources, 
+ *  each queue enforces a limit on the percentage of resources allocated to a 
+ *  user at any given time, if there is competition for them.
+ *  
+ */
+class CapacityTaskScheduler extends TaskScheduler {
+  
+  /** 
+   * For keeping track of reclaimed capacity. 
+   * Whenever slots need to be reclaimed, we create one of these objects. 
+   * As the queue gets slots, the amount to reclaim gets decremented. if 
+   * we haven't reclaimed enough within a certain time, we need to kill 
+   * tasks. This object 'expires' either if all resources are reclaimed
+   * before the deadline, or the deadline passes . 
+   */
+  private static class ReclaimedResource {
+    // how much resource to reclaim
+    public int originalAmount;
+    // how much is to be reclaimed currently
+    public int currentAmount;
+    // the time, in millisecs, when this object expires. 
+    // This time is equal to the time when the object was created, plus
+    // the reclaim-time SLA for the queue.  
+    public long whenToExpire;
+    // we also keep track of when to kill tasks, im millisecs. This is a 
+    // fraction of 'whenToExpire', but we store it here so we don't 
+    // recompute it every time. 
+    public long whenToKill;
+    // whether tasks have been killed for this resource
+    boolean tasksKilled;
+    
+    public ReclaimedResource(int amount, long expiryTime, 
+        long whenToKill) {
+      this.originalAmount = amount;
+      this.currentAmount = amount;
+      this.whenToExpire = expiryTime;
+      this.whenToKill = whenToKill;
+      this.tasksKilled = false;
+    }
+  }
+
+  /** 
+   * This class keeps track of scheduling info for each queue for either 
+   * Map or Reduce tasks. . 
+   * This scheduling information is used by the JT to decide how to allocate
+   * tasks, redistribute capacity, etc. 
+   */
+  private static class QueueSchedulingInfo {
+    String queueName;
+
+    /** guaranteed capacity(%) is set at config time */ 
+    float guaranteedCapacityPercent = 0;
+    /** 
+     * the actual gc, which depends on how many slots are available
+     * in the cluster at any given time. 
+     */
+    int guaranteedCapacity = 0;
+    
+    /** 
+     * we also keep track of how many tasks are running for all jobs in 
+     * the queue, and how many overall tasks there are. This info is 
+     * available for each job, but keeping a sum makes our algos faster.
+     */  
+    // number of running tasks
+    int numRunningTasks = 0;
+    // number of pending tasks
+    int numPendingTasks = 0;
+    
+    /** 
+     * to handle user limits, we need to know how many users have jobs in 
+     * the queue.
+     */  
+    Map<String, Integer> numJobsByUser = new HashMap<String, Integer>();
+    /** for each user, we need to keep track of number of running tasks */
+    Map<String, Integer> numRunningTasksByUser = 
+      new HashMap<String, Integer>();
+      
+    /** min value of user limit (same for all users) */
+    int ulMin;
+    
+    /**
+     * We need to keep track of resources to reclaim. 
+     * Whenever a queue is under capacity and has tasks pending, we offer it 
+     * an SLA that gives it free slots equal to or greater than the gap in 
+     * its capacity, within a period of time (reclaimTime). 
+     * To do this, we periodically check if queues need to reclaim capacity. 
+     * If they do, we create a ResourceReclaim object. We also periodically
+     * check if a queue has received enough free slots within, say, 80% of 
+     * its reclaimTime. If not, we kill enough tasks to make up the 
+     * difference. 
+     * We keep two queues of ResourceReclaim objects. when an object is 
+     * created, it is placed in one queue. Once we kill tasks to recover 
+     * resources for that object, it is placed in an expiry queue. we need
+     * to do this to prevent creating spurious ResourceReclaim objects. We 
+     * keep a count of total resources that are being reclaimed. Thsi count 
+     * is decremented when an object expires. 
+     */
+    
+    /**
+     * reclaim time limit (in msec). This time represents the SLA we offer 
+     * a queue - a queue gets back any lost capacity withing this period 
+     * of time.  
+     */ 
+    long reclaimTime;  
+    /**
+     * the list of resources to reclaim. This list is always sorted so that
+     * resources that need to be reclaimed sooner occur earlier in the list.
+     */
+    LinkedList<ReclaimedResource> reclaimList = 
+      new LinkedList<ReclaimedResource>();
+    /**
+     * the list of resources to expire. This list is always sorted so that
+     * resources that need to be expired sooner occur earlier in the list.
+     */
+    LinkedList<ReclaimedResource> reclaimExpireList = 
+      new LinkedList<ReclaimedResource>();
+    /** 
+     * sum of all resources that are being reclaimed. 
+     * We keep this to prevent unnecessary ReclaimResource objects from being
+     * created.  
+     */
+    int numReclaimedResources = 0;
+    
+    public QueueSchedulingInfo(String queueName, float guaranteedCapacity, 
+        int ulMin, long reclaimTime) {
+      this.queueName = new String(queueName);
+      this.guaranteedCapacityPercent = guaranteedCapacity;
+      this.ulMin = ulMin;
+      this.reclaimTime = reclaimTime;
+    }
+  }
+
+  /** 
+   * This class handles the scheduling algorithms. 
+   * The algos are the same for both Map and Reduce tasks. 
+   * There may be slight variations later, in which case we can make this
+   * an abstract base class and have derived classes for Map and Reduce.  
+   */
+  private static abstract class TaskSchedulingMgr {
+
+    /** quick way to get qsi object given a queue name */
+    private Map<String, QueueSchedulingInfo> queueInfoMap = 
+      new HashMap<String, QueueSchedulingInfo>();
+    /** we keep track of the number of map or reduce slots we saw last */
+    private int numSlots = 0;
+    /** our enclosing TaskScheduler object */
+    protected CapacityTaskScheduler scheduler;
+    // for debugging
+    protected String type = null;
+
+    abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
+        JobInProgress job) throws IOException; 
+    abstract int getClusterCapacity();
+    abstract int getRunningTasks(JobInProgress job);
+    abstract int getPendingTasks(JobInProgress job);
+    abstract int killTasksFromJob(JobInProgress job, int tasksToKill);
+
+    /**
+     * List of QSIs for assigning tasks.
+     * This list is ordered such that queues that need to reclaim capacity
+     * sooner, come before queues that don't. For queues that don't, they're
+     * ordered by a ratio of (# of running tasks)/Guaranteed capacity, which
+     * indicates how much 'free space' the queue has, or how much it is over
+     * capacity. This ordered list is iterated over, when assigning tasks.
+     */  
+    private List<QueueSchedulingInfo> qsiForAssigningTasks = 
+      new ArrayList<QueueSchedulingInfo>();  
+    /** comparator to sort queues */ 
+    private static final class QueueComparator 
+      implements Comparator<QueueSchedulingInfo> {
+      public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) {
+        // if one queue needs to reclaim something and the other one doesn't, 
+        // the former is first
+        if ((0 == q1.reclaimList.size()) && (0 != q2.reclaimList.size())) {
+          return 1;
+        }
+        else if ((0 != q1.reclaimList.size()) && (0 == q2.reclaimList.size())){
+          return -1;
+        }
+        else if ((0 == q1.reclaimList.size()) && (0 == q2.reclaimList.size())){
+          // neither needs to reclaim. look at how much capacity they've filled
+          double r1 = (double)q1.numRunningTasks/(double)q1.guaranteedCapacity;
+          double r2 = (double)q2.numRunningTasks/(double)q2.guaranteedCapacity;
+          if (r1<r2) return -1;
+          else if (r1>r2) return 1;
+          else return 0;
+        }
+        else {
+          // both have to reclaim. Look at which one needs to reclaim earlier
+          long t1 = q1.reclaimList.get(0).whenToKill;
+          long t2 = q2.reclaimList.get(0).whenToKill;
+          if (t1<t2) return -1;
+          else if (t1>t2) return 1;
+          else return 0;
+        }
+      }
+    }
+    private final static QueueComparator queueComparator = new QueueComparator();
+
+   
+    TaskSchedulingMgr(CapacityTaskScheduler sched) {
+      scheduler = sched;
+    }
+    
+    private void add(QueueSchedulingInfo qsi) {
+      queueInfoMap.put(qsi.queueName, qsi);
+      qsiForAssigningTasks.add(qsi);
+    }
+    private int getNumQueues() {
+      return queueInfoMap.size();
+    }
+    private boolean isQueuePresent(String queueName) {
+      return queueInfoMap.containsKey(queueName);
+    }
+
+    /** 
+     * Periodically, we walk through our queues to do the following: 
+     * a. Check if a queue needs to reclaim any resources within a period
+     * of time (because it's running below capacity and more tasks are
+     * waiting)
+     * b. Check if a queue hasn't received enough of the resources it needed
+     * to be reclaimed and thus tasks need to be killed.
+     */
+    private synchronized void reclaimCapacity() {
+      int tasksToKill = 0;
+      // with only one queue, there's nothing to do
+      if (queueInfoMap.size() < 2) {
+        return;
+      }
+      QueueSchedulingInfo lastQsi = 
+        qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
+      long currentTime = scheduler.clock.getTime();
+      for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
+        // is there any resource that needs to be reclaimed? 
+        if ((!qsi.reclaimList.isEmpty()) &&  
+            (qsi.reclaimList.getFirst().whenToKill < 
+              currentTime + CapacityTaskScheduler.RECLAIM_CAPACITY_INTERVAL)) {
+          // make a note of how many tasks to kill to claim resources
+          tasksToKill += qsi.reclaimList.getFirst().currentAmount;
+          // move this to expiry list
+          ReclaimedResource r = qsi.reclaimList.remove();
+          qsi.reclaimExpireList.add(r);
+        }
+        // is there any resource that needs to be expired?
+        if ((!qsi.reclaimExpireList.isEmpty()) && 
+            (qsi.reclaimExpireList.getFirst().whenToExpire <= currentTime)) {
+          ReclaimedResource r = qsi.reclaimExpireList.remove();
+          qsi.numReclaimedResources -= r.originalAmount;
+        }
+        // do we need to reclaim a resource later? 
+        // if no queue is over capacity, there's nothing to reclaim
+        if (lastQsi.numRunningTasks <= lastQsi.guaranteedCapacity) {
+          continue;
+        }
+        if (qsi.numRunningTasks < qsi.guaranteedCapacity) {
+          // usedCap is how much capacity is currently accounted for
+          int usedCap = qsi.numRunningTasks + qsi.numReclaimedResources;
+          // see if we have remaining capacity and if we have enough pending 
+          // tasks to use up remaining capacity
+          if ((usedCap < qsi.guaranteedCapacity) && 
+              ((qsi.numPendingTasks - qsi.numReclaimedResources)>0)) {
+            // create a request for resources to be reclaimed
+            int amt = Math.min((qsi.guaranteedCapacity-usedCap), 
+                (qsi.numPendingTasks - qsi.numReclaimedResources));
+            // create a rsource object that needs to be reclaimed some time
+            // in the future
+            long whenToKill = qsi.reclaimTime - 
+              (CapacityTaskScheduler.HEARTBEATS_LEFT_BEFORE_KILLING * 
+                  scheduler.taskTrackerManager.getNextHeartbeatInterval());
+            if (whenToKill < 0) whenToKill = 0;
+            qsi.reclaimList.add(new ReclaimedResource(amt, 
+                currentTime + qsi.reclaimTime, 
+                currentTime + whenToKill));
+            qsi.numReclaimedResources += amt;
+            LOG.debug("Queue " + qsi.queueName + " needs to reclaim " + 
+                amt + " resources");
+          }
+        }
+      }
+      // kill tasks to reclaim capacity
+      if (0 != tasksToKill) {
+        killTasks(tasksToKill);
+      }
+    }
+
+    // kill 'tasksToKill' tasks 
+    private void killTasks(int tasksToKill)
+    {
+      /* 
+       * There are a number of fair ways in which one can figure out how
+       * many tasks to kill from which queue, so that the total number of
+       * tasks killed is equal to 'tasksToKill'.
+       * Maybe the best way is to keep a global ordering of running tasks
+       * and kill the ones that ran last, irrespective of what queue or 
+       * job they belong to. 
+       * What we do here is look at how many tasks is each queue running
+       * over capacity, and use that as a weight to decide how many tasks
+       * to kill from that queue.
+       */ 
+      
+      // first, find out all queues over capacity
+      int loc;
+      for (loc=0; loc<qsiForAssigningTasks.size(); loc++) {
+        QueueSchedulingInfo qsi = qsiForAssigningTasks.get(loc);
+        if (qsi.numRunningTasks > qsi.guaranteedCapacity) {
+          // all queues from here onwards are running over cap
+          break;
+        }
+      }
+      // if some queue needs to reclaim cap, there must be at least one queue
+      // over cap. But check, just in case. 
+      if (loc == qsiForAssigningTasks.size()) {
+        LOG.warn("In Capacity scheduler, we need to kill " + tasksToKill + 
+            " tasks but there is no queue over capacity.");
+        return;
+      }
+      // calculate how many total tasks are over cap
+      int tasksOverCap = 0;
+      for (int i=loc; i<qsiForAssigningTasks.size(); i++) {
+        QueueSchedulingInfo qsi = qsiForAssigningTasks.get(i);
+        tasksOverCap += (qsi.numRunningTasks - qsi.guaranteedCapacity);
+      }
+      // now kill tasks from each queue
+      for (int i=loc; i<qsiForAssigningTasks.size(); i++) {
+        QueueSchedulingInfo qsi = qsiForAssigningTasks.get(i);
+        killTasksFromQueue(qsi, (int)Math.round(
+            ((double)(qsi.numRunningTasks - qsi.guaranteedCapacity))*
+            tasksToKill/(double)tasksOverCap));
+      }
+    }
+
+    // kill 'tasksToKill' tasks from queue represented by qsi
+    private void killTasksFromQueue(QueueSchedulingInfo qsi, int tasksToKill) {
+      // we start killing as many tasks as possible from the jobs that started
+      // last. This way, we let long-running jobs complete faster.
+      int tasksKilled = 0;
+      JobInProgress jobs[] = scheduler.jobQueuesManager.
+        getRunningJobQueue(qsi.queueName).toArray(new JobInProgress[0]);
+      for (int i=jobs.length-1; i>=0; i--) {
+        if (jobs[i].getStatus().getRunState() != JobStatus.RUNNING) {
+          continue;
+        }
+        tasksKilled += killTasksFromJob(jobs[i], tasksToKill-tasksKilled);
+        if (tasksKilled >= tasksToKill) break;
+      }
+    }
+   
+    // return the TaskAttemptID of the running task, if any, that has made 
+    // the least progress.
+    TaskAttemptID getRunningTaskWithLeastProgress(TaskInProgress tip) {
+      double leastProgress = 1;
+      TaskAttemptID tID = null;
+      for (Iterator<TaskAttemptID> it = 
+        tip.getActiveTasks().keySet().iterator(); it.hasNext();) {
+        TaskAttemptID taskid = it.next();
+        TaskStatus status = tip.getTaskStatus(taskid);
+        if (status.getRunState() == TaskStatus.State.RUNNING) {
+          if (status.getProgress() < leastProgress) {
+            leastProgress = status.getProgress();
+            tID = taskid;
+          }
+        }
+      }
+      return tID;
+    }
+    
+    
+    /**
+     * Update individual QSI objects.
+     * We don't need exact information for all variables, just enough for us
+     * to make scheduling decisions. For example, we don't need an exact count
+     * of numRunningTasks. Once we count upto the grid capacity (gcSum), any
+     * number beyond that will make no difference.
+     * */
+    private synchronized void updateQSIObjects() {
+      // if # of slots have changed since last time, update. 
+      // First, compute whether the total number of TT slots have changed
+      int slotsDiff = getClusterCapacity()- numSlots;
+      numSlots += slotsDiff;
+      for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
+        // compute new GCs and ACs, if TT slots have changed
+        if (slotsDiff != 0) {
+          qsi.guaranteedCapacity +=
+            (qsi.guaranteedCapacityPercent*slotsDiff/100);
+        }
+        qsi.numRunningTasks = 0;
+        qsi.numPendingTasks = 0;
+        for (String s: qsi.numRunningTasksByUser.keySet()) {
+          qsi.numRunningTasksByUser.put(s, 0);
+        }
+        // update stats on running jobs
+        for (JobInProgress j: 
+          scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
+          if (j.getStatus().getRunState() != JobStatus.RUNNING) {
+            continue;
+          }
+          qsi.numRunningTasks += getRunningTasks(j);
+          Integer i = qsi.numRunningTasksByUser.get(j.getProfile().getUser());
+          qsi.numRunningTasksByUser.put(j.getProfile().getUser(), 
+              i+getRunningTasks(j));
+          qsi.numPendingTasks += getPendingTasks(j);
+          LOG.debug("updateQSI: job " + j.toString() + ": run(m) = " + 
+              j.runningMaps() + ", run(r) = " + j.runningReduces() + 
+              ", finished(m) = " + j.finishedMaps() + ", finished(r)= " + 
+              j.finishedReduces() + ", failed(m) = " + j.failedMapTasks + 
+              ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " + 
+              j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks 
+              + ", total(m) = " + j.numMapTasks + ", total(r) = " + 
+              j.numReduceTasks);
+          /* 
+           * it's fine walking down the entire list of running jobs - there
+           * probably will not be many, plus, we may need to go through the
+           * list to compute numRunningTasksByUser. If this is expensive, we
+           * can keep a list of running jobs per user. Then we only need to
+           * consider the first few jobs per user.
+           */ 
+        }
+        // update stats on waiting jobs
+        for (JobInProgress j: 
+          scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
+          // pending tasks
+          if (qsi.numPendingTasks > getClusterCapacity()) {
+            // that's plenty. no need for more computation
+            break;
+          }
+          qsi.numPendingTasks += getPendingTasks(j);
+        }
+      }
+    }
+
+    
+    void jobAdded(JobInProgress job) {
+      // update qsi 
+      QueueSchedulingInfo qsi = 
+        queueInfoMap.get(job.getProfile().getQueueName());
+      // qsi shouldn't be null
+      
+      // update user-specific info
+      Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
+      if (null == i) {
+        qsi.numJobsByUser.put(job.getProfile().getUser(), 1);
+        qsi.numRunningTasksByUser.put(job.getProfile().getUser(), 0);
+      }
+      else {
+        i++;
+      }
+    }
+    void jobRemoved(JobInProgress job) {
+      // update qsi 
+      QueueSchedulingInfo qsi = 
+        queueInfoMap.get(job.getProfile().getQueueName());
+      // qsi shouldn't be null
+      
+      // update numJobsByUser
+      LOG.debug("JOb to be removed for user " + job.getProfile().getUser());
+      Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
+      i--;
+      if (0 == i.intValue()) {
+        qsi.numJobsByUser.remove(job.getProfile().getUser());
+        qsi.numRunningTasksByUser.remove(job.getProfile().getUser());
+        LOG.debug("No more jobs for user, number of users = " + qsi.numJobsByUser.size());
+      }
+      else {
+        LOG.debug("User still has jobs, number of users = " + qsi.numJobsByUser.size());
+      }
+    }
+
+    // called when a task is allocated to queue represented by qsi. 
+    // update our info about reclaimed resources
+    private synchronized void updateReclaimedResources(QueueSchedulingInfo qsi) {
+      // if we needed to reclaim resources, we have reclaimed one
+      if (qsi.reclaimList.isEmpty()) {
+        return;
+      }
+      ReclaimedResource res = qsi.reclaimList.getFirst();
+      res.currentAmount--;
+      if (0 == res.currentAmount) {
+        // move this resource to the expiry list
+        ReclaimedResource r = qsi.reclaimList.remove();
+        qsi.reclaimExpireList.add(r);
+      }
+    }
+
+    private synchronized void updateCollectionOfQSIs() {
+      Collections.sort(qsiForAssigningTasks, queueComparator);
+    }
+
+
+    private boolean isUserOverLimit(JobInProgress j, QueueSchedulingInfo qsi) {
+      // what is our current capacity? It's GC if we're running below GC. 
+      // If we're running over GC, then its #running plus 1 (which is the 
+      // extra slot we're getting). 
+      int currentCapacity;
+      if (qsi.numRunningTasks < qsi.guaranteedCapacity) {
+        currentCapacity = qsi.guaranteedCapacity;
+      }
+      else {
+        currentCapacity = qsi.numRunningTasks+1;
+      }
+      int limit = Math.max((int)(Math.ceil((double)currentCapacity/
+          (double)qsi.numJobsByUser.size())), 
+          (int)(Math.ceil((double)(qsi.ulMin*currentCapacity)/100.0)));
+      if (qsi.numRunningTasksByUser.get(
+          j.getProfile().getUser()) >= limit) {
+        LOG.debug("User " + j.getProfile().getUser() + 
+            " is over limit, num running tasks = " + 
+            qsi.numRunningTasksByUser.get(j.getProfile().getUser()) + 
+            ", limit = " + limit);
+        return true;
+      }
+      else {
+        return false;
+      }
+    }
+    
+    private Task getTaskFromQueue(TaskTrackerStatus taskTracker, 
+        QueueSchedulingInfo qsi) throws IOException {
+      Task t = null;
+      // keep track of users over limit
+      Set<String> usersOverLimit = new HashSet<String>();
+      // look at running jobs first
+      for (JobInProgress j:
+        scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
+        // some jobs may be in the running queue but may have completed 
+        // and not yet have been removed from the running queue
+        if (j.getStatus().getRunState() != JobStatus.RUNNING) {
+          continue;
+        }
+        // is this job's user over limit?
+        if (isUserOverLimit(j, qsi)) {
+          // user over limit. 
+          usersOverLimit.add(j.getProfile().getUser());
+          continue;
+        }
+        // We found a suitable job. Get task from it.
+        t = obtainNewTask(taskTracker, j);
+        if (t != null) {
+          LOG.debug("Got task from job " + 
+              j.getJobID().toStringWOPrefix() + " in queue " + qsi.queueName);
+          return t;
+        }
+      }
+      
+      // if we're here, we found nothing in the running jobs. Time to 
+      // look at waiting jobs. Get first job of a user that is not over limit
+      for (JobInProgress j: 
+        scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
+        // is this job's user over limit?
+        if (usersOverLimit.contains(j.getProfile().getUser())) {
+          // user over limit. 
+          continue;
+        }
+        // this job is a candidate for running. Initialize it, move it
+        // to run queue
+        j.initTasks();
+        scheduler.jobQueuesManager.jobUpdated(j);
+        // We found a suitable job. Get task from it.
+        t = obtainNewTask(taskTracker, j);
+        if (t != null) {
+          LOG.debug("Getting task from job " + 
+              j.getJobID().toStringWOPrefix() + " in queue " + qsi.queueName);
+          return t;
+        }
+      }
+      
+      // if we're here, we haven't found anything. This could be because 
+      // there is nothing to run, or that the user limit for some user is 
+      // too strict, i.e., there's at least one user who doesn't have
+      // enough tasks to satisfy his limit. If it's the later case, look at 
+      // jobs without considering user limits, and get task from first 
+      // eligible job
+      if (usersOverLimit.size() > 0) {
+        for (JobInProgress j:
+          scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
+          if ((j.getStatus().getRunState() == JobStatus.RUNNING) && 
+              (usersOverLimit.contains(j.getProfile().getUser()))) {
+            t = obtainNewTask(taskTracker, j);
+            if (t != null) {
+              LOG.debug("Getting task from job " + 
+                  j.getJobID().toStringWOPrefix() + " in queue " + qsi.queueName);
+              return t;
+            }
+          }
+        }
+        // look at waiting jobs the same way
+        for (JobInProgress j: 
+          scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
+          if (usersOverLimit.contains(j.getProfile().getUser())) {
+            j.initTasks();
+            scheduler.jobQueuesManager.jobUpdated(j);
+            t = obtainNewTask(taskTracker, j);
+            if (t != null) {
+              LOG.debug("Getting task from job " + 
+                  j.getJobID().toStringWOPrefix() + " in queue " + qsi.queueName);
+              return t;
+            }
+          }
+        }
+      }
+      
+      return null;
+    }
+    
+    private List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException {
+      Task t = null;
+
+      /* 
+       * update all our QSI objects.
+       * This involves updating each qsi structure. This operation depends
+       * on the number of running jobs in a queue, and some waiting jobs. If it
+       * becomes expensive, do it once every few hearbeats only.
+       */ 
+      updateQSIObjects();
+      LOG.debug("After updating QSI objects:");
+      printQSIs();
+      /*
+       * sort list of qeues first, as we want queues that need the most to
+       * get first access. If this is expensive, sort every few heartbeats.
+       * We're only sorting a collection of queues - there shouldn't be many.
+       */
+      updateCollectionOfQSIs();
+      for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
+        t = getTaskFromQueue(taskTracker, qsi);
+        if (t!= null) {
+          // we have a task. Update reclaimed resource info
+          updateReclaimedResources(qsi);
+          return Collections.singletonList(t);
+        }
+      }        
+
+      // nothing to give
+      return null;
+    }
+    
+    private void printQSIs() {
+      StringBuffer s = new StringBuffer();
+      for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
+        Collection<JobInProgress> runJobs = 
+          scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
+        Collection<JobInProgress> waitJobs = 
+          scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName);
+        s.append(" Queue '" + qsi.queueName + "'(" + this.type + "): run=" + 
+            qsi.numRunningTasks + ", gc=" + qsi.guaranteedCapacity + 
+            ", wait=" + qsi.numPendingTasks + ", run jobs="+ runJobs.size() + 
+            ", wait jobs=" + waitJobs.size() + "*** ");
+      }
+      LOG.debug(s);
+    }
+
+  }
+
+  /**
+   * The scheduling algorithms for map tasks. 
+   */
+  private static class MapSchedulingMgr extends TaskSchedulingMgr {
+    MapSchedulingMgr(CapacityTaskScheduler dad) {
+      super(dad);
+      type = new String("map");
+    }
+    Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
+    throws IOException {
+      ClusterStatus clusterStatus = 
+        scheduler.taskTrackerManager.getClusterStatus();
+      int numTaskTrackers = clusterStatus.getTaskTrackers();
+      return job.obtainNewMapTask(taskTracker, numTaskTrackers, 
+          scheduler.taskTrackerManager.getNumberOfUniqueHosts());
+    }
+    int getClusterCapacity() {
+      return scheduler.taskTrackerManager.getClusterStatus().getMaxMapTasks();
+    }
+    int getRunningTasks(JobInProgress job) {
+      return job.runningMaps();
+    }
+    int getPendingTasks(JobInProgress job) {
+      return job.pendingMaps();
+    }
+    int killTasksFromJob(JobInProgress job, int tasksToKill) {
+      /*
+       * We'd like to kill tasks that ran the last, or that have made the
+       * least progress.
+       * Ideally, each job would have a list of tasks, sorted by start 
+       * time or progress. That's a lot of state to keep, however. 
+       * For now, we do something a little different. We first try and kill
+       * non-local tasks, as these can be run anywhere. For each TIP, we 
+       * kill the task that has made the least progress, if the TIP has
+       * more than one active task. 
+       * We then look at tasks in runningMapCache.
+       */
+      int tasksKilled = 0;
+      
+      /* 
+       * For non-local running maps, we 'cheat' a bit. We know that the set
+       * of non-local running maps has an insertion order such that tasks 
+       * that ran last are at the end. So we iterate through the set in 
+       * reverse. This is OK because even if the implementation changes, 
+       * we're still using generic set iteration and are no worse of.
+       */ 
+      TaskInProgress[] tips = 
+        job.getNonLocalRunningMaps().toArray(new TaskInProgress[0]);
+      for (int i=tips.length-1; i>=0; i--) {
+        // pick the tast attempt that has progressed least
+        TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]);
+        if (null != tid) {
+          if (tips[i].killTask(tid, false)) {
+            if (++tasksKilled >= tasksToKill) {
+              return tasksKilled;
+            }
+          }
+        }
+      }
+      // now look at other running tasks
+      for (Set<TaskInProgress> s: job.getRunningMapCache().values()) {
+        for (TaskInProgress tip: s) {
+          TaskAttemptID tid = getRunningTaskWithLeastProgress(tip);
+          if (null != tid) {
+            if (tip.killTask(tid, false)) {
+              if (++tasksKilled >= tasksToKill) {
+                return tasksKilled;
+              }
+            }
+          }
+        }
+      }
+      return tasksKilled;
+    }
+
+  }
+
+  /**
+   * The scheduling algorithms for reduce tasks. 
+   */
+  private static class ReduceSchedulingMgr extends TaskSchedulingMgr {
+    ReduceSchedulingMgr(CapacityTaskScheduler dad) {
+      super(dad);
+      type = new String("reduce");
+    }
+    Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
+    throws IOException {
+      ClusterStatus clusterStatus = 
+        scheduler.taskTrackerManager.getClusterStatus();
+      int numTaskTrackers = clusterStatus.getTaskTrackers();
+      return job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
+          scheduler.taskTrackerManager.getNumberOfUniqueHosts());
+    }
+    int getClusterCapacity() {
+      return scheduler.taskTrackerManager.getClusterStatus().getMaxReduceTasks();
+    }
+    int getRunningTasks(JobInProgress job) {
+      return job.runningReduces();
+    }
+    int getPendingTasks(JobInProgress job) {
+      return job.pendingReduces();
+    }
+    int killTasksFromJob(JobInProgress job, int tasksToKill) {
+      /* 
+       * For reduces, we 'cheat' a bit. We know that the set
+       * of running reduces has an insertion order such that tasks 
+       * that ran last are at the end. So we iterate through the set in 
+       * reverse. This is OK because even if the implementation changes, 
+       * we're still using generic set iteration and are no worse of.
+       */ 
+      int tasksKilled = 0;
+      TaskInProgress[] tips = 
+        job.getRunningReduces().toArray(new TaskInProgress[0]);
+      for (int i=tips.length-1; i>=0; i--) {
+        // pick the tast attempt that has progressed least
+        TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]);
+        if (null != tid) {
+          if (tips[i].killTask(tid, false)) {
+            if (++tasksKilled >= tasksToKill) {
+              return tasksKilled;
+            }
+          }
+        }
+      }
+      return tasksKilled;
+    }
+  }
+  
+  /** the scheduling mgrs for Map and Reduce tasks */ 
+  protected TaskSchedulingMgr mapScheduler = new MapSchedulingMgr(this);
+  protected TaskSchedulingMgr reduceScheduler = new ReduceSchedulingMgr(this);
+  
+  /** name of the default queue. */ 
+  static final String DEFAULT_QUEUE_NAME = "default";
+  
+  /** how often does redistribution thread run (in msecs)*/
+  private static long RECLAIM_CAPACITY_INTERVAL;
+  /** we start killing tasks to reclaim capacity when we have so many 
+   * heartbeats left. */
+  private static final int HEARTBEATS_LEFT_BEFORE_KILLING = 3;
+
+  private static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
+  protected JobQueuesManager jobQueuesManager;
+  protected CapacitySchedulerConf rmConf;
+  /** whether scheduler has started or not */
+  private boolean started = false;
+  
+  /**
+   * Used to distribute/reclaim excess capacity among queues
+   */ 
+  class ReclaimCapacity implements Runnable {
+    public ReclaimCapacity() {
+    }
+    public void run() {
+      while (true) {
+        try {
+          Thread.sleep(RECLAIM_CAPACITY_INTERVAL);
+          if (stopReclaim) { 
+            break;
+          }
+          reclaimCapacity();
+        } catch (InterruptedException t) {
+          break;
+        } catch (Throwable t) {
+          LOG.error("Error in redistributing capacity:\n" +
+                    StringUtils.stringifyException(t));
+        }
+      }
+    }
+  }
+  private Thread reclaimCapacityThread = null;
+  /** variable to indicate that thread should stop */
+  private boolean stopReclaim = false;
+
+  /**
+   * A clock class - can be mocked out for testing.
+   */
+  static class Clock {
+    long getTime() {
+      return System.currentTimeMillis();
+    }
+  }
+  private Clock clock;
+
+  
+  public CapacityTaskScheduler() {
+    this(new Clock());
+  }
+  
+  // for testing
+  public CapacityTaskScheduler(Clock clock) {
+    this.jobQueuesManager = new JobQueuesManager(this);
+    this.clock = clock;
+  }
+  
+  /** mostly for testing purposes */
+  public void setResourceManagerConf(CapacitySchedulerConf conf) {
+    this.rmConf = conf;
+  }
+  
+  @Override
+  public synchronized void start() throws IOException {
+    if (started) return;
+    super.start();
+    RECLAIM_CAPACITY_INTERVAL = 
+      conf.getLong("mapred.capacity-scheduler.reclaimCapacity.interval", 5);
+    RECLAIM_CAPACITY_INTERVAL *= 1000;
+    // initialize our queues from the config settings
+    if (null == rmConf) {
+      rmConf = new CapacitySchedulerConf();
+    }
+    // read queue info from config file
+    Set<String> queues = taskTrackerManager.getQueueManager().getQueues();
+    float totalCapacity = 0.0f;
+    for (String queueName: queues) {
+      float gc = rmConf.getGuaranteedCapacity(queueName); 
+      totalCapacity += gc;
+      int ulMin = rmConf.getMinimumUserLimitPercent(queueName); 
+      long reclaimTimeLimit = rmConf.getReclaimTimeLimit(queueName);
+      // reclaimTimeLimit is the time(in millisec) within which we need to
+      // reclaim capacity. 
+      // create queue scheduling objects for Map and Reduce
+      mapScheduler.add(new QueueSchedulingInfo(queueName, gc, 
+          ulMin, reclaimTimeLimit));
+      reduceScheduler.add(new QueueSchedulingInfo(queueName, gc, 
+          ulMin, reclaimTimeLimit));
+
+      // create the queues of job objects
+      boolean supportsPrio = rmConf.isPrioritySupported(queueName);
+      jobQueuesManager.createQueue(queueName, supportsPrio);
+    }
+    if (totalCapacity > 100.0) {
+      throw new IllegalArgumentException("Sum of queue capacities over 100% at "
+                                         + totalCapacity);
+    }
+    
+    // Sanity check: there should be at least one queue. 
+    if (0 == mapScheduler.getNumQueues()) {
+      throw new IllegalStateException("System has no queue configured");
+    }
+    
+    // check if there's a queue with the default name. If not, we quit.
+    if (!mapScheduler.isQueuePresent(DEFAULT_QUEUE_NAME)) {
+      throw new IllegalStateException("System has no default queue configured");
+    }
+    
+    // listen to job changes
+    taskTrackerManager.addJobInProgressListener(jobQueuesManager);
+
+    // start thread for redistributing capacity
+    this.reclaimCapacityThread = 
+      new Thread(new ReclaimCapacity(),"reclaimCapacity");
+    this.reclaimCapacityThread.start();
+    started = true;
+    LOG.info("Capacity scheduler initialized " + queues.size() + " queues");
+  }
+  
+  @Override
+  public synchronized void terminate() throws IOException {
+    if (!started) return;
+    if (jobQueuesManager != null) {
+      taskTrackerManager.removeJobInProgressListener(
+          jobQueuesManager);
+    }
+    // tell the reclaim thread to stop
+    stopReclaim = true;
+    started = false;
+    super.terminate();
+  }
+  
+  @Override
+  public synchronized void setConf(Configuration conf) {
+    super.setConf(conf);
+  }
+
+  void reclaimCapacity() {
+    mapScheduler.reclaimCapacity();
+    reduceScheduler.reclaimCapacity();
+  }
+  
+  /**
+   * provided for the test classes
+   * lets you update the QSI objects and sorted collection
+   */ 
+  void updateQSIInfo() {
+    mapScheduler.updateQSIObjects();
+    mapScheduler.updateCollectionOfQSIs();
+    reduceScheduler.updateQSIObjects();
+    reduceScheduler.updateCollectionOfQSIs();
+  }
+  
+  /* 
+   * The grand plan for assigning a task. 
+   * First, decide whether a Map or Reduce task should be given to a TT 
+   * (if the TT can accept either). 
+   * Next, pick a queue. We only look at queues that need a slot. Among
+   * these, we first look at queues whose ac is less than gc (queues that 
+   * gave up capacity in the past). Next, we look at any other queue that
+   * needs a slot. 
+   * Next, pick a job in a queue. we pick the job at the front of the queue
+   * unless its user is over the user limit. 
+   * Finally, given a job, pick a task from the job. 
+   *  
+   */
+  @Override
+  public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
+      throws IOException {
+    
+    List<Task> tasks = null;
+    /* 
+     * If TT has Map and Reduce slot free, we need to figure out whether to
+     * give it a Map or Reduce task.
+     * Number of ways to do this. For now, base decision on how much is needed
+     * versus how much is used (default to Map, if equal).
+     */
+    LOG.debug("TT asking for task, max maps=" + taskTracker.getMaxMapTasks() + 
+        ", run maps=" + taskTracker.countMapTasks() + ", max reds=" + 
+        taskTracker.getMaxReduceTasks() + ", run reds=" + 
+        taskTracker.countReduceTasks() + ", map cap=" + 
+        mapScheduler.getClusterCapacity() + ", red cap = " + 
+        reduceScheduler.getClusterCapacity());
+    int maxMapTasks = taskTracker.getMaxMapTasks();
+    int currentMapTasks = taskTracker.countMapTasks();
+    int maxReduceTasks = taskTracker.getMaxReduceTasks();
+    int currentReduceTasks = taskTracker.countReduceTasks();
+    if ((maxReduceTasks - currentReduceTasks) > 
+    (maxMapTasks - currentMapTasks)) {
+      tasks = reduceScheduler.assignTasks(taskTracker);
+      // if we didn't get any, look at map tasks, if TT has space
+      if ((null == tasks) && (maxMapTasks > currentMapTasks)) {
+        tasks = mapScheduler.assignTasks(taskTracker);
+      }
+    }
+    else {
+      tasks = mapScheduler.assignTasks(taskTracker);
+      // if we didn't get any, look at red tasks, if TT has space
+      if ((null == tasks) && (maxReduceTasks > currentReduceTasks)) {
+        tasks = reduceScheduler.assignTasks(taskTracker);
+      }
+    }
+    return tasks;
+  }
+  
+  // called when a job is added
+  synchronized void jobAdded(JobInProgress job) {
+    // let our map and reduce schedulers know this, so they can update 
+    // user-specific info
+    mapScheduler.jobAdded(job);
+    reduceScheduler.jobAdded(job);
+  }
+
+  // called when a job is removed
+  synchronized void jobRemoved(JobInProgress job) {
+    // let our map and reduce schedulers know this, so they can update 
+    // user-specific info
+    mapScheduler.jobRemoved(job);
+    reduceScheduler.jobRemoved(job);
+  }
+  
+}
+

+ 216 - 0
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java

@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A {@link JobInProgressListener} that maintains the jobs being managed in
+ * one or more queues. 
+ */
+class JobQueuesManager extends JobInProgressListener {
+
+  /* 
+   * If a queue supports priorities, waiting jobs must be 
+   * sorted on priorities, and then on their start times (technically, 
+   * their insertion time.  
+   * If a queue doesn't support priorities, waiting jobs are
+   * sorted based on their start time.  
+   * Running jobs are not sorted. A job that started running earlier
+   * is ahead in the queue, so insertion should be at the tail.
+   */
+  
+  // comparator for jobs in queues that support priorities
+  private static final Comparator<JobInProgress> PRIORITY_JOB_COMPARATOR
+    = new Comparator<JobInProgress>() {
+    public int compare(JobInProgress o1, JobInProgress o2) {
+      // Look at priority.
+      int res = o1.getPriority().compareTo(o2.getPriority());
+      if (res == 0) {
+        // the job that started earlier wins
+        if (o1.getStartTime() < o2.getStartTime()) {
+          res = -1;
+        } else {
+          res = (o1.getStartTime() == o2.getStartTime() ? 0 : 1);
+        }
+      }
+      if (res == 0) {
+        res = o1.getJobID().compareTo(o2.getJobID());
+      }
+      return res;
+    }
+  };
+  // comparator for jobs in queues that don't support priorities
+  private static final Comparator<JobInProgress> STARTTIME_JOB_COMPARATOR
+    = new Comparator<JobInProgress>() {
+    public int compare(JobInProgress o1, JobInProgress o2) {
+      // the job that started earlier wins
+      if (o1.getStartTime() < o2.getStartTime()) {
+        return -1;
+      } else {
+        return (o1.getStartTime() == o2.getStartTime() ? 0 : 1);
+      }
+    }
+  };
+  
+  // class to store queue info
+  private static class QueueInfo {
+
+    // whether the queue supports priorities
+    boolean supportsPriorities;
+    // maintain separate collections of running & waiting jobs. This we do 
+    // mainly because when a new job is added, it cannot superceede a running 
+    // job, even though the latter may be a lower priority. If this is ever
+    // changed, we may get by with one collection. 
+    Collection<JobInProgress> waitingJobs;
+    Collection<JobInProgress> runningJobs;
+    
+    QueueInfo(boolean prio) {
+      this.supportsPriorities = prio;
+      if (supportsPriorities) {
+        this.waitingJobs = new TreeSet<JobInProgress>(PRIORITY_JOB_COMPARATOR);
+      }
+      else {
+        this.waitingJobs = new TreeSet<JobInProgress>(STARTTIME_JOB_COMPARATOR);
+      }
+      this.runningJobs = new LinkedList<JobInProgress>();
+    }
+
+    /**
+     * we need to delete an object from our TreeSet based on referential
+     * equality, rather than value equality that the TreeSet uses. 
+     * Another way to do this is to extend the TreeSet and override remove().
+     */
+    static private boolean removeOb(Collection<JobInProgress> c, Object o) {
+      Iterator<JobInProgress> i = c.iterator();
+      while (i.hasNext()) {
+          if (i.next() == o) {
+              i.remove();
+              return true;
+          }
+      }
+      return false;
+    }
+    
+  }
+  
+  // we maintain a hashmap of queue-names to queue info
+  private Map<String, QueueInfo> jobQueues = 
+    new HashMap<String, QueueInfo>();
+  private static final Log LOG = LogFactory.getLog(JobQueuesManager.class);
+  private CapacityTaskScheduler scheduler;
+
+  
+  JobQueuesManager(CapacityTaskScheduler s) {
+    this.scheduler = s;
+  }
+  
+  /**
+   * create an empty queue with the default comparator
+   * @param queueName The name of the queue
+   * @param supportsPriotities whether the queue supports priorities
+   */
+  public void createQueue(String queueName, boolean supportsPriotities) {
+    jobQueues.put(queueName, new QueueInfo(supportsPriotities));
+  }
+  
+  /**
+   * Returns the queue of running jobs associated with the name
+   */
+  public Collection<JobInProgress> getRunningJobQueue(String queueName) {
+    return jobQueues.get(queueName).runningJobs;
+  }
+  
+  /**
+   * Returns the queue of waiting jobs associated with the name
+   */
+  public Collection<JobInProgress> getWaitingJobQueue(String queueName) {
+    return jobQueues.get(queueName).waitingJobs;
+  }
+  
+  @Override
+  public void jobAdded(JobInProgress job) {
+    LOG.info("Job submitted to queue " + job.getProfile().getQueueName());
+    // add job to the right queue
+    QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
+    if (null == qi) {
+      // job was submitted to a queue we're not aware of
+      LOG.warn("Invalid queue " + job.getProfile().getQueueName() + 
+          " specified for job" + job.getProfile().getJobID() + 
+          ". Ignoring job.");
+      return;
+    }
+    // add job to waiting queue. It will end up in the right place, 
+    // based on priority. 
+    // We use our own version of removing objects based on referential
+    // equality, since the 'job' object has already been changed. 
+    qi.waitingJobs.add(job);
+    // let scheduler know. 
+    scheduler.jobAdded(job);
+  }
+
+  @Override
+  public void jobRemoved(JobInProgress job) {
+    QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
+    if (null == qi) {
+      // can't find queue for job. Shouldn't happen. 
+      LOG.warn("Could not find queue " + job.getProfile().getQueueName() + 
+          " when removing job " + job.getProfile().getJobID());
+      return;
+    }
+    // job could be in running or waiting queue
+    if (!qi.runningJobs.remove(job)) {
+      QueueInfo.removeOb(qi.waitingJobs, job);
+    }
+    // let scheduler know
+    scheduler.jobRemoved(job);
+  }
+  
+  @Override
+  public void jobUpdated(JobInProgress job) {
+    QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
+    if (null == qi) {
+      // can't find queue for job. Shouldn't happen. 
+      LOG.warn("Could not find queue " + job.getProfile().getQueueName() + 
+          " when updating job " + job.getProfile().getJobID());
+      return;
+    }
+    // this is called when a job's priority or state is changed.
+    // since we don't know the job's previous state, we need to 
+    // find out in which queue it was earlier, and then place it in the
+    // right queue.
+    Collection<JobInProgress> dest = (job.getStatus().getRunState() == 
+      JobStatus.PREP)? qi.waitingJobs: qi.runningJobs;
+    // We use our own version of removing objects based on referential
+    // equality, since the 'job' object has already been changed. 
+    if (!QueueInfo.removeOb(qi.waitingJobs, job)) {
+      qi.runningJobs.remove(job);
+    }
+    dest.add(job);
+  }
+  
+}

+ 792 - 0
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -0,0 +1,792 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BytesWritable;
+//import org.apache.hadoop.mapred.CapacityTaskScheduler;
+import org.apache.hadoop.conf.Configuration;
+
+public class TestCapacityScheduler extends TestCase {
+  
+  private static int jobCounter;
+  
+  static class FakeJobInProgress extends JobInProgress {
+    
+    private FakeTaskTrackerManager taskTrackerManager;
+    private int mapTaskCtr;
+    private int redTaskCtr;
+    private Set<TaskInProgress> mapTips = 
+      new HashSet<TaskInProgress>();
+    private Set<TaskInProgress> reduceTips = 
+      new HashSet<TaskInProgress>();
+    
+    public FakeJobInProgress(JobID jId, JobConf jobConf,
+        FakeTaskTrackerManager taskTrackerManager, String user) 
+    throws IOException {
+      super(jId, jobConf);
+      this.taskTrackerManager = taskTrackerManager;
+      this.startTime = System.currentTimeMillis();
+      this.status = new JobStatus();
+      this.status.setRunState(JobStatus.PREP);
+      if (null == jobConf.getQueueName()) {
+        this.profile = new JobProfile(user, jId, 
+            null, null, null);
+      }
+      else {
+        this.profile = new JobProfile(user, jId, 
+            null, null, null, jobConf.getQueueName());
+      }
+      mapTaskCtr = 0;
+      redTaskCtr = 0;
+    }
+    
+    @Override
+    public synchronized void initTasks() throws IOException {
+      getStatus().setRunState(JobStatus.RUNNING);
+    }
+
+    @Override
+    public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
+        int ignored) throws IOException {
+      if (runningMapTasks == numMapTasks) return null;
+      TaskAttemptID attemptId = getTaskAttemptID(true);
+      Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
+        @Override
+        public String toString() {
+          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+        }
+      };
+      taskTrackerManager.startTask(tts.getTrackerName(), task);
+      runningMapTasks++;
+      // create a fake TIP and keep track of it
+      mapTips.add(new FakeTaskInProgress(getJobID(), 
+          getJobConf(), task, true, this));
+      return task;
+    }
+    
+    @Override
+    public Task obtainNewReduceTask(final TaskTrackerStatus tts,
+        int clusterSize, int ignored) throws IOException {
+      if (runningReduceTasks == numReduceTasks) return null;
+      TaskAttemptID attemptId = getTaskAttemptID(false);
+      Task task = new ReduceTask("", attemptId, 0, 10) {
+        @Override
+        public String toString() {
+          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+        }
+      };
+      taskTrackerManager.startTask(tts.getTrackerName(), task);
+      runningReduceTasks++;
+      // create a fake TIP and keep track of it
+      reduceTips.add(new FakeTaskInProgress(getJobID(), 
+          getJobConf(), task, false, this));
+      return task;
+    }
+    
+    public void mapTaskFinished() {
+      runningMapTasks--;
+      finishedMapTasks++;
+    }
+    
+    public void reduceTaskFinished() {
+      runningReduceTasks--;
+      finishedReduceTasks++;
+    }
+    
+    private TaskAttemptID getTaskAttemptID(boolean isMap) {
+      JobID jobId = getJobID();
+      return new TaskAttemptID(jobId.getJtIdentifier(),
+          jobId.getId(), isMap, (isMap)?++mapTaskCtr: ++redTaskCtr, 0);
+    }
+    
+    @Override
+    Set<TaskInProgress> getNonLocalRunningMaps() {
+      return (Set<TaskInProgress>)mapTips;
+    }
+    @Override
+    Set<TaskInProgress> getRunningReduces() {
+      return (Set<TaskInProgress>)reduceTips;
+    }
+  }
+  
+  static class FakeTaskInProgress extends TaskInProgress {
+    private boolean isMap;
+    private FakeJobInProgress fakeJob;
+    private TreeMap<TaskAttemptID, String> activeTasks;
+    private TaskStatus taskStatus;
+    FakeTaskInProgress(JobID jId, JobConf jobConf, Task t, 
+        boolean isMap, FakeJobInProgress job) {
+      super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0);
+      this.isMap = isMap;
+      this.fakeJob = job;
+      activeTasks = new TreeMap<TaskAttemptID, String>();
+      activeTasks.put(t.getTaskID(), "tt");
+      // create a fake status for a task that is running for a bit
+      this.taskStatus = TaskStatus.createTaskStatus(isMap);
+      taskStatus.setProgress(0.5f);
+      taskStatus.setRunState(TaskStatus.State.RUNNING);
+    }
+    
+    @Override
+    TreeMap<TaskAttemptID, String> getActiveTasks() {
+      return activeTasks;
+    }
+    @Override
+    public TaskStatus getTaskStatus(TaskAttemptID taskid) {
+      // return a status for a task that has run a bit
+      return taskStatus;
+    }
+    @Override
+    boolean killTask(TaskAttemptID taskId, boolean shouldFail) {
+      if (isMap) {
+        fakeJob.mapTaskFinished();
+      }
+      else {
+        fakeJob.reduceTaskFinished();
+      }
+      return true;
+    }
+  }
+  
+  static class FakeQueueManager extends QueueManager {
+    private Set<String> queues = null;
+    FakeQueueManager() {
+      super(new Configuration());
+    }
+    void setQueues(Set<String> queues) {
+      this.queues = queues;
+    }
+    public synchronized Set<String> getQueues() {
+      return queues;
+    }
+  }
+  
+  static class FakeTaskTrackerManager implements TaskTrackerManager {
+    int maps = 0;
+    int reduces = 0;
+    int maxMapTasksPerTracker = 2;
+    int maxReduceTasksPerTracker = 1;
+    List<JobInProgressListener> listeners =
+      new ArrayList<JobInProgressListener>();
+    FakeQueueManager qm = new FakeQueueManager();
+    
+    private Map<String, TaskTrackerStatus> trackers =
+      new HashMap<String, TaskTrackerStatus>();
+    private Map<String, TaskStatus> taskStatuses = 
+      new HashMap<String, TaskStatus>();
+
+    public FakeTaskTrackerManager() {
+      trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
+          new ArrayList<TaskStatus>(), 0,
+          maxMapTasksPerTracker, maxReduceTasksPerTracker));
+      trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2,
+          new ArrayList<TaskStatus>(), 0,
+          maxMapTasksPerTracker, maxReduceTasksPerTracker));
+    }
+    
+    public void addTaskTracker(String ttName) {
+      trackers.put(ttName, new TaskTrackerStatus(ttName, ttName + ".host", 1,
+          new ArrayList<TaskStatus>(), 0,
+          maxMapTasksPerTracker, maxReduceTasksPerTracker));
+    }
+    
+    public ClusterStatus getClusterStatus() {
+      int numTrackers = trackers.size();
+      return new ClusterStatus(numTrackers, maps, reduces,
+          numTrackers * maxMapTasksPerTracker,
+          numTrackers * maxReduceTasksPerTracker,
+          JobTracker.State.RUNNING);
+    }
+
+    public int getNumberOfUniqueHosts() {
+      return 0;
+    }
+
+    public int getNextHeartbeatInterval() {
+      return MRConstants.HEARTBEAT_INTERVAL_MIN;
+    }
+    
+    public Collection<TaskTrackerStatus> taskTrackers() {
+      return trackers.values();
+    }
+
+
+    public void addJobInProgressListener(JobInProgressListener listener) {
+      listeners.add(listener);
+    }
+
+    public void removeJobInProgressListener(JobInProgressListener listener) {
+      listeners.remove(listener);
+    }
+    
+    public void submitJob(JobInProgress job) {
+      for (JobInProgressListener listener : listeners) {
+        listener.jobAdded(job);
+      }
+    }
+    
+    public TaskTrackerStatus getTaskTracker(String trackerID) {
+      return trackers.get(trackerID);
+    }
+    
+    public void startTask(String taskTrackerName, final Task t) {
+      if (t.isMapTask()) {
+        maps++;
+      } else {
+        reduces++;
+      }
+      TaskStatus status = new TaskStatus() {
+        @Override
+        public boolean getIsMap() {
+          return t.isMapTask();
+        }
+      };
+      taskStatuses.put(t.getTaskID().toString(), status);
+      status.setRunState(TaskStatus.State.RUNNING);
+      trackers.get(taskTrackerName).getTaskReports().add(status);
+    }
+    
+    public void finishTask(String taskTrackerName, String tipId, 
+        FakeJobInProgress j) {
+      TaskStatus status = taskStatuses.get(tipId);
+      if (status.getIsMap()) {
+        maps--;
+        j.mapTaskFinished();
+      } else {
+        reduces--;
+        j.reduceTaskFinished();
+      }
+      status.setRunState(TaskStatus.State.SUCCEEDED);
+    }
+    
+    void addQueues(String[] arr) {
+      Set<String> queues = new HashSet<String>();
+      for (String s: arr) {
+        queues.add(s);
+      }
+      qm.setQueues(queues);
+    }
+    
+    public QueueManager getQueueManager() {
+      return qm;
+    }
+  }
+  
+  // represents a fake queue configuration info
+  static class FakeQueueInfo {
+    String queueName;
+    float gc;
+    int reclaimTimeLimit;
+    boolean supportsPrio;
+    int ulMin;
+
+    public FakeQueueInfo(String queueName, float gc,
+        int reclaimTimeLimit, boolean supportsPrio, int ulMin) {
+      this.queueName = queueName;
+      this.gc = gc;
+      this.reclaimTimeLimit = reclaimTimeLimit;
+      this.supportsPrio = supportsPrio;
+      this.ulMin = ulMin;
+    }
+  }
+  
+  static class FakeResourceManagerConf extends CapacitySchedulerConf {
+  
+    // map of queue names to queue info
+    private Map<String, FakeQueueInfo> queueMap = 
+      new LinkedHashMap<String, FakeQueueInfo>();
+    String firstQueue;
+    
+    void setFakeQueues(List<FakeQueueInfo> queues) {
+      for (FakeQueueInfo q: queues) {
+        queueMap.put(q.queueName, q);
+      }
+      firstQueue = new String(queues.get(0).queueName);
+    }
+    
+    public synchronized Set<String> getQueues() {
+      return queueMap.keySet();
+    }
+    
+    /*public synchronized String getFirstQueue() {
+      return firstQueue;
+    }*/
+    
+    public float getGuaranteedCapacity(String queue) {
+      return queueMap.get(queue).gc;
+    }
+    
+    public int getReclaimTimeLimit(String queue) {
+      return queueMap.get(queue).reclaimTimeLimit;
+    }
+    
+    public int getMinimumUserLimitPercent(String queue) {
+      return queueMap.get(queue).ulMin;
+    }
+    
+    public boolean isPrioritySupported(String queue) {
+      return queueMap.get(queue).supportsPrio;
+    }
+  }
+
+  protected class FakeClock extends CapacityTaskScheduler.Clock {
+    private long time = 0;
+    
+    public void advance(long millis) {
+      time += millis;
+    }
+
+    @Override
+    long getTime() {
+      return time;
+    }
+  }
+
+  
+  protected JobConf conf;
+  protected CapacityTaskScheduler scheduler;
+  private FakeTaskTrackerManager taskTrackerManager;
+  private FakeResourceManagerConf resConf;
+  private FakeClock clock;
+
+  @Override
+  protected void setUp() throws Exception {
+    jobCounter = 0;
+    taskTrackerManager = new FakeTaskTrackerManager();
+    clock = new FakeClock();
+    scheduler = new CapacityTaskScheduler(clock);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+
+    conf = new JobConf();
+    // set interval to a large number so thread doesn't interfere with us
+    conf.setLong("mapred.capacity-scheduler.reclaimCapacity.interval", 500);
+    scheduler.setConf(conf);
+    
+  }
+  
+  @Override
+  protected void tearDown() throws Exception {
+    if (scheduler != null) {
+      scheduler.terminate();
+    }
+  }
+  
+  private FakeJobInProgress submitJob(int state, int maps, int reduces, 
+      String queue, String user) throws IOException {
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setNumMapTasks(maps);
+    jobConf.setNumReduceTasks(reduces);
+    if (queue != null)
+      jobConf.setQueueName(queue);
+    FakeJobInProgress job = new FakeJobInProgress(
+        new JobID("test", ++jobCounter), jobConf, taskTrackerManager, user);
+    job.getStatus().setRunState(state);
+    taskTrackerManager.submitJob(job);
+    return job;
+  }
+  
+  /*protected void submitJobs(int number, int state, int maps, int reduces)
+    throws IOException {
+    for (int i = 0; i < number; i++) {
+      submitJob(state, maps, reduces);
+    }
+  }*/
+  
+  // basic tests, should be able to submit to queues
+  public void testSubmitToQueues() throws Exception {
+    // set up some queues
+    String[] qs = {"default", "q2"};
+    taskTrackerManager.addQueues(qs);
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    // submit a job with no queue specified. It should be accepted
+    // and given to the default queue. 
+    JobInProgress j = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+    // when we ask for a task, we should get one, from the job submitted
+    Task t;
+    t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    // submit another job, to a different queue
+    j = submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+    // now when we get a task, it should be from the second job
+    t = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+  }
+  
+  // test capacity transfer
+  public void testCapacityTransfer() throws Exception {
+    // set up some queues
+    String[] qs = {"default", "q2"};
+    taskTrackerManager.addQueues(qs);
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    // submit a job  
+    submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+    // for queue 'q2', the GC for maps is 2. Since we're the only user, 
+    // we should get a task 
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    // I should get another map task. 
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    // Now we're at full capacity for maps. If I ask for another map task,
+    // I should get a map task from the default queue's capacity. 
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    // and another
+    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+  }
+
+  // test user limits
+  public void testUserLimits() throws Exception {
+    // set up some queues
+    String[] qs = {"default", "q2"};
+    taskTrackerManager.addQueues(qs);
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    // submit a job  
+    submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+    // for queue 'q2', the GC for maps is 2. Since we're the only user, 
+    // we should get a task 
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    // Submit another job, from a different user
+    submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+    // Now if I ask for a map task, it should come from the second job 
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    // Now we're at full capacity for maps. If I ask for another map task,
+    // I should get a map task from the default queue's capacity. 
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    // and another
+    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+  }
+
+  // test user limits when a 2nd job is submitted much after first job 
+  public void testUserLimits2() throws Exception {
+    // set up some queues
+    String[] qs = {"default", "q2"};
+    taskTrackerManager.addQueues(qs);
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    // submit a job  
+    submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+    // for queue 'q2', the GC for maps is 2. Since we're the only user, 
+    // we should get a task 
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    // since we're the only job, we get another map
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    // Submit another job, from a different user
+    submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+    // Now if I ask for a map task, it should come from the second job 
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    // and another
+    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+  }
+
+  // test user limits when a 2nd job is submitted much after first job 
+  // and we need to wait for first job's task to complete
+  public void testUserLimits3() throws Exception {
+    // set up some queues
+    String[] qs = {"default", "q2"};
+    taskTrackerManager.addQueues(qs);
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    // submit a job  
+    FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+    // for queue 'q2', the GC for maps is 2. Since we're the only user, 
+    // we should get a task 
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    // since we're the only job, we get another map
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    // we get two more maps from 'default queue'
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+    // Submit another job, from a different user
+    FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+    // one of the task finishes
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
+    // Now if I ask for a map task, it should come from the second job 
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    // another task from job1 finishes, another new task to job2
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
+    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    // now we have equal number of tasks from each job. Whichever job's
+    // task finishes, that job gets a new task
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000003_0", j1);
+    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
+    checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
+  }
+
+  // test user limits with many users, more slots
+  public void testUserLimits4() throws Exception {
+    // set up one queue, with 10 slots
+    String[] qs = {"default"};
+    taskTrackerManager.addQueues(qs);
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 10000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+    // add some more TTs 
+    taskTrackerManager.addTaskTracker("tt3");
+    taskTrackerManager.addTaskTracker("tt4");
+    taskTrackerManager.addTaskTracker("tt5");
+
+    // u1 submits job
+    FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+    // it gets the first 5 slots
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+    checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
+    // u2 submits job with 4 slots
+    FakeJobInProgress j2 = submitJob(JobStatus.PREP, 4, 4, null, "u2");
+    // u2 should get next 4 slots
+    checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
+    checkAssignment("tt4", "attempt_test_0002_m_000002_0 on tt4");
+    checkAssignment("tt4", "attempt_test_0002_m_000003_0 on tt4");
+    checkAssignment("tt5", "attempt_test_0002_m_000004_0 on tt5");
+    // last slot should go to u1, since u2 has no more tasks
+    checkAssignment("tt5", "attempt_test_0001_m_000006_0 on tt5");
+    // u1 finishes a task
+    taskTrackerManager.finishTask("tt5", "attempt_test_0001_m_000006_0", j1);
+    // u1 submits a few more jobs 
+    submitJob(JobStatus.PREP, 10, 10, null, "u1");
+    submitJob(JobStatus.PREP, 10, 10, null, "u1");
+    submitJob(JobStatus.PREP, 10, 10, null, "u1");
+    // u2 also submits a job
+    submitJob(JobStatus.PREP, 10, 10, null, "u2");
+    // now u3 submits a job
+    submitJob(JobStatus.PREP, 2, 2, null, "u3");
+    // next slot should go to u3, even though u2 has an earlier job, since
+    // user limits have changed and u1/u2 are over limits
+    checkAssignment("tt5", "attempt_test_0007_m_000001_0 on tt5");
+    // some other task finishes and u3 gets it
+    taskTrackerManager.finishTask("tt5", "attempt_test_0002_m_000004_0", j1);
+    checkAssignment("tt5", "attempt_test_0007_m_000002_0 on tt5");
+    // now, u2 finishes a task
+    taskTrackerManager.finishTask("tt4", "attempt_test_0002_m_000002_0", j1);
+    // next slot will go to u1, since u3 has nothing to run and u1's job is 
+    // first in the queue
+    checkAssignment("tt4", "attempt_test_0001_m_000007_0 on tt4");
+  }
+
+  // test code to reclaim capacity
+  public void testReclaimCapacity() throws Exception {
+    // set up some queues
+    String[] qs = {"default", "q2", "q3"};
+    taskTrackerManager.addQueues(qs);
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 25.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("q3", 25.0f, 1000000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    // set up a situation where q2 is under capacity, and default & q3
+    // are at/over capacity
+    FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+    FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q3", "u1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    // now submit a job to q2
+    FakeJobInProgress j3 = submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+    // update our structures
+    scheduler.updateQSIInfo();
+    // get scheduler to notice that q2 needs to reclaim
+    scheduler.reclaimCapacity();
+    // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
+    // we start reclaiming when 15 secs are left. 
+    clock.advance(400000);
+    scheduler.reclaimCapacity();
+    // no tasks should have been killed yet
+    assertEquals(j1.runningMapTasks, 3);
+    assertEquals(j2.runningMapTasks, 1);
+    clock.advance(200000);
+    scheduler.reclaimCapacity();
+    // task from j1 will be killed
+    assertEquals(j1.runningMapTasks, 2);
+    assertEquals(j2.runningMapTasks, 1);
+    
+  }
+
+  // test code to reclaim multiple capacity 
+  public void testReclaimCapacity2() throws Exception {
+    // set up some queues
+    String[] qs = {"default", "q2", "q3", "q4"};
+    taskTrackerManager.addQueues(qs);
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 20.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("q3", 20.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("q4", 10.0f, 1000000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+    
+    // add some more TTs so our total map capacity is 10
+    taskTrackerManager.addTaskTracker("tt3");
+    taskTrackerManager.addTaskTracker("tt4");
+    taskTrackerManager.addTaskTracker("tt5");
+
+    // q2 has nothing running, default is under cap, q3 and q4 are over cap
+    FakeJobInProgress j1 = submitJob(JobStatus.PREP, 2, 2, null, "u1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q3", "u1");
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    FakeJobInProgress j3 = submitJob(JobStatus.PREP, 10, 10, "q4", "u1");
+    checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
+    checkAssignment("tt4", "attempt_test_0003_m_000002_0 on tt4");
+    checkAssignment("tt4", "attempt_test_0002_m_000004_0 on tt4");
+    checkAssignment("tt5", "attempt_test_0002_m_000005_0 on tt5");
+    checkAssignment("tt5", "attempt_test_0003_m_000003_0 on tt5");
+    // at this point, q3 is running 5 tasks (with a cap of 2), q4 is
+    // running 3 tasks (with a cap of 1). 
+    // If we submit a job to 'default', we need to get 3 slots back. 
+    FakeJobInProgress j4 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+    // update our structures
+    scheduler.updateQSIInfo();
+    // get scheduler to notice that q2 needs to reclaim
+    scheduler.reclaimCapacity();
+    // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
+    // we start reclaiming when 15 secs are left. 
+    clock.advance(400000);
+    scheduler.reclaimCapacity();
+    // nothing should have happened
+    assertEquals(j2.runningMapTasks, 5);
+    assertEquals(j3.runningMapTasks, 3);
+    // 3 tasks to kill, 5 running over cap. q3 should give up 3*3/5 = 2 slots.
+    // q4 should give up 2*3/5 = 1 slot. 
+    clock.advance(200000);
+    scheduler.reclaimCapacity();
+    assertEquals(j2.runningMapTasks, 3);
+    assertEquals(j3.runningMapTasks, 2);
+    
+  }
+
+  // test code to reclaim capacity in steps
+  public void testReclaimCapacityInSteps() throws Exception {
+    // set up some queues
+    String[] qs = {"default", "q2"};
+    taskTrackerManager.addQueues(qs);
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, 1000000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    // set up a situation where q2 is under capacity, and default is
+    // at/over capacity
+    FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+    // now submit a job to q2
+    FakeJobInProgress j2 = submitJob(JobStatus.PREP, 1, 1, "q2", "u1");
+    // update our structures
+    scheduler.updateQSIInfo();
+    // get scheduler to notice that q2 needs to reclaim
+    scheduler.reclaimCapacity();
+    // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
+    // we start reclaiming when 15 secs are left. 
+    clock.advance(400000);
+    // submit another job to q2 which causes more capacity to be reclaimed
+    j2 = submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+    // update our structures
+    scheduler.updateQSIInfo();
+    clock.advance(200000);
+    scheduler.reclaimCapacity();
+    // one task from j1 will be killed
+    assertEquals(j1.runningMapTasks, 3);
+    clock.advance(300000);
+    scheduler.reclaimCapacity();
+    // timer for 2nd job hasn't fired, so nothing killed
+    assertEquals(j1.runningMapTasks, 3);
+    clock.advance(400000);
+    scheduler.reclaimCapacity();
+    // one task from j1 will be killed
+    assertEquals(j1.runningMapTasks, 2);
+    
+  }
+
+  protected TaskTrackerStatus tracker(String taskTrackerName) {
+    return taskTrackerManager.getTaskTracker(taskTrackerName);
+  }
+  
+  protected Task checkAssignment(String taskTrackerName,
+      String expectedTaskString) throws IOException {
+    List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+    assertNotNull(expectedTaskString, tasks);
+    assertEquals(expectedTaskString, 1, tasks.size());
+    assertEquals(expectedTaskString, tasks.get(0).toString());
+    return tasks.get(0);
+  }
+  
+}

+ 245 - 0
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java

@@ -0,0 +1,245 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+
+public class TestCapacitySchedulerConf extends TestCase {
+
+  private static String testDataDir = System.getProperty("test.build.data");
+  private static String testConfFile;
+  
+  private Map<String, String> defaultProperties;
+  private CapacitySchedulerConf testConf;
+  private PrintWriter writer;
+  
+  static {
+    if (testDataDir == null) {
+      testDataDir = ".";
+    } else {
+      new File(testDataDir).mkdirs();
+    }
+    testConfFile = new File(testDataDir, "test-conf.xml").getAbsolutePath();
+  }
+  
+  public TestCapacitySchedulerConf() {
+    defaultProperties = setupQueueProperties(
+        new String[] { "guaranteed-capacity", 
+                       "reclaim-time-limit",
+                       "supports-priority",
+                       "minimum-user-limit-percent" }, 
+        new String[] { "100", 
+                        "300",
+                        "false", 
+                        "100" }
+                      );
+  }
+
+  
+  public void setUp() throws IOException {
+    openFile();
+  }
+  
+  public void tearDown() throws IOException {
+    File confFile = new File(testConfFile);
+    if (confFile.exists()) {
+      confFile.delete();  
+    }
+  }
+  
+  public void testDefaults() {
+    testConf = new CapacitySchedulerConf();
+    Map<String, Map<String, String>> queueDetails
+                            = new HashMap<String, Map<String,String>>();
+    queueDetails.put("default", defaultProperties);
+    checkQueueProperties(testConf, queueDetails);
+  }
+  
+  public void testQueues() {
+
+    Map<String, String> q1Props = setupQueueProperties(
+        new String[] { "guaranteed-capacity", 
+                       "reclaim-time-limit",
+                       "supports-priority",
+                       "minimum-user-limit-percent" }, 
+        new String[] { "10", 
+                        "600",
+                        "true",
+                        "25" }
+                      );
+
+    Map<String, String> q2Props = setupQueueProperties(
+        new String[] { "guaranteed-capacity", 
+                       "reclaim-time-limit",
+                       "supports-priority",
+                       "minimum-user-limit-percent" }, 
+        new String[] { "100", 
+                        "6000",
+                        "false", 
+                        "50" }
+                      );
+
+    startConfig();
+    writeQueueDetails("default", q1Props);
+    writeQueueDetails("research", q2Props);
+    endConfig();
+
+    testConf = new CapacitySchedulerConf(new Path(testConfFile));
+
+    Map<String, Map<String, String>> queueDetails
+              = new HashMap<String, Map<String,String>>();
+    queueDetails.put("default", q1Props);
+    queueDetails.put("research", q2Props);
+    checkQueueProperties(testConf, queueDetails);
+  }
+  
+  public void testQueueWithDefaultProperties() {
+    Map<String, String> q1Props = setupQueueProperties(
+        new String[] { "guaranteed-capacity", 
+                       "minimum-user-limit-percent" }, 
+        new String[] { "20", 
+                        "75" }
+                      );
+    startConfig();
+    writeQueueDetails("default", q1Props);
+    endConfig();
+
+    testConf = new CapacitySchedulerConf(new Path(testConfFile));
+
+    Map<String, Map<String, String>> queueDetails
+              = new HashMap<String, Map<String,String>>();
+    Map<String, String> expProperties = new HashMap<String, String>();
+    for (String key : q1Props.keySet()) {
+      expProperties.put(key, q1Props.get(key));
+    }
+    expProperties.put("reclaim-time-limit", "300");
+    expProperties.put("supports-priority", "false");
+    queueDetails.put("default", expProperties);
+    checkQueueProperties(testConf, queueDetails);
+  }
+
+  public void testReload() throws IOException {
+    // use the setup in the test case testQueues as a base...
+    testQueues();
+    
+    // write new values to the file...
+    Map<String, String> q1Props = setupQueueProperties(
+        new String[] { "guaranteed-capacity", 
+                       "reclaim-time-limit",
+                       "supports-priority",
+                       "minimum-user-limit-percent" }, 
+        new String[] { "20.5", 
+                        "600",
+                        "true", 
+                        "40" }
+                      );
+
+    Map<String, String> q2Props = setupQueueProperties(
+        new String[] { "guaranteed-capacity", 
+                       "reclaim-time-limit",
+                       "supports-priority",
+                       "minimum-user-limit-percent" }, 
+        new String[] { "100", 
+                        "3000",
+                        "false",
+                        "50" }
+                      );
+
+    openFile();
+    startConfig();
+    writeQueueDetails("default", q1Props);
+    writeQueueDetails("production", q2Props);
+    endConfig();
+    
+    testConf.reloadConfiguration();
+    
+    Map<String, Map<String, String>> queueDetails 
+                      = new HashMap<String, Map<String, String>>();
+    queueDetails.put("default", q1Props);
+    queueDetails.put("production", q2Props);
+    checkQueueProperties(testConf, queueDetails);
+  }
+
+  private void checkQueueProperties(
+                        CapacitySchedulerConf testConf,
+                        Map<String, Map<String, String>> queueDetails) {
+    for (String queueName : queueDetails.keySet()) {
+      Map<String, String> map = queueDetails.get(queueName);
+      assertEquals(Float.parseFloat(map.get("guaranteed-capacity")),
+           testConf.getGuaranteedCapacity(queueName));
+      assertEquals(Integer.parseInt(map.get("minimum-user-limit-percent")),
+          testConf.getMinimumUserLimitPercent(queueName));
+      assertEquals(Integer.parseInt(map.get("reclaim-time-limit")),
+          testConf.getReclaimTimeLimit(queueName));
+      assertEquals(Boolean.parseBoolean(map.get("supports-priority")),
+          testConf.isPrioritySupported(queueName));
+    }
+  }
+  
+  private Map<String, String> setupQueueProperties(String[] keys, 
+                                                String[] values) {
+    HashMap<String, String> map = new HashMap<String, String>();
+    for(int i=0; i<keys.length; i++) {
+      map.put(keys[i], values[i]);
+    }
+    return map;
+  }
+
+  private void openFile() throws IOException {
+    
+    if (testDataDir != null) {
+      File f = new File(testDataDir);
+      f.mkdirs();
+    }
+    FileWriter fw = new FileWriter(testConfFile);
+    BufferedWriter bw = new BufferedWriter(fw);
+    writer = new PrintWriter(bw);
+  }
+  
+  private void startConfig() {
+    writer.println("<?xml version=\"1.0\"?>");
+    writer.println("<configuration>");
+  }
+  
+  private void writeQueueDetails(String queue, Map<String, String> props) {
+    for (String key : props.keySet()) {
+      writer.println("<property>");
+      writer.println("<name>mapred.capacity-scheduler.queue." 
+                        + queue + "." + key +
+                    "</name>");
+      writer.println("<value>"+props.get(key)+"</value>");
+      writer.println("</property>");
+    }
+  }
+  
+  private void endConfig() {
+    writer.println("</configuration>");
+    writer.close();
+  }
+  
+}

+ 5 - 0
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

@@ -156,6 +156,11 @@ public class TestFairScheduler extends TestCase {
       listeners.remove(listener);
     }
     
+    @Override
+    public int getNextHeartbeatInterval() {
+      return MRConstants.HEARTBEAT_INTERVAL_MIN;
+    }
+    
     // Test methods
     
     public void submitJob(JobInProgress job) {

+ 54 - 3
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -72,6 +72,10 @@ class JobInProgress {
   int finishedReduceTasks = 0;
   int failedMapTasks = 0; 
   int failedReduceTasks = 0;
+  // runningMapTasks include speculative tasks, so we need to capture 
+  // speculative tasks separately 
+  int speculativeMapTasks = 0;
+  int speculativeReduceTasks = 0;
   
   int mapFailuresPercent = 0;
   int reduceFailuresPercent = 0;
@@ -441,6 +445,14 @@ class JobInProgress {
   public synchronized int finishedReduces() {
     return finishedReduceTasks;
   }
+  public synchronized int pendingMaps() {
+    return numMapTasks - runningMapTasks - failedMapTasks - 
+    finishedMapTasks + speculativeMapTasks;
+  }
+  public synchronized int pendingReduces() {
+    return numReduceTasks - runningReduceTasks - failedReduceTasks - 
+    finishedReduceTasks + speculativeReduceTasks;
+  }
   public JobPriority getPriority() {
     return this.priority;
   }
@@ -484,7 +496,34 @@ class JobInProgress {
   TaskInProgress[] getReduceTasks() {
     return reduces;
   }
-    
+
+  /**
+   * Return the nonLocalRunningMaps
+   * @return
+   */
+  Set<TaskInProgress> getNonLocalRunningMaps()
+  {
+    return nonLocalRunningMaps;
+  }
+  
+  /**
+   * Return the runningMapCache
+   * @return
+   */
+  Map<Node, Set<TaskInProgress>> getRunningMapCache()
+  {
+    return runningMapCache;
+  }
+  
+  /**
+   * Return runningReduces
+   * @return
+   */
+  Set<TaskInProgress> getRunningReduces()
+  {
+    return runningReduces;
+  }
+  
   /**
    * Get the job configuration
    * @return the job's configuration
@@ -738,6 +777,8 @@ class JobInProgress {
     Task result = maps[target].getTaskToRun(tts.getTrackerName());
     if (result != null) {
       runningMapTasks += 1;
+      if (maps[target].getActiveTasks().size() > 1)
+        speculativeMapTasks++;
       if (maps[target].isFirstAttempt(result.getTaskID())) {
         JobHistory.Task.logStarted(maps[target].getTIPId(), Values.MAP.name(),
                                    System.currentTimeMillis(),
@@ -849,6 +890,8 @@ class JobInProgress {
     Task result = reduces[target].getTaskToRun(tts.getTrackerName());
     if (result != null) {
       runningReduceTasks += 1;
+      if (reduces[target].getActiveTasks().size() > 1)
+        speculativeReduceTasks++;
       if (reduces[target].isFirstAttempt(result.getTaskID())) {
         JobHistory.Task.logStarted(reduces[target].getTIPId(), Values.REDUCE.name(),
                                    System.currentTimeMillis(), "");
@@ -1467,6 +1510,7 @@ class JobInProgress {
                                          JobTrackerInstrumentation metrics) 
   {
     TaskAttemptID taskid = status.getTaskID();
+    int oldNumAttempts = tip.getActiveTasks().size();
         
     // Sanity check: is the TIP already complete? 
     // It _is_ safe to not decrement running{Map|Reduce}Tasks and
@@ -1514,10 +1558,14 @@ class JobInProgress {
                                   status.getCounters()); 
     }
         
-    // Update the running/finished map/reduce counts
+    int newNumAttempts = tip.getActiveTasks().size();
     if (!tip.isCleanupTask()) {
       if (tip.isMapTask()) {
         runningMapTasks -= 1;
+        // check if this was a sepculative task
+        if (oldNumAttempts > 1) {
+          speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
+        }
         finishedMapTasks += 1;
         metrics.completeMap(taskid);
         // remove the completed map from the resp running caches
@@ -1527,6 +1575,9 @@ class JobInProgress {
         }
       } else {
         runningReduceTasks -= 1;
+        if (oldNumAttempts > 1) {
+          speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
+        }
         finishedReduceTasks += 1;
         metrics.completeReduce(taskid);
         // remove the completed reduces from the running reducers set
@@ -1620,7 +1671,7 @@ class JobInProgress {
       jobKilled = true;
     }
   }
-
+  
   /**
    * A task assigned to this JobInProgress has reported in as failed.
    * Most of the time, we'll just reschedule execution.  However, after

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -1272,7 +1272,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    * Heartbeat interval is incremented 1second for every 50 nodes. 
    * @return next heartbeat interval.
    */
-  private int getNextHeartbeatInterval() {
+  public int getNextHeartbeatInterval() {
     // get the no of task trackers
     int clusterSize = getClusterStatus().getTaskTrackers();
     int heartbeatInterval =  Math.max(

+ 0 - 447
src/mapred/org/apache/hadoop/mapred/ResourceManagerConf.java

@@ -1,447 +0,0 @@
-/** Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.PrintWriter;
-import java.util.Arrays;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Class providing access to resource manager configuration.
- * 
- * Resource manager configuration involves setting up queues, and defining
- * various properties for the queues. These are typically read from a file 
- * called resource-manager-conf.xml that must be in the classpath of the
- * application. The class provides APIs to get/set and reload the 
- * configuration for the queues.
- */
-class ResourceManagerConf {
-  
-  /** Default file name from which the resource manager configuration is read. */ 
-  public static final String RM_CONF_FILE = "resource-manager-conf.xml";
-
-  /** Default value for guaranteed capacity of maps.
-   * The default value is set to <code>Integer.MAX_VALUE</code>, the idea
-   * being that the default is suitable for organizations that do not
-   * require setting up any queues. 
-   */ 
-  public static final int DEFAULT_GUARANTEED_CAPACITY_MAPS = 
-                                                    Integer.MAX_VALUE;
-
-  /** Default value for guaranteed capacity of reduces.
-   * The default value is set to <code>Integer.MAX_VALUE</code>, the idea
-   * being that the default is suitable for organizations that do not
-   * require setting up any queues. 
-   */ 
-  public static final int DEFAULT_GUARANTEED_CAPACITY_REDUCES = 
-                                                    Integer.MAX_VALUE;
-
-  /** Default value for reclaiming redistributed resources.
-   * The default value is set to <code>0</code>, the idea
-   * being that the default is suitable for organizations that do not
-   * require setting up any queues.
-   */ 
-  public static final int DEFAULT_RECLAIM_TIME_LIMIT = 0;
-  
-  /** Default value for minimum resource limit per user per queue, as a 
-   * percentage.
-   * The default value is set to <code>100</code>, the idea
-   * being that the default is suitable for organizations that do not
-   * require setting up any queues.
-   */ 
-  public static final int DEFAULT_MIN_USER_LIMIT_PERCENT = 100;
-
-  private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX 
-                                              = "hadoop.rm.queue.";
-
-  private Configuration rmConf;
-  private Set<String> queues;
-  
-  /**
-   * Create a new ResourceManagerConf.
-   * This method reads from the default configuration file mentioned in
-   * {@link RM_CONF_FILE}, that must be present in the classpath of the
-   * application.
-   */
-  public ResourceManagerConf() {
-    rmConf = new Configuration(false);
-    rmConf.addResource(RM_CONF_FILE);
-  }
-
-  /**
-   * Create a new ResourceManagerConf reading the specified configuration
-   * file.
-   * 
-   * @param configFile {@link Path} to the configuration file containing
-   * the resource manager configuration.
-   */
-  public ResourceManagerConf(Path configFile) {
-    rmConf = new Configuration(false);
-    rmConf.addResource(configFile);
-  }
-  
-  /**
-   * Return the set of configured queue names.
-   * @return set of configured queue names.
-   */
-  public synchronized Set<String> getQueues() {
-    if (queues == null) {
-      String[] vals = rmConf.getStrings("hadoop.rm.queue.names");
-      queues = new TreeSet<String>();
-      for (String val : vals) {
-        queues.add(val);
-      }
-    }
-    return queues;
-  }
-  
-  /**
-   * Define the set of queues known to this configuration.
-   * This will override the queue names that are read from the
-   * configuration file. 
-   * @param newQueues Set of queue names
-   */
-  public synchronized void setQueues(Set<String> newQueues) {
-    StringBuffer queueSb = new StringBuffer();
-    for (String queue : newQueues) {
-      queueSb.append(queue).append(',');
-    }
-    if (newQueues.size() > 0) {
-      String value = queueSb.substring(0, queueSb.length());
-      rmConf.set("hadoop.rm.queue.names", value);
-      queues = null;
-    }
-  }
-  
-  /**
-   * Get the guaranteed number of maps for the specified queue.
-   * 
-   * This method defaults to {@link #DEFAULT_GUARANTEED_CAPACITY_MAPS} if
-   * no value is specified in the configuration for this queue. If the queue
-   * name is unknown, this method throws a {@link IllegalArgumentException}
-   * @param queue name of the queue
-   * @return guaranteed number of maps for this queue.
-   */
-  public int getGuaranteedCapacityMaps(String queue) {
-    checkQueue(queue);
-    return rmConf.getInt(toFullPropertyName(queue, 
-                          "guaranteed-capacity-maps"), 
-                          DEFAULT_GUARANTEED_CAPACITY_MAPS);
-  }
-
-  /**
-   * Set the guaranteed number of maps for the specified queue
-   * @param queue Name of the queue
-   * @param value Guaranteed number of maps.
-   */
-  public void setGuaranteedCapacityMaps(String queue, int value) {
-    checkQueue(queue);
-    rmConf.setInt(toFullPropertyName(queue,"guaranteed-capacity-maps"), 
-                    value);
-  }
-  
-  /**
-   * Get the guaranteed number of reduces for the specified queue.
-   * 
-   * This method defaults to {@link #DEFAULT_GUARANTEED_CAPACITY_REDUCES} if
-   * no value is specified in the configuration for this queue. If the queue
-   * name is unknown, this method throws a {@link IllegalArgumentException}
-   * @param queue name of the queue
-   * @return guaranteed number of reduces for this queue.
-   */
-  public int getGuaranteedCapacityReduces(String queue) {
-    checkQueue(queue);
-    return rmConf.getInt(toFullPropertyName(queue,
-                          "guaranteed-capacity-reduces"), 
-                          DEFAULT_GUARANTEED_CAPACITY_REDUCES);
-  }
-
-  /**
-   * Set the guaranteed number of reduces for the specified queue
-   * @param queue Name of the queue
-   * @param value Guaranteed number of reduces.
-   */
-  public void setGuaranteedCapacityReduces(String queue, int value) {
-    checkQueue(queue);
-    rmConf.setInt(toFullPropertyName(queue,"guaranteed-capacity-reduces"), 
-                    value);
-  }
-  
-  /**
-   * Get the amount of time before which redistributed resources must be
-   * reclaimed for the specified queue.
-   * 
-   * The resource manager distributes spare capacity from a free queue
-   * to ones which are in need for more resources. However, if a job 
-   * submitted to the first queue requires back the resources, they must
-   * be reclaimed within the specified configuration time limit.
-   * 
-   * This method defaults to {@link #DEFAULT_RECLAIM_TIME_LIMIT} if
-   * no value is specified in the configuration for this queue. If the queue
-   * name is unknown, this method throws a {@link IllegalArgumentException}
-   * @param queue name of the queue
-   * @return reclaim time limit for this queue.
-   */
-  public int getReclaimTimeLimit(String queue) {
-    checkQueue(queue);
-    return rmConf.getInt(toFullPropertyName(queue, "reclaim-time-limit"), 
-                          DEFAULT_RECLAIM_TIME_LIMIT);
-  }
-  
-  /**
-   * Set the amount of time before which redistributed resources must be
-   * reclaimed for the specified queue.
-   * @param queue Name of the queue
-   * @param value Amount of time before which the redistributed resources
-   * must be retained.
-   */
-  public void setReclaimTimeLimit(String queue, int value) {
-    checkQueue(queue);
-    rmConf.setInt(toFullPropertyName(queue, "reclaim-time-limit"), value);
-  }
-  
-  /**
-   * Get the list of users who can submit jobs to this queue.
-   * 
-   * This method defaults to <code>null</code> if no value is specified 
-   * in the configuration for this queue. If the queue
-   * name is unknown, this method throws a {@link IllegalArgumentException}
-   * @param queue name of the queue
-   * @return list of users who can submit jobs to this queue.
-   */
-  public String[] getAllowedUsers(String queue) {
-    checkQueue(queue);
-    return rmConf.getStrings(toFullPropertyName(queue, "allowed-users"), 
-                              (String[])null);
-  }
-  
-  /**
-   * Set the list of users who can submit jobs to this queue.
-   * 
-   * If the queue name is unknown, this method throws a 
-   * {@link IllegalArgumentException}
-   * @param queue name of the queue
-   * @param values list of users allowed to submit jobs to this queue.
-   */
-  public void setAllowedUsers(String queue, String... values) {
-    checkQueue(queue);
-    rmConf.setStrings(toFullPropertyName(queue, "allowed-users"), values);
-  }
-  
-  /**
-   * Get the list of users who cannot submit jobs to this queue.
-   * 
-   * This method defaults to <code>null</code> if no value is specified 
-   * in the configuration for this queue. If the queue
-   * name is unknown, this method throws a {@link IllegalArgumentException}
-   * @param queue name of the queue
-   * @return list of users who cannot submit jobs to this queue.
-   */
-  public String[] getDeniedUsers(String queue) {
-    checkQueue(queue);
-    return rmConf.getStrings(toFullPropertyName(queue, "denied-users"), 
-                              (String[])null);
-  }
-
-  /**
-   * Set the list of users who cannot submit jobs to this queue.
-   * 
-   * If the queue name is unknown, this method throws a 
-   * {@link IllegalArgumentException}
-   * @param queue name of the queue
-   * @param values list of users denied to submit jobs to this queue.
-   */
-  public void setDeniedUsers(String queue, String... values) {
-    checkQueue(queue);
-    rmConf.setStrings(toFullPropertyName(queue, "denied-users"), values);
-  }
-
-  /**
-   * Get whether users present in allowed users list will override values
-   * in the denied user list for this queue.
-   * 
-   * If a user name is specified in both the allowed user list and the denied
-   * user list, this configuration determines whether to honor the allowed
-   * user list or the denied user list. This method defaults to 
-   * <code>false</code> if no value is specified in the configuration for 
-   * this queue. If the queue name is unknown, this method throws a 
-   * {@link IllegalArgumentException}
-   * @param queue name of the queue
-   * @return 
-   */
-  public boolean doAllowedUsersOverride(String queue) {
-    checkQueue(queue);
-    return rmConf.getBoolean(toFullPropertyName(queue, 
-                              "allowed-users-override"), false);
-  }
-
-  /**
-   * Set whether users present in allowed users list will override values
-   * in the denied user list for this queue.
-   * 
-   * If the queue name is unknown, this method throws a 
-   * {@link IllegalArgumentException}
-   * @param queue name of the queue
-   * @param value true if the allowed users take priority, false otherwise.
-   */
-  public void setAllowedUsersOverride(String queue, boolean value) {
-    checkQueue(queue);
-    rmConf.setBoolean(toFullPropertyName(queue, "allowed-users-override"),
-                        value);
-  }
-  
-  /**
-   * Get whether priority is supported for this queue.
-   * 
-   * If this value is false, then job priorities will be ignored in 
-   * scheduling decisions. This method defaults to <code>false</code> if 
-   * the property is not configured for this queue. If the queue name is 
-   * unknown, this method throws a {@link IllegalArgumentException}
-   * @param queue name of the queue
-   * @return Whether this queue supports priority or not.
-   */
-  public boolean isPrioritySupported(String queue) {
-    checkQueue(queue);
-    return rmConf.getBoolean(toFullPropertyName(queue, "supports-priority"),
-                              false);  
-  }
-  
-  /**
-   * Set whether priority is supported for this queue.
-   * 
-   * If the queue name is unknown, this method throws a 
-   * {@link IllegalArgumentException}
-   * @param queue name of the queue
-   * @param value true, if the queue must support priorities, false otherwise.
-   */
-  public void setPrioritySupported(String queue, boolean value) {
-    checkQueue(queue);
-    rmConf.setBoolean(toFullPropertyName(queue, "supports-priority"), value);
-  }
-  
-  /**
-   * Get the minimum limit of resources for any user submitting jobs in 
-   * this queue, in percentage.
-   * 
-   * This method defaults to {@link #DEFAULT_MIN_USER_LIMIT_PERCENT} if
-   * no value is specified in the configuration for this queue. If the queue
-   * name is unknown, this method throws a {@link IllegalArgumentException}
-   * @param queue name of the queue
-   * @return minimum limit of resources, in percentage, that will be 
-   * available for a user.
-   * 
-   */
-  public int getMinimumUserLimitPercent(String queue) {
-    checkQueue(queue);
-    return rmConf.getInt(toFullPropertyName(queue, 
-                          "minimum-user-limit-percent"), 
-                          DEFAULT_MIN_USER_LIMIT_PERCENT);
-  }
-  
-  /**
-   * Set the minimum limit of resources for any user submitting jobs in
-   * this queue, in percentage.
-   * 
-   * If the queue name is unknown, this method throws a 
-   * {@link IllegalArgumentException}
-   * @param queue name of the queue
-   * @param value minimum limit of resources for any user submitting jobs
-   * in this queue
-   */
-  public void setMinimumUserLimitPercent(String queue, int value) {
-    checkQueue(queue);
-    rmConf.setInt(toFullPropertyName(queue, "minimum-user-limit-percent"), 
-                    value);
-  }
-  
-  /**
-   * Reload configuration by clearing the information read from the 
-   * underlying configuration file.
-   */
-  public synchronized void reloadConfiguration() {
-    queues = null;
-    rmConf.reloadConfiguration();
-  }
-  
-  /**
-   * Print configuration read from the underlying configuration file.
-   * 
-   * @param writer {@link PrintWriter} to which the output must be written.
-   */
-  public void printConfiguration(PrintWriter writer) {
-    
-    Set<String> queueSet = getQueues();
-    if (queueSet == null) {
-      writer.println("No queues configured.");
-      return;
-    }
-    StringBuffer sb = new StringBuffer();
-    for (String queue : queueSet) {
-      sb.append(queue).append(",");
-    }
-    writer.println("hadoop.rm.queue.names: " + sb.substring(0, sb.length()-1));
-    
-    for (String queue : queueSet) {
-      writer.println(toFullPropertyName(queue, "guaranteed-capacity-maps") + 
-                      ": " + getGuaranteedCapacityMaps(queue));
-      writer.println(toFullPropertyName(queue, "guaranteed-capacity-reduces") + 
-                      ": " + getGuaranteedCapacityReduces(queue));
-      writer.println(toFullPropertyName(queue, "reclaim-time-limit") + 
-                      ": " + getReclaimTimeLimit(queue));
-      writer.println(toFullPropertyName(queue, "minimum-user-limit-percent") +
-                      ": " + getMinimumUserLimitPercent(queue));
-      writer.println(toFullPropertyName(queue, "supports-priority") + 
-                      ": " + isPrioritySupported(queue));
-      writer.println(toFullPropertyName(queue, "allowed-users-override") +
-                      ": " + doAllowedUsersOverride(queue));
-      printUserList(writer, queue, "allowed-users", getAllowedUsers(queue));
-      printUserList(writer, queue, "denied-users", getDeniedUsers(queue));
-    }
-  }
-
-  private void printUserList(PrintWriter writer, String queue, 
-                              String listType, String[] users) {
-    if (users == null) {
-      writer.println(toFullPropertyName(queue, listType) + ": No users configured.");
-    } else {
-      writer.println(toFullPropertyName(queue, listType) + ": ");
-      Arrays.sort(users);
-      for (String user : users) {
-        writer.println("\t" + user);
-      }
-    }
-  }
-  
-  private synchronized void checkQueue(String queue) {
-    if (queues == null) {
-      queues = getQueues();
-    }
-    if (!queues.contains(queue)) {
-      throw new IllegalArgumentException("Queue " + queue + " is undefined.");
-    }
-  }
-
-  private static final String toFullPropertyName(String queue, 
-                                                  String property) {
-      return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
-  }
-}

+ 8 - 0
src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java

@@ -63,4 +63,12 @@ interface TaskTrackerManager {
    * @return the {@link QueueManager}
    */
   public QueueManager getQueueManager();
+  
+  /**
+   * Return the current heartbeat interval that's used by {@link TaskTracker}s.
+   *
+   * @return the heartbeat interval used by {@link TaskTracker}s
+   */
+  public int getNextHeartbeatInterval();
+  
 }

+ 5 - 0
src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java

@@ -144,6 +144,11 @@ public class TestJobQueueTaskScheduler extends TestCase {
       return null;
     }
     
+    @Override
+    public int getNextHeartbeatInterval() {
+      return MRConstants.HEARTBEAT_INTERVAL_MIN;
+    }
+    
     // Test methods
     
     public void submitJob(JobInProgress job) {

+ 0 - 447
src/test/org/apache/hadoop/mapred/TestResourceManagerConf.java

@@ -1,447 +0,0 @@
-/** Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.PipedReader;
-import java.io.PipedWriter;
-import java.io.PrintWriter;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.StringUtils;
-
-public class TestResourceManagerConf extends TestCase {
-
-  private static final String TEST_CONF_FILE = 
-                new File(".", "test-conf.xml").getAbsolutePath();
-  private PrintWriter writer;
-  private Map<String, String> defaultProperties;
-  private ResourceManagerConf testConf;
-  
-  public TestResourceManagerConf() {
-    defaultProperties = setupQueueProperties(
-        new String[] { "guaranteed-capacity-maps", 
-                       "guaranteed-capacity-reduces",
-                       "reclaim-time-limit",
-                       "allowed-users",
-                       "denied-users",
-                       "allowed-users-override",
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { String.valueOf(Integer.MAX_VALUE), 
-                        String.valueOf(Integer.MAX_VALUE),
-                        "0", null, null, "false",
-                        "true", "100" }
-                      );
-  }
-  
-  public void setUp() throws IOException {
-    openFile();
-  }
-  
-  public void tearDown() throws IOException {
-    File testConfFile = new File(TEST_CONF_FILE);
-    if (testConfFile.exists()) {
-      testConfFile.delete();  
-    }
-  }
-  
-  public void testDefaults() {
-    testConf = new ResourceManagerConf();
-    Map<String, Map<String, String>> queueDetails
-                            = new HashMap<String, Map<String,String>>();
-    queueDetails.put("default", defaultProperties);
-    checkQueueProperties(testConf, queueDetails);
-  }
-  
-  public void testQueues() {
-
-    Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity-maps", 
-                       "guaranteed-capacity-reduces",
-                       "reclaim-time-limit",
-                       "allowed-users",
-                       "denied-users",
-                       "allowed-users-override",
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "10", "5", "600", "u1,u2,u3", "u1,g1", "false",
-                        "true", "25" }
-                      );
-
-    Map<String, String> q2Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity-maps", 
-                       "guaranteed-capacity-reduces",
-                       "reclaim-time-limit",
-                       "allowed-users",
-                       "denied-users",
-                       "allowed-users-override",
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "100", "50", "6000", "u4,u5,u6", "u6,g2", "true",
-                        "false", "50" }
-                      );
-
-    startConfig();
-    writeQueueDetails("default", q1Props);
-    writeQueueDetails("research", q2Props);
-    writeQueues("default,research");
-    endConfig();
-
-    testConf = new ResourceManagerConf(new Path(TEST_CONF_FILE));
-
-    Map<String, Map<String, String>> queueDetails
-              = new HashMap<String, Map<String,String>>();
-    queueDetails.put("default", q1Props);
-    queueDetails.put("research", q2Props);
-    checkQueueProperties(testConf, queueDetails);
-  }
-  
-  public void testQueueWithDefaultProperties() {
-    Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity-maps", 
-                       "guaranteed-capacity-reduces",
-                       "allowed-users",
-                       "allowed-users-override",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "20", "10", "u7,u8,u9", "false", "75" }
-                      );
-    startConfig();
-    writeQueueDetails("default", q1Props);
-    writeQueues("default");
-    endConfig();
-
-    testConf = new ResourceManagerConf(new Path(TEST_CONF_FILE));
-
-    Map<String, Map<String, String>> queueDetails
-              = new HashMap<String, Map<String,String>>();
-    Map<String, String> expProperties = new HashMap<String, String>();
-    for (String key : q1Props.keySet()) {
-      expProperties.put(key, q1Props.get(key));
-    }
-    expProperties.put("reclaim-time-limit", "0");
-    expProperties.put("denied-users", null);
-    expProperties.put("supports-priority", "false");
-    queueDetails.put("default", expProperties);
-    checkQueueProperties(testConf, queueDetails);
-  }
-
-  public void testInvalidQueue() {
-    try {
-      testConf = new ResourceManagerConf();
-      testConf.getGuaranteedCapacityMaps("invalid");
-      fail("Should not return value for invalid queue");
-    } catch (IllegalArgumentException iae) {
-      assertEquals("Queue invalid is undefined.", iae.getMessage());
-    }
-  }
-
-  public void testQueueAddition() {
-    testConf = new ResourceManagerConf();
-    Set<String> queues = new HashSet<String>();
-    queues.add("default");
-    queues.add("newqueue");
-    testConf.setQueues(queues);
-    
-    testConf.setGuaranteedCapacityMaps("newqueue", 1);
-    testConf.setGuaranteedCapacityReduces("newqueue", 1);
-    testConf.setReclaimTimeLimit("newqueue", 30);
-    testConf.setMinimumUserLimitPercent("newqueue", 40);
-    testConf.setAllowedUsers("newqueue", "a", "b", "c");
-    testConf.setDeniedUsers("newqueue", "d", "e", "f");
-    testConf.setAllowedUsersOverride("newqueue", true);
-    testConf.setPrioritySupported("newqueue", false);
-    
-    Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity-maps", 
-                       "guaranteed-capacity-reduces",
-                       "reclaim-time-limit",
-                       "allowed-users",
-                       "denied-users",
-                       "allowed-users-override",
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "1", "1", "30", "a,b,c", "d,e,f", "true",
-                        "false", "40" }
-                      );
-    
-    Map<String, Map<String, String>> queueDetails = 
-                  new HashMap<String, Map<String, String>>();
-    queueDetails.put("default", defaultProperties);
-    queueDetails.put("newqueue", q1Props);
-    checkQueueProperties(testConf, queueDetails);
-  }
-  
-  public void testReload() throws IOException {
-    // use the setup in the test case testQueues as a base...
-    testQueues();
-    
-    // write new values to the file...
-    Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity-maps", 
-                       "guaranteed-capacity-reduces",
-                       "reclaim-time-limit",
-                       "allowed-users",
-                       "denied-users",
-                       "allowed-users-override",
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "20", "5", "600", "u1,u2,u3,u4", "u1,g1", "false",
-                        "true", "40" }
-                      );
-
-    Map<String, String> q2Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity-maps", 
-                       "guaranteed-capacity-reduces",
-                       "reclaim-time-limit",
-                       "allowed-users",
-                       "denied-users",
-                       "allowed-users-override",
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "1000", "500", "3000", "u4,u6", "g2", "false",
-                        "false", "50" }
-                      );
-
-    openFile();
-    startConfig();
-    writeQueueDetails("default", q1Props);
-    writeQueueDetails("production", q2Props);
-    writeQueues("default,production");
-    endConfig();
-    
-    testConf.reloadConfiguration();
-    
-    Map<String, Map<String, String>> queueDetails 
-                      = new HashMap<String, Map<String, String>>();
-    queueDetails.put("default", q1Props);
-    queueDetails.put("production", q2Props);
-    checkQueueProperties(testConf, queueDetails);
-  }
-
-  public void testPrint() throws IOException, InterruptedException {
-    
-    Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity-maps", 
-                       "guaranteed-capacity-reduces",
-                       "reclaim-time-limit",
-                       "allowed-users",
-                       "denied-users",
-                       "allowed-users-override",
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "20", "5", "600", "u2,a1,c3,b4", "u1,g1", "false",
-                        "true", "40" }
-                      );
-
-    Map<String, String> q2Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity-maps", 
-                       "guaranteed-capacity-reduces",
-                       "reclaim-time-limit",
-                       "allowed-users",
-                       "denied-users",
-                       "allowed-users-override",
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "1000", "500", "3000", "e5,d6", "g2", "false",
-                        "false", "50" }
-                      );
-    
-    startConfig();
-    writeQueueDetails("default", q1Props);
-    writeQueueDetails("research", q2Props);
-    writeQueues("default,research");
-    endConfig();
-
-    testConf = new ResourceManagerConf(new Path(TEST_CONF_FILE));
-    
-    PipedWriter pw = new PipedWriter();
-    PrintWriter writer = new PrintWriter(pw);
-    PipedReader pr = new PipedReader(pw);
-    BufferedReader br = new BufferedReader(pr);
-    
-    PrintReaderThread prThread = new PrintReaderThread(br);
-    prThread.start();
-
-    testConf.printConfiguration(writer);
-    writer.close();
-    
-    prThread.join();
-    ArrayList<String> output = prThread.getOutput();
-    
-    ListIterator iter = output.listIterator();
-    
-    assertEquals("hadoop.rm.queue.names: default,research", iter.next());
-    checkPrintOutput("default", q1Props, iter,
-                      new String[] {"a1", "b4", "c3", "u2"},
-                      new String[] {"g1", "u1"});
-    checkPrintOutput("research", q2Props, iter,
-                      new String[] {"d6", "e5" },
-                      new String[] {"g2"});
-  }
-  
-  private void checkPrintOutput(String queue, 
-                                Map<String, String> queueProps,
-                                ListIterator iter,
-                                String[] allowedUsers,
-                                String[] deniedUsers) {
-    
-    String[] propsOrder = {"guaranteed-capacity-maps", 
-                             "guaranteed-capacity-reduces",
-                             "reclaim-time-limit",
-                             "minimum-user-limit-percent",
-                             "supports-priority",
-                             "allowed-users-override"};
-    
-    for (String prop : propsOrder) {
-      assertEquals(("hadoop.rm.queue." + queue + "." + prop + ": " 
-                    + queueProps.get(prop)), iter.next());
-    }
-    
-    checkUserList(queue, iter, allowedUsers, "allowed-users");
-    checkUserList(queue, iter, deniedUsers, "denied-users");
-  }
-
-  private void checkUserList(String queue, 
-                              ListIterator iter, 
-                              String[] userList,
-                              String listType) {
-    assertEquals("hadoop.rm.queue." + queue + "." + listType + ": ",
-                    iter.next());
-    for (String user : userList) {
-      assertEquals("\t" + user, iter.next());
-    }
-  }
-
-  private void checkQueueProperties(
-                        ResourceManagerConf testConf,
-                        Map<String, Map<String, String>> queueDetails) {
-    Set<String> queues = testConf.getQueues();
-    assertEquals(queueDetails.keySet().size(), queues.size());
-    for (String name : queueDetails.keySet()) {
-      assertTrue(queues.contains(name));  
-    }
-    
-    for (String queueName : queueDetails.keySet()) {
-      Map<String, String> map = queueDetails.get(queueName);
-      assertEquals(Integer.parseInt(map.get("guaranteed-capacity-maps")),
-          testConf.getGuaranteedCapacityMaps(queueName));
-      assertEquals(Integer.parseInt(map.get("guaranteed-capacity-reduces")),
-          testConf.getGuaranteedCapacityReduces(queueName));
-      assertEquals(Integer.parseInt(map.get("minimum-user-limit-percent")),
-          testConf.getMinimumUserLimitPercent(queueName));
-      assertEquals(Integer.parseInt(map.get("reclaim-time-limit")),
-          testConf.getReclaimTimeLimit(queueName));
-      assertEquals(Boolean.parseBoolean(map.get("supports-priority")),
-          testConf.isPrioritySupported(queueName));
-      String[] expAllowedUsers = StringUtils.getStrings(
-                                    map.get("allowed-users"));
-      String[] allowedUsers = testConf.getAllowedUsers(queueName);
-      assertTrue(Arrays.equals(expAllowedUsers, allowedUsers));
-      String[] expDeniedUsers = StringUtils.getStrings(
-                                    map.get("denied-users"));
-      String[] deniedUsers = testConf.getDeniedUsers(queueName);
-      assertTrue(Arrays.equals(expDeniedUsers, deniedUsers));
-      assertEquals(Boolean.parseBoolean(map.get("allowed-users-override")),
-          testConf.doAllowedUsersOverride(queueName));
-    }
-  }
-  
-  private Map<String, String> setupQueueProperties(String[] keys, 
-                                                String[] values) {
-    HashMap<String, String> map = new HashMap<String, String>();
-    for(int i=0; i<keys.length; i++) {
-      map.put(keys[i], values[i]);
-    }
-    return map;
-  }
-
-  private void openFile() throws IOException {
-    FileWriter fw = new FileWriter(TEST_CONF_FILE);
-    BufferedWriter bw = new BufferedWriter(fw);
-    writer = new PrintWriter(bw);
-  }
-  
-  private void startConfig() {
-    writer.println("<?xml version=\"1.0\"?>");
-    writer.println("<configuration>");
-  }
-  
-  private void writeQueues(String queueList) {
-    writer.println("<property>");
-    writer.println("<name>hadoop.rm.queue.names</name>");
-    writer.println("<value>"+queueList+"</value>");
-    writer.println("</property>");
-  }
-  
-  private void writeQueueDetails(String queue, Map<String, String> props) {
-    for (String key : props.keySet()) {
-      writer.println("<property>");
-      writer.println("<name>hadoop.rm.queue." + queue + "." + key +
-                    "</name>");
-      writer.println("<value>"+props.get(key)+"</value>");
-      writer.println("</property>");
-    }
-  }
-  
-  private void endConfig() {
-    writer.println("</configuration>");
-    writer.close();
-  }
-  
-  class PrintReaderThread extends Thread {
-    
-    private BufferedReader reader;
-    private ArrayList<String> lines;
-    
-    public PrintReaderThread(BufferedReader reader) {
-      this.reader = reader;
-      lines = new ArrayList<String>();
-    }
-    
-    public void run() {
-      try {
-        String line = reader.readLine();
-        while (line != null) {
-          lines.add(line);
-          line = reader.readLine();
-        }
-      } catch (IOException ioe) {
-        lines.clear();
-      }
-    }
-    
-    public ArrayList<String> getOutput() {
-      return lines;
-    }
-  }
-}