Browse Source

Merge -r 771606:771607 from trunk to branch 0.20 to fix HADOOP-5726.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@771609 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala 16 years ago
parent
commit
3db789dc0a

+ 3 - 0
CHANGES.txt

@@ -4,6 +4,9 @@ Release 0.20.1 - Unreleased
 
   INCOMPATIBLE CHANGES
 
+    HADOOP-5726. Remove pre-emption from capacity scheduler code base.
+    (Rahul Kumar Singh via yhemanth)
+
   NEW FEATURES
 
   IMPROVEMENTS

+ 2 - 29
conf/capacity-scheduler.xml.template

@@ -8,21 +8,13 @@
 <configuration>
 
   <property>
-    <name>mapred.capacity-scheduler.queue.default.guaranteed-capacity</name>
+    <name>mapred.capacity-scheduler.queue.default.capacity</name>
     <value>100</value>
     <description>Percentage of the number of slots in the cluster that are
-      guaranteed to be available for jobs in this queue.
+      to be available for jobs in this queue.
     </description>    
   </property>
   
-  <property>
-    <name>mapred.capacity-scheduler.queue.default.reclaim-time-limit</name>
-    <value>300</value>
-    <description>The amount of time, in seconds, before which 
-      resources distributed to other queues will be reclaimed.
-    </description>
-  </property>
-
   <property>
     <name>mapred.capacity-scheduler.queue.default.supports-priority</name>
     <value>false</value>
@@ -54,28 +46,9 @@
     </description>
   </property>
   
-  
-  <property>
-    <name>mapred.capacity-scheduler.reclaimCapacity.interval</name>
-    <value>5</value>
-    <description>The time interval, in seconds, between which the scheduler
-     periodically determines whether capacity needs to be reclaimed for 
-     any queue.
-    </description>
-  </property>
-  
   <!-- The default configuration settings for the capacity task scheduler -->
   <!-- The default values would be applied to all the queues which don't have -->
   <!-- the appropriate property for the particular queue -->
-  <property>
-    <name>mapred.capacity-scheduler.default-reclaim-time-limit</name>
-    <value>300</value>
-    <description>The amount of time, in seconds, before which 
-    resources distributed to other queues will be reclaimed by default
-    in a job queue.
-    </description>
-  </property>
-  
   <property>
     <name>mapred.capacity-scheduler.default-supports-priority</name>
     <value>false</value>

+ 13 - 83
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java

