瀏覽代碼

commit 2845b2fe4208c72af24a511e2bcd98b82e195c58
Author: Arun C Murthy <acmurthy@apache.org>
Date: Tue Jul 20 23:08:02 2010 -0700

MAPREDUCE-1872. Hardened CapacityScheduler to have comprehensive, coherent limits on tasks/jobs for jobs/users/queues. Also, added the ability to refresh queue definitions without the need to restart the JobTracker.

+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1872. Hardened CapacityScheduler to have comprehensive, coherent
+ limits on tasks/jobs for jobs/users/queues. Also, added the ability to
+ refresh queue definitions without the need to restart the JobTracker.
+ (acmurthy)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077573 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 年之前
父節點
當前提交
8985ff2232

+ 119 - 29
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java

@@ -36,6 +36,8 @@ class CapacitySchedulerConf {
   
   private int defaultUlimitMinimum;
   
+  private float defaultUserLimitFactor;
+  
   private boolean defaultSupportPriority;
   
   private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX = 
@@ -98,7 +100,14 @@ class CapacitySchedulerConf {
 
   private Configuration rmConf;
 
-  private int defaultMaxJobsPerUsersToInitialize;
+  private int defaultInitToAcceptJobsFactor;
+  private int defaultMaxActiveTasksPerUserToInitialize;
+  private int defaultMaxActiveTasksPerQueueToInitialize;
+  
+  static final String MAX_SYSTEM_JOBS_KEY = 
+    "mapred.capacity-scheduler.maximum-system-jobs";
+  
+  static final int DEFAULT_MAX_SYSTEM_JOBS = 5000;
   
   /**
    * Create a new Capacity scheduler conf.
@@ -130,13 +139,25 @@ class CapacitySchedulerConf {
    * which is used by the Capacity Scheduler.
    */
   private void initializeDefaults() {
-    defaultUlimitMinimum = rmConf.getInt(
-        "mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);
+    defaultUlimitMinimum = 
+      rmConf.getInt(
+          "mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);
+    defaultUserLimitFactor = 
+      rmConf.getFloat("mapred.capacity-scheduler.default-user-limit-factor", 
+          1.0f);
     defaultSupportPriority = rmConf.getBoolean(
         "mapred.capacity-scheduler.default-supports-priority", false);
-    defaultMaxJobsPerUsersToInitialize = rmConf.getInt(
-        "mapred.capacity-scheduler.default-maximum-initialized-jobs-per-user",
-        2);
+    defaultMaxActiveTasksPerQueueToInitialize = 
+      rmConf.getInt(
+          "mapred.capacity-scheduler.default-maximum-active-tasks-per-queue", 
+          200000);
+    defaultMaxActiveTasksPerUserToInitialize = 
+      rmConf.getInt(
+          "mapred.capacity-scheduler.default-maximum-active-tasks-per-user", 
+          100000);
+    defaultInitToAcceptJobsFactor =
+      rmConf.getInt("mapred.capacity-scheduler.default-init-accept-jobs-factor", 
+          10);
   }
   
   /**
@@ -293,6 +314,32 @@ class CapacitySchedulerConf {
                     value);
   }
   
+  /**
+   * Get the factor of queue capacity above which a single user in a queue
+   * can consume resources.
+   * 
+   * @param queue queue name
+   * @return factor of queue capacity above which a single user in a queue
+   *         can consume resources
+   */
+  public float getUserLimitFactor(String queue) {
+    return rmConf.getFloat(toFullPropertyName(queue, "user-limit-factor"), 
+        defaultUserLimitFactor);
+  }
+  
+  /**
+   * Set the factor of queue capacity above which a single user in a queue
+   * can consume resources.
+   * 
+   * @param queue queue name
+   * @param userLimitFactor factor of queue capacity above which a single user 
+   *                        in a queue can consume resources
+   */
+  public void setUserLimitFactor(String queue, float userLimitFactor) {
+    rmConf.setFloat(toFullPropertyName(queue, "user-limit-factor"), 
+        userLimitFactor);
+  }
+  
   /**
    * Reload configuration by clearing the information read from the 
    * underlying configuration file.
@@ -307,38 +354,81 @@ class CapacitySchedulerConf {
       return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
   }
 
+  public int getMaxSystemJobs() {
+    int maxSystemJobs = 
+      rmConf.getInt(MAX_SYSTEM_JOBS_KEY, DEFAULT_MAX_SYSTEM_JOBS);
+    if (maxSystemJobs <= 0) {
+      throw new IllegalArgumentException("Invalid maximum system jobs: " + 
+          maxSystemJobs);
+    }
+    
+    return maxSystemJobs;
+  }
+
+  public void setMaxSystemJobs(int maxSystemJobs) {
+    rmConf.setInt(MAX_SYSTEM_JOBS_KEY, maxSystemJobs);
+  }
+  
+  public int getInitToAcceptJobsFactor(String queue) {
+    int initToAccepFactor = 
+      rmConf.getInt(toFullPropertyName(queue, "init-accept-jobs-factor"), 
+          defaultInitToAcceptJobsFactor);
+    if(initToAccepFactor <= 0) {
+      throw new IllegalArgumentException(
+          "Invalid maximum jobs per user configuration " + initToAccepFactor);
+    }
+    return initToAccepFactor;
+  }
+  
+  public void setInitToAcceptJobsFactor(String queue, int initToAcceptFactor) {
+    rmConf.setInt(toFullPropertyName(queue, "init-accept-jobs-factor"), 
+        initToAcceptFactor);
+  }
+  
   /**
-   * Gets the maximum number of jobs which are allowed to initialize in the
-   * job queue.
+   * Get the maximum active tasks per user to be initialized.
    * 
-   * @param queue queue name.
-   * @return maximum number of jobs allowed to be initialized per user.
-   * @throws IllegalArgumentException if maximum number of users is negative
-   * or zero.
+   * @param queue queue name
    */
-  public int getMaxJobsPerUserToInitialize(String queue) {
-    int maxJobsPerUser = rmConf.getInt(toFullPropertyName(queue,
-        "maximum-initialized-jobs-per-user"), 
-        defaultMaxJobsPerUsersToInitialize);
-    if(maxJobsPerUser <= 0) {
-      throw new IllegalArgumentException(
-          "Invalid maximum jobs per user configuration " + maxJobsPerUser);
-    }
-    return maxJobsPerUser;
+  public int getMaxInitializedActiveTasks(String queue) {
+    return rmConf.getInt(toFullPropertyName(queue, 
+                                            "maximum-initialized-active-tasks"), 
+                         defaultMaxActiveTasksPerQueueToInitialize);
   }
   
   /**
-   * Sets the maximum number of jobs which are allowed to be initialized 
-   * for a user in the queue.
+   * Set the maximum active tasks per user to be initialized.
    * 
-   * @param queue queue name.
-   * @param value maximum number of jobs allowed to be initialized per user.
+   * @param queue queue name
+   * @param value maximum active tasks
    */
-  public void setMaxJobsPerUserToInitialize(String queue, int value) {
-    rmConf.setInt(toFullPropertyName(queue, 
-        "maximum-initialized-jobs-per-user"), value);
+  public void setMaxInitializedActiveTasks(String queue, int value) {
+    rmConf.setInt(toFullPropertyName(queue, "maximum-initialized-active-tasks"), 
+                  value);
   }
-
+  
+  /**
+   * Get the maximum active tasks per user to be initialized.
+   * 
+   * @param queue queue name
+   */
+  public int getMaxInitializedActiveTasksPerUser(String queue) {
+    return rmConf.getInt(toFullPropertyName(queue, 
+                                            "maximum-initialized-active-tasks-per-user"), 
+                         defaultMaxActiveTasksPerUserToInitialize);
+  }
+  
+  /**
+   * Set the maximum active tasks per user to be initialized.
+   * 
+   * @param queue queue name
+   * @param value maximum active tasks
+   */
+  public void setMaxInitializedActiveTasksPerUser(String queue, int value) {
+    rmConf.setInt(toFullPropertyName(queue, "maximum-initialized-active-tasks-per-user"), 
+                  value);
+  }
+  
   /**
    * Amount of time in milliseconds which poller thread and initialization
    * thread would sleep before looking at the queued jobs.

+ 1061 - 0
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java

@@ -0,0 +1,1061 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapred.CapacityTaskScheduler.TaskSchedulingMgr;
+import org.apache.hadoop.mapred.JobQueueJobInProgressListener.JobSchedulingInfo;
+
+
+/***********************************************************************
+ * Keeping track of scheduling information for queues
+ * 
+ * We need to maintain scheduling information relevant to a queue (its 
+ * name, capacity, etc), along with information specific to 
+ * each kind of task, Map or Reduce (num of running tasks, pending 
+ * tasks etc). 
+ * 
+ * This scheduling information is used to decide how to allocate
+ * tasks, redistribute capacity, etc.
+ *  
+ * A QueueSchedulingInfo(QSI) object represents scheduling information for
+ * a  A TaskSchedulingInfo (TSI) object represents scheduling 
+ * information for a particular kind of task (Map or Reduce).
+ *   
+ **********************************************************************/
+class CapacitySchedulerQueue {
+  
+  static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
+  
+  private static class SlotsUsage {
+    /** 
+     * the actual capacity, which depends on how many slots are available
+     * in the cluster at any given time. 
+     */
+    private int capacity = 0;
+    // number of running tasks
+    int numRunningTasks = 0;
+    // number of slots occupied by running tasks
+    int numSlotsOccupied = 0;
+  
+    //the actual maximum capacity which depends on how many slots are available
+    //in cluster at any given time.
+    private int maxCapacity = -1;
+  
+    /**
+     * for each user, we need to keep track of number of slots occupied by
+     * running tasks
+     */
+    Map<String, Integer> numSlotsOccupiedByUser = 
+      new HashMap<String, Integer>();
+  
+    /**
+     * reset the variables associated with tasks
+     */
+    void reset() {
+      numRunningTasks = 0;
+      numSlotsOccupied = 0;
+      numSlotsOccupiedByUser.clear();
+    }
+  
+  
+    /**
+     * Returns the actual capacity.
+     * capacity.
+     *
+     * @return
+     */
+    int getCapacity() {
+      return capacity;
+    }
+  
+    /**
+     * Mutator method for capacity
+     *
+     * @param capacity
+     */
+    void setCapacity(int capacity) {
+        this.capacity = capacity;
+    }
+  
+    /**
+     * @return the numRunningTasks
+     */
+    int getNumRunningTasks() {
+      return numRunningTasks;
+    }
+  
+    /**
+     * @return the numSlotsOccupied
+     */
+    int getNumSlotsOccupied() {
+      return numSlotsOccupied;
+    }
+  
+    /**
+     * return information about the tasks
+     */
+    @Override
+    public String toString() {
+      float occupiedSlotsAsPercent =
+          getCapacity() != 0 ?
+            ((float) numSlotsOccupied * 100 / getCapacity()) : 0;
+      StringBuffer sb = new StringBuffer();
+      
+      sb.append("Capacity: " + capacity + " slots\n");
+      
+      if(getMaxCapacity() >= 0) {
+        sb.append("Maximum capacity: " + getMaxCapacity() +" slots\n");
+      }
+      sb.append(String.format("Used capacity: %d (%.1f%% of Capacity)\n",
+          Integer.valueOf(numSlotsOccupied), Float
+              .valueOf(occupiedSlotsAsPercent)));
+      sb.append(String.format("Running tasks: %d\n", Integer
+          .valueOf(numRunningTasks)));
+      // include info on active users
+      if (numSlotsOccupied != 0) {
+        sb.append("Active users:\n");
+        for (Map.Entry<String, Integer> entry : numSlotsOccupiedByUser
+            .entrySet()) {
+          if ((entry.getValue() == null) || (entry.getValue().intValue() <= 0)) {
+            // user has no tasks running
+            continue;
+          }
+          sb.append("User '" + entry.getKey() + "': ");
+          int numSlotsOccupiedByThisUser = entry.getValue().intValue();
+          float p =
+              (float) numSlotsOccupiedByThisUser * 100 / numSlotsOccupied;
+          sb.append(String.format("%d (%.1f%% of used capacity)\n", Long
+              .valueOf(numSlotsOccupiedByThisUser), Float.valueOf(p)));
+        }
+      }
+      return sb.toString();
+    }
+  
+    int getMaxCapacity() {
+      return maxCapacity;
+    }
+  
+    void setMaxCapacity(int maxCapacity) {
+      this.maxCapacity = maxCapacity;
+    }
+    
+    int getNumSlotsOccupiedByUser(String user) {
+      Integer slots = numSlotsOccupiedByUser.get(user);
+      return (slots != null) ? slots : 0;
+    }
+
+
+    void updateCapacities(float capacityPercent, float maxCapacityPercent, 
+                          int clusterCapacity) {
+      //compute new capacity
+      setCapacity((int)(capacityPercent*clusterCapacity/100));
+
+      //compute new max map capacities
+      if(maxCapacityPercent > 0) {
+        setMaxCapacity((int)(maxCapacityPercent*clusterCapacity / 100));
+      }
+    }
+    
+    void updateSlotsUsage(String user, int numRunningTasks, int numSlotsOccupied) {
+      this.numRunningTasks += numRunningTasks;
+      this.numSlotsOccupied += numSlotsOccupied;
+      Integer i = this.numSlotsOccupiedByUser.get(user);
+      int slots = numSlotsOccupied + ((i == null) ? 0 : i.intValue());
+      this.numSlotsOccupiedByUser.put(user, slots);
+    }
+  }
+
+  // Queue name
+  final String queueName;
+
+  /**
+   * capacity(%) is set in the config
+   */
+  volatile float capacityPercent = 0;
+  
+  
+  /**
+   * maxCapacityPercent(%) is set in config as
+   * mapred.capacity-scheduler.<queue-name>.maximum-capacity
+   * maximum-capacity percent defines a limit beyond which a queue
+   * cannot expand. Remember this limit is dynamic and changes w.r.t
+   * cluster size.
+   */
+  volatile float maxCapacityPercent = -1;
+  
+  /** 
+   * to handle user limits, we need to know how many users have jobs in 
+   * the 
+   */  
+  Map<String, Integer> numJobsByUser = new HashMap<String, Integer>();
+    
+  /**
+   * min value of user limit (same for all users)
+   */
+  volatile int ulMin;
+
+  /**
+   * The factor of queue-capacity above which a single user can consume
+   * queue resources.
+   */
+  volatile float ulMinFactor;
+  
+  /**
+   * We keep a TaskSchedulingInfo object for each kind of task we support
+   */
+  CapacitySchedulerQueue.SlotsUsage mapSlots;
+  CapacitySchedulerQueue.SlotsUsage reduceSlots;
+  
+  /** 
+   * Whether the queue supports priorities.
+   */
+  final boolean supportsPriorities;
+  
+  /**
+   * Information required to track job, user, queue limits 
+   */
+  
+  Map<JobSchedulingInfo, JobInProgress> waitingJobs; // for waiting jobs
+  Map<JobSchedulingInfo, JobInProgress> runningJobs; // for running jobs
+  
+  /**
+   *  Active tasks in the queue
+   */
+  int activeTasks = 0;
+  
+  /**
+   *  Users in the queue
+   */
+  Map<String, UserInfo> users = new HashMap<String, UserInfo>();
+
+  /**
+   * Comparator for ordering jobs in this queue
+   */
+  public Comparator<JobSchedulingInfo> comparator;
+  
+  int maxJobsToInit;
+  int maxJobsToAccept;
+  int maxJobsPerUserToInit;
+  int maxJobsPerUserToAccept;
+  int maxActiveTasks;
+  int maxActiveTasksPerUser;
+
+  // comparator for jobs in queues that don't support priorities
+  private static final Comparator<JobSchedulingInfo> STARTTIME_JOB_COMPARATOR
+    = new Comparator<JobSchedulingInfo>() {
+    public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) {
+      // the job that started earlier wins
+      if (o1.getStartTime() < o2.getStartTime()) {
+        return -1;
+      } else {
+        return (o1.getStartTime() == o2.getStartTime() 
+                ? o1.getJobID().compareTo(o2.getJobID()) 
+                : 1);
+      }
+    }
+  };
+
+  public CapacitySchedulerQueue(String queueName, CapacitySchedulerConf conf) {
+    this.queueName = queueName;
+
+    // Do not allow changes to 'supportsPriorities'
+    supportsPriorities = conf.isPrioritySupported(queueName);
+
+    initializeQueue(conf);
+
+    if (supportsPriorities) {
+      // use the default priority-aware comparator
+      comparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR;
+    }
+    else {
+      comparator = STARTTIME_JOB_COMPARATOR;
+    }
+    this.waitingJobs = 
+      new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
+    this.runningJobs = 
+      new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
+
+    this.mapSlots = new SlotsUsage();
+    this.reduceSlots = new SlotsUsage();    
+  }
+  
+  synchronized void init(float capacityPercent, float maxCapacityPercent,
+      int ulMin, float ulMinFactor,
+      int maxJobsToInit, int maxJobsPerUserToInit,
+      int maxActiveTasks, int maxActiveTasksPerUser,
+      int maxJobsToAccept, int maxJobsPerUserToAccept) {
+    this.capacityPercent = capacityPercent;
+    this.maxCapacityPercent = maxCapacityPercent;
+    this.ulMin = ulMin;
+    this.ulMinFactor = ulMinFactor;
+    
+    this.maxJobsToInit = maxJobsToInit;
+    this.maxJobsPerUserToInit = maxJobsPerUserToInit; 
+    this.maxActiveTasks = maxActiveTasks;
+    this.maxActiveTasksPerUser = maxActiveTasksPerUser; 
+    this.maxJobsToAccept = maxJobsToAccept;
+    this.maxJobsPerUserToAccept = maxJobsPerUserToAccept;
+    
+    LOG.info("Initialized '" + queueName + "' queue with " +
+        "cap=" + capacityPercent + ", " +
+        "maxCap=" + maxCapacityPercent + ", " +
+        "ulMin=" + ulMin + ", " +
+        "ulMinFactor=" + ulMinFactor + ", " +
+        "supportsPriorities=" + supportsPriorities + ", " +
+        "maxJobsToInit=" + maxJobsToInit + ", " +
+        "maxJobsToAccept=" + maxJobsToAccept + ", " +
+        "maxActiveTasks=" + maxActiveTasks + ", " +
+        "maxJobsPerUserToInit=" + maxJobsPerUserToInit + ", " +
+        "maxJobsPerUserToAccept=" + maxJobsPerUserToAccept + ", " +
+        "maxActiveTasksPerUser=" + maxActiveTasksPerUser
+    );
+  }
+  
+  synchronized void initializeQueue(CapacitySchedulerQueue other) {
+    init(other.capacityPercent, other.maxCapacityPercent, 
+        other.ulMin, other.ulMinFactor, 
+        other.maxJobsToInit, other.maxJobsPerUserToInit, 
+        other.maxActiveTasks, other.maxActiveTasksPerUser, 
+        other.maxJobsToAccept, other.maxJobsPerUserToAccept);
+  }
+  
+  synchronized void initializeQueue(CapacitySchedulerConf conf) {
+    float capacityPercent = conf.getCapacity(queueName);
+    float maxCapacityPercent = conf.getMaxCapacity(queueName);
+    int ulMin = conf.getMinimumUserLimitPercent(queueName);
+    float ulMinFactor = conf.getUserLimitFactor(queueName);
+    
+    int maxSystemJobs = conf.getMaxSystemJobs();
+    int maxJobsToInit = (int)Math.ceil(maxSystemJobs * capacityPercent/100.0);
+    int maxJobsPerUserToInit = 
+      (int)Math.ceil(maxSystemJobs * capacityPercent/100.0 * ulMin/100.0);
+    int maxActiveTasks = conf.getMaxInitializedActiveTasks(queueName);
+    int maxActiveTasksPerUser = 
+      conf.getMaxInitializedActiveTasksPerUser(queueName);
+
+    int jobInitToAcceptFactor = conf.getInitToAcceptJobsFactor(queueName);
+    int maxJobsToAccept = maxJobsToInit * jobInitToAcceptFactor;
+    int maxJobsPerUserToAccept = maxJobsPerUserToInit * jobInitToAcceptFactor;
+    
+    init(capacityPercent, maxCapacityPercent, 
+        ulMin, ulMinFactor, 
+        maxJobsToInit, maxJobsPerUserToInit, 
+        maxActiveTasks, maxActiveTasksPerUser, 
+        maxJobsToAccept, maxJobsPerUserToAccept);
+  }
+
+  /**
+   * @return the queueName
+   */
+  String getQueueName() {
+    return queueName;
+  }
+
+  /**
+   * @return the capacityPercent
+   */
+  float getCapacityPercent() {
+    return capacityPercent;
+  }
+
+  /**
+   * reset the variables associated with tasks
+   */
+  void resetSlotsUsage(TaskType taskType) {
+    if (taskType == TaskType.MAP) {
+      mapSlots.reset();
+    } else if (taskType == TaskType.REDUCE) {
+      reduceSlots.reset();
+    } else {    
+      throw new IllegalArgumentException("Illegal taskType=" + taskType);
+    }
+  }
+
+
+  /**
+   * Returns the actual capacity in terms of slots for the <code>taskType</code>.
+   * @param taskType
+   * @return actual capacity in terms of slots for the <code>taskType</code>
+   */
+  int getCapacity(TaskType taskType) {
+    if (taskType == TaskType.MAP) {
+      return mapSlots.getCapacity();
+    } else if (taskType == TaskType.REDUCE) {
+      return reduceSlots.getCapacity();
+    }
+
+    throw new IllegalArgumentException("Illegal taskType=" + taskType);
+  }
+
+  /**
+   * Get the number of running tasks of the given <code>taskType</code>.
+   * @param taskType
+   * @return
+   */
+  int getNumRunningTasks(TaskType taskType) {
+    if (taskType == TaskType.MAP) {
+      return mapSlots.getNumRunningTasks();
+    } else if (taskType == TaskType.REDUCE) {
+      return reduceSlots.getNumRunningTasks();
+    }
+    
+    throw new IllegalArgumentException("Illegal taskType=" + taskType);
+  }
+
+  /**
+   * Get number of slots occupied of the <code>taskType</code>.
+   * @param taskType
+   * @return number of slots occupied of the <code>taskType</code>
+   */
+  int getNumSlotsOccupied(TaskType taskType) {
+    if (taskType == TaskType.MAP) {
+      return mapSlots.getNumSlotsOccupied();
+    } else if (taskType == TaskType.REDUCE) {
+      return reduceSlots.getNumSlotsOccupied();
+    }
+    
+    throw new IllegalArgumentException("Illegal taskType=" + taskType);
+  }
+
+  /**
+   * Get maximum number of slots for the <code>taskType</code>.
+   * @param taskType
+   * @return maximum number of slots for the <code>taskType</code>
+   */
+  int getMaxCapacity(TaskType taskType) {
+    if (taskType == TaskType.MAP) {
+      return mapSlots.getMaxCapacity();
+    } else if (taskType == TaskType.REDUCE) {
+      return reduceSlots.getMaxCapacity();
+    }
+    
+    throw new IllegalArgumentException("Illegal taskType=" + taskType);
+  }
+
+  /**
+   * Get number of slots occupied by a <code>user</code> of 
+   * <code>taskType</code>.
+   * @param user
+   * @param taskType
+   * @return number of slots occupied by a <code>user</code> of 
+   *         <code>taskType</code>
+   */
+  int getNumSlotsOccupiedByUser(String user, TaskType taskType) {
+    if (taskType == TaskType.MAP) {
+      return mapSlots.getNumSlotsOccupiedByUser(user);
+    } else if (taskType == TaskType.REDUCE) {
+      return reduceSlots.getNumSlotsOccupiedByUser(user);
+    }
+    
+    throw new IllegalArgumentException("Illegal taskType=" + taskType);
+  }
+  
+  /**
+   * A new job is added to the 
+   * @param job
+   */
+  void jobAdded(JobInProgress job) {
+    // update user-specific info
+    String user = job.getProfile().getUser();
+    
+    Integer i = numJobsByUser.get(user);
+    if (null == i) {
+      i = 1;
+      // set the count for running tasks to 0
+      mapSlots.numSlotsOccupiedByUser.put(user, 0);
+      reduceSlots.numSlotsOccupiedByUser.put(user, 0);
+    }
+    else {
+      i++;
+    }
+    numJobsByUser.put(user, i);
+  }
+  
+  int getNumJobsByUser(String user) {
+    Integer numJobs = numJobsByUser.get(user);
+    return (numJobs != null) ? numJobs : 0;
+  }
+  
+  /**
+   * A job from the queue has completed.
+   * @param job
+   */
+  void jobCompleted(JobInProgress job) {
+    String user = job.getProfile().getUser();
+    // update numJobsByUser
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Job to be removed for user " + user);
+    }
+    Integer i = numJobsByUser.get(job.getProfile().getUser());
+    i--;  // i should never be null!
+    if (0 == i.intValue()) {
+      numJobsByUser.remove(user);
+      // remove job footprint from our TSIs
+      mapSlots.numSlotsOccupiedByUser.remove(user);
+      reduceSlots.numSlotsOccupiedByUser.remove(user);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No more jobs for user, number of users = " + 
+            numJobsByUser.size());
+      }
+    }
+    else {
+      numJobsByUser.put(user, i);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("User still has " + i + " jobs, number of users = "
+                + numJobsByUser.size());
+      }
+    }
+  }
+  
+  /**
+   * Update queue usage.
+   * @param type
+   * @param user
+   * @param numRunningTasks
+   * @param numSlotsOccupied
+   */
+  void update(TaskType type, String user, 
+      int numRunningTasks, int numSlotsOccupied) {
+    if (type == TaskType.MAP) {
+      mapSlots.updateSlotsUsage(user, numRunningTasks, numSlotsOccupied);
+    } else if (type == TaskType.REDUCE) {
+      reduceSlots.updateSlotsUsage(user, numRunningTasks, numSlotsOccupied);
+    }
+  }
+  
+  /**
+   * Update queue usage across all running jobs.
+   * @param mapClusterCapacity
+   * @param reduceClusterCapacity
+   * @param mapScheduler
+   * @param reduceScheduler
+   */
+  void updateAll(int mapClusterCapacity, int reduceClusterCapacity, 
+      TaskSchedulingMgr mapScheduler, TaskSchedulingMgr reduceScheduler) {
+   // Compute new capacities for maps and reduces
+    mapSlots.updateCapacities(capacityPercent, maxCapacityPercent, 
+        mapClusterCapacity);
+    reduceSlots.updateCapacities(capacityPercent, maxCapacityPercent, 
+        reduceClusterCapacity);
+
+    // reset running/pending tasks, tasks per user
+    resetSlotsUsage(TaskType.MAP);
+    resetSlotsUsage(TaskType.REDUCE);
+    
+    Collection<JobInProgress> jobs = getRunningJobs(); // Safe to iterate since
+                                                       // we get a copy here
+    for (JobInProgress j : jobs) {
+      if (j.getStatus().getRunState() != JobStatus.RUNNING) {
+        continue;
+      }
+
+      int numMapsRunningForThisJob = mapScheduler.getRunningTasks(j);
+      int numReducesRunningForThisJob = reduceScheduler.getRunningTasks(j);
+      int numRunningMapSlots = 
+        numMapsRunningForThisJob * mapScheduler.getSlotsPerTask(j);
+      int numRunningReduceSlots =
+        numReducesRunningForThisJob * reduceScheduler.getSlotsPerTask(j);
+      int numMapSlotsForThisJob = mapScheduler.getSlotsOccupied(j);
+      int numReduceSlotsForThisJob = reduceScheduler.getSlotsOccupied(j);
+      int numReservedMapSlotsForThisJob = 
+        (mapScheduler.getNumReservedTaskTrackers(j) * 
+         mapScheduler.getSlotsPerTask(j)); 
+      int numReservedReduceSlotsForThisJob = 
+        (reduceScheduler.getNumReservedTaskTrackers(j) * 
+         reduceScheduler.getSlotsPerTask(j)); 
+      
+      j.setSchedulingInfo(
+          CapacityTaskScheduler.getJobQueueSchedInfo(numMapsRunningForThisJob, 
+              numRunningMapSlots,
+              numReservedMapSlotsForThisJob,
+              numReducesRunningForThisJob, 
+              numRunningReduceSlots,
+              numReservedReduceSlotsForThisJob));
+
+      update(TaskType.MAP, j.getProfile().getUser(), 
+          numMapsRunningForThisJob, numMapSlotsForThisJob);
+      update(TaskType.REDUCE, j.getProfile().getUser(), 
+          numReducesRunningForThisJob, numReduceSlotsForThisJob);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format(queueName + " - updateQSI: job %s: run(m)=%d, "
+            + "occupied(m)=%d, run(r)=%d, occupied(r)=%d, finished(m)=%d,"
+            + " finished(r)=%d, failed(m)=%d, failed(r)=%d, "
+            + "spec(m)=%d, spec(r)=%d, total(m)=%d, total(r)=%d", j
+            .getJobID().toString(), Integer
+            .valueOf(numMapsRunningForThisJob), Integer
+            .valueOf(numMapSlotsForThisJob), Integer
+            .valueOf(numReducesRunningForThisJob), Integer
+            .valueOf(numReduceSlotsForThisJob), Integer.valueOf(j
+            .finishedMaps()), Integer.valueOf(j.finishedReduces()), Integer
+            .valueOf(j.failedMapTasks),
+            Integer.valueOf(j.failedReduceTasks), Integer
+                .valueOf(j.speculativeMapTasks), Integer
+                .valueOf(j.speculativeReduceTasks), Integer
+                .valueOf(j.numMapTasks), Integer.valueOf(j.numReduceTasks)));
+      }
+    }
+  }
+  
+  boolean doesQueueSupportPriorities() {
+    return supportsPriorities;
+  }
+
+  /**
+   * return information about the queue
+   *
+   * @return a String representing the information about the 
+   */
+  @Override
+  public String toString(){
+    // We print out the queue information first, followed by info
+    // on map and reduce tasks and job info
+    StringBuilder sb = new StringBuilder();
+    sb.append("Queue configuration\n");
+    sb.append("Capacity Percentage: ");
+    sb.append(capacityPercent);
+    sb.append("%\n");
+    sb.append("User Limit: " + ulMin + "%\n");
+    sb.append("Priority Supported: " +
+        (doesQueueSupportPriorities() ? "YES":"NO") + "\n");
+    sb.append("-------------\n");
+
+    sb.append("Map tasks\n");
+    sb.append(mapSlots.toString());
+    sb.append("-------------\n");
+    sb.append("Reduce tasks\n");
+    sb.append(reduceSlots.toString());
+    sb.append("-------------\n");
+    
+    sb.append("Job info\n");
+    sb.append("Number of Waiting Jobs: " + getNumWaitingJobs() + "\n");
+    sb.append("Number of users who have submitted jobs: " + 
+        numJobsByUser.size() + "\n");
+    return sb.toString();
+  }
+  
+  /**
+   * Functionality to deal with job initialization
+   */
+
+  
+  // per-user information
+  static class UserInfo {
+    int runningJobs;
+    int waitingJobs;
+    
+    int activeTasks;
+    
+    int getNumRunningJobs() {
+      return runningJobs;
+    }
+    
+    int getNumWaitingJobs() {
+      return waitingJobs;
+    }
+    
+    int getNumActiveTasks() {
+      return activeTasks;
+    }
+    
+    public void jobAdded(JobInProgress job) {
+      ++waitingJobs;
+    }
+    
+    public void jobInitialized(JobInProgress job) {
+      --waitingJobs;
+      
+      ++runningJobs;
+      activeTasks += job.desiredTasks();
+    }
+    
+    public void jobCompleted(JobInProgress job) {
+      --runningJobs;
+      activeTasks -= job.desiredTasks();
+    }
+    
+    boolean isInactive() {
+      return activeTasks == 0 && runningJobs == 0  && waitingJobs == 0;
+    }
+  }
+
+  synchronized Collection<JobInProgress> getWaitingJobs() {
+    return Collections.unmodifiableCollection(
+        new LinkedList<JobInProgress>(waitingJobs.values()));
+  }
+  
+  synchronized Collection<JobInProgress> getRunningJobs() {
+    return Collections.unmodifiableCollection(
+        new LinkedList<JobInProgress>(runningJobs.values())); 
+  }
+  
+  synchronized int getNumActiveTasks() {
+    return activeTasks;
+  }
+  
+  synchronized int getNumRunningJobs() {
+    return runningJobs.size();
+  }
+  
+  synchronized int getNumRunningJobsByUser(String user) {
+    UserInfo userInfo = users.get(user);
+    return (userInfo == null) ? 0 : userInfo.getNumRunningJobs();
+  }
+
+  synchronized int getNumActiveTasksByUser(String user) {
+    UserInfo userInfo = users.get(user);
+    return (userInfo == null) ? 0 : userInfo.getNumActiveTasks();
+  }
+
+  synchronized int getNumWaitingJobsByUser(String user) {
+    UserInfo userInfo = users.get(user);
+    return (userInfo == null) ? 0 : userInfo.getNumWaitingJobs();
+  }
+
+  synchronized void addRunningJob(JobInProgress job) {
+    JobSchedulingInfo jobSchedInfo = new JobSchedulingInfo(job);
+
+    if (runningJobs.containsKey(jobSchedInfo)) {
+      LOG.info("job " + job.getJobID() + " already running in queue'" +
+          queueName + "'!");
+      return;
+    }
+
+    runningJobs.put(jobSchedInfo,job);
+
+    // Update queue stats
+    activeTasks += job.desiredTasks();
+    
+    // Update user stats
+    String user = job.getProfile().getUser();
+    UserInfo userInfo = users.get(user);
+    userInfo.jobInitialized(job);
+  }
+  
+  synchronized JobInProgress removeRunningJob(JobSchedulingInfo jobInfo) {
+    JobInProgress job = runningJobs.remove(jobInfo); 
+
+    // We have to be careful, we might be trying to remove a job  
+    // which might not have been initialized
+    if (job != null) {
+      // Update user stats
+      String user = job.getProfile().getUser();
+      synchronized (users) {
+        UserInfo userInfo = users.get(user);
+
+        synchronized (userInfo) {
+          userInfo.jobCompleted(job);
+
+          if (userInfo.isInactive()) {
+            users.remove(userInfo);
+          }
+        }
+      }
+
+      // Update queue stats
+      activeTasks -= job.desiredTasks();
+    }
+
+    return job;
+  }
+  
+  synchronized void addWaitingJob(JobInProgress job) throws IOException {
+    JobSchedulingInfo jobSchedInfo = new JobSchedulingInfo(job);
+    if (waitingJobs.containsKey(jobSchedInfo)) {
+      LOG.info("job " + job.getJobID() + " already waiting in queue '" + 
+          queueName + "'!");
+      return;
+    }
+    
+    String user = job.getProfile().getUser();
+
+    // Check acceptance limits
+    checkJobSubmissionLimits(job, user);
+    
+    waitingJobs.put(new JobSchedulingInfo(job), job);
+    
+    // Update user stats
+    UserInfo userInfo = users.get(user);
+    if (userInfo == null) {
+      userInfo = new UserInfo();
+      users.put(user, userInfo);
+    }
+    userInfo.jobAdded(job);
+  }
+  
+  synchronized JobInProgress removeWaitingJob(JobSchedulingInfo schedInfo) {
+    return waitingJobs.remove(schedInfo);
+  }
+
+  synchronized int getNumActiveUsers() {
+    return users.size();
+  }
+  
+  synchronized int getNumWaitingJobs() {
+    return waitingJobs.size(); 
+  } 
+  
+  Comparator<JobSchedulingInfo> getComparator() {
+    return comparator;
+  }
+  
+  /**
+   * Functions to deal with queue-limits.
+   */
+  
+  /**
+   * Check if the queue can be assigned <code>numSlots</code> 
+   * of the given <code>taskType</code> so that the queue doesn't exceed its
+   * configured maximum-capacity.
+   * 
+   * @param taskType
+   * @param numSlots
+   * @return <code>true</code> if slots can be assigned
+   */
+  boolean assignSlotsToQueue(TaskType taskType, int numSlots) {
+    // Check if the queue is running over it's maximum-capacity
+    if (getMaxCapacity(taskType) > 0) {  // Check if max capacity is enabled
+        if ((getNumSlotsOccupied(taskType) + numSlots) > 
+             getMaxCapacity(taskType)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                "Queue " + queueName + " " + "has reached its  max " + 
+                taskType + " capacity");
+            LOG.debug("Current running tasks " + getCapacity(taskType));
+          }
+          return false;
+        }
+      }
+    
+    return true;
+  }
+  /**
+   * Check if the given <code>job</code> and <code>user</code> and 
+   * queue can be assigned the requested number of slots of 
+   * the given <code>taskType</code> for the .
+   * 
+   * This checks to ensure that queue and user are under appropriate limits.
+   * 
+   * @param taskType
+   * @param job
+   * @param user
+   * @return <code>true</code> if the given job/user/queue can be assigned 
+   * the requested number of slots, <code>false</code> otherwise
+   */
+  boolean assignSlotsToJob(TaskType taskType, JobInProgress job, String user) {
+    // Check to ensure we will not go over the queue's max-capacity
+    if (!assignSlotsToQueue(taskType, job.getNumSlotsPerTask(taskType))) {
+      return false;
+    }
+    
+    // what is our current capacity? It is equal to the queue-capacity if
+    // we're running below capacity. If we're running over capacity, then its
+    // #running plus slotPerTask of the job (which is the number of extra
+    // slots we're getting).
+    int currentCapacity;
+    int queueCapacity = getCapacity(taskType);
+    if (getNumSlotsOccupied(taskType) < queueCapacity) {
+      currentCapacity = queueCapacity;
+    }
+    else {
+      currentCapacity = 
+        getNumSlotsOccupied(taskType) + job.getNumSlotsPerTask(taskType);
+    }
+    
+    // Never allow a single user to take more than the 
+    // queue's configured capacity * user-limit-factor.
+    // Also, the queue's configured capacity should be higher than 
+    // queue-hard-limit * ulMin
+    int limit = 
+      Math.min(
+          Math.max(divideAndCeil(currentCapacity, numJobsByUser.size()), 
+                   divideAndCeil(ulMin*currentCapacity, 100)),
+          (int)(queueCapacity * ulMinFactor)
+          );
+    if (getNumSlotsOccupiedByUser(user, taskType) >= limit) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("User " + user + " is over limit, num slots occupied=" + 
+            getNumSlotsOccupiedByUser(user, taskType) + 
+            ", limit=" + limit);
+      }
+      return false;
+    }
+
+    return true;
+  }
+  
+  /**
+   * Ceil of result of dividing two integers.
+   * 
+   * This is *not* a utility method. 
+   * Neither <code>a</code> or <code>b</code> should be negative.
+   *  
+   * @param a
+   * @param b
+   * @return ceil of the result of a/b
+   */
+  private static int divideAndCeil(int a, int b) {
+    if (b == 0) {
+      LOG.info("divideAndCeil called with a=" + a + " b=" + b);
+      return 0;
+    }
+    return (a + (b - 1)) / b;
+  }
+
+  /**
+   * Check if the given <code>job</code> can be accepted to the 
+   * queue on behalf of the <code>user</code>.
+   * @param job 
+   * @param user
+   * @return <code>true</code> if the job can be accepted, 
+   *         <code>false</code> otherwise
+   */
+  synchronized void checkJobSubmissionLimits(JobInProgress job, String user) 
+  throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("checkJobSubmissionLimits - " +
+          "qWaitJobs=" + getNumWaitingJobs() + " " +
+          "qRunJobs=" + getNumRunningJobs() + " " +
+          "maxJobsToAccept=" + maxJobsToAccept +
+          "user=" + user + " " +
+          "uWaitJobs=" +  getNumWaitingJobsByUser(user) + " " +
+          "uRunJobs=" + getNumRunningJobsByUser(user)  + " " +
+          "maxJobsPerUserToAccept=" + maxJobsPerUserToAccept + " " +
+          "");
+    }
+    
+    // Across all jobs in queue
+    if ((getNumWaitingJobs() + getNumRunningJobs()) >= maxJobsToAccept) {
+      throw new IOException(
+          "Job '" + job.getJobID() + "' from user '" + user  + 
+          "' rejected since queue '" + queueName + 
+          "' already has " + getNumWaitingJobs() + " waiting jobs and " + 
+          getNumRunningJobs() + " running jobs and exceeds limit of " +
+          maxJobsToAccept + " jobs to accept");
+    }
+    
+    // Across all jobs of the user
+    if ((getNumWaitingJobsByUser(user) + getNumRunningJobsByUser(user)) >= 
+        maxJobsPerUserToAccept) {
+      throw new IOException(
+          "Job '" + job.getJobID() + "' rejected since user '" + user +  
+          "' already has " + getNumWaitingJobsByUser(user) + " waiting jobs" +
+          " and " + getNumRunningJobsByUser(user) + " running jobs," +
+          " it exceeds limit of " + maxJobsToAccept + " jobs to accept" +
+          " in queue '" + queueName + "'");
+    }
+  }
+  
+  /**
+   * Check if the <code>job</code> can be initialized in the queue.
+   * 
+   * @param job
+   * @param currentlyInitializedJobs
+   * @param currentlyInitializedTasks
+   * @return <code>true</code> if the job can be initialized, 
+   *         <code>false</code> otherwise
+   */
+  synchronized boolean initializeJobForQueue(JobInProgress job,
+      int currentlyInitializedJobs, int currentlyInitializedTasks) {
+    
+    // Check if queue has sufficient number of jobs
+    int runningJobs = getNumRunningJobs();
+    if ((runningJobs + currentlyInitializedJobs) >= maxJobsToInit) {
+      LOG.info(getQueueName() + " already has " + runningJobs + 
+          " running jobs and " + currentlyInitializedJobs + " jobs about to be" +
+          " initialized, cannot initialize " + job.getJobID() + 
+          " since it will exceeed limit of " + 
+          maxJobsToInit + " initialized jobs for this queue");
+      return false;
+    }
+    
+    // Check if queue has too many active tasks
+    if ((activeTasks + currentlyInitializedTasks + job.desiredTasks()) >= 
+         maxActiveTasks) {
+      LOG.info("Queue '" + getQueueName() + "' has " + activeTasks + 
+          " active tasks and " + currentlyInitializedTasks + " tasks about to" +
+          " be initialized, cannot initialize job '" + job.getJobID() + 
+          "' for user '" + job.getProfile().getUser() + "' with " +
+          job.desiredTasks() + " tasks since it will exceed limit of " + 
+          maxActiveTasks + " active tasks for this queue");
+      return false;
+    }
+    
+    return true;
+  }
+  
+  /**
+   * Check if the <code>job</code> can be initialized in the queue
+   * on behalf of the <code>user</code>.
+   * 
+   * @param job
+   * @param user
+   * @param currentlyInitializedJobs
+   * @param currentlyInitializedTasks
+   * @return <code>true</code> if the job can be initialized, 
+   *         <code>false</code> otherwise
+   */
+  synchronized boolean initializeJobForUser(JobInProgress job, 
+      String user, int currentlyInitializedJobs, 
+      int currentlyInitializedTasks) {
+    
+    // Check if the user has too many jobs
+    int userRunningJobs = getNumRunningJobsByUser(user);
+    if ((userRunningJobs + currentlyInitializedJobs) >=
+        maxJobsPerUserToInit) {
+      LOG.info(getQueueName() + " already has " + userRunningJobs + 
+          " running jobs and " + currentlyInitializedJobs + " jobs about to be" +
+          " initialized, for user " + user + 
+          "; cannot initialize " + job.getJobID() + 
+          " since it will exceeed limit of " + 
+          maxJobsPerUserToInit + 
+          " initialized jobs per user for this queue");
+      return false;
+    }
+    
+    // Check if the user has too many active tasks
+    int userActiveTasks = getNumActiveTasksByUser(user);
+    if ((userActiveTasks + currentlyInitializedTasks) >= 
+        maxActiveTasksPerUser) {
+      LOG.info(getQueueName() + " has " + userActiveTasks + 
+          " active tasks and " + currentlyInitializedTasks + " tasks about to" +
+          " be initialized for user " + user + 
+          ", cannot initialize " + job.getJobID() + " with " +
+          job.desiredTasks() + " tasks since it will exceed limit of " + 
+          maxActiveTasksPerUser + 
+          " active tasks per user for this queue");
+      return false;
+    }
+    
+    return true;
+  }
+
+}