@@ -34,8 +34,6 @@ class CapacitySchedulerConf {
   /** Default file name from which the resource manager configuration is read. */ 
   public static final String SCHEDULER_CONF_FILE = "capacity-scheduler.xml";
   
-  private int defaultReclaimTime;
-  
   private int defaultUlimitMinimum;
   
   private boolean defaultSupportPriority;
@@ -117,8 +115,6 @@ class CapacitySchedulerConf {
    * which is used by the Capacity Scheduler.
    */
   private void initializeDefaults() {
-    defaultReclaimTime = rmConf.getInt(
-        "mapred.capacity-scheduler.default-reclaim-time-limit",300);
     defaultUlimitMinimum = rmConf.getInt(
         "mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);
     defaultSupportPriority = rmConf.getBoolean(
@@ -129,33 +125,33 @@ class CapacitySchedulerConf {
   }
   
   /**
-   * Get the guaranteed percentage of the cluster for the specified queue.
+   * Get the percentage of the cluster for the specified queue.
    * 
-   * This method defaults to configured default Guaranteed Capacity if
+   * This method defaults to configured default Capacity if
    * no value is specified in the configuration for this queue. 
    * If the configured capacity is negative value or greater than 100 an
    * {@link IllegalArgumentException} is thrown.
    * 
-   * If default Guaranteed capacity is not configured for a queue, then
+   * If default capacity is not configured for a queue, then
    * system allocates capacity based on what is free at the time of 
    * capacity scheduler start
    * 
    * 
    * @param queue name of the queue
-   * @return guaranteed percent of the cluster for the queue.
+   * @return percent of the cluster for the queue.
    */
-  public float getGuaranteedCapacity(String queue) {
-    //Check done in order to return default GC which can be negative
-    //In case of both GC and default GC not configured.
+  public float getCapacity(String queue) {
+    //Check done in order to return default capacity which can be negative
+    //In case of both capacity and default capacity not configured.
     //Last check is if the configuration is specified and is marked as
     //negative we throw exception
     String raw = rmConf.getRaw(toFullPropertyName(queue, 
-        "guaranteed-capacity"));
+        "capacity"));
     if(raw == null) {
       return -1;
     }
     float result = rmConf.getFloat(toFullPropertyName(queue, 
-                                   "guaranteed-capacity"), 
+                                   "capacity"), 
                                    -1);
     if (result < 0.0 || result > 100.0) {
       throw new IllegalArgumentException("Illegal capacity for queue " + queue +
@@ -165,53 +161,13 @@ class CapacitySchedulerConf {
   }
   
   /**
-   * Sets the Guaranteed capacity of the given queue.
-   * 
-   * @param queue name of the queue
-   * @param gc guaranteed percent of the cluster for the queue.
-   */
-  public void setGuaranteedCapacity(String queue,float gc) {
-    rmConf.setFloat(toFullPropertyName(queue, "guaranteed-capacity"),gc);
-  }
-  
-  
-  /**
-   * Get the amount of time before which redistributed resources must be
-   * reclaimed for the specified queue.
-   * 
-   * The resource manager distributes spare capacity from a free queue
-   * to ones which are in need for more resources. However, if a job 
-   * submitted to the first queue requires back the resources, they must
-   * be reclaimed within the specified configuration time limit.
-   * 
-   * This method defaults to configured default reclaim time limit if
-   * no value is specified in the configuration for this queue.
-   * 
-   * Throws an {@link IllegalArgumentException} when invalid value is 
-   * configured.
+   * Sets the capacity of the given queue.
    * 
    * @param queue name of the queue
-   * @return reclaim time limit for this queue.
-   */
-  public int getReclaimTimeLimit(String queue) {
-    int reclaimTimeLimit = rmConf.getInt(toFullPropertyName(queue, "reclaim-time-limit"), 
-        defaultReclaimTime);
-    if(reclaimTimeLimit <= 0) {
-      throw new IllegalArgumentException("Invalid reclaim time limit : " 
-          + reclaimTimeLimit + " for queue : " + queue);
-    }
-    return reclaimTimeLimit;
-  }
-  
-  /**
-   * Set the amount of time before which redistributed resources must be
-   * reclaimed for the specified queue.
-   * @param queue Name of the queue
-   * @param value Amount of time before which the redistributed resources
-   * must be retained.
+   * @param gc percent of the cluster for the queue.
    */
-  public void setReclaimTimeLimit(String queue, int value) {
-    rmConf.setInt(toFullPropertyName(queue, "reclaim-time-limit"), value);
+  public void setCapacity(String queue,float gc) {
+    rmConf.setFloat(toFullPropertyName(queue, "capacity"),gc);
   }
   
   /**
@@ -435,30 +391,4 @@ class CapacitySchedulerConf {
   public void setDefaultPercentOfPmemInVmem(float value) {
     rmConf.setFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY, value);
   }
-  
-  /**
-   * Gets the reclaim capacity thread interval.
-   * 
-   * @return reclaim capacity interval
-   */
-
-  public long getReclaimCapacityInterval() {
-    long reclaimCapacityInterval = 
-      rmConf.getLong("mapred.capacity-scheduler.reclaimCapacity.interval", 5);
-    
-    if(reclaimCapacityInterval <= 0) {
-      throw new IllegalArgumentException("Invalid reclaim capacity " +
-      		"interval, should be greater than zero");
-    }
-    return reclaimCapacityInterval;
-  }
-  /**
-   * Sets the reclaim capacity thread interval.
-   * 
-   * @param value
-   */
-  public void setReclaimCapacityInterval(long value) {
-    rmConf.setLong("mapred.capacity-scheduler.reclaimCapacity.interval", 
-        value);
-  }
 }

+ 69 - 527
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

@@ -42,15 +42,12 @@ import org.apache.hadoop.util.StringUtils;
  * and provides a HOD-less way to share large clusters. This scheduler 
  * provides the following features: 
  *  * support for queues, where a job is submitted to a queue. 
- *  * Queues are guaranteed a fraction of the capacity of the grid (their 
- *  'guaranteed capacity') in the sense that a certain capacity of resources 
+ *  * Queues are assigned a fraction of the capacity of the grid (their
+ *  'capacity') in the sense that a certain capacity of resources 
  *  will be at their disposal. All jobs submitted to the queues of an Org 
- *  will have access to the capacity guaranteed to the Org.
- *  * Free resources can be allocated to any queue beyond its guaranteed 
- *  capacity. These excess allocated resources can be reclaimed and made 
- *  available to another queue in order to meet its capacity guarantee.
- *  * The scheduler guarantees that excess resources taken from a queue will 
- *  be restored to it within N minutes of its need for them.
+ *  will have access to the capacity to the Org.
+ *  * Free resources can be allocated to any queue beyond its 
+ *  capacity.
  *  * Queues optionally support job priorities (disabled by default). 
  *  * Within a queue, jobs with higher priority will have access to the 
  *  queue's resources before jobs with lower priority. However, once a job 
@@ -62,42 +59,11 @@ import org.apache.hadoop.util.StringUtils;
  */
 class CapacityTaskScheduler extends TaskScheduler {
   
-  /** 
-   * For keeping track of reclaimed capacity. 
-   * Whenever slots need to be reclaimed, we create one of these objects. 
-   * As the queue gets slots, the amount to reclaim gets decremented. if 
-   * we haven't reclaimed enough within a certain time, we need to kill 
-   * tasks. This object 'expires' either if all resources are reclaimed
-   * before the deadline, or the deadline passes . 
-   */
-  private static class ReclaimedResource {
-    // how much resource to reclaim
-    public int originalAmount;
-    // how much is to be reclaimed currently
-    public int currentAmount;
-    // the time, in millisecs, when this object expires. 
-    // This time is equal to the time when the object was created, plus
-    // the reclaim-time SLA for the queue.  
-    public long whenToExpire;
-    // we also keep track of when to kill tasks, in millisecs. This is a 
-    // fraction of 'whenToExpire', but we store it here so we don't 
-    // recompute it every time. 
-    public long whenToKill;
-    
-    public ReclaimedResource(int amount, long expiryTime, 
-        long whenToKill) {
-      this.originalAmount = amount;
-      this.currentAmount = amount;
-      this.whenToExpire = expiryTime;
-      this.whenToKill = whenToKill;
-    }
-  }
-
   /***********************************************************************
    * Keeping track of scheduling information for queues
    * 
    * We need to maintain scheduling information relevant to a queue (its 
-   * name, guaranteed capacity, etc), along with information specific to 
+   * name, capacity, etc), along with information specific to 
    * each kind of task, Map or Reduce (num of running tasks, pending 
    * tasks etc). 
    * 
@@ -115,58 +81,18 @@ class CapacityTaskScheduler extends TaskScheduler {
      * the actual gc, which depends on how many slots are available
      * in the cluster at any given time. 
      */
-    int guaranteedCapacity = 0;
+    int capacity = 0;
     // number of running tasks
     int numRunningTasks = 0;
-    // number of pending tasks
-    int numPendingTasks = 0;
     /** for each user, we need to keep track of number of running tasks */
     Map<String, Integer> numRunningTasksByUser = 
       new HashMap<String, Integer>();
     
-    /**
-     * We need to keep track of resources to reclaim. 
-     * Whenever a queue is under capacity and has tasks pending, we offer it 
-     * an SLA that gives it free slots equal to or greater than the gap in 
-     * its capacity, within a period of time (reclaimTime). 
-     * To do this, we periodically check if queues need to reclaim capacity. 
-     * If they do, we create a ResourceReclaim object. We also periodically
-     * check if a queue has received enough free slots within, say, 80% of 
-     * its reclaimTime. If not, we kill enough tasks to make up the 
-     * difference. 
-     * We keep two queues of ResourceReclaim objects. when an object is 
-     * created, it is placed in one queue. Once we kill tasks to recover 
-     * resources for that object, it is placed in an expiry queue. we need
-     * to do this to prevent creating spurious ResourceReclaim objects. We 
-     * keep a count of total resources that are being reclaimed. This count 
-     * is decremented when an object expires. 
-     */
-    
-    /**
-     * the list of resources to reclaim. This list is always sorted so that
-     * resources that need to be reclaimed sooner occur earlier in the list.
-     */
-    LinkedList<ReclaimedResource> reclaimList = 
-      new LinkedList<ReclaimedResource>();
-    /**
-     * the list of resources to expire. This list is always sorted so that
-     * resources that need to be expired sooner occur earlier in the list.
-     */
-    LinkedList<ReclaimedResource> reclaimExpireList = 
-      new LinkedList<ReclaimedResource>();
-    /** 
-     * sum of all resources that are being reclaimed. 
-     * We keep this to prevent unnecessary ReclaimResource objects from being
-     * created.  
-     */
-    int numReclaimedResources = 0;
-    
     /**
      * reset the variables associated with tasks
      */
     void resetTaskVars() {
       numRunningTasks = 0;
-      numPendingTasks = 0;
       for (String s: numRunningTasksByUser.keySet()) {
         numRunningTasksByUser.put(s, 0);
       }
@@ -176,11 +102,11 @@ class CapacityTaskScheduler extends TaskScheduler {
      * return information about the tasks
      */
     public String toString(){
-      float runningTasksAsPercent = guaranteedCapacity!= 0 ? 
-          ((float)numRunningTasks * 100/guaranteedCapacity):0;
+      float runningTasksAsPercent = capacity!= 0 ?
+          ((float)numRunningTasks * 100/capacity):0;
       StringBuffer sb = new StringBuffer();
-      sb.append("Guaranteed Capacity: " + guaranteedCapacity + "\n");
-      sb.append(String.format("Running tasks: %.1f%% of Guaranteed Capacity\n",
+      sb.append("Capacity: " + capacity + "\n");
+      sb.append(String.format("Running tasks: %.1f%% of Capacity\n",
           runningTasksAsPercent));
       // include info on active users
       if (numRunningTasks != 0) {
@@ -202,8 +128,8 @@ class CapacityTaskScheduler extends TaskScheduler {
   private static class QueueSchedulingInfo {
     String queueName;
 
-    /** guaranteed capacity(%) is set in the config */ 
-    float guaranteedCapacityPercent = 0;
+    /** capacity(%) is set in the config */ 
+    float capacityPercent = 0;
     
     /** 
      * to handle user limits, we need to know how many users have jobs in 
@@ -214,13 +140,6 @@ class CapacityTaskScheduler extends TaskScheduler {
     /** min value of user limit (same for all users) */
     int ulMin;
     
-    /**
-     * reclaim time limit (in msec). This time represents the SLA we offer 
-     * a queue - a queue gets back any lost capacity withing this period 
-     * of time.  
-     */ 
-    long reclaimTime;
-    
     /**
      * We keep track of the JobQueuesManager only for reporting purposes 
      * (in toString()). 
@@ -234,11 +153,10 @@ class CapacityTaskScheduler extends TaskScheduler {
     TaskSchedulingInfo reduceTSI;
     
     public QueueSchedulingInfo(String queueName, float gcPercent, 
-        int ulMin, long reclaimTime, JobQueuesManager jobQueuesManager) {
+        int ulMin, JobQueuesManager jobQueuesManager) {
       this.queueName = new String(queueName);
-      this.guaranteedCapacityPercent = gcPercent;
+      this.capacityPercent = gcPercent;
       this.ulMin = ulMin;
-      this.reclaimTime = reclaimTime;
       this.jobQueuesManager = jobQueuesManager;
       this.mapTSI = new TaskSchedulingInfo();
       this.reduceTSI = new TaskSchedulingInfo();
@@ -253,12 +171,10 @@ class CapacityTaskScheduler extends TaskScheduler {
       StringBuffer sb = new StringBuffer();
       sb.append("Queue configuration\n");
       //sb.append("Name: " + queueName + "\n");
-      sb.append("Guaranteed Capacity Percentage: ");
-      sb.append(guaranteedCapacityPercent);
+      sb.append("Capacity Percentage: ");
+      sb.append(capacityPercent);
       sb.append("%\n");
       sb.append(String.format("User Limit: %d%s\n",ulMin, "%"));
-      sb.append(String.format("Reclaim Time limit: %s\n", 
-          StringUtils.formatTime(reclaimTime)));
       sb.append(String.format("Priority Supported: %s\n",
           (jobQueuesManager.doesQueueSupportPriorities(queueName))?
               "YES":"NO"));
@@ -300,9 +216,8 @@ class CapacityTaskScheduler extends TaskScheduler {
     public String toString(){
       // note that we do not call updateQSIObjects() here for performance
       // reasons. This means that the data we print out may be slightly
-      // stale. This data is updated whenever assignTasks() is called, or
-      // whenever the reclaim capacity thread runs, which should be fairly
-      // often. If neither of these happen, the data gets stale. If we see
+      // stale. This data is updated whenever assignTasks() is called
+      // If this doesn't happen, the data gets stale. If we see
       // this often, we may need to detect this situation and call 
       // updateQSIObjects(), or just call it each time. 
       return scheduler.getDisplayInfo(queueName);
@@ -372,14 +287,11 @@ class CapacityTaskScheduler extends TaskScheduler {
     abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
         JobInProgress job) throws IOException; 
     abstract int getPendingTasks(JobInProgress job);
-    abstract int killTasksFromJob(JobInProgress job, int tasksToKill);
     abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
 
     /**
      * List of QSIs for assigning tasks.
-     * This list is ordered such that queues that need to reclaim capacity
-     * sooner, come before queues that don't. For queues that don't, they're
-     * ordered by a ratio of (# of running tasks)/Guaranteed capacity, which
+     * Queues are ordered by a ratio of (# of running tasks)/capacity, which
      * indicates how much 'free space' the queue has, or how much it is over
      * capacity. This ordered list is iterated over, when assigning tasks.
      */  
@@ -396,34 +308,15 @@ class CapacityTaskScheduler extends TaskScheduler {
       public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) {
         TaskSchedulingInfo t1 = getTSI(q1);
         TaskSchedulingInfo t2 = getTSI(q2);
-        // if one queue needs to reclaim something and the other one doesn't, 
-        // the former is first
-        if ((0 == t1.reclaimList.size()) && (0 != t2.reclaimList.size())) {
-          return 1;
-        }
-        else if ((0 != t1.reclaimList.size()) && (0 == t2.reclaimList.size())){
-          return -1;
-        }
-        else if ((0 == t1.reclaimList.size()) && (0 == t2.reclaimList.size())){
-          // neither needs to reclaim. 
-          // look at how much capacity they've filled. Treat a queue with gc=0 
-          // equivalent to a queue running at capacity
-          double r1 = (0 == t1.guaranteedCapacity)? 1.0f: 
-            (double)t1.numRunningTasks/(double)t1.guaranteedCapacity;
-          double r2 = (0 == t2.guaranteedCapacity)? 1.0f:
-            (double)t2.numRunningTasks/(double)t2.guaranteedCapacity;
-          if (r1<r2) return -1;
-          else if (r1>r2) return 1;
-          else return 0;
-        }
-        else {
-          // both have to reclaim. Look at which one needs to reclaim earlier
-          long tm1 = t1.reclaimList.get(0).whenToKill;
-          long tm2 = t2.reclaimList.get(0).whenToKill;
-          if (tm1<tm2) return -1;
-          else if (tm1>tm2) return 1;
-          else return 0;
-        }
+        // look at how much capacity they've filled. Treat a queue with gc=0 
+        // equivalent to a queue running at capacity
+        double r1 = (0 == t1.capacity)? 1.0f:
+          (double)t1.numRunningTasks/(double)t1.capacity;
+        double r2 = (0 == t2.capacity)? 1.0f:
+          (double)t2.numRunningTasks/(double)t2.capacity;
+        if (r1<r2) return -1;
+        else if (r1>r2) return 1;
+        else return 0;
       }
     }
     // subclass for map and reduce comparators
@@ -454,197 +347,19 @@ class CapacityTaskScheduler extends TaskScheduler {
       Collections.sort(qsiForAssigningTasks, queueComparator);
     }
     
-    /** 
-     * Periodically, we walk through our queues to do the following: 
-     * a. Check if a queue needs to reclaim any resources within a period
-     * of time (because it's running below capacity and more tasks are
-     * waiting)
-     * b. Check if a queue hasn't received enough of the resources it needed
-     * to be reclaimed and thus tasks need to be killed.
-     * The caller is responsible for ensuring that the QSI objects and the 
-     * collections are up-to-date.
-     * 
-     * Make sure that we do not make any calls to scheduler.taskTrackerManager
-     * as this can result in a deadlock (see HADOOP-4977). 
-     */
-    private synchronized void reclaimCapacity(int nextHeartbeatInterval) {
-      int tasksToKill = 0;
-      
-      QueueSchedulingInfo lastQsi = 
-        qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
-      TaskSchedulingInfo lastTsi = getTSI(lastQsi);
-      long currentTime = scheduler.clock.getTime();
-      for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
-        TaskSchedulingInfo tsi = getTSI(qsi);
-        if (tsi.guaranteedCapacity <= 0) {
-          // no capacity, hence nothing can be reclaimed.
-          continue;
-        }
-        // is there any resource that needs to be reclaimed? 
-        if ((!tsi.reclaimList.isEmpty()) &&  
-            (tsi.reclaimList.getFirst().whenToKill < 
-              currentTime + CapacityTaskScheduler.RECLAIM_CAPACITY_INTERVAL)) {
-          // make a note of how many tasks to kill to claim resources
-          tasksToKill += tsi.reclaimList.getFirst().currentAmount;
-          // move this to expiry list
-          ReclaimedResource r = tsi.reclaimList.remove();
-          tsi.reclaimExpireList.add(r);
-        }
-        // is there any resource that needs to be expired?
-        if ((!tsi.reclaimExpireList.isEmpty()) && 
-            (tsi.reclaimExpireList.getFirst().whenToExpire <= currentTime)) {
-          ReclaimedResource r = tsi.reclaimExpireList.remove();
-          tsi.numReclaimedResources -= r.originalAmount;
-        }
-        // do we need to reclaim a resource later? 
-        // if no queue is over capacity, there's nothing to reclaim
-        if (lastTsi.numRunningTasks <= lastTsi.guaranteedCapacity) {
-          continue;
-        }
-        if (tsi.numRunningTasks < tsi.guaranteedCapacity) {
-          // usedCap is how much capacity is currently accounted for
-          int usedCap = tsi.numRunningTasks + tsi.numReclaimedResources;
-          // see if we have remaining capacity and if we have enough pending 
-          // tasks to use up remaining capacity
-          if ((usedCap < tsi.guaranteedCapacity) && 
-              ((tsi.numPendingTasks - tsi.numReclaimedResources)>0)) {
-            // create a request for resources to be reclaimed
-            int amt = Math.min((tsi.guaranteedCapacity-usedCap), 
-                (tsi.numPendingTasks - tsi.numReclaimedResources));
-            // create a resource object that needs to be reclaimed some time
-            // in the future
-            long whenToKill = qsi.reclaimTime - 
-              (CapacityTaskScheduler.HEARTBEATS_LEFT_BEFORE_KILLING * 
-                  nextHeartbeatInterval);
-            if (whenToKill < 0) whenToKill = 0;
-            tsi.reclaimList.add(new ReclaimedResource(amt, 
-                currentTime + qsi.reclaimTime, 
-                currentTime + whenToKill));
-            tsi.numReclaimedResources += amt;
-            LOG.debug("Queue " + qsi.queueName + " needs to reclaim " + 
-                amt + " resources");
-          }
-        }
-      }
-      // kill tasks to reclaim capacity
-      if (0 != tasksToKill) {
-        killTasks(tasksToKill);
-      }
-    }
-
-    // kill 'tasksToKill' tasks 
-    private void killTasks(int tasksToKill)
-    {
-      /* 
-       * There are a number of fair ways in which one can figure out how
-       * many tasks to kill from which queue, so that the total number of
-       * tasks killed is equal to 'tasksToKill'.
-       * Maybe the best way is to keep a global ordering of running tasks
-       * and kill the ones that ran last, irrespective of what queue or 
-       * job they belong to. 
-       * What we do here is look at how many tasks is each queue running
-       * over capacity, and use that as a weight to decide how many tasks
-       * to kill from that queue.
-       */ 
-      
-      // first, find out all queues over capacity
-      int loc;
-      for (loc=0; loc<qsiForAssigningTasks.size(); loc++) {
-        QueueSchedulingInfo qsi = qsiForAssigningTasks.get(loc);
-        if (getTSI(qsi).numRunningTasks > getTSI(qsi).guaranteedCapacity) {
-          // all queues from here onwards are running over cap
-          break;
-        }
-      }
-      // if some queue needs to reclaim cap, there must be at least one queue
-      // over cap. But check, just in case. 
-      if (loc == qsiForAssigningTasks.size()) {
-        LOG.warn("In Capacity scheduler, we need to kill " + tasksToKill + 
-            " tasks but there is no queue over capacity.");
-        return;
-      }
-      // calculate how many total tasks are over cap
-      int tasksOverCap = 0;
-      for (int i=loc; i<qsiForAssigningTasks.size(); i++) {
-        QueueSchedulingInfo qsi = qsiForAssigningTasks.get(i);
-        tasksOverCap += 
-          (getTSI(qsi).numRunningTasks - getTSI(qsi).guaranteedCapacity);
-      }
-      // now kill tasks from each queue
-      for (int i=loc; i<qsiForAssigningTasks.size(); i++) {
-        QueueSchedulingInfo qsi = qsiForAssigningTasks.get(i);
-        killTasksFromQueue(qsi, (int)Math.round(
-            ((double)(getTSI(qsi).numRunningTasks - 
-                getTSI(qsi).guaranteedCapacity))*
-            tasksToKill/(double)tasksOverCap));
-      }
-    }
-
-    // kill 'tasksToKill' tasks from queue represented by qsi
-    private void killTasksFromQueue(QueueSchedulingInfo qsi, int tasksToKill) {
-      // we start killing as many tasks as possible from the jobs that started
-      // last. This way, we let long-running jobs complete faster.
-      int tasksKilled = 0;
-      JobInProgress jobs[] = scheduler.jobQueuesManager.
-        getRunningJobQueue(qsi.queueName).toArray(new JobInProgress[0]);
-      for (int i=jobs.length-1; i>=0; i--) {
-        if (jobs[i].getStatus().getRunState() != JobStatus.RUNNING) {
-          continue;
-        }
-        tasksKilled += killTasksFromJob(jobs[i], tasksToKill-tasksKilled);
-        if (tasksKilled >= tasksToKill) break;
-      }
-    }
-   
-    // return the TaskAttemptID of the running task, if any, that has made 
-    // the least progress.
-    TaskAttemptID getRunningTaskWithLeastProgress(TaskInProgress tip) {
-      double leastProgress = 1;
-      TaskAttemptID tID = null;
-      for (Iterator<TaskAttemptID> it = 
-        tip.getActiveTasks().keySet().iterator(); it.hasNext();) {
-        TaskAttemptID taskid = it.next();
-        TaskStatus status = tip.getTaskStatus(taskid);
-        if (status.getRunState() == TaskStatus.State.RUNNING) {
-          if (status.getProgress() < leastProgress) {
-            leastProgress = status.getProgress();
-            tID = taskid;
-          }
-        }
-      }
-      return tID;
-    }
-    
-    // called when a task is allocated to queue represented by qsi. 
-    // update our info about reclaimed resources
-    private synchronized void updateReclaimedResources(QueueSchedulingInfo qsi) {
-      TaskSchedulingInfo tsi = getTSI(qsi);
-      // if we needed to reclaim resources, we have reclaimed one
-      if (tsi.reclaimList.isEmpty()) {
-        return;
-      }
-      ReclaimedResource res = tsi.reclaimList.getFirst();
-      res.currentAmount--;
-      if (0 == res.currentAmount) {
-        // move this resource to the expiry list
-        ReclaimedResource r = tsi.reclaimList.remove();
-        tsi.reclaimExpireList.add(r);
-      }
-    }
-
     private synchronized void updateCollectionOfQSIs() {
       Collections.sort(qsiForAssigningTasks, queueComparator);
     }
 
 
     private boolean isUserOverLimit(String user, QueueSchedulingInfo qsi) {
-      // what is our current capacity? It's GC if we're running below GC. 
-      // If we're running over GC, then its #running plus 1 (which is the 
+      // what is our current capacity? It's capacity if we're running below capacity.
+      // If we're running over capacity, then its #running plus 1 (which is the
       // extra slot we're getting). 
       int currentCapacity;
       TaskSchedulingInfo tsi = getTSI(qsi);
-      if (tsi.numRunningTasks < tsi.guaranteedCapacity) {
-        currentCapacity = tsi.guaranteedCapacity;
+      if (tsi.numRunningTasks < tsi.capacity) {
+        currentCapacity = tsi.capacity;
       }
       else {
         currentCapacity = tsi.numRunningTasks+1;
@@ -760,7 +475,7 @@ class CapacityTaskScheduler extends TaskScheduler {
       for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
         // we may have queues with gc=0. We shouldn't look at jobs from 
         // these queues
-        if (0 == getTSI(qsi).guaranteedCapacity) {
+        if (0 == getTSI(qsi).capacity) {
           continue;
         }
         TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
@@ -772,8 +487,6 @@ class CapacityTaskScheduler extends TaskScheduler {
 
         // if we find a task, return
         if (lookUpStatus == TaskLookupResult.LookUpStatus.TASK_FOUND) {
-          // we have a task. Update reclaimed resource info
-          updateReclaimedResources(qsi);
           return tlr;
         }
         // if there was a memory mismatch, return
@@ -795,8 +508,8 @@ class CapacityTaskScheduler extends TaskScheduler {
         Collection<JobInProgress> runJobs = 
           scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
         s.append(" Queue '" + qsi.queueName + "'(" + this.type + "): run=" + 
-            tsi.numRunningTasks + ", gc=" + tsi.guaranteedCapacity + 
-            ", wait=" + tsi.numPendingTasks + ", run jobs="+ runJobs.size() + 
+            tsi.numRunningTasks + ", gc=" + tsi.capacity
+            + ", run jobs="+ runJobs.size() +
             "*** ");
       }
       LOG.debug(s);
@@ -830,55 +543,7 @@ class CapacityTaskScheduler extends TaskScheduler {
     int getPendingTasks(JobInProgress job) {
       return job.pendingMaps();
     }
-    int killTasksFromJob(JobInProgress job, int tasksToKill) {
-      /*
-       * We'd like to kill tasks that ran the last, or that have made the
-       * least progress.
-       * Ideally, each job would have a list of tasks, sorted by start 
-       * time or progress. That's a lot of state to keep, however. 
-       * For now, we do something a little different. We first try and kill
-       * non-local tasks, as these can be run anywhere. For each TIP, we 
-       * kill the task that has made the least progress, if the TIP has
-       * more than one active task. 
-       * We then look at tasks in runningMapCache.
-       */
-      int tasksKilled = 0;
-      
-      /* 
-       * For non-local running maps, we 'cheat' a bit. We know that the set
-       * of non-local running maps has an insertion order such that tasks 
-       * that ran last are at the end. So we iterate through the set in 
-       * reverse. This is OK because even if the implementation changes, 
-       * we're still using generic set iteration and are no worse of.
-       */ 
-      TaskInProgress[] tips = 
-        job.getNonLocalRunningMaps().toArray(new TaskInProgress[0]);
-      for (int i=tips.length-1; i>=0; i--) {
-        // pick the tast attempt that has progressed least
-        TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]);
-        if (null != tid) {
-          if (tips[i].killTask(tid, false)) {
-            if (++tasksKilled >= tasksToKill) {
-              return tasksKilled;
-            }
-          }
-        }
-      }
-      // now look at other running tasks
-      for (Set<TaskInProgress> s: job.getRunningMapCache().values()) {
-        for (TaskInProgress tip: s) {
-          TaskAttemptID tid = getRunningTaskWithLeastProgress(tip);
-          if (null != tid) {
-            if (tip.killTask(tid, false)) {
-              if (++tasksKilled >= tasksToKill) {
-                return tasksKilled;
-              }
-            }
-          }
-        }
-      }
-      return tasksKilled;
-    }
+
     TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
       return qsi.mapTSI;
     }
@@ -911,30 +576,7 @@ class CapacityTaskScheduler extends TaskScheduler {
     int getPendingTasks(JobInProgress job) {
       return job.pendingReduces();
     }
-    int killTasksFromJob(JobInProgress job, int tasksToKill) {
-      /* 
-       * For reduces, we 'cheat' a bit. We know that the set
-       * of running reduces has an insertion order such that tasks 
-       * that ran last are at the end. So we iterate through the set in 
-       * reverse. This is OK because even if the implementation changes, 
-       * we're still using generic set iteration and are no worse of.
-       */ 
-      int tasksKilled = 0;
-      TaskInProgress[] tips = 
-        job.getRunningReduces().toArray(new TaskInProgress[0]);
-      for (int i=tips.length-1; i>=0; i--) {
-        // pick the tast attempt that has progressed least
-        TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]);
-        if (null != tid) {
-          if (tips[i].killTask(tid, false)) {
-            if (++tasksKilled >= tasksToKill) {
-              return tasksKilled;
-            }
-          }
-        }
-      }
-      return tasksKilled;
-    }
+
     TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
       return qsi.reduceTSI;
     }
@@ -953,45 +595,12 @@ class CapacityTaskScheduler extends TaskScheduler {
   /** name of the default queue. */ 
   static final String DEFAULT_QUEUE_NAME = "default";
   
-  /** how often does redistribution thread run (in msecs)*/
-  private static long RECLAIM_CAPACITY_INTERVAL;
-  /** we start killing tasks to reclaim capacity when we have so many 
-   * heartbeats left. */
-  private static final int HEARTBEATS_LEFT_BEFORE_KILLING = 3;
-
   static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
   protected JobQueuesManager jobQueuesManager;
   protected CapacitySchedulerConf schedConf;
   /** whether scheduler has started or not */
   private boolean started = false;
   
-  /**
-   * Used to distribute/reclaim excess capacity among queues
-   */ 
-  class ReclaimCapacity implements Runnable {
-    public ReclaimCapacity() {
-    }
-    public void run() {
-      while (true) {
-        try {
-          Thread.sleep(RECLAIM_CAPACITY_INTERVAL);
-          if (stopReclaim) { 
-            break;
-          }
-          reclaimCapacity();
-        } catch (InterruptedException t) {
-          break;
-        } catch (Throwable t) {
-          LOG.error("Error in redistributing capacity:\n" +
-                    StringUtils.stringifyException(t));
-        }
-      }
-    }
-  }
-  private Thread reclaimCapacityThread = null;
-  /** variable to indicate that thread should stop */
-  private boolean stopReclaim = false;
-
   /**
    * A clock class - can be mocked out for testing.
    */
@@ -1067,9 +676,6 @@ class CapacityTaskScheduler extends TaskScheduler {
 
     initializeMemoryRelatedConf();
     
-    RECLAIM_CAPACITY_INTERVAL = schedConf.getReclaimCapacityInterval();
-    RECLAIM_CAPACITY_INTERVAL *= 1000;
-
     // read queue info from config file
     QueueManager queueManager = taskTrackerManager.getQueueManager();
     Set<String> queues = queueManager.getQueues();
@@ -1078,20 +684,19 @@ class CapacityTaskScheduler extends TaskScheduler {
       throw new IllegalStateException("System has no queue configured");
     }
 
-    Set<String> queuesWithoutConfiguredGC = new HashSet<String>();
+    Set<String> queuesWithoutConfiguredCapacity = new HashSet<String>();
     float totalCapacity = 0.0f;
     for (String queueName: queues) {
-      float gc = schedConf.getGuaranteedCapacity(queueName); 
+      float gc = schedConf.getCapacity(queueName);
       if(gc == -1.0) {
-        queuesWithoutConfiguredGC.add(queueName);
+        queuesWithoutConfiguredCapacity.add(queueName);
       }else {
         totalCapacity += gc;
       }
       int ulMin = schedConf.getMinimumUserLimitPercent(queueName); 
-      long reclaimTimeLimit = schedConf.getReclaimTimeLimit(queueName) * 1000;
       // create our QSI and add to our hashmap
       QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, gc, 
-          ulMin, reclaimTimeLimit, jobQueuesManager);
+                                                    ulMin, jobQueuesManager);
       queueInfoMap.put(queueName, qsi);
 
       // create the queues of job objects
@@ -1105,11 +710,11 @@ class CapacityTaskScheduler extends TaskScheduler {
     }
     float remainingQuantityToAllocate = 100 - totalCapacity;
     float quantityToAllocate = 
-      remainingQuantityToAllocate/queuesWithoutConfiguredGC.size();
-    for(String queue: queuesWithoutConfiguredGC) {
+      remainingQuantityToAllocate/queuesWithoutConfiguredCapacity.size();
+    for(String queue: queuesWithoutConfiguredCapacity) {
       QueueSchedulingInfo qsi = queueInfoMap.get(queue); 
-      qsi.guaranteedCapacityPercent = quantityToAllocate;
-      schedConf.setGuaranteedCapacity(queue, quantityToAllocate);
+      qsi.capacityPercent = quantityToAllocate;
+      schedConf.setCapacity(queue, quantityToAllocate);
     }    
     
     // check if there's a queue with the default name. If not, we quit.
@@ -1137,19 +742,9 @@ class CapacityTaskScheduler extends TaskScheduler {
     initializationPoller.setDaemon(true);
     initializationPoller.start();
 
-    // start thread for redistributing capacity if we have more than 
-    // one queue
-    if (queueInfoMap.size() > 1) {
-      this.reclaimCapacityThread = 
-        new Thread(new ReclaimCapacity(),"reclaimCapacity");
-      this.reclaimCapacityThread.start();
-    }
-    else {
-      LOG.info("Only one queue present. Reclaim capacity thread not started.");
-    }
-    
     started = true;
-    LOG.info("Capacity scheduler initialized " + queues.size() + " queues");  }
+    LOG.info("Capacity scheduler initialized " + queues.size() + " queues");  
+  }
   
   /** mostly for testing purposes */
   void setInitializationPoller(JobInitializationPoller p) {
@@ -1163,8 +758,6 @@ class CapacityTaskScheduler extends TaskScheduler {
       taskTrackerManager.removeJobInProgressListener(
           jobQueuesManager);
     }
-    // tell the reclaim thread to stop
-    stopReclaim = true;
     started = false;
     initializationPoller.terminate();
     super.terminate();
@@ -1175,27 +768,6 @@ class CapacityTaskScheduler extends TaskScheduler {
     super.setConf(conf);
   }
 
-  /**
-   * Reclaim capacity for both map & reduce tasks. 
-   * Do not make this synchronized, since we call taskTrackerManager 
-   * (see HADOOP-4977). 
-   */
-  void reclaimCapacity() {
-    // get the cluster capacity
-    ClusterStatus c = taskTrackerManager.getClusterStatus();
-    int mapClusterCapacity = c.getMaxMapTasks();
-    int reduceClusterCapacity = c.getMaxReduceTasks();
-    int nextHeartbeatInterval = taskTrackerManager.getNextHeartbeatInterval();
-    // update the QSI objects
-    updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
-    // update the qsi collections, since we depend on their ordering 
-    mapScheduler.updateCollectionOfQSIs();
-    reduceScheduler.updateCollectionOfQSIs();
-    // now, reclaim
-    mapScheduler.reclaimCapacity(nextHeartbeatInterval);
-    reduceScheduler.reclaimCapacity(nextHeartbeatInterval);
-  }
-  
   /**
    * provided for the test classes
    * lets you update the QSI objects and sorted collections
@@ -1216,31 +788,27 @@ class CapacityTaskScheduler extends TaskScheduler {
    * to make scheduling decisions. For example, we don't need an exact count
    * of numRunningTasks. Once we count upto the grid capacity, any
    * number beyond that will make no difference.
-   * 
-   * The pending task count is only required in reclaim capacity. So 
-   * if the computation becomes expensive, we can add a boolean to 
-   * denote if pending task computation is required or not.
-   * 
+   *
    **/
-  private synchronized void updateQSIObjects(int mapClusterCapacity, 
+  private synchronized void updateQSIObjects(int mapClusterCapacity,
       int reduceClusterCapacity) {
-    // if # of slots have changed since last time, update. 
+    // if # of slots have changed since last time, update.
     // First, compute whether the total number of TT slots have changed
     for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
-      // compute new GCs, if TT slots have changed
+      // compute new capacities, if TT slots have changed
       if (mapClusterCapacity != prevMapClusterCapacity) {
-        qsi.mapTSI.guaranteedCapacity =
-          (int)(qsi.guaranteedCapacityPercent*mapClusterCapacity/100);
+        qsi.mapTSI.capacity =
+          (int)(qsi.capacityPercent*mapClusterCapacity/100);
       }
       if (reduceClusterCapacity != prevReduceClusterCapacity) {
-        qsi.reduceTSI.guaranteedCapacity =
-          (int)(qsi.guaranteedCapacityPercent*reduceClusterCapacity/100);
+        qsi.reduceTSI.capacity =
+          (int)(qsi.capacityPercent*reduceClusterCapacity/100);
       }
       // reset running/pending tasks, tasks per user
       qsi.mapTSI.resetTaskVars();
       qsi.reduceTSI.resetTaskVars();
       // update stats on running jobs
-      for (JobInProgress j: 
+      for (JobInProgress j:
         jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
@@ -1249,62 +817,36 @@ class CapacityTaskScheduler extends TaskScheduler {
         int runningReduces = j.runningReduces();
         qsi.mapTSI.numRunningTasks += runningMaps;
         qsi.reduceTSI.numRunningTasks += runningReduces;
-        Integer i = 
+        Integer i =
           qsi.mapTSI.numRunningTasksByUser.get(j.getProfile().getUser());
-        qsi.mapTSI.numRunningTasksByUser.put(j.getProfile().getUser(), 
+        qsi.mapTSI.numRunningTasksByUser.put(j.getProfile().getUser(),
             i+runningMaps);
         i = qsi.reduceTSI.numRunningTasksByUser.get(j.getProfile().getUser());
-        qsi.reduceTSI.numRunningTasksByUser.put(j.getProfile().getUser(), 
+        qsi.reduceTSI.numRunningTasksByUser.put(j.getProfile().getUser(),
             i+runningReduces);
-        qsi.mapTSI.numPendingTasks += j.pendingMaps();
-        qsi.reduceTSI.numPendingTasks += j.pendingReduces();
         LOG.debug("updateQSI: job " + j.getJobID().toString() + ": run(m) = " +
-            j.runningMaps() + ", run(r) = " + j.runningReduces() + 
-            ", finished(m) = " + j.finishedMaps() + ", finished(r)= " + 
-            j.finishedReduces() + ", failed(m) = " + j.failedMapTasks + 
-            ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " + 
-            j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks 
-            + ", total(m) = " + j.numMapTasks + ", total(r) = " + 
+            j.runningMaps() + ", run(r) = " + j.runningReduces() +
+            ", finished(m) = " + j.finishedMaps() + ", finished(r)= " +
+            j.finishedReduces() + ", failed(m) = " + j.failedMapTasks +
+            ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " +
+            j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks
+            + ", total(m) = " + j.numMapTasks + ", total(r) = " +
             j.numReduceTasks);
-        /* 
+        /*
          * it's fine walking down the entire list of running jobs - there
          * probably will not be many, plus, we may need to go through the
          * list to compute numRunningTasksByUser. If this is expensive, we
          * can keep a list of running jobs per user. Then we only need to
          * consider the first few jobs per user.
-         */ 
-      }
-      
-      //update stats on waiting jobs
-      for(JobInProgress j: jobQueuesManager.getWaitingJobs(qsi.queueName)) {
-        // pending tasks
-        if ((qsi.mapTSI.numPendingTasks > mapClusterCapacity) &&
-            (qsi.reduceTSI.numPendingTasks > reduceClusterCapacity)) {
-          // that's plenty. no need for more computation
-          break;
-        }
-        /*
-         * Consider only the waiting jobs in the job queue. Job queue can
-         * contain:
-         * 1. Jobs which are in running state but not scheduled
-         * (these would also be present in running queue), the pending 
-         * task count of these jobs is computed when scheduler walks
-         * through running job queue.
-         * 2. Jobs which are killed by user, but waiting job initialization
-         * poller to walk through the job queue to clean up killed jobs.
          */
-        if (j.getStatus().getRunState() == JobStatus.PREP) {
-          qsi.mapTSI.numPendingTasks += j.pendingMaps();
-          qsi.reduceTSI.numPendingTasks += j.pendingReduces();
-        }
       }
     }
-    
+
     prevMapClusterCapacity = mapClusterCapacity;
     prevReduceClusterCapacity = reduceClusterCapacity;
   }
 
-  /* 
+  /*
    * The grand plan for assigning a task. 
    * First, decide whether a Map or Reduce task should be given to a TT 
    * (if the TT can accept either). 

+ 137 - 434
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -493,15 +493,12 @@ public class TestCapacityScheduler extends TestCase {
   static class FakeQueueInfo {
     String queueName;
     float gc;
-    int reclaimTimeLimit;
     boolean supportsPrio;
     int ulMin;
 
-    public FakeQueueInfo(String queueName, float gc,
-        int reclaimTimeLimit, boolean supportsPrio, int ulMin) {
+    public FakeQueueInfo(String queueName, float gc, boolean supportsPrio, int ulMin) {
       this.queueName = queueName;
       this.gc = gc;
-      this.reclaimTimeLimit = reclaimTimeLimit;
       this.supportsPrio = supportsPrio;
       this.ulMin = ulMin;
     }
@@ -514,7 +511,6 @@ public class TestCapacityScheduler extends TestCase {
       new LinkedHashMap<String, FakeQueueInfo>();
     String firstQueue;
     
-    private long reclaimCapacityInterval = 1000;
     
     void setFakeQueues(List<FakeQueueInfo> queues) {
       for (FakeQueueInfo q: queues) {
@@ -531,17 +527,13 @@ public class TestCapacityScheduler extends TestCase {
       return firstQueue;
     }*/
     
-    public float getGuaranteedCapacity(String queue) {
+    public float getCapacity(String queue) {
       if(queueMap.get(queue).gc == -1) {
-        return super.getGuaranteedCapacity(queue);
+        return super.getCapacity(queue);
       }
       return queueMap.get(queue).gc;
     }
     
-    public int getReclaimTimeLimit(String queue) {
-      return queueMap.get(queue).reclaimTimeLimit;
-    }
-    
     public int getMinimumUserLimitPercent(String queue) {
       return queueMap.get(queue).ulMin;
     }
@@ -559,16 +551,6 @@ public class TestCapacityScheduler extends TestCase {
     public int getMaxWorkerThreads() {
       return 1;
     }
-    
-    @Override
-    public long getReclaimCapacityInterval() {
-      return reclaimCapacityInterval ;
-    }
-    
-    @Override
-    public void setReclaimCapacityInterval(long value) {
-      this.reclaimCapacityInterval = value;
-    }
   }
 
   protected class FakeClock extends CapacityTaskScheduler.Clock {
@@ -677,7 +659,7 @@ public class TestCapacityScheduler extends TestCase {
     // start the scheduler
     taskTrackerManager.addQueues(new String[] {"default"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, 1, true, 1));
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 1));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
@@ -814,7 +796,7 @@ public class TestCapacityScheduler extends TestCase {
     taskTrackerManager.addQueues(new String[] {"default"});
     
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 10, true, 25));
+    queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
@@ -869,8 +851,8 @@ public class TestCapacityScheduler extends TestCase {
     String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+    queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
@@ -893,7 +875,7 @@ public class TestCapacityScheduler extends TestCase {
     String[] qs = { "default" };
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, 300, true, 100));
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
@@ -916,29 +898,29 @@ public class TestCapacityScheduler extends TestCase {
         subJobsList.get("u1").containsAll(jobs));
   }
   
-  //Basic test to test GC allocation across the queues which have no
-  //GC configured.
+  //Basic test to test capacity allocation across the queues which have no
+  //capacity configured.
   
-  public void testGCAllocationToQueues() throws Exception {
+  public void testCapacityAllocationToQueues() throws Exception {
     String[] qs = {"default","q1","q2","q3","q4"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default",25.0f,5000,true,25));
-    queues.add(new FakeQueueInfo("q1",-1.0f,5000,true,25));
-    queues.add(new FakeQueueInfo("q2",-1.0f,5000,true,25));
-    queues.add(new FakeQueueInfo("q3",-1.0f,5000,true,25));
-    queues.add(new FakeQueueInfo("q4",-1.0f,5000,true,25));
+    queues.add(new FakeQueueInfo("default",25.0f,true,25));
+    queues.add(new FakeQueueInfo("q1",-1.0f,true,25));
+    queues.add(new FakeQueueInfo("q2",-1.0f,true,25));
+    queues.add(new FakeQueueInfo("q3",-1.0f,true,25));
+    queues.add(new FakeQueueInfo("q4",-1.0f,true,25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start(); 
-    assertEquals(18.75f, resConf.getGuaranteedCapacity("q1"));
-    assertEquals(18.75f, resConf.getGuaranteedCapacity("q2"));
-    assertEquals(18.75f, resConf.getGuaranteedCapacity("q3"));
-    assertEquals(18.75f, resConf.getGuaranteedCapacity("q4"));
+    assertEquals(18.75f, resConf.getCapacity("q1"));
+    assertEquals(18.75f, resConf.getCapacity("q2"));
+    assertEquals(18.75f, resConf.getCapacity("q3"));
+    assertEquals(18.75f, resConf.getCapacity("q4"));
   }
 
-  // Tests how GC is computed and assignment of tasks done
-  // on the basis of the GC.
+  // Tests how capacity is computed and assignment of tasks done
+  // on the basis of the capacity.
   public void testCapacityBasedAllocation() throws Exception {
     // set up some queues
     String[] qs = {"default", "q2"};
@@ -946,8 +928,8 @@ public class TestCapacityScheduler extends TestCase {
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     // set the gc % as 10%, so that gc will be zero initially as 
     // the cluster capacity increase slowly.
-    queues.add(new FakeQueueInfo("default", 10.0f, 5000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 90.0f, 5000, true, 25));
+    queues.add(new FakeQueueInfo("default", 10.0f, true, 25));
+    queues.add(new FakeQueueInfo("q2", 90.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
@@ -960,35 +942,35 @@ public class TestCapacityScheduler extends TestCase {
     
     // job from q2 runs first because it has some non-zero capacity.
     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    verifyGuaranteedCapacity("0", "default");
-    verifyGuaranteedCapacity("3", "q2");
+    verifyCapacity("0", "default");
+    verifyCapacity("3", "q2");
     
     // add another tt to increase tt slots
     taskTrackerManager.addTaskTracker("tt3");
     checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
-    verifyGuaranteedCapacity("0", "default");
-    verifyGuaranteedCapacity("5", "q2");
+    verifyCapacity("0", "default");
+    verifyCapacity("5", "q2");
     
     // add another tt to increase tt slots
     taskTrackerManager.addTaskTracker("tt4");
     checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
-    verifyGuaranteedCapacity("0", "default");
-    verifyGuaranteedCapacity("7", "q2");
+    verifyCapacity("0", "default");
+    verifyCapacity("7", "q2");
     
     // add another tt to increase tt slots
     taskTrackerManager.addTaskTracker("tt5");
     // now job from default should run, as it is furthest away
     // in terms of runningMaps / gc.
     checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4");
-    verifyGuaranteedCapacity("1", "default");
-    verifyGuaranteedCapacity("9", "q2");
+    verifyCapacity("1", "default");
+    verifyCapacity("9", "q2");
   }
   
-  private void verifyGuaranteedCapacity(String expectedCapacity, 
+  private void verifyCapacity(String expectedCapacity,
                                           String queue) throws IOException {
     String schedInfo = taskTrackerManager.getQueueManager().
-                          getSchedulerInfo(queue).toString();
-    assertTrue(schedInfo.contains("Map tasks\nGuaranteed Capacity: " 
+                          getSchedulerInfo(queue).toString();    
+    assertTrue(schedInfo.contains("Map tasks\nCapacity: " 
         + expectedCapacity));
   }
   
@@ -998,15 +980,15 @@ public class TestCapacityScheduler extends TestCase {
     String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+    queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
     // submit a job  
     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
-    // for queue 'q2', the GC for maps is 2. Since we're the only user, 
+    // for queue 'q2', the capacity for maps is 2. Since we're the only user,
     // we should get a task 
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     // I should get another map task. 
@@ -1024,15 +1006,15 @@ public class TestCapacityScheduler extends TestCase {
     String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+    queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
     // submit a job  
     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
-    // for queue 'q2', the GC for maps is 2. Since we're the only user, 
+    // for queue 'q2', the capacity for maps is 2. Since we're the only user,
     // we should get a task 
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     // Submit another job, from a different user
@@ -1052,15 +1034,15 @@ public class TestCapacityScheduler extends TestCase {
     String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+    queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
     // submit a job  
     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
-    // for queue 'q2', the GC for maps is 2. Since we're the only user, 
+    // for queue 'q2', the capacity for maps is 2. Since we're the only user,
     // we should get a task 
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     // since we're the only job, we get another map
@@ -1080,15 +1062,15 @@ public class TestCapacityScheduler extends TestCase {
     String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+    queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
     // submit a job  
     FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
-    // for queue 'q2', the GC for maps is 2. Since we're the only user, 
+    // for queue 'q2', the capacity for maps is 2. Since we're the only user,
     // we should get a task 
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     // since we're the only job, we get another map
@@ -1119,7 +1101,7 @@ public class TestCapacityScheduler extends TestCase {
     String[] qs = {"default"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, 10000, true, 25));
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
@@ -1170,283 +1152,7 @@ public class TestCapacityScheduler extends TestCase {
     // first in the queue
     checkAssignment("tt4", "attempt_test_0001_m_000007_0 on tt4");
   }
-
-  // test code to reclaim capacity
-  public void testReclaimCapacity() throws Exception {
-    // set up some queues
-    String[] qs = {"default", "q2", "q3"};
-    taskTrackerManager.addQueues(qs);
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 25.0f, 1000, true, 25));
-    queues.add(new FakeQueueInfo("q3", 25.0f, 1000, true, 25));
-    resConf.setFakeQueues(queues);
-    resConf.setReclaimCapacityInterval(500);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start();
-
-    // set up a situation where q2 is under capacity, and default & q3
-    // are at/over capacity
-    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
-    FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
-    // now submit a job to q2
-    FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
-    // get scheduler to notice that q2 needs to reclaim
-    scheduler.reclaimCapacity();
-    // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
-    // we start reclaiming when 15 secs are left. 
-    clock.advance(400000);
-    scheduler.reclaimCapacity();
-    // no tasks should have been killed yet
-    assertEquals(j1.runningMapTasks, 3);
-    assertEquals(j2.runningMapTasks, 1);
-    clock.advance(200000);
-    scheduler.reclaimCapacity();
-    // task from j1 will be killed
-    assertEquals(j1.runningMapTasks, 2);
-    assertEquals(j2.runningMapTasks, 1);
-    
-  }
-
-  // test code to reclaim multiple capacity 
-  public void testReclaimCapacity2() throws Exception {
-    // set up some queues
-    String[] qs = {"default", "q2", "q3", "q4"};
-    taskTrackerManager.addQueues(qs);
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 20.0f, 1000, true, 25));
-    queues.add(new FakeQueueInfo("q3", 20.0f, 1000, true, 25));
-    queues.add(new FakeQueueInfo("q4", 10.0f, 1000, true, 25));
-    resConf.setFakeQueues(queues);
-    resConf.setReclaimCapacityInterval(500);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start();
-    
-    // add some more TTs so our total map capacity is 10
-    taskTrackerManager.addTaskTracker("tt3");
-    taskTrackerManager.addTaskTracker("tt4");
-    taskTrackerManager.addTaskTracker("tt5");
-
-    // q2 has nothing running, default is under cap, q3 and q4 are over cap
-    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 2, 2, null, "u1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q4", "u1");
-    checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
-    checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
-    checkAssignment("tt4", "attempt_test_0003_m_000002_0 on tt4");
-    checkAssignment("tt4", "attempt_test_0002_m_000004_0 on tt4");
-    checkAssignment("tt5", "attempt_test_0002_m_000005_0 on tt5");
-    checkAssignment("tt5", "attempt_test_0003_m_000003_0 on tt5");
-    // at this point, q3 is running 5 tasks (with a cap of 2), q4 is
-    // running 3 tasks (with a cap of 1). 
-    // If we submit a job to 'default', we need to get 3 slots back. 
-    FakeJobInProgress j4 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
-    // get scheduler to notice that q2 needs to reclaim
-    scheduler.reclaimCapacity();
-    // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
-    // we start reclaiming when 15 secs are left. 
-    clock.advance(400000);
-    scheduler.reclaimCapacity();
-    // nothing should have happened
-    assertEquals(j2.runningMapTasks, 5);
-    assertEquals(j3.runningMapTasks, 3);
-    // 3 tasks to kill, 5 running over cap. q3 should give up 3*3/5 = 2 slots.
-    // q4 should give up 2*3/5 = 1 slot. 
-    clock.advance(200000);
-    scheduler.reclaimCapacity();
-    assertEquals(j2.runningMapTasks, 3);
-    assertEquals(j3.runningMapTasks, 2);
-    
-  }
-
-  // test code to reclaim capacity when the cluster is completely occupied
-  public void testReclaimCapacityWithFullCluster() throws Exception {
-    // set up some queues
-    String[] qs = {"default", "queue"};
-    taskTrackerManager.addQueues(qs);
-    int maxSlots = taskTrackerManager.maxMapTasksPerTracker 
-                   * taskTrackerManager.taskTrackers().size();
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
-    queues.add(new FakeQueueInfo("queue", 50.0f, 1000, true, 25));
-    resConf.setFakeQueues(queues);
-    resConf.setReclaimCapacityInterval(500);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start();
-    
-    // now submit 1 job to queue "default" which should take up the cluster
-    FakeJobInProgress j1 = 
-      submitJobAndInit(JobStatus.PREP, maxSlots, 0, "default", "u1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
-    
-    // now submit a job to queue "queue"
-    submitJobAndInit(JobStatus.PREP, maxSlots, 0, "queue", "u2");
-    
-    scheduler.reclaimCapacity();
-    
-    clock.advance(scheduler.schedConf.getReclaimTimeLimit("default") * 1000);
-    
-    scheduler.reclaimCapacity();
-    
-    // check if the tasks are killed 
-    assertEquals("Failed to reclaim tasks", j1.runningMapTasks, 2);
-  }
-  
-  // test code to reclaim capacity in steps
-  public void testReclaimCapacityInSteps() throws Exception {
-    // set up some queues
-    String[] qs = {"default", "q2"};
-    taskTrackerManager.addQueues(qs);
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
-    resConf.setFakeQueues(queues);
-    resConf.setReclaimCapacityInterval(500);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start();
-
-    // set up a situation where q2 is under capacity, and default is
-    // at/over capacity
-    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
-    // now submit a job to q2
-    FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 1, 1, "q2", "u1");
-    // get scheduler to notice that q2 needs to reclaim
-    scheduler.reclaimCapacity();
-    // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
-    // we start reclaiming when 15 secs are left. 
-    clock.advance(400000);
-    // submit another job to q2 which causes more capacity to be reclaimed
-    j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
-    clock.advance(200000);
-    scheduler.reclaimCapacity();
-    // one task from j1 will be killed
-    assertEquals(j1.runningMapTasks, 3);
-    clock.advance(300000);
-    scheduler.reclaimCapacity();
-    // timer for 2nd job hasn't fired, so nothing killed
-    assertEquals(j1.runningMapTasks, 3);
-    clock.advance(400000);
-    scheduler.reclaimCapacity();
-    // one task from j1 will be killed
-    assertEquals(j1.runningMapTasks, 2);
-    
-  }
   
-  /*
-   * Test case for checking the reclaim capacity with uninitalized jobs.
-   * 
-   * Configure 2 queue with capacity scheduler.
-   * 
-   * Submit a single job to the default queue and make it go above the gc
-   * of the queue.
-   * 
-   * Then submit another job to the second queue but don't initialize it.
-   * 
-   * Run reclaim capacity thread for the scheduler, in order to let scheduler
-   * know that it has to reclaim capacity.
-   * 
-   * Advance the scheduler clock by appropriate milliseconds.
-   * 
-   * Run scheduler.reclaimCapacity() to kill the appropriate tasks.
-   * 
-   * Check running task count of the running job.
-   * 
-   */
-  public void testReclaimCapacityWithUninitializedJobs() throws IOException {
-    String[] qs = {"default", "q2"};
-    taskTrackerManager.addQueues(qs);
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
-    resConf.setFakeQueues(queues);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start();
-
-    //Submit one job to the default queue and get the capacity over the 
-    //gc of the particular queue.
-    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
-    
-    //Submit another job to the second queue but not initialize it.
-    submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
-    
-    //call scheduler's reclaim capacity in order to start reclaim capacity
-    //process.
-    scheduler.reclaimCapacity();
-    //advance the clock to the position when the two task of the job would
-    //be killed.
-    clock.advance(600000);
-    //run reclaim capacity
-    scheduler.reclaimCapacity();
-    //check the count of the running tasks.
-    assertEquals(j1.runningMapTasks, 2);
-    
-  }
-  
-  // test code to reclaim capacity with one queue haveing zero GC 
-  // (HADOOP-4988). 
-  // Simple test: reclaim capacity should work even if one of the 
-  // queues has a gc of 0. 
-  public void testReclaimCapacityWithZeroGC() throws Exception {
-    // set up some queues
-    String[] qs = {"default", "q2", "q3"};
-    taskTrackerManager.addQueues(qs);
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    // we want q3 to have 0 GC. Map slots = 4. 
-    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 40.0f, 1000, true, 25));
-    queues.add(new FakeQueueInfo("q3", 10.0f, 1000, true, 25));
-    // note: because of the way we convert gc% into actual gc, q2's gc
-    // will be 1, not 2. 
-    resConf.setFakeQueues(queues);
-    resConf.setReclaimCapacityInterval(500);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start();
-
-    // set up a situation where q2 is under capacity, and default 
-    // is over capacity
-    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
-    //FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
-    // now submit a job to q2
-    FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
-    // get scheduler to notice that q2 needs to reclaim
-    scheduler.reclaimCapacity();
-    // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
-    // we start reclaiming when 15 secs are left. 
-    clock.advance(400000);
-    scheduler.reclaimCapacity();
-    // no tasks should have been killed yet
-    assertEquals(j1.runningMapTasks, 4);
-    clock.advance(200000);
-    scheduler.reclaimCapacity();
-    // task from j1 will be killed
-    assertEquals(j1.runningMapTasks, 3);
-    
-  }
-
   /*
    * Following is the testing strategy for testing scheduling information.
    * - start capacity scheduler with two queues.
@@ -1470,9 +1176,9 @@ public class TestCapacityScheduler extends TestCase {
    * - Now fail a job which has not been initialized at all.
    * - Run the poller, so that it can clean up the job queue.
    * - Check the count, the waiting job count should be 2.
-   * - Now raise status change events to move the initialized jobs which 
+   * - Now raise status change events to move the initialized jobs which
    * should be two in count to running queue.
-   * - Then schedule a map of the job in running queue. 
+   * - Then schedule a map of the job in running queue.
    * - Run the poller because the poller is responsible for waiting
    * jobs count. Check the count, it should be using 100% map and one
    * waiting job
@@ -1480,15 +1186,15 @@ public class TestCapacityScheduler extends TestCase {
    * - Check the count, it should be now one waiting job and zero running
    * tasks
    */
-  
+
   public void testSchedulingInformation() throws Exception {
     String[] qs = {"default", "q2"};
     taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
@@ -1501,167 +1207,164 @@ public class TestCapacityScheduler extends TestCase {
     String schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo();
     String schedulingInfo2 = queueManager.getJobQueueInfo("q2").getSchedulingInfo();
     String[] infoStrings = schedulingInfo.split("\n");
-    
-    assertEquals(infoStrings.length, 17);
-    assertEquals(infoStrings[1] , "Guaranteed Capacity Percentage: 50.0%");
-    assertEquals(infoStrings[7] , "Guaranteed Capacity: " + totalMaps * 50/100);
-    assertEquals(infoStrings[11] , "Guaranteed Capacity: " + totalReduces * 50/100);
+    assertEquals(infoStrings.length, 16);
+    assertEquals(infoStrings[1] , "Capacity Percentage: 50.0%");
+    assertEquals(infoStrings[6] , "Capacity: " + totalMaps * 50/100);
+    assertEquals(infoStrings[10] , "Capacity: " + totalReduces * 50/100);
     assertEquals(infoStrings[2] , "User Limit: 25%");
-    assertEquals(infoStrings[3] , "Reclaim Time limit: " + 
-        StringUtils.formatTime(1000000));
-    assertEquals(infoStrings[4] , "Priority Supported: YES");
-    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 0");
-    assertEquals(infoStrings[16] , "Number of users who have submitted jobs: 0");
+    assertEquals(infoStrings[3] , "Priority Supported: YES");
+    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
+    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
+    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 0");
+    assertEquals(infoStrings[15] , "Number of users who have submitted jobs: 0");
     assertEquals(schedulingInfo, schedulingInfo2);
-    
+
     //Testing with actual job submission.
-    ArrayList<FakeJobInProgress> userJobs = 
+    ArrayList<FakeJobInProgress> userJobs =
       submitJobs(1, 5, "default").get("u1");
-    schedulingInfo = 
+    schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    
+
     //waiting job should be equal to number of jobs submitted.
-    assertEquals(infoStrings.length, 17);
-    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 5");
-    
+    assertEquals(infoStrings.length, 16);
+    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
+    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
+    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5");
+
     //Initalize the jobs but don't raise events
     controlledInitializationPoller.selectJobsToInitialize();
-    
-    schedulingInfo = 
+
+    schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 17);
+    assertEquals(infoStrings.length, 16);
     //should be previous value as nothing is scheduled because no events
     //has been raised after initialization.
-    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 5");
-    
+    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
+    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
+    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5");
+
     //Raise status change event so that jobs can move to running queue.
     raiseStatusChangeEvents(scheduler.jobQueuesManager);
     //assign one job
     Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     //Initalize extra job.
     controlledInitializationPoller.selectJobsToInitialize();
-    
+
     //Get scheduling information, now the number of waiting job should have
     //changed to 4 as one is scheduled and has become running.
     // make sure we update our stats
     scheduler.updateQSIInfoForTests();
-    schedulingInfo = 
+    schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 19);
-    assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[14],"Running tasks: 0.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[17] , "Number of Waiting Jobs: 4");
-    
+    assertEquals(infoStrings.length, 18);
+    assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
+    assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity");
+    assertEquals(infoStrings[16] , "Number of Waiting Jobs: 4");
+
     //assign a reduce task
     Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     // make sure we update our stats
     scheduler.updateQSIInfoForTests();
-    schedulingInfo = 
+    schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 21);
-    assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[14],"Running tasks: 100.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[19] , "Number of Waiting Jobs: 4");
-    
+    assertEquals(infoStrings.length, 20);
+    assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
+    assertEquals(infoStrings[13],"Running tasks: 100.0% of Capacity");
+    assertEquals(infoStrings[18] , "Number of Waiting Jobs: 4");
+
     //Complete the job and check the running tasks count
     FakeJobInProgress u1j1 = userJobs.get(0);
     taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1j1);
     taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1j1);
     taskTrackerManager.finalizeJob(u1j1);
-    
+
     // make sure we update our stats
     scheduler.updateQSIInfoForTests();
-    schedulingInfo = 
+    schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 17);
-    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 4");
-    
+    assertEquals(infoStrings.length, 16);
+    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
+    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
+    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 4");
+
     //Fail a job which is initialized but not scheduled and check the count.
     FakeJobInProgress u1j2 = userJobs.get(1);
-    assertTrue("User1 job 2 not initalized ", 
+    assertTrue("User1 job 2 not initalized ",
         u1j2.getStatus().getRunState() == JobStatus.RUNNING);
     taskTrackerManager.finalizeJob(u1j2, JobStatus.FAILED);
     //Run initializer to clean up failed jobs
     controlledInitializationPoller.selectJobsToInitialize();
     // make sure we update our stats
     scheduler.updateQSIInfoForTests();
-    schedulingInfo = 
+    schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 17);
-    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 3");
-    
+    assertEquals(infoStrings.length, 16);
+    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
+    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
+    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 3");
+
     //Fail a job which is not initialized but is in the waiting queue.
     FakeJobInProgress u1j5 = userJobs.get(4);
-    assertFalse("User1 job 5 initalized ", 
+    assertFalse("User1 job 5 initalized ",
         u1j5.getStatus().getRunState() == JobStatus.RUNNING);
-    
+
     taskTrackerManager.finalizeJob(u1j5, JobStatus.FAILED);
     //run initializer to clean up failed job
     controlledInitializationPoller.selectJobsToInitialize();
     // make sure we update our stats
     scheduler.updateQSIInfoForTests();
-    schedulingInfo = 
+    schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 17);
-    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 2");
-    
+    assertEquals(infoStrings.length, 16);
+    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
+    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
+    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 2");
+
     //Raise status change events as none of the intialized jobs would be
     //in running queue as we just failed the second job which was initialized
     //and completed the first one.
     raiseStatusChangeEvents(scheduler.jobQueuesManager);
-    
+
     //Now schedule a map should be job3 of the user as job1 succeeded job2
     //failed and now job3 is running
     t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
     FakeJobInProgress u1j3 = userJobs.get(2);
-    assertTrue("User Job 3 not running ", 
+    assertTrue("User Job 3 not running ",
         u1j3.getStatus().getRunState() == JobStatus.RUNNING);
-    
+
     //now the running count of map should be one and waiting jobs should be
     //one. run the poller as it is responsible for waiting count
     controlledInitializationPoller.selectJobsToInitialize();
     // make sure we update our stats
     scheduler.updateQSIInfoForTests();
-    schedulingInfo = 
+    schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 19);
-    assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[14],"Running tasks: 0.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[17] , "Number of Waiting Jobs: 1");
-    
+    assertEquals(infoStrings.length, 18);
+    assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
+    assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity");
+    assertEquals(infoStrings[16] , "Number of Waiting Jobs: 1");
+
     //Fail the executing job
     taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
     // make sure we update our stats
     scheduler.updateQSIInfoForTests();
     //Now running counts should become zero
-    schedulingInfo = 
+    schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 17);
-    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
-    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 1");
-    
+    assertEquals(infoStrings.length, 16);
+    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
+    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
+    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 1");
+
   }
 
   /**
@@ -1683,7 +1386,7 @@ public class TestCapacityScheduler extends TestCase {
 
     taskTrackerManager.addQueues(new String[] { "default" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.setTaskTrackerManager(taskTrackerManager);
@@ -1729,7 +1432,7 @@ public class TestCapacityScheduler extends TestCase {
 
     taskTrackerManager.addQueues(new String[] { "default" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.setTaskTrackerManager(taskTrackerManager);
@@ -1778,7 +1481,7 @@ public class TestCapacityScheduler extends TestCase {
 
     taskTrackerManager.addQueues(new String[] { "default" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
@@ -1863,7 +1566,7 @@ public class TestCapacityScheduler extends TestCase {
 
     taskTrackerManager.addQueues(new String[] { "default" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
@@ -1926,7 +1629,7 @@ public class TestCapacityScheduler extends TestCase {
     ttStatus.setReservedPhysicalMemory(0);
 
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
     taskTrackerManager.addQueues(new String[] { "default" });
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
@@ -1993,7 +1696,7 @@ public class TestCapacityScheduler extends TestCase {
     ttStatus.setReservedPhysicalMemory(0);
 
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
     taskTrackerManager.addQueues(new String[] { "default" });
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
@@ -2092,7 +1795,7 @@ public class TestCapacityScheduler extends TestCase {
     scheduler.setTaskTrackerManager(taskTrackerManager);
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
@@ -2217,7 +1920,7 @@ public class TestCapacityScheduler extends TestCase {
     String[] qs = { "default"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
@@ -2264,7 +1967,7 @@ public class TestCapacityScheduler extends TestCase {
     String[] qs = { "default"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();

+ 22 - 97
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java

@@ -49,13 +49,11 @@ public class TestCapacitySchedulerConf extends TestCase {
   
   public TestCapacitySchedulerConf() {
     defaultProperties = setupQueueProperties(
-        new String[] { "guaranteed-capacity", 
-                       "reclaim-time-limit",
+        new String[] { "capacity", 
                        "supports-priority",
                        "minimum-user-limit-percent",
                        "maximum-initialized-jobs-per-user"}, 
         new String[] { "100", 
-                        "300",
                         "false", 
                         "100",
                         "2" }
@@ -85,26 +83,22 @@ public class TestCapacitySchedulerConf extends TestCase {
   public void testQueues() {
 
     Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity", 
-                       "reclaim-time-limit",
+        new String[] { "capacity", 
                        "supports-priority",
                        "minimum-user-limit-percent",
                        "maximum-initialized-jobs-per-user"}, 
         new String[] { "10", 
-                        "600",
                         "true",
                         "25",
                         "4"}
                       );
 
     Map<String, String> q2Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity", 
-                       "reclaim-time-limit",
+        new String[] { "capacity", 
                        "supports-priority",
                        "minimum-user-limit-percent",
                        "maximum-initialized-jobs-per-user"}, 
         new String[] { "100", 
-                        "6000",
                         "false", 
                         "50",
                         "1"}
@@ -126,7 +120,7 @@ public class TestCapacitySchedulerConf extends TestCase {
   
   public void testQueueWithDefaultProperties() {
     Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity", 
+        new String[] { "capacity", 
                        "minimum-user-limit-percent" }, 
         new String[] { "20", 
                         "75" }
@@ -143,7 +137,6 @@ public class TestCapacitySchedulerConf extends TestCase {
     for (String key : q1Props.keySet()) {
       expProperties.put(key, q1Props.get(key));
     }
-    expProperties.put("reclaim-time-limit", "300");
     expProperties.put("supports-priority", "false");
     expProperties.put("maximum-initialized-jobs-per-user", "2");
     queueDetails.put("default", expProperties);
@@ -156,23 +149,19 @@ public class TestCapacitySchedulerConf extends TestCase {
     
     // write new values to the file...
     Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity", 
-                       "reclaim-time-limit",
+        new String[] { "capacity", 
                        "supports-priority",
                        "minimum-user-limit-percent" }, 
         new String[] { "20.5", 
-                        "600",
                         "true", 
                         "40" }
                       );
 
     Map<String, String> q2Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity", 
-                       "reclaim-time-limit",
+        new String[] { "capacity", 
                        "supports-priority",
                        "minimum-user-limit-percent" }, 
         new String[] { "100", 
-                        "3000",
                         "false",
                         "50" }
                       );
@@ -198,23 +187,19 @@ public class TestCapacitySchedulerConf extends TestCase {
     endConfig();
 
     Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity", 
-                       "reclaim-time-limit",
+        new String[] { "capacity",
                        "supports-priority",
                        "minimum-user-limit-percent" }, 
         new String[] { "-1", 
-                        "800",
                         "true", 
                         "50" }
                       );
 
     Map<String, String> q2Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity", 
-                       "reclaim-time-limit",
+        new String[] { "capacity",
                        "supports-priority",
                        "minimum-user-limit-percent" }, 
         new String[] { "-1", 
-                        "800",
                         "true",
                         "50" }
                       );
@@ -235,18 +220,16 @@ public class TestCapacitySchedulerConf extends TestCase {
     startConfig();
     writeUserDefinedDefaultConfiguration();
     Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity", 
-                       "reclaim-time-limit",
+        new String[] { "capacity",
                        "supports-priority",
                        "minimum-user-limit-percent" }, 
         new String[] { "-1", 
-                        "800",
                         "true", 
                         "50" }
                       );
 
     Map<String, String> q2Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity", 
+        new String[] { "capacity", 
                        "supports-priority",
                        "minimum-user-limit-percent" }, 
         new String[] { "40", 
@@ -254,12 +237,10 @@ public class TestCapacitySchedulerConf extends TestCase {
                         "50" }
                       );
     Map<String, String> q3Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity", 
-                       "reclaim-time-limit",
+        new String[] { "capacity", 
                        "supports-priority",
                        "minimum-user-limit-percent" }, 
         new String[] { "40", 
-                       "500",
                         "true",
                         "50" }
                       );
@@ -269,7 +250,6 @@ public class TestCapacitySchedulerConf extends TestCase {
     testConf = new CapacitySchedulerConf(new Path(testConfFile));
     Map<String, Map<String, String>> queueDetails
               = new HashMap<String, Map<String,String>>();
-    q2Props.put("reclaim-time-limit", "800");
     queueDetails.put("default", q1Props);
     queueDetails.put("production", q2Props);
     queueDetails.put("test", q3Props);
@@ -280,12 +260,10 @@ public class TestCapacitySchedulerConf extends TestCase {
     openFile();
     startConfig();
     Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity", 
-                       "reclaim-time-limit",
+        new String[] { "capacity", 
                        "supports-priority",
                        "minimum-user-limit-percent" }, 
-        new String[] { "-1", 
-                        "800",
+        new String[] { "-1",
                         "true", 
                         "-50" }
                       );
@@ -299,29 +277,6 @@ public class TestCapacitySchedulerConf extends TestCase {
       assertTrue(true);
     }
   }
-  public void testInvalidReclaimTimeLimit() throws IOException {
-    openFile();
-    startConfig();
-    Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity", 
-                       "reclaim-time-limit",
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "-1", 
-                        "-800",
-                        "true", 
-                        "50" }
-                      );
-    writeQueueDetails("default", q1Props);
-    endConfig();
-    try {
-      testConf = new CapacitySchedulerConf(new Path(testConfFile));
-      testConf.getReclaimTimeLimit("default");
-      fail("Expect Invalid reclaim time limit to raise Exception");
-    }catch(IllegalArgumentException e) {
-      assertTrue(true);
-    }
-  }
   
   public void testInitializationPollerProperties() 
     throws Exception {
@@ -372,42 +327,16 @@ public class TestCapacitySchedulerConf extends TestCase {
     } catch (IllegalArgumentException e) {}
   }
   
-  public void testInvalidReclaimCapacityInterval() throws IOException {
-    openFile();
-    startConfig();
-    Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "guaranteed-capacity", 
-                       "reclaim-time-limit",
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "-1", 
-                        "-800",
-                        "true", 
-                        "50" }
-                      );
-    writeQueueDetails("default", q1Props);
-    writeProperty("mapred.capacity-scheduler.reclaimCapacity.interval", "0");
-    endConfig();
-    try {
-      testConf = new CapacitySchedulerConf(new Path(testConfFile));
-      testConf.getReclaimCapacityInterval();
-      fail("Expect Invalid reclaim capacity interval raise Exception");
-    }catch(IllegalArgumentException e) {
-      assertTrue(true);
-    }
-  }
-  
+
   private void checkQueueProperties(
                         CapacitySchedulerConf testConf,
                         Map<String, Map<String, String>> queueDetails) {
     for (String queueName : queueDetails.keySet()) {
       Map<String, String> map = queueDetails.get(queueName);
-      assertEquals(Float.parseFloat(map.get("guaranteed-capacity")),
-           testConf.getGuaranteedCapacity(queueName));
+      assertEquals(Float.parseFloat(map.get("capacity")),
+           testConf.getCapacity(queueName));
       assertEquals(Integer.parseInt(map.get("minimum-user-limit-percent")),
           testConf.getMinimumUserLimitPercent(queueName));
-      assertEquals(Integer.parseInt(map.get("reclaim-time-limit")),
-          testConf.getReclaimTimeLimit(queueName));
       assertEquals(Boolean.parseBoolean(map.get("supports-priority")),
           testConf.isPrioritySupported(queueName));
     }
@@ -451,25 +380,21 @@ public class TestCapacitySchedulerConf extends TestCase {
   
   
   private void writeDefaultConfiguration() {
-    writeProperty("mapred.capacity-scheduler.default-reclaim-time-limit"
-        , "300");
     writeProperty("mapred.capacity-scheduler.default-supports-priority"
         , "false");
     writeProperty("mapred.capacity-scheduler.default-minimum-user-limit-percent"
-        , "100");  
+        , "100");
   }
-  
-  
+
+
   private void writeUserDefinedDefaultConfiguration() {
-    writeProperty("mapred.capacity-scheduler.default-reclaim-time-limit"
-        , "800");
     writeProperty("mapred.capacity-scheduler.default-supports-priority"
         , "true");
     writeProperty("mapred.capacity-scheduler.default-minimum-user-limit-percent"
-        , "50"); 
+        , "50");
   }
-  
-  
+
+
   private void writeProperty(String name, String value) {
     writer.println("<property>");
     writer.println("<name> " + name + "</name>");