+ 15 - 17
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerServlet.java

@@ -30,8 +30,7 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.hadoop.mapred.CapacityTaskScheduler.QueueSchedulingInfo;
-import org.apache.hadoop.mapred.CapacityTaskScheduler.TaskSchedulingInfo;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapred.JobHistory.JobInfo;
 import org.apache.hadoop.util.StringUtils;
 
@@ -121,34 +120,33 @@ public class CapacitySchedulerServlet extends HttpServlet {
                 "<th>Reduce Task Used Capacity</th>" +
                 "<th>Running Reduces </tr>\n");
       JobQueuesManager queuesManager = scheduler.getJobQueuesManager();
-      for (QueueSchedulingInfo qsi : scheduler.getQueueInfoMap().values()) {
-        String queueName = qsi.getQueueName();
-        TaskSchedulingInfo maptsi = scheduler.getMapScheduler().getTSI(qsi);
-        TaskSchedulingInfo redtsi = scheduler.getReduceScheduler().getTSI(qsi);
+      for (CapacitySchedulerQueue queue : scheduler.getQueueInfoMap().values()) {
+        String queueName = queue.getQueueName();
         out.print("<tr>\n");
         out.printf(
             "<td><a href=\"jobqueue_details.jsp?queueName=%s\">%s</a></td>\n",
             queueName, queueName);
-        out.printf("<td>%s</td>\n", queuesManager.getNumRunningJobs(queueName));
-        out.printf("<td>%s</td>\n", queuesManager.getNumWaitingJobs(queueName));
-        out.printf("<td>%.1f%%</td>\n", qsi.getCapacityPercent());
-        int mapCapacity = maptsi.getCapacity();
-        int mapSlotsOccupied = maptsi.getNumSlotsOccupied();
-        int reduceSlotsOccupied = redtsi.getNumSlotsOccupied();
+        out.printf("<td>%s</td>\n", queue.getNumRunningJobs());
+        out.printf("<td>%s</td>\n", queue.getNumWaitingJobs());
+        out.printf("<td>%.1f%%</td>\n", queue.getCapacityPercent());
+        int mapCapacity = queue.getCapacity(TaskType.MAP);
+        int mapSlotsOccupied = queue.getNumSlotsOccupied(TaskType.MAP);
+        int reduceSlotsOccupied = queue.getNumSlotsOccupied(TaskType.REDUCE);
         float occupiedSlotsAsPercent = 
             mapCapacity != 0 ? ((float) mapSlotsOccupied * 100 / mapCapacity)
             : 0;
         out.printf("<td>%s</td>\n", mapCapacity);
         out.printf("<td>%s (%.1f%% of Capacity)</td>\n", mapSlotsOccupied,
             occupiedSlotsAsPercent);
-        out.printf("<td>%s</td>\n", maptsi.getNumRunningTasks());
-        int reduceCapacity = redtsi.getCapacity();
-        float redOccupiedSlotsAsPercent = reduceCapacity != 0 ? ((float) reduceSlotsOccupied * 100 / mapCapacity)
-            : 0;
+        out.printf("<td>%s</td>\n", queue.getNumRunningTasks(TaskType.MAP));
+        int reduceCapacity = queue.getCapacity(TaskType.REDUCE);
+        float redOccupiedSlotsAsPercent = 
+          (reduceCapacity != 0 ? ((float)reduceSlotsOccupied*100 / mapCapacity)
+            : 0);
         out.printf("<td>%s</td>\n", reduceCapacity);
         out.printf("<td>%s (%.1f%% of Capacity)</td>\n", reduceSlotsOccupied,
             redOccupiedSlotsAsPercent);
-        out.printf("<td>%s</td>\n", redtsi.getNumRunningTasks());
+        out.printf("<td>%s</td>\n", queue.getNumRunningTasks(TaskType.REDUCE));
       }
       out.print("</table>\n");
     }

文件差異過大導致無法顯示
+ 190 - 654
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java


+ 97 - 104
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java

@@ -19,11 +19,15 @@ package org.apache.hadoop.mapred;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -68,16 +72,6 @@ public class JobInitializationPoller extends Thread {
   private static final Log LOG = LogFactory
       .getLog(JobInitializationPoller.class.getName());
 
-  /*
-   * The poller picks up jobs across users to initialize based on user limits.
-   * Suppose the user limit for a queue is 25%, it means atmost 4 users' jobs
-   * can run together. However, in order to account for jobs from a user that
-   * might complete faster than others, it initializes jobs from an additional
-   * number of users as a backlog. This variable defines the additional
-   * number of users whose jobs can be considered for initializing. 
-   */
-  private static final int MAX_ADDITIONAL_USERS_TO_INIT = 2;
-
   private JobQueuesManager jobQueueManager;
   private long sleepInterval;
   private int poolSize;
@@ -100,11 +94,12 @@ public class JobInitializationPoller extends Thread {
      * The hash map which maintains relationship between queue to jobs to
      * initialize per queue.
      */
-    private HashMap<String, TreeMap<JobSchedulingInfo, JobInProgress>> jobsPerQueue;
+    private Map<String, Map<JobSchedulingInfo, JobInProgress>> jobsPerQueue;
 
     public JobInitializationThread() {
       startIniting = true;
-      jobsPerQueue = new HashMap<String, TreeMap<JobSchedulingInfo, JobInProgress>>();
+      jobsPerQueue = 
+        new ConcurrentHashMap<String, Map<JobSchedulingInfo, JobInProgress>>();
     }
 
     @Override
@@ -156,8 +151,7 @@ public class JobInitializationPoller extends Thread {
      * @return First job in the queue and removes it.
      */
     private JobInProgress getFirstJobInQueue(String queue) {
-      TreeMap<JobSchedulingInfo, JobInProgress> jobsList = jobsPerQueue
-          .get(queue);
+      Map<JobSchedulingInfo, JobInProgress> jobsList = jobsPerQueue.get(queue);
       synchronized (jobsList) {
         if (jobsList.isEmpty()) {
           return null;
@@ -186,8 +180,7 @@ public class JobInitializationPoller extends Thread {
     }
 
     void addJobsToQueue(String queue, JobInProgress job) {
-      TreeMap<JobSchedulingInfo, JobInProgress> jobs = jobsPerQueue
-          .get(queue);
+      Map<JobSchedulingInfo, JobInProgress> jobs = jobsPerQueue.get(queue);
       if (jobs == null) {
         LOG.error("Invalid queue passed to the thread : " + queue
             + " For job :: " + job.getJobID());
@@ -199,43 +192,20 @@ public class JobInitializationPoller extends Thread {
       }
     }
 
-    void addQueue(String queue) {
-      TreeMap<JobSchedulingInfo, JobInProgress> jobs = new TreeMap<JobSchedulingInfo, JobInProgress>(
-          jobQueueManager.getComparator(queue));
-      jobsPerQueue.put(queue, jobs);
-    }
-  }
+    void addQueue(String queueName) {
+      CapacitySchedulerQueue queue = jobQueueManager.getQueue(queueName);
 
-  /**
-   * The queue information class maintains following information per queue:
-   * Maximum users allowed to initialize job in the particular queue. Maximum
-   * jobs allowed to be initialize per user in the queue.
-   * 
-   */
-  private class QueueInfo {
-    String queue;
-    int maxUsersAllowedToInitialize;
-    int maxJobsPerUserToInitialize;
-
-    public QueueInfo(String queue, int maxUsersAllowedToInitialize,
-        int maxJobsPerUserToInitialize) {
-      this.queue = queue;
-      this.maxJobsPerUserToInitialize = maxJobsPerUserToInitialize;
-      this.maxUsersAllowedToInitialize = maxUsersAllowedToInitialize;
+      TreeMap<JobSchedulingInfo, JobInProgress> jobs = 
+        new TreeMap<JobSchedulingInfo, JobInProgress>(queue.getComparator());
+      jobsPerQueue.put(queueName, jobs);
     }
   }
 
-  /**
-   * Map which contains the configuration used for initializing jobs
-   * in that associated to a particular job queue.
-   */
-  private HashMap<String, QueueInfo> jobQueues;
-
   /**
    * Set of jobs which have been passed to Initialization threads.
    * This is maintained so that we dont call initTasks() for same job twice.
    */
-  private HashMap<JobID,JobInProgress> initializedJobs;
+  private HashMap<JobID, JobInProgress> initializedJobs;
 
   private volatile boolean running;
 
@@ -244,41 +214,34 @@ public class JobInitializationPoller extends Thread {
    * The map which provides information which thread should be used to
    * initialize jobs for a given job queue.
    */
-  private HashMap<String, JobInitializationThread> threadsToQueueMap;
+  private Map<String, JobInitializationThread> threadsToQueueMap;
 
   public JobInitializationPoller(JobQueuesManager mgr,
       CapacitySchedulerConf rmConf, Set<String> queue, 
       TaskTrackerManager ttm) {
     initializedJobs = new HashMap<JobID,JobInProgress>();
-    jobQueues = new HashMap<String, QueueInfo>();
     this.jobQueueManager = mgr;
-    threadsToQueueMap = new HashMap<String, JobInitializationThread>();
+    threadsToQueueMap = 
+      Collections.synchronizedMap(new HashMap<String, 
+          JobInitializationThread>());
     super.setName("JobInitializationPollerThread");
     running = true;
     this.ttm = ttm;
   }
 
+  void setTaskTrackerManager(TaskTrackerManager ttm) {
+    this.ttm = ttm;
+  }
+  
   /*
    * method to read all configuration values required by the initialisation
    * poller
    */
 
-  void init(Set<String> queues, 
+  void init(int numQueues, 
             CapacitySchedulerConf capacityConf) {
-    for (String queue : queues) {
-      int userlimit = capacityConf.getMinimumUserLimitPercent(queue);
-      int maxUsersToInitialize = ((100 / userlimit) + MAX_ADDITIONAL_USERS_TO_INIT);
-      int maxJobsPerUserToInitialize = capacityConf
-          .getMaxJobsPerUserToInitialize(queue);
-      QueueInfo qi = new QueueInfo(queue, maxUsersToInitialize,
-          maxJobsPerUserToInitialize);
-      jobQueues.put(queue, qi);
-    }
     sleepInterval = capacityConf.getSleepInterval();
-    poolSize = capacityConf.getMaxWorkerThreads();
-    if (poolSize > queues.size()) {
-      poolSize = queues.size();
-    }
+    poolSize = Math.min(capacityConf.getMaxWorkerThreads(), numQueues);
     assignThreadsToQueues();
     Collection<JobInitializationThread> threads = threadsToQueueMap.values();
     for (JobInitializationThread t : threads) {
@@ -289,6 +252,20 @@ public class JobInitializationPoller extends Thread {
     }
   }
 
+  void reinit(Set<String> queues) {
+    Set<String> oldQueues = threadsToQueueMap.keySet();
+    int i=0;
+    JobInitializationThread[] threads = 
+      threadsToQueueMap.values().toArray(new JobInitializationThread[0]);
+    for (String newQueue : queues) {
+      if (!oldQueues.contains(newQueue)) {
+        JobInitializationThread t = threads[i++ % threads.length];
+        t.addQueue(newQueue);
+        threadsToQueueMap.put(newQueue, t);
+      }
+    }
+  }
+  
   /**
    * This is main thread of initialization poller, We essentially do 
    * following in the main threads:
@@ -323,7 +300,7 @@ public class JobInitializationPoller extends Thread {
    * 
    */
   void selectJobsToInitialize() {
-    for (String queue : jobQueues.keySet()) {
+    for (String queue : jobQueueManager.getAllQueues()) {
       ArrayList<JobInProgress> jobsToInitialize = getJobsToInitialize(queue);
       printJobs(jobsToInitialize);
       JobInitializationThread t = threadsToQueueMap.get(queue);
@@ -368,8 +345,9 @@ public class JobInitializationPoller extends Thread {
    * 
    */
   private void assignThreadsToQueues() {
-    int countOfQueues = jobQueues.size();
-    String[] queues = (String[]) jobQueues.keySet().toArray(
+    Collection<String> queueNames = jobQueueManager.getAllQueues();
+    int countOfQueues = queueNames.size();
+    String[] queues = (String[]) queueNames.toArray(
         new String[countOfQueues]);
     int numberOfQueuesPerThread = countOfQueues / poolSize;
     int numberOfQueuesAssigned = 0;
@@ -425,22 +403,21 @@ public class JobInitializationPoller extends Thread {
    * already been initialized. The latter user's initialized jobs are redundant,
    * but we'll leave them initialized.
    * 
-   * @param queue name of the queue to pick the jobs to initialize.
+   * @param queueName name of the queue to pick the jobs to initialize.
    * @return list of jobs to be initalized in a queue. An empty queue is
    *         returned if no jobs are found.
    */
-  ArrayList<JobInProgress> getJobsToInitialize(String queue) {
-    QueueInfo qi = jobQueues.get(queue);
+  ArrayList<JobInProgress> getJobsToInitialize(String queueName) {
+    CapacitySchedulerQueue queue = jobQueueManager.getQueue(queueName);
     ArrayList<JobInProgress> jobsToInitialize = new ArrayList<JobInProgress>();
-    // use the configuration parameter which is configured for the particular
-    // queue.
-    int maximumUsersAllowedToInitialize = qi.maxUsersAllowedToInitialize;
-    int maxJobsPerUserAllowedToInitialize = qi.maxJobsPerUserToInitialize;
-    int maxJobsPerQueueToInitialize = maximumUsersAllowedToInitialize
-        * maxJobsPerUserAllowedToInitialize;
+
     int countOfJobsInitialized = 0;
-    HashMap<String, Integer> userJobsInitialized = new HashMap<String, Integer>();
-    Collection<JobInProgress> jobs = jobQueueManager.getWaitingJobs(queue);
+    int countOfTasksInitialized = 0;
+    Map<String, Integer> userJobsInitialized = new HashMap<String, Integer>();
+    Map<String, Integer> userTasksInitialized = new HashMap<String, Integer>();
+    Set<String> usersOverLimit = new HashSet<String>();
+    Collection<JobInProgress> jobs = queue.getWaitingJobs();
+    
     /*
      * Walk through the collection of waiting jobs.
      *  We maintain a map of jobs that have already been initialized. If a 
@@ -456,40 +433,54 @@ public class JobInitializationPoller extends Thread {
      */
     for (JobInProgress job : jobs) {
       String user = job.getProfile().getUser();
-      int numberOfJobs = userJobsInitialized.get(user) == null ? 0
-          : userJobsInitialized.get(user);
-      // If the job is already initialized then add the count against user
-      // then continue.
+      // If the job is already initialized then continue.
       if (initializedJobs.containsKey(job.getJobID())) {
-        userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
-        countOfJobsInitialized++;
         continue;
       }
-      boolean isUserPresent = userJobsInitialized.containsKey(user);
-      if (!isUserPresent
-          && userJobsInitialized.size() < maximumUsersAllowedToInitialize) {
-        // this is a new user being considered and the number of users
-        // is within limits.
-        userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
-        jobsToInitialize.add(job);
-        initializedJobs.put(job.getJobID(),job);
-        countOfJobsInitialized++;
-      } else if (isUserPresent
-          && numberOfJobs < maxJobsPerUserAllowedToInitialize) {
-        userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
-        jobsToInitialize.add(job);
-        initializedJobs.put(job.getJobID(),job);
-        countOfJobsInitialized++;
-      }
-      /*
-       * if the maximum number of jobs to initalize for a queue is reached
-       * then we stop looking at further jobs. The jobs beyond this number
-       * can be initialized.
+
+      /** 
+       * Ensure we will not exceed queue limits
        */
-      if(countOfJobsInitialized > maxJobsPerQueueToInitialize) {
+      if (!queue.initializeJobForQueue(job, 
+          countOfJobsInitialized, countOfTasksInitialized)) {
         break;
       }
+      
+      
+      /**
+       *  Ensure we will not exceed user limits
+       */
+      
+      // Ensure we don't process a user's jobs out of order 
+      if (usersOverLimit.contains(user)) {
+        continue;
+      }
+      
+      Integer userJobs = userJobsInitialized.get(user);
+      if (userJobs == null) {
+        userJobs = 0;
+      }
+      Integer userTasks = userTasksInitialized.get(user);
+      if (userTasks == null) {
+        userTasks = 0;
+      }
+      if (!queue.initializeJobForUser(job, user, userJobs, userTasks)) {
+        usersOverLimit.add(user);   // Note down the user
+        continue;
+      }
+      
+      // Ready to initialize!
+      initializedJobs.put(job.getJobID(), job);
+      jobsToInitialize.add(job);
+      
+      // Update queue & user counts
+      countOfJobsInitialized++;
+      countOfTasksInitialized += job.desiredTasks();
+      
+      userJobsInitialized.put(user, userJobs+1);
+      userTasksInitialized.put(user, (userTasks + job.desiredTasks()));
     }
+    
     return jobsToInitialize;
   }
 
@@ -536,7 +527,9 @@ public class JobInitializationPoller extends Thread {
           LOG.info("Removing scheduled jobs from waiting queue"
               + job.getJobID());
           jobsIterator.remove();
-          jobQueueManager.removeJobFromWaitingQueue(job);
+          CapacitySchedulerQueue queue = 
+            jobQueueManager.getQueue(job.getProfile().getQueueName());
+          queue.removeWaitingJob(new JobSchedulingInfo(job));
           continue;
         }
       }

+ 48 - 175
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -25,6 +26,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,160 +38,32 @@ import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
  * one or more queues. 
  */
 class JobQueuesManager extends JobInProgressListener {
-
-  /* 
-   * If a queue supports priorities, jobs must be 
-   * sorted on priorities, and then on their start times (technically, 
-   * their insertion time.  
-   * If a queue doesn't support priorities, jobs are
-   * sorted based on their start time.  
-   */
-  
-  // comparator for jobs in queues that don't support priorities
-  private static final Comparator<JobSchedulingInfo> STARTTIME_JOB_COMPARATOR
-    = new Comparator<JobSchedulingInfo>() {
-    public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) {
-      // the job that started earlier wins
-      if (o1.getStartTime() < o2.getStartTime()) {
-        return -1;
-      } else {
-        return (o1.getStartTime() == o2.getStartTime() 
-                ? o1.getJobID().compareTo(o2.getJobID()) 
-                : 1);
-      }
-    }
-  };
   
-  // class to store queue info
-  private static class QueueInfo {
-
-    // whether the queue supports priorities
-    boolean supportsPriorities;
-    Map<JobSchedulingInfo, JobInProgress> waitingJobs; // for waiting jobs
-    Map<JobSchedulingInfo, JobInProgress> runningJobs; // for running jobs
-    
-    public Comparator<JobSchedulingInfo> comparator;
-    
-    QueueInfo(boolean prio) {
-      this.supportsPriorities = prio;
-      if (supportsPriorities) {
-        // use the default priority-aware comparator
-        comparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR;
-      }
-      else {
-        comparator = STARTTIME_JOB_COMPARATOR;
-      }
-      waitingJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
-      runningJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
-    }
-    
-    Collection<JobInProgress> getWaitingJobs() {
-      synchronized (waitingJobs) {
-        return Collections.unmodifiableCollection(
-            new LinkedList<JobInProgress>(waitingJobs.values()));
-      }
-    }
-    
-    int getNumWaitingJobs() {
-      synchronized (waitingJobs) {
-        return waitingJobs.size();
-      }
-    }
-    
-    Collection<JobInProgress> getRunningJobs() {
-      synchronized (runningJobs) {
-       return Collections.unmodifiableCollection(
-           new LinkedList<JobInProgress>(runningJobs.values())); 
-      }
-    }
-    
-    int getNumRunningJobs() {
-      synchronized (runningJobs) {
-        return runningJobs.size();
-      }
-    }
-    
-    void addRunningJob(JobInProgress job) {
-      synchronized (runningJobs) {
-       runningJobs.put(new JobSchedulingInfo(job),job); 
-      }
-    }
-    
-    JobInProgress removeRunningJob(JobSchedulingInfo jobInfo) {
-      synchronized (runningJobs) {
-        return runningJobs.remove(jobInfo); 
-      }
-    }
-    
-    JobInProgress removeWaitingJob(JobSchedulingInfo schedInfo) {
-      synchronized (waitingJobs) {
-        return waitingJobs.remove(schedInfo);
-      }
-    }
-    
-    void addWaitingJob(JobInProgress job) {
-      synchronized (waitingJobs) {
-        waitingJobs.put(new JobSchedulingInfo(job), job);
-      }
-    }
-    
-    int getWaitingJobCount() {
-      synchronized (waitingJobs) {
-       return waitingJobs.size(); 
-      }
-    }
-    
-  }
-  
-  // we maintain a hashmap of queue-names to queue info
-  private Map<String, QueueInfo> jobQueues = 
-    new HashMap<String, QueueInfo>();
   private static final Log LOG = LogFactory.getLog(JobQueuesManager.class);
   private CapacityTaskScheduler scheduler;
+  // Queues in the system
+  private Collection<String> jobQueueNames;
+  private Map<String, CapacitySchedulerQueue> jobQueues = 
+    new HashMap<String, CapacitySchedulerQueue>();
 
   
   JobQueuesManager(CapacityTaskScheduler s) {
     this.scheduler = s;
   }
   
-  /**
-   * create an empty queue with the default comparator
-   * @param queueName The name of the queue
-   * @param supportsPriotities whether the queue supports priorities
-   */
-  public void createQueue(String queueName, boolean supportsPriotities) {
-    jobQueues.put(queueName, new QueueInfo(supportsPriotities));
-  }
-  
-  /**
-   * Returns the queue of running jobs associated with the name
-   */
-  public Collection<JobInProgress> getRunningJobQueue(String queueName) {
-    return jobQueues.get(queueName).getRunningJobs();
-  }
-  
-  public int getNumRunningJobs(String queueName) {
-    return jobQueues.get(queueName).getNumRunningJobs();
-  }
-  
-  /**
-   * Returns the queue of waiting jobs associated with queue name.
-   * 
-   */
-  Collection<JobInProgress> getWaitingJobs(String queueName) {
-    return jobQueues.get(queueName).getWaitingJobs();
-  }
-  
-  public int getNumWaitingJobs(String queueName) {
-    return jobQueues.get(queueName).getNumWaitingJobs();
+  void setQueues(Map<String, CapacitySchedulerQueue> queues) {
+    this.jobQueues = queues;
+    this.jobQueueNames = new ArrayList<String>(queues.keySet());
   }
   
   @Override
   public void jobAdded(JobInProgress job) throws IOException {
-    LOG.info("Job submitted to queue " + job.getProfile().getQueueName());
+    LOG.info("Job " + job.getJobID() + " submitted to queue " + 
+        job.getProfile().getQueueName());
+    
     // add job to the right queue
-    QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
-    if (null == qi) {
+    CapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName());
+    if (null == queue) {
       // job was submitted to a queue we're not aware of
       LOG.warn("Invalid queue " + job.getProfile().getQueueName() + 
           " specified for job" + job.getProfile().getJobID() + 
@@ -198,7 +72,7 @@ class JobQueuesManager extends JobInProgressListener {
     }
     // add job to waiting queue. It will end up in the right place, 
     // based on priority. 
-    qi.addWaitingJob(job);
+    queue.addWaitingJob(job);
     // let scheduler know. 
     scheduler.jobAdded(job);
   }
@@ -208,15 +82,18 @@ class JobQueuesManager extends JobInProgressListener {
    * job queue manager.
    */
   private void jobCompleted(JobInProgress job, JobSchedulingInfo oldInfo, 
-                            QueueInfo qi) {
+      CapacitySchedulerQueue queue) {
     LOG.info("Job " + job.getJobID().toString() + " submitted to queue " 
         + job.getProfile().getQueueName() + " has completed");
     //remove jobs from both queue's a job can be in
     //running and waiting queue at the same time.
-    qi.removeRunningJob(oldInfo);
-    qi.removeWaitingJob(oldInfo);
-    // let scheduler know
-    scheduler.jobCompleted(job);
+    JobInProgress runningJob = queue.removeRunningJob(oldInfo);
+    JobInProgress waitingJob = queue.removeWaitingJob(oldInfo);
+    // let scheduler know if necessary
+    // sometimes this isn't necessary if the job was rejected during submission
+    if (runningJob != null || waitingJob != null) {
+      scheduler.jobCompleted(job);
+    }
   }
   
   // Note that job is removed when the job completes i.e in jobUpated()
@@ -226,27 +103,34 @@ class JobQueuesManager extends JobInProgressListener {
   // This is used to reposition a job in the queue. A job can get repositioned 
   // because of the change in the job priority or job start-time.
   private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo, 
-                           QueueInfo qi) {
+      CapacitySchedulerQueue queue) {
     
-    if(qi.removeWaitingJob(oldInfo) != null) {
-      qi.addWaitingJob(job);
+    if(queue.removeWaitingJob(oldInfo) != null) {
+      try {
+        queue.addWaitingJob(job);
+      } catch (IOException ioe) {
+        // Ignore, cannot happen
+        LOG.warn("Couldn't change priority!");
+        return;
+      }
     }
-    if(qi.removeRunningJob(oldInfo) != null) {
-      qi.addRunningJob(job);
+    if(queue.removeRunningJob(oldInfo) != null) {
+      queue.addRunningJob(job);
     }
   }
   
   // This is used to move a job from the waiting queue to the running queue.
   private void makeJobRunning(JobInProgress job, JobSchedulingInfo oldInfo, 
-                              QueueInfo qi) {
+                              CapacitySchedulerQueue queue) {
     // Removing of the job from job list is responsibility of the
     //initialization poller.
     // Add the job to the running queue
-    qi.addRunningJob(job);
+    queue.addRunningJob(job);
   }
   
   // Update the scheduler as job's state has changed
-  private void jobStateChanged(JobStatusChangeEvent event, QueueInfo qi) {
+  private void jobStateChanged(JobStatusChangeEvent event, 
+                               CapacitySchedulerQueue queue) {
     JobInProgress job = event.getJobInProgress();
     JobSchedulingInfo oldJobStateInfo = 
       new JobSchedulingInfo(event.getOldStatus());
@@ -255,16 +139,16 @@ class JobQueuesManager extends JobInProgressListener {
     if (event.getEventType() == EventType.PRIORITY_CHANGED 
         || event.getEventType() == EventType.START_TIME_CHANGED) {
       // Make a priority change
-      reorderJobs(job, oldJobStateInfo, qi);
+      reorderJobs(job, oldJobStateInfo, queue);
     } else if (event.getEventType() == EventType.RUN_STATE_CHANGED) {
       // Check if the job is complete
       int runState = job.getStatus().getRunState();
       if (runState == JobStatus.SUCCEEDED
           || runState == JobStatus.FAILED
           || runState == JobStatus.KILLED) {
-        jobCompleted(job, oldJobStateInfo, qi);
+        jobCompleted(job, oldJobStateInfo, queue);
       } else if (runState == JobStatus.RUNNING) {
-        makeJobRunning(job, oldJobStateInfo, qi);
+        makeJobRunning(job, oldJobStateInfo, queue);
       }
     }
   }
@@ -272,8 +156,8 @@ class JobQueuesManager extends JobInProgressListener {
   @Override
   public void jobUpdated(JobChangeEvent event) {
     JobInProgress job = event.getJobInProgress();
-    QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
-    if (null == qi) {
+    CapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName());
+    if (null == queue) {
       // can't find queue for job. Shouldn't happen. 
       LOG.warn("Could not find queue " + job.getProfile().getQueueName() + 
           " when updating job " + job.getProfile().getJobID());
@@ -282,26 +166,15 @@ class JobQueuesManager extends JobInProgressListener {
     
     // Check if this is the status change
     if (event instanceof JobStatusChangeEvent) {
-      jobStateChanged((JobStatusChangeEvent)event, qi);
+      jobStateChanged((JobStatusChangeEvent)event, queue);
     }
   }
   
-  void removeJobFromWaitingQueue(JobInProgress job) {
-    String queue = job.getProfile().getQueueName();
-    QueueInfo qi = jobQueues.get(queue);
-    qi.removeWaitingJob(new JobSchedulingInfo(job));
+  CapacitySchedulerQueue getQueue(String queue) {
+    return jobQueues.get(queue);
   }
   
-  Comparator<JobSchedulingInfo> getComparator(String queue) {
-    return jobQueues.get(queue).comparator;
-  }
-  
-  int getWaitingJobCount(String queue) {
-    QueueInfo qi = jobQueues.get(queue);
-    return qi.getWaitingJobCount();
-  }
-
-  boolean doesQueueSupportPriorities(String queueName) {
-    return jobQueues.get(queueName).supportsPriorities;
+  Collection<String> getAllQueues() {
+    return Collections.unmodifiableCollection(jobQueueNames);
   }
 }

文件差異過大導致無法顯示
+ 282 - 148
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java


+ 1 - 1
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java

@@ -60,7 +60,7 @@ public class TestCapacitySchedulerWithJobTracker extends
           .getTaskScheduler();
       JobQueuesManager mgr = scheduler.jobQueuesManager;
       assertEquals("Failed job present in Waiting queue", 0, mgr
-          .getWaitingJobCount("default"));
+          .getQueue("default").getNumWaitingJobs());
     }
   }
 

+ 11 - 2
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -803,6 +803,15 @@ public class JobInProgress {
     return numReduceTasks - runningReduceTasks - failedReduceTIPs - 
     finishedReduceTasks + speculativeReduceTasks;
   }
+  
+  /**
+   * Return total number of map and reduce tasks desired by the job.
+   * @return total number of map and reduce tasks desired by the job
+   */
+  public int desiredTasks() {
+    return desiredMaps() + desiredReduces();
+  }
+  
   public int getNumSlotsPerTask(TaskType taskType) {
     if (taskType == TaskType.MAP) {
       return numSlotsPerMap;
@@ -1361,7 +1370,7 @@ public class JobInProgress {
   
   /**
    * Check if we can schedule an off-switch task for this job.
-   * @param numTaskTrackers.
+   * @param numTaskTrackers number of tasktrackers
    * 
    * We check the number of missed opportunities for the job. 
    * If it has 'waited' long enough we go ahead and schedule.
@@ -1559,7 +1568,7 @@ public class JobInProgress {
       LOG.info("Exceeded limit for reduce input size: Estimated:" + 
           estimatedReduceInputSize + " Limit: " + 
           reduce_input_limit + " Failing Job " + jobId);
-      status.setFailureInfo("Job Exceeded Reduce Input limit " 
+      status.setFailureInfo("Job exceeded Reduce Input limit " 
           + " Limit:  " + reduce_input_limit + 
           " Estimated: " + estimatedReduceInputSize);
       jobtracker.failJob(this);

+ 22 - 0
src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java

@@ -54,6 +54,28 @@ class JobQueueJobInProgressListener extends JobInProgressListener {
     JobPriority getPriority() {return priority;}
     long getStartTime() {return startTime;}
     JobID getJobID() {return id;}
+    
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || obj.getClass() != JobSchedulingInfo.class) {
+        return false;
+      } else if (obj == this) {
+        return true;
+      }
+      else if (obj instanceof JobSchedulingInfo) {
+        JobSchedulingInfo that = (JobSchedulingInfo)obj;
+        return (this.id.equals(that.id) && 
+                this.startTime == that.startTime && 
+                this.priority == that.priority);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return (int)(id.hashCode() * priority.hashCode() + startTime);
+    }
+
   }
   
   static final Comparator<JobSchedulingInfo> FIFO_JOB_QUEUE_COMPARATOR

+ 25 - 11
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -3717,7 +3717,20 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         jobInfo.write(out);
         out.close();
       }
-      return addJob(jobId, job);
+      
+      // Submit the job
+      JobStatus status;
+      try {
+        status = addJob(jobId, job);
+      } catch (IOException ioe) {
+        LOG.info("Job " + jobId + " submission failed!", ioe);
+        status = job.getStatus();
+        status.setFailureInfo(StringUtils.stringifyException(ioe));
+        failJob(job);
+        throw ioe;
+      }
+      
+      return status;
     }
   }
 
@@ -3753,19 +3766,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    * adding a job. This is the core job submission logic
    * @param jobId The id for the job submitted which needs to be added
    */
-  private synchronized JobStatus addJob(JobID jobId, JobInProgress job) {
+  private synchronized JobStatus addJob(JobID jobId, JobInProgress job) 
+  throws IOException {
     totalSubmissions++;
 
     synchronized (jobs) {
       synchronized (taskScheduler) {
         jobs.put(job.getProfile().getJobID(), job);
         for (JobInProgressListener listener : jobInProgressListeners) {
-          try {
-            listener.jobAdded(job);
-          } catch (IOException ioe) {
-            LOG.warn("Failed to add and so skipping the job : "
-                + job.getJobID() + ". Exception : " + ioe);
-          }
+          listener.jobAdded(job);
         }
       }
     }
@@ -3946,8 +3955,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
           StringUtils.stringifyException(kie));
       killJob(job);
     } catch (Throwable t) {
-      String failureInfo = "Job initialization failed:\n" +
-      StringUtils.stringifyException(t);
+      String failureInfo = 
+        "Job initialization failed:\n" + StringUtils.stringifyException(t);
       // If the job initialization is failed, job state will be FAILED
       LOG.error(failureInfo);
       job.getStatus().setFailureInfo(failureInfo);
@@ -4889,7 +4898,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   public void refreshQueues() throws IOException {
     LOG.info("Refreshing queue information. requested by : " +
         UserGroupInformation.getCurrentUser().getShortUserName());
-    this.queueManager.refreshQueues(new Configuration(this.conf));
+    this.queueManager.refreshQueues(new Configuration());
+    
+    synchronized (taskScheduler) {
+      taskScheduler.refresh();
+    }
+
   }
   
   synchronized String getReasonsForBlacklisting(String host) {

+ 41 - 21
src/mapred/org/apache/hadoop/mapred/QueueManager.java

@@ -28,6 +28,7 @@ import java.util.TreeSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Queue.QueueState;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.util.StringUtils;
@@ -61,7 +62,8 @@ class QueueManager {
   /** Whether ACLs are enabled in the system or not. */
   private boolean aclsEnabled;
   /** Map of a queue name and Queue object */
-  final HashMap<String,Queue> queues;
+  final HashMap<String,Queue> queues = new HashMap<String,Queue>();
+  
   /**
    * Enum representing an AccessControlList that drives set of operations that
    * can be performed on a queue.
@@ -98,20 +100,24 @@ class QueueManager {
   public QueueManager(Configuration conf) {
     checkDeprecation(conf);
     conf.addResource(QUEUE_ACLS_FILE_NAME);
-    queues = new HashMap<String,Queue>();
+    
+    // Get configured ACLs and state for each queue
+    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
+
+    queues.putAll(parseQueues(conf)); 
+  }
+  
+  synchronized private Map<String, Queue> parseQueues(Configuration conf) {
+    Map<String, Queue> queues = new HashMap<String, Queue>();
     // First get the queue names
     String[] queueNameValues = conf.getStrings("mapred.queue.names",
         new String[]{JobConf.DEFAULT_QUEUE_NAME});
-    // Get configured ACLs and state for each queue
-    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
     for (String name : queueNameValues) {
-      try {
-        queues.put(name, new Queue(name, getQueueAcls(name, conf),
-              getQueueState(name, conf)));
-      } catch (Throwable t) {
-        LOG.warn("Not able to initialize queue " + name, t);
-      }
+      queues.put(name, new Queue(name, getQueueAcls(name, conf),
+          getQueueState(name, conf)));
     }
+    
+    return queues;
   }
   
   /**
@@ -218,6 +224,7 @@ class QueueManager {
    * @throws IOException when queue ACL configuration file is invalid.
    */
   synchronized void refreshQueues(Configuration conf) throws IOException {
+    
     // First check if things are configured in mapred-site.xml,
     // so we can print out a deprecation warning.
     // This check is needed only until we support the configuration
@@ -228,23 +235,36 @@ class QueueManager {
     // will be overridden.
     conf.addResource(QUEUE_ACLS_FILE_NAME);
 
+    // Now parse the queues and check to ensure no queue has been deleted
+    Map<String, Queue> newQueues = parseQueues(conf);
+    checkQueuesForDeletion(queues, newQueues);
+
     // Now we refresh the properties of the queues. Note that we
     // do *not* refresh the queue names or the acls flag. Instead
     // we use the older values configured for them.
-    LOG.info("Refreshing acls and state for configured queues.");
-    try {
-      for (String qName : getQueues()) {
-        Queue q = queues.get(qName);
-        q.setAcls(getQueueAcls(qName, conf));
-        q.setState(getQueueState(qName, conf));
+    queues.clear();
+    queues.putAll(newQueues);
+    LOG.info("Queues acls, state and configs refreshed: " + 
+        queues.size() + " queues present now.");
+  }
+
+  private void checkQueuesForDeletion(Map<String, Queue> currentQueues,
+      Map<String, Queue> newQueues) {
+    for (String queue : currentQueues.keySet()) {
+      if (!newQueues.containsKey(queue)) {
+        throw new IllegalArgumentException("Couldn't find queue '" + queue + 
+            "' during refresh!");
+      }
+    }
+    
+    // Mark new queues as STOPPED
+    for (String queue : newQueues.keySet()) {
+      if (!currentQueues.containsKey(queue)) {
+        newQueues.get(queue).setState(QueueState.STOPPED);
       }
-    } catch (Throwable t) {
-      LOG.warn("Invalid queue configuration", t);
-      throw new IOException("Invalid queue configuration", t);
     }
-
   }
-
+  
   private void checkDeprecation(Configuration conf) {
     // check if queues are defined.
     String[] queues = conf.getStrings("mapred.queue.names");

+ 6 - 1
src/mapred/org/apache/hadoop/mapred/TaskScheduler.java

@@ -91,5 +91,10 @@ abstract class TaskScheduler implements Configurable {
    * @return
    */
   public abstract Collection<JobInProgress> getJobs(String queueName);
-    
+
+  /**
+   * Refresh the configuration of the scheduler.
+   */
+  public void refresh() throws IOException {}
+  
 }

部分文件因文件數量過多而無法顯示