Ver Fonte

HADOOP-3746. Add a fair share scheduler. (Matei Zaharia via omalley)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@690093 13f79535-47bb-0310-9956-ffa450edef68
Owen O'Malley há 17 anos atrás
pai
commit
de5f5f41c9
19 ficheiros alterados com 3206 adições e 0 exclusões
  1. 2 0
      CHANGES.txt
  2. 1 0
      src/contrib/build.xml
  3. 238 0
      src/contrib/fairscheduler/README
  4. 28 0
      src/contrib/fairscheduler/build.xml
  5. 30 0
      src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/AllocationConfigurationException.java
  6. 54 0
      src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
  7. 76 0
      src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java
  8. 689 0
      src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
  9. 299 0
      src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
  10. 42 0
      src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java
  11. 79 0
      src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
  12. 57 0
      src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java
  13. 60 0
      src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java
  14. 324 0
      src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
  15. 101 0
      src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java
  16. 26 0
      src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskType.java
  17. 33 0
      src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/WeightAdjuster.java
  18. 1064 0
      src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
  19. 3 0
      src/mapred/org/apache/hadoop/mapred/TaskInProgress.java

+ 2 - 0
CHANGES.txt

@@ -104,6 +104,8 @@ Trunk (unreleased changes)
     HADOOP-3759. Provides ability to run memory intensive jobs without 
     affecting other running tasks on the nodes. (Hemanth Yamijala via ddas)
 
+    HADOOP-3746. Add a fair share scheduler. (Matei Zaharia via omalley)
+
   IMPROVEMENTS
 
     HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.

+ 1 - 0
src/contrib/build.xml

@@ -47,6 +47,7 @@
   <target name="test">
     <subant target="test">
       <fileset dir="." includes="streaming/build.xml"/>
+      <fileset dir="." includes="fairscheduler/build.xml"/>
     </subant>
   </target>
   

+ 238 - 0
src/contrib/fairscheduler/README

@@ -0,0 +1,238 @@
+# Copyright 2008 The Apache Software Foundation Licensed under the
+# Apache License, Version 2.0 (the "License"); you may not use this
+# file except in compliance with the License.  You may obtain a copy
+# of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
+# required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.  See the License for the specific language governing
+# permissions and limitations under the License.
+
+This package implements fair scheduling for MapReduce jobs.
+
+Fair scheduling is a method of assigning resources to jobs such that all jobs
+get, on average, an equal share of resources over time. When there is a single
+job running, that job uses the entire cluster. When other jobs are submitted,
+tasks slots that free up are assigned to the new jobs, so that each job gets
+roughly the same amount of CPU time. Unlike the default Hadoop scheduler, which
+forms a queue of jobs, this lets short jobs finish in reasonable time while not
+starving long jobs. It is also a reasonable way to share a cluster between a
+number of users. Finally, fair sharing can also work with job priorities - the
+priorities are used as weights to determine the fraction of total compute time
+that each job should get.
+
+In addition to providing fair sharing, the Fair Scheduler allows assigning
+jobs to "pools" with guaranteed minimum shares. When a pool contains jobs,
+it gets at least its minimum share, but when a pool does not need its full
+capacity, the excess is shared between other running jobs. Thus pools are
+a way to guarantee capacity for particular user groups while utilizing the
+cluster efficiently when these users are not submitting any jobs. Within each
+pool, fair sharing is used to share capacity between the running jobs. By
+default the pool is set based on the queue.name property in the jobconf which
+will be introduced with the Hadoop Resource Manager (JIRA 3445), but it's
+possible to also have a pool per user or per Unix user group.
+
+The fair scheduler lets all jobs run by default, but it is also possible to
+limit the number of running jobs per user and per pool through the config
+file. This can be useful when a user must submit hundreds of jobs at once,
+or in general to improve performance if running too many jobs at once would
+cause too much intermediate data to be created or too much context-switching.
+Limiting the jobs does not cause any subsequently submitted jobs to fail, only
+to wait in the sheduler's queue until some of the user's earlier jobs finish.
+Jobs to run from each user/pool are chosen in order of priority and then submit
+time, as in the default FIFO scheduler in Hadoop.
+
+Finally, the fair scheduler provides several extension points where the basic
+functionality can be extended. For example, the weight calculation can be
+modified to give a priority boost to new jobs, implementing a "shortest job
+first" like policy which will reduce response times for interactive jobs even
+further. 
+
+--------------------------------------------------------------------------------
+
+BUILDING:
+
+In HADOOP_HOME, run ant package to build Hadoop and its contrib packages.
+
+--------------------------------------------------------------------------------
+
+INSTALLING:
+
+To run the fair scheduler in your Hadoop installation, you need to put it on
+the CLASSPATH. The easiest way is to copy the hadoop-*-fairscheduler.jar
+from HADOOP_HOME/build/contrib/fairscheduler to HADOOP_HOME/lib. Alternatively
+you can modify HADOOP_CLASSPATH to include this jar, in conf/hadoop-env.sh.
+
+You will also need to set the following property in the Hadoop config file
+(conf/hadoop-site.xml) to have Hadoop use the fair scheduler:
+
+<property>
+  <name>mapred.jobtracker.taskScheduler</name>
+  <value>org.apache.hadoop.mapred.FairScheduler</value>
+</property>
+
+Once you restart the cluster, you can check that the fair scheduler is running
+by going to http://<jobtracker URL>/scheduler on the JobTracker's web UI. A
+"job scheduler administration" page should be visible there. This page is
+described in the Administration section.
+
+--------------------------------------------------------------------------------
+
+CONFIGURING:
+
+The following properties can be set in hadoop-site.xml to configure the
+scheduler:
+
+mapred.fairscheduler.allocation.file:
+    Specifies an absolute path to an XML file which contains the allocations
+    for each pool, as well as the per-pool and per-user limits on number of
+    running jobs. If this property is not provided, allocations are not used.
+    This file must be in XML format, and can contain three types of elements:
+    - pool elements, which may contain elements for minMaps, minReduces and
+      maxRunningJobs (limit the number of jobs from the pool to run at once).
+    - user elements, which may contain a maxRunningJobs to limit jobs.
+    - A userMaxJobsDefault element, which sets the running job limit for any
+      users that do not have their own elements.
+    The following example file shows how to create each type of element:
+        <?xml version="1.0"?>
+        <allocations>
+          <pool name="sample_pool">
+            <minMaps>5</minMaps>
+            <minReduces>5</minReduces>
+          </pool>
+          <user name="sample_user">
+            <maxRunningJobs>6</maxRunningJobs>
+          </user>
+          <userMaxJobsDefault>3</userMaxJobsDefault>
+        </allocations>
+    This example creates a pool sample_pool with a guarantee of 5 map slots
+    and 5 reduce slots. It also limits the number of running jobs per user
+    to 3, except for sample_user, who can run 6 jobs concurrently.
+    Any pool not defined in the allocations file will have no guaranteed
+    capacity. Also, any pool or user with no max running jobs set in the file
+    will be allowed to run an unlimited number of jobs.
+
+mapred.fairscheduler.assignmultiple:
+    Allows the scheduler to assign both a map task and a reduce task on each
+    heartbeat, which improves cluster throughput when there are many small
+    tasks to run. Boolean value, default: false.
+
+mapred.fairscheduler.sizebasedweight:
+    Take into account job sizes in calculating their weights for fair sharing.
+    By default, weights are only based on job priorities. Setting this flag to
+    true will make them based on the size of the job (number of tasks needed)
+    as well, though not linearly (the weight will be proportional to the log
+    of the number of tasks needed). This lets larger jobs get larger fair
+    shares while still providing enough of a share to small jobs to let them
+    finish fast. Boolean value, default: false.
+
+mapred.fairscheduler.poolnameproperty:
+    Specify which jobconf property is used to determine the pool that a job
+    belongs in. String, default: queue.name (the same property as the queue
+    name in the Hadoop Resource Manager, JIRA 3445). You can use user.name
+    or group.name to base it on the Unix user or Unix group for example.
+
+mapred.fairscheduler.weightadjuster:
+    An extensibility point that lets you specify a class to adjust the weights
+    of running jobs. This class should implement the WeightAdjuster interface.
+    There is currently one example implementation - NewJobWeightBooster, which
+    increases the weight of jobs for the first 5 minutes of their lifetime
+    to let short jobs finish faster. To use it, set the weightadjuster property
+    to the full class name, org.apache.hadoop.mapred.NewJobWeightBooster.
+    NewJobWeightBooster itself provides two parameters for setting the duration
+    and boost factor - mapred.newjobweightbooster.factor (default 3) and
+    mapred.newjobweightbooster.duration (in milliseconds, default 300000 for 5
+    minutes).
+
+mapred.fairscheduler.loadmanager:
+    An extensibility point that lets you specify a class that determines
+    how many maps and reduces can run on a given TaskTracker. This class should
+    implement the LoadManager interface. By default the task caps in the Hadoop
+    config file are used, but this option could be used to make the load based
+    on available memory and CPU utilization for example.
+
+mapred.fairscheduler.taskselector:
+    An extensibility point that lets you specify a class that determines
+    which task from within a job to launch on a given tracker. This can be
+    used to change either the locality policy (e.g. keep some jobs within
+    a particular rack) or the speculative execution algorithm (select when to
+    launch speculative tasks). The default implementation uses Hadoop's
+    default algorithms from JobInProgress. 
+
+--------------------------------------------------------------------------------
+
+ADMINISTRATION:
+
+The fair scheduler provides support for administration at runtime through
+two mechanisms. First, it is possible to modify pools' allocations and user
+and pool running job limits at runtime by editing the allocation config file.
+The scheduler will reload this file 10-15 seconds after it sees that it was
+modified. Second, current jobs, pools, and fair shares can be examined through
+the JobTracker's web interface, at http://<jobtracker URL>/scheduler. On this
+interface, it is also possible to modify jobs' priorities or move jobs from
+one pool to another and see the effects on the fair shares (this requires
+JavaScript). The following fields can be seen for each job on the web interface:
+
+Submitted - Date and time job was submitted.
+JobID, User, Name - Job identifiers as on the standard web UI.
+Pool - Current pool of job. Select another value to move job to another pool.
+Priority - Current priority. Select another value to change the job's priority.
+Maps/Reduces Finished: Number of tasks finished / total tasks.
+Maps/Reduces Running: Tasks currently running.
+Map/Reduce Fair Share: The average number of task slots that this job should
+    have at any given time according to fair sharing. The actual number of
+    tasks will go up and down depending on how much compute time the job has
+    had, but on average it will get its fair share amount.
+
+In addition, it is possible to turn on an "advanced" view for the web UI, by
+going to http://<jobtracker URL>/scheduler?advanced. This view shows four more
+columns used for calculations internally:
+
+Maps/Reduce Weight: Weight of the job in the fair sharing calculations. This
+    depends on priority and potentially also on job size and job age if the
+    sizebasedweight and NewJobWeightBooster are enabled.
+Map/Reduce Deficit: The job's scheduling deficit in macine-seconds - the amount
+    of resources it should have gotten according to its fair share, minus how
+    many it actually got. Positive deficit means the job will be scheduled
+    again in the near future because it needs to catch up to its fair share.
+    The scheduler schedules jobs with higher deficit ahead of others. Please
+    see the Implementation section of this document for details.
+
+Finally, the web interface provides a button for switching to FIFO scheduling,
+at runtime, at the bottom of the page, in case this becomes necessary and it
+is inconvenient to restart the MapReduce cluster. 
+
+--------------------------------------------------------------------------------
+
+IMPLEMENTATION:
+
+There are two aspects to implementing fair scheduling: Calculating each job's
+fair share, and choosing which job to run when a task slot becomes available.
+
+To select jobs to run, the scheduler then keeps track of a "deficit" for
+each job - the difference between the amount of compute time it should have
+gotten on an ideal scheduler, and the amount of compute time it actually got.
+This is a measure of how "unfair" we've been to the job. Every few hundred
+milliseconds, the scheduler updates the deficit of each job by looking at
+how many tasks each job had running during this interval vs. its fair share.
+Whenever a task slot becomes available, it is assigned to the job with the
+highest deficit. There is one exception - if there were one or more jobs who
+were not meeting their pool capacity guarantees, we only choose among these
+"needy" jobs (based again on their deficit), to ensure that the scheduler
+meets pool guarantees as soon as possible.
+
+The fair shares are calculated by dividing the capacity of the cluster among
+runnable jobs according to a "weight" for each job. By default the weight is
+based on priority, with each level of priority having 2x higher weight than the
+next (for example, VERY_HIGH has 4x the weight of NORMAL). However, weights can
+also be based on job sizes and ages, as described in the Configuring section.
+For jobs that are in a pool, fair shares also take into account the minimum
+guarantee for that pool. This capacity is divided among the jobs in that pool
+according again to their weights.
+
+Finally, when limits on a user's running jobs or a pool's running jobs are in
+place, we choose which jobs get to run by sorting all jobs in order of priority
+and then submit time, as in the standard Hadoop scheduler. Any jobs that fall
+after the user/pool's limit in this ordering are queued up and wait idle until
+they can be run. During this time, they are ignored from the fair sharing
+calculations and do not gain or lose deficit (their fair share is set to zero).

+ 28 - 0
src/contrib/fairscheduler/build.xml

@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<!-- 
+Before you can run these subtargets directly, you need 
+to call at top-level: ant deploy-contrib compile-core-test
+-->
+<project name="fairscheduler" default="jar">
+
+  <import file="../build-contrib.xml"/>
+
+</project>

+ 30 - 0
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/AllocationConfigurationException.java

@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * Thrown when the allocation file for {@link PoolManager} is malformed.  
+ */
+public class AllocationConfigurationException extends Exception {
+  private static final long serialVersionUID = 4046517047810854249L;
+  
+  public AllocationConfigurationException(String message) {
+    super(message);
+  }
+}

+ 54 - 0
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java

@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * A {@link LoadManager} for use by the {@link FairScheduler} that allocates
+ * tasks evenly across nodes up to their per-node maximum, using the default
+ * load management algorithm in Hadoop.
+ */
+public class CapBasedLoadManager extends LoadManager {
+  /**
+   * Determine how many tasks of a given type we want to run on a TaskTracker. 
+   * This cap is chosen based on how many tasks of that type are outstanding in
+   * total, so that when the cluster is used below capacity, tasks are spread
+   * out uniformly across the nodes rather than being clumped up on whichever
+   * machines sent out heartbeats earliest.
+   */
+  int getCap(TaskTrackerStatus tracker,
+      int totalRunnableTasks, int localMaxTasks) {
+    int numTaskTrackers = taskTrackerManager.taskTrackers().size();
+    return Math.min(localMaxTasks,
+        (int) Math.ceil((double) totalRunnableTasks / numTaskTrackers));
+  }
+
+  @Override
+  public boolean canAssignMap(TaskTrackerStatus tracker,
+      int totalRunnableMaps) {
+    return tracker.countMapTasks() < getCap(tracker, totalRunnableMaps,
+        tracker.getMaxMapTasks());
+  }
+
+  @Override
+  public boolean canAssignReduce(TaskTrackerStatus tracker,
+      int totalRunnableReduces) {
+    return tracker.countReduceTasks() < getCap(tracker, totalRunnableReduces,
+        tracker.getMaxReduceTasks());
+  }
+}

+ 76 - 0
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * A {@link TaskSelector} implementation that wraps around the default
+ * {@link JobInProgress#obtainNewMapTask(TaskTrackerStatus, int)} and
+ * {@link JobInProgress#obtainNewReduceTask(TaskTrackerStatus, int)} methods
+ * in {@link JobInProgress}, using the default Hadoop locality and speculative
+ * threshold algorithms.
+ */
+public class DefaultTaskSelector extends TaskSelector {
+
+  @Override
+  public int neededSpeculativeMaps(JobInProgress job) {
+    int count = 0;
+    long time = System.currentTimeMillis();
+    double avgProgress = job.getStatus().mapProgress();
+    for (TaskInProgress tip: job.maps) {
+      if (tip.isRunning() && tip.hasSpeculativeTask(time, avgProgress)) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  @Override
+  public int neededSpeculativeReduces(JobInProgress job) {
+    int count = 0;
+    long time = System.currentTimeMillis();
+    double avgProgress = job.getStatus().reduceProgress();
+    for (TaskInProgress tip: job.reduces) {
+      if (tip.isRunning() && tip.hasSpeculativeTask(time, avgProgress)) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  @Override
+  public Task obtainNewMapTask(TaskTrackerStatus taskTracker, JobInProgress job)
+      throws IOException {
+    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
+    int numTaskTrackers = clusterStatus.getTaskTrackers();
+    return job.obtainNewMapTask(taskTracker, numTaskTrackers,
+        taskTrackerManager.getNumberOfUniqueHosts());
+  }
+
+  @Override
+  public Task obtainNewReduceTask(TaskTrackerStatus taskTracker, JobInProgress job)
+      throws IOException {
+    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
+    int numTaskTrackers = clusterStatus.getTaskTrackers();
+    return job.obtainNewReduceTask(taskTracker, numTaskTrackers,
+        taskTrackerManager.getNumberOfUniqueHosts());
+  }
+
+}

+ 689 - 0
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java

@@ -0,0 +1,689 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A {@link TaskScheduler} that implements fair sharing.
+ */
+public class FairScheduler extends TaskScheduler {
+  /** How often fair shares are re-calculated */
+  public static final long UPDATE_INTERVAL = 500;
+  public static final Log LOG = LogFactory.getLog(
+      "org.apache.hadoop.mapred.FairScheduler");
+  
+  protected PoolManager poolMgr;
+  
+  protected LoadManager loadMgr;
+  protected TaskSelector taskSelector;
+  protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
+  protected Map<JobInProgress, JobInfo> infos = // per-job scheduling variables
+    new HashMap<JobInProgress, JobInfo>();
+  protected long lastUpdateTime;           // Time when we last updated infos
+  protected boolean initialized;  // Are we initialized?
+  protected boolean running;      // Are we running?
+  protected boolean useFifo;      // Set if we want to revert to FIFO behavior
+  protected boolean assignMultiple; // Simultaneously assign map and reduce?
+  protected boolean sizeBasedWeight; // Give larger weights to larger jobs
+  private Clock clock;
+  private boolean runBackgroundUpdates; // Can be set to false for testing
+  private EagerTaskInitializationListener eagerInitListener;
+  private JobListener jobListener;
+  
+  /**
+   * A class for holding per-job scheduler variables. These always contain the
+   * values of the variables at the last update(), and are used along with a
+   * time delta to update the map and reduce deficits before a new update().
+   */
+  static class JobInfo {
+    boolean runnable = false;   // Can the job run given user/pool limits?
+    double mapWeight = 0;       // Weight of job in calculation of map share
+    double reduceWeight = 0;    // Weight of job in calculation of reduce share
+    long mapDeficit = 0;        // Time deficit for maps
+    long reduceDeficit = 0;     // Time deficit for reduces
+    int runningMaps = 0;        // Maps running at last update
+    int runningReduces = 0;     // Reduces running at last update
+    int neededMaps;             // Maps needed at last update
+    int neededReduces;          // Reduces needed at last update
+    int minMaps = 0;            // Minimum maps as guaranteed by pool
+    int minReduces = 0;         // Minimum reduces as guaranteed by pool
+    double mapFairShare = 0;    // Fair share of map slots at last update
+    double reduceFairShare = 0; // Fair share of reduce slots at last update
+  }
+  
+  /**
+   * A clock class - can be mocked out for testing.
+   */
+  static class Clock {
+    long getTime() {
+      return System.currentTimeMillis();
+    }
+  }
+  
+  public FairScheduler() {
+    this(new Clock(), true);
+  }
+  
+  /**
+   * Constructor used for tests, which can change the clock and disable updates.
+   */
+  protected FairScheduler(Clock clock, boolean runBackgroundUpdates) {
+    this.clock = clock;
+    this.runBackgroundUpdates = runBackgroundUpdates;
+    this.eagerInitListener = new EagerTaskInitializationListener();
+    this.jobListener = new JobListener();
+  }
+
+  @Override
+  public void start() {
+    try {
+      Configuration conf = getConf();
+      eagerInitListener.start();
+      taskTrackerManager.addJobInProgressListener(eagerInitListener);
+      taskTrackerManager.addJobInProgressListener(jobListener);
+      poolMgr = new PoolManager(conf);
+      loadMgr = (LoadManager) ReflectionUtils.newInstance(
+          conf.getClass("mapred.fairscheduler.loadmanager", 
+              CapBasedLoadManager.class, LoadManager.class), conf);
+      loadMgr.setTaskTrackerManager(taskTrackerManager);
+      loadMgr.start();
+      taskSelector = (TaskSelector) ReflectionUtils.newInstance(
+          conf.getClass("mapred.fairscheduler.taskselector", 
+              DefaultTaskSelector.class, TaskSelector.class), conf);
+      taskSelector.setTaskTrackerManager(taskTrackerManager);
+      taskSelector.start();
+      Class<?> weightAdjClass = conf.getClass(
+          "mapred.fairscheduler.weightadjuster", null);
+      if (weightAdjClass != null) {
+        weightAdjuster = (WeightAdjuster) ReflectionUtils.newInstance(
+            weightAdjClass, conf);
+      }
+      assignMultiple = conf.getBoolean("mapred.fairscheduler.assignmultiple",
+          false);
+      sizeBasedWeight = conf.getBoolean("mapred.fairscheduler.sizebasedweight",
+          false);
+      initialized = true;
+      running = true;
+      lastUpdateTime = clock.getTime();
+      // Start a thread to update deficits every UPDATE_INTERVAL
+      if (runBackgroundUpdates)
+        new UpdateThread().start();
+      // Register servlet with JobTracker's Jetty server
+      if (taskTrackerManager instanceof JobTracker) {
+        JobTracker jobTracker = (JobTracker) taskTrackerManager;
+        StatusHttpServer infoServer = jobTracker.infoServer;
+        infoServer.setAttribute("scheduler", this);
+        infoServer.addServlet("scheduler", "/scheduler",
+            FairSchedulerServlet.class);
+      }
+    } catch (Exception e) {
+      // Can't load one of the managers - crash the JobTracker now while it is
+      // starting up so that the user notices.
+      throw new RuntimeException("Failed to start FairScheduler", e);
+    }
+    LOG.info("Successfully configured FairScheduler");
+  }
+
+  @Override
+  public void terminate() throws IOException {
+    running = false;
+    if (jobListener != null)
+      taskTrackerManager.removeJobInProgressListener(jobListener);
+    if (eagerInitListener != null)
+      taskTrackerManager.removeJobInProgressListener(eagerInitListener);
+  }
+  
+  /**
+   * Used to listen for jobs added/removed by our {@link TaskTrackerManager}.
+   */
+  private class JobListener extends JobInProgressListener {
+    @Override
+    public void jobAdded(JobInProgress job) {
+      synchronized (FairScheduler.this) {
+        poolMgr.addJob(job);
+        JobInfo info = new JobInfo();
+        infos.put(job, info);
+        update();
+      }
+    }
+    
+    @Override
+    public void jobRemoved(JobInProgress job) {
+      synchronized (FairScheduler.this) {
+        poolMgr.removeJob(job);
+        infos.remove(job);
+      }
+    }
+  
+    @Override
+    public void jobUpdated(JobInProgress job) {
+    }
+  }
+
+  /**
+   * A thread which calls {@link FairScheduler#update()} ever
+   * <code>UPDATE_INTERVAL</code> milliseconds.
+   */
+  private class UpdateThread extends Thread {
+    private UpdateThread() {
+      super("FairScheduler update thread");
+    }
+
+    public void run() {
+      while (running) {
+        try {
+          Thread.sleep(UPDATE_INTERVAL);
+          update();
+        } catch (Exception e) {
+          LOG.error("Failed to update fair share calculations", e);
+        }
+      }
+    }
+  }
+  
+  @Override
+  public synchronized List<Task> assignTasks(TaskTrackerStatus tracker)
+      throws IOException {
+    if (!initialized) // Don't try to assign tasks if we haven't yet started up
+      return null;
+    
+    // Reload allocations file if it hasn't been loaded in a while
+    poolMgr.reloadAllocsIfNecessary();
+    
+    // Compute total runnable maps and reduces
+    int runnableMaps = 0;
+    int runnableReduces = 0;
+    for (JobInProgress job: infos.keySet()) {
+      runnableMaps += runnableTasks(job, TaskType.MAP);
+      runnableReduces += runnableTasks(job, TaskType.REDUCE);
+    }
+    
+    // Scan to see whether any job needs to run a map, then a reduce
+    ArrayList<Task> tasks = new ArrayList<Task>();
+    TaskType[] types = new TaskType[] {TaskType.MAP, TaskType.REDUCE};
+    for (TaskType taskType: types) {
+      boolean canAssign = (taskType == TaskType.MAP) ? 
+          loadMgr.canAssignMap(tracker, runnableMaps) :
+          loadMgr.canAssignReduce(tracker, runnableReduces);
+      if (canAssign) {
+        // Figure out the jobs that need this type of task
+        List<JobInProgress> candidates = new ArrayList<JobInProgress>();
+        for (JobInProgress job: infos.keySet()) {
+          if (job.getStatus().getRunState() == JobStatus.RUNNING && 
+              neededTasks(job, taskType) > 0) {
+            candidates.add(job);
+          }
+        }
+        // Sort jobs by deficit (for Fair Sharing) or submit time (for FIFO)
+        Comparator<JobInProgress> comparator = useFifo ?
+            new FifoJobComparator() : new DeficitComparator(taskType);
+        Collections.sort(candidates, comparator);
+        for (JobInProgress job: candidates) {
+          Task task = (taskType == TaskType.MAP ? 
+              taskSelector.obtainNewMapTask(tracker, job) :
+              taskSelector.obtainNewReduceTask(tracker, job));
+          if (task != null) {
+            // Update the JobInfo for this job so we account for the launched
+            // tasks during this update interval and don't try to launch more
+            // tasks than the job needed on future heartbeats
+            JobInfo info = infos.get(job);
+            if (taskType == TaskType.MAP) {
+              info.runningMaps++;
+              info.neededMaps--;
+            } else {
+              info.runningReduces++;
+              info.neededReduces--;
+            }
+            tasks.add(task);
+            if (!assignMultiple)
+              return tasks;
+            break;
+          }
+        }
+      }
+    }
+    
+    // If no tasks were found, return null
+    return tasks.isEmpty() ? null : tasks;
+  }
+
+  /**
+   * Compare jobs by deficit for a given task type, putting jobs whose current
+   * allocation is less than their minimum share always ahead of others. This is
+   * the default job comparator used for Fair Sharing.
+   */
+  private class DeficitComparator implements Comparator<JobInProgress> {
+    private final TaskType taskType;
+
+    private DeficitComparator(TaskType taskType) {
+      this.taskType = taskType;
+    }
+
+    public int compare(JobInProgress j1, JobInProgress j2) {
+      // Put needy jobs ahead of non-needy jobs (where needy means must receive
+      // new tasks to meet slot minimum), comparing among jobs of the same type
+      // by deficit so as to put jobs with higher deficit ahead.
+      JobInfo j1Info = infos.get(j1);
+      JobInfo j2Info = infos.get(j2);
+      long deficitDif;
+      boolean j1Needy, j2Needy;
+      if (taskType == TaskType.MAP) {
+        j1Needy = j1.runningMaps() < Math.floor(j1Info.minMaps);
+        j2Needy = j2.runningMaps() < Math.floor(j2Info.minMaps);
+        deficitDif = j2Info.mapDeficit - j1Info.mapDeficit;
+      } else {
+        j1Needy = j1.runningReduces() < Math.floor(j1Info.minReduces);
+        j2Needy = j2.runningReduces() < Math.floor(j2Info.minReduces);
+        deficitDif = j2Info.reduceDeficit - j1Info.reduceDeficit;
+      }
+      if (j1Needy && !j2Needy)
+        return -1;
+      else if (j2Needy && !j1Needy)
+        return 1;
+      else // Both needy or both non-needy; compare by deficit
+        return (int) Math.signum(deficitDif);
+    }
+  }
+  
+  /**
+   * Recompute the internal variables used by the scheduler - per-job weights,
+   * fair shares, deficits, minimum slot allocations, and numbers of running
+   * and needed tasks of each type. 
+   */
+  protected synchronized void update() {
+    // Remove non-running jobs
+    List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
+    for (JobInProgress job: infos.keySet()) { 
+      int runState = job.getStatus().getRunState();
+      if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED) {
+        toRemove.add(job);
+      }
+    }
+    for (JobInProgress job: toRemove) {
+      infos.remove(job);
+      poolMgr.removeJob(job);
+    }
+    // Update running jobs with deficits since last update, and compute new
+    // slot allocations, weight, shares and task counts
+    long now = clock.getTime();
+    long timeDelta = now - lastUpdateTime;
+    updateDeficits(timeDelta);
+    updateRunnability();
+    updateTaskCounts();
+    updateWeights();
+    updateMinSlots();
+    updateFairShares();
+    lastUpdateTime = now;
+  }
+  
+  private void updateDeficits(long timeDelta) {
+    for (JobInfo info: infos.values()) {
+      info.mapDeficit +=
+        (info.mapFairShare - info.runningMaps) * timeDelta;
+      info.reduceDeficit +=
+        (info.reduceFairShare - info.runningReduces) * timeDelta;
+    }
+  }
+  
+  private void updateRunnability() {
+    // Start by marking everything as not runnable
+    for (JobInfo info: infos.values()) {
+      info.runnable = false;
+    }
+    // Create a list of sorted jobs in order of start time and priority
+    List<JobInProgress> jobs = new ArrayList<JobInProgress>(infos.keySet());
+    Collections.sort(jobs, new FifoJobComparator());
+    // Mark jobs as runnable in order of start time and priority, until
+    // user or pool limits have been reached.
+    Map<String, Integer> userJobs = new HashMap<String, Integer>();
+    Map<String, Integer> poolJobs = new HashMap<String, Integer>();
+    for (JobInProgress job: jobs) {
+      if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+        String user = job.getJobConf().getUser();
+        String pool = poolMgr.getPoolName(job);
+        int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0;
+        int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0;
+        if (userCount < poolMgr.getUserMaxJobs(user) && 
+            poolCount < poolMgr.getPoolMaxJobs(pool)) {
+          infos.get(job).runnable = true;
+          userJobs.put(user, userCount + 1);
+          poolJobs.put(pool, poolCount + 1);
+        }
+      }
+    }
+  }
+
+  private void updateTaskCounts() {
+    for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
+      JobInProgress job = entry.getKey();
+      JobInfo info = entry.getValue();
+      if (job.getStatus().getRunState() != JobStatus.RUNNING)
+        continue; // Job is still in PREP state and tasks aren't initialized
+      // Count maps
+      int totalMaps = job.numMapTasks;
+      int finishedMaps = 0;
+      int runningMaps = 0;
+      for (TaskInProgress tip: job.getMapTasks()) {
+        if (tip.isComplete()) {
+          finishedMaps += 1;
+        } else if (tip.isRunning()) {
+          runningMaps += tip.getActiveTasks().size();
+        }
+      }
+      info.runningMaps = runningMaps;
+      info.neededMaps = (totalMaps - runningMaps - finishedMaps
+          + taskSelector.neededSpeculativeMaps(job));
+      // Count reduces
+      int totalReduces = job.numReduceTasks;
+      int finishedReduces = 0;
+      int runningReduces = 0;
+      for (TaskInProgress tip: job.getReduceTasks()) {
+        if (tip.isComplete()) {
+          finishedReduces += 1;
+        } else if (tip.isRunning()) {
+          runningReduces += tip.getActiveTasks().size();
+        }
+      }
+      info.runningReduces = runningReduces;
+      info.neededReduces = (totalReduces - runningReduces - finishedReduces 
+                            + taskSelector.neededSpeculativeReduces(job));
+      // If the job was marked as not runnable due to its user or pool having
+      // too many active jobs, set the neededMaps/neededReduces to 0. We still
+      // count runningMaps/runningReduces however so we can give it a deficit.
+      if (!info.runnable) {
+        info.neededMaps = 0;
+        info.neededReduces = 0;
+      }
+    }
+  }
+
+  private void updateWeights() {
+    for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
+      JobInProgress job = entry.getKey();
+      JobInfo info = entry.getValue();
+      info.mapWeight = calculateWeight(job, TaskType.MAP);
+      info.reduceWeight = calculateWeight(job, TaskType.REDUCE);
+    }
+  }
+  
+  private void updateMinSlots() {
+    // Clear old minSlots
+    for (JobInfo info: infos.values()) {
+      info.minMaps = 0;
+      info.minReduces = 0;
+    }
+    // For each pool, distribute its task allocation among jobs in it that need
+    // slots. This is a little tricky since some jobs in the pool might not be
+    // able to use all the slots, e.g. they might have only a few tasks left.
+    // To deal with this, we repeatedly split up the available task slots
+    // between the jobs left, give each job min(its alloc, # of slots it needs),
+    // and redistribute any slots that are left over between jobs that still
+    // need slots on the next pass. If, in total, the jobs in our pool don't
+    // need all its allocation, we leave the leftover slots for general use.
+    PoolManager poolMgr = getPoolManager();
+    for (Pool pool: poolMgr.getPools()) {
+      for (final TaskType type: TaskType.values()) {
+        Set<JobInProgress> jobs = new HashSet<JobInProgress>(pool.getJobs());
+        int slotsLeft = poolMgr.getAllocation(pool.getName(), type);
+        // Keep assigning slots until none are left
+        while (slotsLeft > 0) {
+          // Figure out total weight of jobs that still need slots
+          double totalWeight = 0;
+          for (Iterator<JobInProgress> it = jobs.iterator(); it.hasNext();) {
+            JobInProgress job = it.next();
+            if (isRunnable(job) &&
+                runnableTasks(job, type) > minTasks(job, type)) {
+              totalWeight += weight(job, type);
+            } else {
+              it.remove();
+            }
+          }
+          if (totalWeight == 0) // No jobs that can use more slots are left 
+            break;
+          // Assign slots to jobs, using the floor of their weight divided by
+          // total weight. This ensures that all jobs get some chance to take
+          // a slot. Then, if no slots were assigned this way, we do another
+          // pass where we use ceil, in case some slots were still left over.
+          int oldSlots = slotsLeft; // Copy slotsLeft so we can modify it
+          for (JobInProgress job: jobs) {
+            double weight = weight(job, type);
+            int share = (int) Math.floor(oldSlots * weight / totalWeight);
+            slotsLeft = giveMinSlots(job, type, slotsLeft, share);
+          }
+          if (slotsLeft == oldSlots) {
+            // No tasks were assigned; do another pass using ceil, giving the
+            // extra slots to jobs in order of weight then deficit
+            List<JobInProgress> sortedJobs = new ArrayList<JobInProgress>(jobs);
+            Collections.sort(sortedJobs, new Comparator<JobInProgress>() {
+              public int compare(JobInProgress j1, JobInProgress j2) {
+                double dif = weight(j2, type) - weight(j1, type);
+                if (dif == 0) // Weights are equal, compare by deficit 
+                  dif = deficit(j2, type) - deficit(j1, type);
+                return (int) Math.signum(dif);
+              }
+            });
+            for (JobInProgress job: sortedJobs) {
+              double weight = weight(job, type);
+              int share = (int) Math.ceil(oldSlots * weight / totalWeight);
+              slotsLeft = giveMinSlots(job, type, slotsLeft, share);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Give up to <code>tasksToGive</code> min slots to a job (potentially fewer
+   * if either the job needs fewer slots or there aren't enough slots left).
+   * Returns the number of slots left over.
+   */
+  private int giveMinSlots(JobInProgress job, TaskType type,
+      int slotsLeft, int slotsToGive) {
+    int runnable = runnableTasks(job, type);
+    int curMin = minTasks(job, type);
+    slotsToGive = Math.min(Math.min(slotsLeft, runnable - curMin), slotsToGive);
+    slotsLeft -= slotsToGive;
+    JobInfo info = infos.get(job);
+    if (type == TaskType.MAP)
+      info.minMaps += slotsToGive;
+    else
+      info.minReduces += slotsToGive;
+    return slotsLeft;
+  }
+
+  private void updateFairShares() {
+    // Clear old fairShares
+    for (JobInfo info: infos.values()) {
+      info.mapFairShare = 0;
+      info.reduceFairShare = 0;
+    }
+    // Assign new shares, based on weight and minimum share. This is done
+    // as follows. First, we split up the available slots between all
+    // jobs according to weight. Then if there are any jobs whose minSlots is
+    // larger than their fair allocation, we give them their minSlots and
+    // remove them from the list, and start again with the amount of slots
+    // left over. This continues until all jobs' minSlots are less than their
+    // fair allocation, and at this point we know that we've met everyone's
+    // guarantee and we've split the excess capacity fairly among jobs left.
+    for (TaskType type: TaskType.values()) {
+      // Select only jobs that still need this type of task
+      HashSet<JobInfo> jobsLeft = new HashSet<JobInfo>();
+      for (Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
+        JobInProgress job = entry.getKey();
+        JobInfo info = entry.getValue();
+        if (isRunnable(job) && runnableTasks(job, type) > 0) {
+          jobsLeft.add(info);
+        }
+      }
+      double slotsLeft = getTotalSlots(type);
+      while (!jobsLeft.isEmpty()) {
+        double totalWeight = 0;
+        for (JobInfo info: jobsLeft) {
+          double weight = (type == TaskType.MAP ?
+              info.mapWeight : info.reduceWeight);
+          totalWeight += weight;
+        }
+        boolean recomputeSlots = false;
+        double oldSlots = slotsLeft; // Copy slotsLeft so we can modify it
+        for (Iterator<JobInfo> iter = jobsLeft.iterator(); iter.hasNext();) {
+          JobInfo info = iter.next();
+          double minSlots = (type == TaskType.MAP ?
+              info.minMaps : info.minReduces);
+          double weight = (type == TaskType.MAP ?
+              info.mapWeight : info.reduceWeight);
+          double fairShare = weight / totalWeight * oldSlots;
+          if (minSlots > fairShare) {
+            // Job needs more slots than its fair share; give it its minSlots,
+            // remove it from the list, and set recomputeSlots = true to 
+            // remember that we must loop again to redistribute unassigned slots
+            if (type == TaskType.MAP)
+              info.mapFairShare = minSlots;
+            else
+              info.reduceFairShare = minSlots;
+            slotsLeft -= minSlots;
+            iter.remove();
+            recomputeSlots = true;
+          }
+        }
+        if (!recomputeSlots) {
+          // All minimums are met. Give each job its fair share of excess slots.
+          for (JobInfo info: jobsLeft) {
+            double weight = (type == TaskType.MAP ?
+                info.mapWeight : info.reduceWeight);
+            double fairShare = weight / totalWeight * oldSlots;
+            if (type == TaskType.MAP)
+              info.mapFairShare = fairShare;
+            else
+              info.reduceFairShare = fairShare;
+          }
+          break;
+        }
+      }
+    }
+  }
+
+  private double calculateWeight(JobInProgress job, TaskType taskType) {
+    if (!isRunnable(job)) {
+      return 0;
+    } else {
+      double weight = 1.0;
+      if (sizeBasedWeight) {
+        // Set weight based on runnable tasks
+        weight = Math.log1p(runnableTasks(job, taskType)) / Math.log(2);
+      }
+      weight *= getPriorityFactor(job.getPriority());
+      if (weightAdjuster != null) {
+        // Run weight through the user-supplied weightAdjuster
+        weight = weightAdjuster.adjustWeight(job, taskType, weight);
+      }
+      return weight;
+    }
+  }
+
+  private double getPriorityFactor(JobPriority priority) {
+    switch (priority) {
+    case VERY_HIGH: return 4.0;
+    case HIGH:      return 2.0;
+    case NORMAL:    return 1.0;
+    case LOW:       return 0.5;
+    default:        return 0.25; // priority = VERY_LOW
+    }
+  }
+  
+  public PoolManager getPoolManager() {
+    return poolMgr;
+  }
+
+  public int getTotalSlots(TaskType type) {
+    int slots = 0;
+    for (TaskTrackerStatus tt: taskTrackerManager.taskTrackers()) {
+      slots += (type == TaskType.MAP ?
+          tt.getMaxMapTasks() : tt.getMaxReduceTasks());
+    }
+    return slots;
+  }
+
+  public boolean getUseFifo() {
+    return useFifo;
+  }
+  
+  public void setUseFifo(boolean useFifo) {
+    this.useFifo = useFifo;
+  }
+  
+  // Getter methods for reading JobInfo values based on TaskType, safely
+  // returning 0's for jobs with no JobInfo present.
+
+  protected int neededTasks(JobInProgress job, TaskType taskType) {
+    JobInfo info = infos.get(job);
+    if (info == null) return 0;
+    return taskType == TaskType.MAP ? info.neededMaps : info.neededReduces;
+  }
+  
+  protected int runningTasks(JobInProgress job, TaskType taskType) {
+    JobInfo info = infos.get(job);
+    if (info == null) return 0;
+    return taskType == TaskType.MAP ? info.runningMaps : info.runningReduces;
+  }
+
+  protected int runnableTasks(JobInProgress job, TaskType type) {
+    return neededTasks(job, type) + runningTasks(job, type);
+  }
+
+  protected int minTasks(JobInProgress job, TaskType type) {
+    JobInfo info = infos.get(job);
+    if (info == null) return 0;
+    return (type == TaskType.MAP) ? info.minMaps : info.minReduces;
+  }
+
+  protected double weight(JobInProgress job, TaskType taskType) {
+    JobInfo info = infos.get(job);
+    if (info == null) return 0;
+    return (taskType == TaskType.MAP ? info.mapWeight : info.reduceWeight);
+  }
+
+  protected double deficit(JobInProgress job, TaskType taskType) {
+    JobInfo info = infos.get(job);
+    if (info == null) return 0;
+    return taskType == TaskType.MAP ? info.mapDeficit : info.reduceDeficit;
+  }
+
+  protected boolean isRunnable(JobInProgress job) {
+    JobInfo info = infos.get(job);
+    if (info == null) return false;
+    return info.runnable;
+  }
+}

+ 299 - 0
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java

@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Servlet for displaying fair scheduler information, installed at
+ * [job tracker URL]/scheduler when the {@link FairScheduler} is in use.
+ * 
+ * The main features are viewing each job's task count and fair share, ability
+ * to change job priorities and pools from the UI, and ability to switch the
+ * scheduler to FIFO mode without restarting the JobTracker if this is required
+ * for any reason.
+ * 
+ * There is also an "advanced" view for debugging that can be turned on by
+ * going to [job tracker URL]/scheduler?advanced.
+ */
+public class FairSchedulerServlet extends HttpServlet {
+  private static final long serialVersionUID = 9104070533067306659L;
+  private static final DateFormat DATE_FORMAT = 
+    new SimpleDateFormat("MMM dd, HH:mm");
+  
+  private FairScheduler scheduler;
+  private JobTracker jobTracker;
+  private static long lastId = 0; // Used to generate unique element IDs
+
+  @Override
+  public void init() throws ServletException {
+    super.init();
+    ServletContext servletContext = this.getServletContext();
+    this.scheduler = (FairScheduler) servletContext.getAttribute("scheduler");
+    this.jobTracker = (JobTracker) scheduler.taskTrackerManager;
+  }
+  
+  @Override
+  protected void doPost(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException, IOException {
+    doGet(req, resp); // Same handler for both GET and POST
+  }
+  
+  @Override
+  public void doGet(HttpServletRequest request, HttpServletResponse response)
+    throws ServletException, IOException {
+    // If the request has a set* param, handle that and redirect to the regular
+    // view page so that the user won't resubmit the data if they hit refresh.
+    boolean advancedView = request.getParameter("advanced") != null;
+    if (request.getParameter("setFifo") != null) {
+      scheduler.setUseFifo(request.getParameter("setFifo").equals("true"));
+      response.sendRedirect("/scheduler" + (advancedView ? "?advanced" : ""));
+      return;
+    }
+    if (request.getParameter("setPool") != null) {
+      PoolManager poolMgr = scheduler.getPoolManager();
+      synchronized (poolMgr) {
+        String pool = request.getParameter("setPool");
+        String jobId = request.getParameter("jobid");
+        Collection<JobInProgress> runningJobs = jobTracker.runningJobs();
+        for (JobInProgress job: runningJobs) {
+          if (job.getProfile().getJobID().toString().equals(jobId)) {
+            poolMgr.setPool(job, pool);
+            scheduler.update();
+            break;
+          }
+        }
+      }
+      response.sendRedirect("/scheduler" + (advancedView ? "?advanced" : ""));
+      return;
+    }
+    if (request.getParameter("setPriority") != null) {
+      PoolManager poolMgr = scheduler.getPoolManager();
+      synchronized (poolMgr) {
+        JobPriority priority = JobPriority.valueOf(request.getParameter(
+            "setPriority"));
+        String jobId = request.getParameter("jobid");
+        Collection<JobInProgress> runningJobs = jobTracker.runningJobs();
+        for (JobInProgress job: runningJobs) {
+          if (job.getProfile().getJobID().toString().equals(jobId)) {
+            job.setPriority(priority);
+            scheduler.update();
+            break;
+          }
+        }
+      }
+      response.sendRedirect("/scheduler" + (advancedView ? "?advanced" : ""));
+      return;
+    }
+    // Print out the normal response
+    response.setContentType("text/html");
+    PrintWriter out = new PrintWriter(response.getOutputStream());
+    String hostname = StringUtils.simpleHostname(
+        jobTracker.getJobTrackerMachine());
+    out.print("<html><head>");
+    out.printf("<title>%s Job Scheduler Admininstration</title>\n", hostname);
+    out.printf("<META http-equiv=\"refresh\" content=\"15;URL=/scheduler%s\">",
+        advancedView ? "?advanced" : "");
+    out.print("<link rel=\"stylesheet\" type=\"text/css\" " + 
+        "href=\"/static/hadoop.css\">\n");
+    out.print("</head><body>\n");
+    out.printf("<h1><a href=\"/jobtracker.jsp\">%s</a> " + 
+        "Job Scheduler Administration</h1>\n", hostname);
+    showPools(out, advancedView);
+    showJobs(out, advancedView);
+    showAdminForm(out, advancedView);
+    out.print("</body></html>\n");
+    out.close();
+  }
+
+  /**
+   * Print a view of pools to the given output writer.
+   */
+  private void showPools(PrintWriter out, boolean advancedView) {
+    PoolManager poolManager = scheduler.getPoolManager();
+    synchronized(poolManager) {
+      out.print("<h2>Pools</h2>\n");
+      out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
+      out.print("<tr><th>Pool</th><th>Running Jobs</th>" + 
+          "<th>Min Maps</th><th>Min Reduces</th>" + 
+          "<th>Running Maps</th><th>Running Reduces</th></tr>\n");
+      List<Pool> pools = new ArrayList<Pool>(poolManager.getPools());
+      Collections.sort(pools, new Comparator<Pool>() {
+        public int compare(Pool p1, Pool p2) {
+          if (p1.isDefaultPool())
+            return 1;
+          else if (p2.isDefaultPool())
+            return -1;
+          else return p1.getName().compareTo(p2.getName());
+        }});
+      for (Pool pool: pools) {
+        int runningMaps = 0;
+        int runningReduces = 0;
+        for (JobInProgress job: pool.getJobs()) {
+          JobInfo info = scheduler.infos.get(job);
+          if (info != null) {
+            runningMaps += info.runningMaps;
+            runningReduces += info.runningReduces;
+          }
+        }
+        out.print("<tr>\n");
+        out.printf("<td>%s</td>\n", pool.getName());
+        out.printf("<td>%s</td>\n", pool.getJobs().size());
+        out.printf("<td>%s</td>\n", poolManager.getAllocation(pool.getName(),
+            TaskType.MAP));
+        out.printf("<td>%s</td>\n", poolManager.getAllocation(pool.getName(), 
+            TaskType.REDUCE));
+        out.printf("<td>%s</td>\n", runningMaps);
+        out.printf("<td>%s</td>\n", runningReduces);
+        out.print("</tr>\n");
+      }
+      out.print("</table>\n");
+    }
+  }
+
+  /**
+   * Print a view of running jobs to the given output writer.
+   */
+  private void showJobs(PrintWriter out, boolean advancedView) {
+    out.print("<h2>Running Jobs</h2>\n");
+    out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
+    int colsPerTaskType = advancedView ? 5 : 3;
+    out.printf("<tr><th rowspan=2>Submitted</th>" + 
+        "<th rowspan=2>JobID</th>" +
+        "<th rowspan=2>User</th>" +
+        "<th rowspan=2>Name</th>" +
+        "<th rowspan=2>Pool</th>" +
+        "<th rowspan=2>Priority</th>" +
+        "<th colspan=%d>Maps</th>" +
+        "<th colspan=%d>Reduces</th>",
+        colsPerTaskType, colsPerTaskType);
+    out.print("</tr><tr>\n");
+    out.print("<th>Finished</th><th>Running</th><th>Fair Share</th>" +
+        (advancedView ? "<th>Weight</th><th>Deficit</th>" : ""));
+    out.print("<th>Finished</th><th>Running</th><th>Fair Share</th>" +
+        (advancedView ? "<th>Weight</th><th>Deficit</th>" : ""));
+    out.print("</tr>\n");
+    Collection<JobInProgress> runningJobs = jobTracker.runningJobs();
+    for (JobInProgress job: runningJobs) {
+      JobProfile profile = job.getProfile();
+      JobInfo info = scheduler.infos.get(job);
+      if (info == null) { // Job finished, but let's show 0's for info
+        info = new JobInfo();
+      }
+      out.print("<tr>\n");
+      out.printf("<td>%s</td>\n", DATE_FORMAT.format(
+          new Date(job.getStartTime())));
+      out.printf("<td><a href=\"jobdetails.jsp?jobid=%s\">%s</a></td>",
+          profile.getJobID(), profile.getJobID());
+      out.printf("<td>%s</td>\n", profile.getUser());
+      out.printf("<td>%s</td>\n", profile.getJobName());
+      out.printf("<td>%s</td>\n", generateSelect(
+          scheduler.getPoolManager().getPoolNames(),
+          scheduler.getPoolManager().getPoolName(job),
+          "/scheduler?setPool=<CHOICE>&jobid=" + profile.getJobID() +
+          (advancedView ? "&advanced" : "")));
+      out.printf("<td>%s</td>\n", generateSelect(
+          Arrays.asList(new String[]
+              {"VERY_LOW", "LOW", "NORMAL", "HIGH", "VERY_HIGH"}),
+          job.getPriority().toString(),
+          "/scheduler?setPriority=<CHOICE>&jobid=" + profile.getJobID() +
+          (advancedView ? "&advanced" : "")));
+      out.printf("<td>%d / %d</td><td>%d</td><td>%8.1f</td>\n",
+          job.finishedMaps(), job.desiredMaps(), info.runningMaps,
+          info.mapFairShare);
+      if (advancedView) {
+        out.printf("<td>%8.1f</td>\n", info.mapWeight);
+        out.printf("<td>%s</td>\n", info.neededMaps > 0 ?
+            (info.mapDeficit / 1000) + "s" : "--");
+      }
+      out.printf("<td>%d / %d</td><td>%d</td><td>%8.1f</td>\n",
+          job.finishedReduces(), job.desiredReduces(), info.runningReduces,
+          info.reduceFairShare);
+      if (advancedView) {
+        out.printf("<td>%8.1f</td>\n", info.reduceWeight);
+        out.printf("<td>%s</td>\n", info.neededReduces > 0 ?
+            (info.reduceDeficit / 1000) + "s" : "--");
+      }
+      out.print("</tr>\n");
+    }
+    out.print("</table>\n");
+  }
+
+  /**
+   * Generate a HTML select control with a given list of choices and a given
+   * option selected. When the selection is changed, take the user to the
+   * <code>submitUrl</code>. The <code>submitUrl</code> can be made to include
+   * the option selected -- the first occurrence of the substring
+   * <code>&lt;CHOICE&gt;</code> will be replaced by the option chosen.
+   */
+  private String generateSelect(Iterable<String> choices, 
+      String selectedChoice, String submitUrl) {
+    StringBuilder html = new StringBuilder();
+    String id = "select" + lastId++;
+    html.append("<select id=\"" + id + "\" name=\"" + id + "\" " + 
+        "onchange=\"window.location = '" + submitUrl + 
+        "'.replace('<CHOICE>', document.getElementById('" + id +
+        "').value);\">\n");
+    for (String choice: choices) {
+      html.append(String.format("<option value=\"%s\"%s>%s</option>\n",
+          choice, (choice.equals(selectedChoice) ? " selected" : ""), choice));
+    }
+    html.append("</select>\n");
+    return html.toString();
+  }
+
+  /**
+   * Print the administration form at the bottom of the page, which currently
+   * only includes the button for switching between FIFO and Fair Scheduling.
+   */
+  private void showAdminForm(PrintWriter out, boolean advancedView) {
+    out.print("<h2>Scheduling Mode</h2>\n");
+    String curMode = scheduler.getUseFifo() ? "FIFO" : "Fair Sharing";
+    String otherMode = scheduler.getUseFifo() ? "Fair Sharing" : "FIFO";
+    String advParam = advancedView ? "?advanced" : "";
+    out.printf("<form method=\"post\" action=\"/scheduler%s\">\n", advParam);
+    out.printf("<p>The scheduler is currently using <b>%s mode</b>. " +
+        "<input type=\"submit\" value=\"Switch to %s mode.\" " + 
+        "onclick=\"return confirm('Are you sure you want to change " +
+        "scheduling mode to %s?')\" />\n",
+        curMode, otherMode, otherMode);
+    out.printf("<input type=\"hidden\" name=\"setFifo\" value=\"%s\" />",
+        !scheduler.getUseFifo());
+    out.print("</form>\n");
+  }
+}

+ 42 - 0
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.Comparator;
+
+/**
+ * Order {@link JobInProgress} objects by priority and then by submit time, as
+ * in the default scheduler in Hadoop.
+ */
+public class FifoJobComparator implements Comparator<JobInProgress> {
+  public int compare(JobInProgress j1, JobInProgress j2) {
+    int res = j1.getPriority().compareTo(j2.getPriority());
+    if (res == 0) {
+      if (j1.getStartTime() < j2.getStartTime()) {
+        res = -1;
+      } else {
+        res = (j1.getStartTime() == j2.getStartTime() ? 0 : 1);
+      }
+    }
+    if (res == 0) {
+      res = j1.hashCode() - j2.hashCode();
+    }
+    return res;
+  }
+}

+ 79 - 0
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java

@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A pluggable object that manages the load on each {@link TaskTracker}, telling
+ * the {@link TaskScheduler} when it can launch new tasks. 
+ */
+public abstract class LoadManager implements Configurable {
+  protected Configuration conf;
+  protected TaskTrackerManager taskTrackerManager;
+  
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public synchronized void setTaskTrackerManager(
+      TaskTrackerManager taskTrackerManager) {
+    this.taskTrackerManager = taskTrackerManager;
+  }
+  
+  /**
+   * Lifecycle method to allow the LoadManager to start any work in separate
+   * threads.
+   */
+  public void start() throws IOException {
+    // do nothing
+  }
+  
+  /**
+   * Lifecycle method to allow the LoadManager to stop any work it is doing.
+   */
+  public void terminate() throws IOException {
+    // do nothing
+  }
+  
+  /**
+   * Can a given {@link TaskTracker} run another map task?
+   * @param tracker The machine we wish to run a new map on
+   * @param totalRunnableMaps Set of running jobs in the cluster
+   * @return true if another map can be launched on <code>tracker</code>
+   */
+  public abstract boolean canAssignMap(TaskTrackerStatus tracker,
+      int totalRunnableMaps);
+
+  /**
+   * Can a given {@link TaskTracker} run another reduce task?
+   * @param tracker The machine we wish to run a new map on
+   * @param totalReducesNeeded Set of running jobs in the cluster
+   * @return true if another reduce can be launched on <code>tracker</code>
+   */
+  public abstract boolean canAssignReduce(TaskTrackerStatus tracker,
+      int totalRunnableReduces);
+}

+ 57 - 0
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java

@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+
+/**
+ * A {@link WeightAdjuster} implementation that gives a weight boost to new jobs
+ * for a certain amount of time -- by default, a 3x weight boost for 60 seconds.
+ * This can be used to make shorter jobs finish faster, emulating Shortest Job
+ * First scheduling while not starving long jobs. 
+ */
+public class NewJobWeightBooster extends Configured implements WeightAdjuster {
+  private static final float DEFAULT_FACTOR = 3;
+  private static final long DEFAULT_DURATION = 5 * 60 * 1000;
+
+  private float factor;
+  private long duration;
+
+  public void setConf(Configuration conf) {
+    if (conf != null) {
+      factor = conf.getFloat("mapred.newjobweightbooster.factor",
+          DEFAULT_FACTOR);
+      duration = conf.getLong("mapred.newjobweightbooster.duration",
+          DEFAULT_DURATION);
+    }
+    super.setConf(conf);
+  }
+  
+  public double adjustWeight(JobInProgress job, TaskType taskType,
+      double curWeight) {
+    long start = job.getStartTime();
+    long now = System.currentTimeMillis();
+    if (now - start < duration) {
+      return curWeight * factor;
+    } else {
+      return curWeight;
+    }
+  }
+}

+ 60 - 0
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java

@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * A schedulable pool of jobs.
+ */
+public class Pool {
+  /** Name of the default pool, where jobs with no pool parameter go. */
+  public static final String DEFAULT_POOL_NAME = "default";
+  
+  /** Pool name. */
+  private String name;
+  
+  /** Jobs in this specific pool; does not include children pools' jobs. */
+  private Collection<JobInProgress> jobs = new ArrayList<JobInProgress>();
+
+  public Pool(String name) {
+    this.name = name;
+  }
+  
+  public Collection<JobInProgress> getJobs() {
+    return jobs;
+  }
+  
+  public void addJob(JobInProgress job) {
+    jobs.add(job);
+  }
+  
+  public void removeJob(JobInProgress job) {
+    jobs.remove(job);
+  }
+  
+  public String getName() {
+    return name;
+  }
+
+  public boolean isDefaultPool() {
+    return Pool.DEFAULT_POOL_NAME.equals(name);
+  }
+}

+ 324 - 0
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java

@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+import org.xml.sax.SAXException;
+
+/**
+ * Maintains a hierarchy of pools.
+ */
+public class PoolManager {
+  public static final Log LOG = LogFactory.getLog(
+    "org.apache.hadoop.mapred.PoolManager");
+
+  /** Time to wait between checks of the allocation file */
+  public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000;
+  
+  /**
+   * Time to wait after the allocation has been modified before reloading it
+   * (this is done to prevent loading a file that hasn't been fully written).
+   */
+  public static final long ALLOC_RELOAD_WAIT = 5 * 1000; 
+  
+  // Map and reduce minimum allocations for each pool
+  private Map<String, Integer> mapAllocs = new HashMap<String, Integer>();
+  private Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
+  
+  // Max concurrent running jobs for each pool and for each user; in addition,
+  // for users that have no max specified, we use the userMaxJobsDefault.
+  private Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
+  private Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
+  private int userMaxJobsDefault = Integer.MAX_VALUE;
+
+  private String allocFile; // Path to XML file containing allocations
+  private String poolNameProperty; // Jobconf property to use for determining a
+                                   // job's pool name (default: queue.name)
+  
+  private Map<String, Pool> pools = new HashMap<String, Pool>();
+  
+  private long lastReloadAttempt; // Last time we tried to reload the pools file
+  private long lastSuccessfulReload; // Last time we successfully reloaded pools
+  private boolean lastReloadAttemptFailed = false;
+
+  public PoolManager(Configuration conf) throws IOException, SAXException,
+      AllocationConfigurationException, ParserConfigurationException {
+    this.poolNameProperty = conf.get(
+        "mapred.fairscheduler.poolnameproperty", "queue.name");
+    this.allocFile = conf.get("mapred.fairscheduler.allocation.file");
+    if (allocFile == null) {
+      LOG.warn("No mapred.fairscheduler.allocation.file given in jobconf - " +
+          "the fair scheduler will not use any queues.");
+    }
+    reloadAllocs();
+    lastSuccessfulReload = System.currentTimeMillis();
+    lastReloadAttempt = System.currentTimeMillis();
+    // Create the default pool so that it shows up in the web UI
+    getPool(Pool.DEFAULT_POOL_NAME);
+  }
+  
+  /**
+   * Get a pool by name, creating it if necessary
+   */
+  public synchronized Pool getPool(String name) {
+    Pool pool = pools.get(name);
+    if (pool == null) {
+      pool = new Pool(name);
+      pools.put(name, pool);
+    }
+    return pool;
+  }
+
+  /**
+   * Reload allocations file if it hasn't been loaded in a while
+   */
+  public void reloadAllocsIfNecessary() {
+    long time = System.currentTimeMillis();
+    if (time > lastReloadAttempt + ALLOC_RELOAD_INTERVAL) {
+      lastReloadAttempt = time;
+      try {
+        File file = new File(allocFile);
+        long lastModified = file.lastModified();
+        if (lastModified > lastSuccessfulReload &&
+            time > lastModified + ALLOC_RELOAD_WAIT) {
+          reloadAllocs();
+          lastSuccessfulReload = time;
+          lastReloadAttemptFailed = false;
+        }
+      } catch (Exception e) {
+        // Throwing the error further out here won't help - the RPC thread
+        // will catch it and report it in a loop. Instead, just log it and
+        // hope somebody will notice from the log.
+        // We log the error only on the first failure so we don't fill up the
+        // JobTracker's log with these messages.
+        if (!lastReloadAttemptFailed) {
+          LOG.error("Failed to reload allocations file - " +
+              "will use existing allocations.", e);
+        }
+        lastReloadAttemptFailed = true;
+      }
+    }
+  }
+  
+  /**
+   * Updates the allocation list from the allocation config file. This file is
+   * expected to be in the following whitespace-separated format:
+   * 
+   * <code>
+   * poolName1 mapAlloc reduceAlloc
+   * poolName2 mapAlloc reduceAlloc
+   * ...
+   * </code>
+   * 
+   * Blank lines and lines starting with # are ignored.
+   *  
+   * @throws IOException if the config file cannot be read.
+   * @throws AllocationConfigurationException if allocations are invalid.
+   * @throws ParserConfigurationException if XML parser is misconfigured.
+   * @throws SAXException if config file is malformed.
+   */
+  public void reloadAllocs() throws IOException, ParserConfigurationException, 
+      SAXException, AllocationConfigurationException {
+    if (allocFile == null) return;
+    // Create some temporary hashmaps to hold the new allocs, and we only save
+    // them in our fields if we have parsed the entire allocs file successfully.
+    Map<String, Integer> mapAllocs = new HashMap<String, Integer>();
+    Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
+    Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
+    Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
+    int userMaxJobsDefault = Integer.MAX_VALUE;
+    
+    // Remember all pool names so we can display them on web UI, etc.
+    List<String> poolNamesInAllocFile = new ArrayList<String>();
+    
+    // Read and parse the allocations file.
+    DocumentBuilderFactory docBuilderFactory =
+      DocumentBuilderFactory.newInstance();
+    docBuilderFactory.setIgnoringComments(true);
+    DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+    Document doc = builder.parse(new File(allocFile));
+    Element root = doc.getDocumentElement();
+    if (!"allocations".equals(root.getTagName()))
+      throw new AllocationConfigurationException("Bad allocations file: " + 
+          "top-level element not <allocations>");
+    NodeList elements = root.getChildNodes();
+    for (int i = 0; i < elements.getLength(); i++) {
+      Node node = elements.item(i);
+      if (!(node instanceof Element))
+        continue;
+      Element element = (Element)node;
+      if ("pool".equals(element.getTagName())) {
+        String poolName = element.getAttribute("name");
+        poolNamesInAllocFile.add(poolName);
+        NodeList fields = element.getChildNodes();
+        for (int j = 0; j < fields.getLength(); j++) {
+          Node fieldNode = fields.item(j);
+          if (!(fieldNode instanceof Element))
+            continue;
+          Element field = (Element) fieldNode;
+          if ("minMaps".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            int val = Integer.parseInt(text);
+            mapAllocs.put(poolName, val);
+          } else if ("minReduces".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            int val = Integer.parseInt(text);
+            reduceAllocs.put(poolName, val);
+          } else if ("maxRunningJobs".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            int val = Integer.parseInt(text);
+            poolMaxJobs.put(poolName, val);
+          }
+        }
+      } else if ("user".equals(element.getTagName())) {
+        String userName = element.getAttribute("name");
+        NodeList fields = element.getChildNodes();
+        for (int j = 0; j < fields.getLength(); j++) {
+          Node fieldNode = fields.item(j);
+          if (!(fieldNode instanceof Element))
+            continue;
+          Element field = (Element) fieldNode;
+          if ("maxRunningJobs".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            int val = Integer.parseInt(text);
+            userMaxJobs.put(userName, val);
+          }
+        }
+      } else if ("userMaxJobsDefault".equals(element.getTagName())) {
+        String text = ((Text)element.getFirstChild()).getData().trim();
+        int val = Integer.parseInt(text);
+        userMaxJobsDefault = val;
+      } else {
+        LOG.warn("Bad element in allocations file: " + element.getTagName());
+      }
+    }
+    
+    // Commit the reload; also create any pool defined in the alloc file
+    // if it does not already exist, so it can be displayed on the web UI.
+    synchronized(this) {
+      this.mapAllocs = mapAllocs;
+      this.reduceAllocs = reduceAllocs;
+      this.poolMaxJobs = poolMaxJobs;
+      this.userMaxJobs = userMaxJobs;
+      this.userMaxJobsDefault = userMaxJobsDefault;
+      for (String name: poolNamesInAllocFile) {
+        getPool(name);
+      }
+    }
+  }
+
+  /**
+   * Get the allocation for a particular pool
+   */
+  public int getAllocation(String pool, TaskType taskType) {
+    Map<String, Integer> allocationMap = (taskType == TaskType.MAP ?
+        mapAllocs : reduceAllocs);
+    Integer alloc = allocationMap.get(pool);
+    return (alloc == null ? 0 : alloc);
+  }
+  
+  /**
+   * Add a job in the appropriate pool
+   */
+  public synchronized void addJob(JobInProgress job) {
+    getPool(getPoolName(job)).addJob(job);
+  }
+  
+  /**
+   * Remove a job
+   */
+  public synchronized void removeJob(JobInProgress job) {
+    getPool(getPoolName(job)).removeJob(job);
+  }
+  
+  /**
+   * Change the pool of a particular job
+   */
+  public synchronized void setPool(JobInProgress job, String pool) {
+    removeJob(job);
+    job.getJobConf().set(poolNameProperty, pool);
+    addJob(job);
+  }
+
+  /**
+   * Get a collection of all pools
+   */
+  public synchronized Collection<Pool> getPools() {
+    return pools.values();
+  }
+  
+  /**
+   * Get the pool name for a JobInProgress from its configuration. This uses
+   * the "project" property in the jobconf by default, or the property set with
+   * "mapred.fairscheduler.poolnameproperty".
+   */
+  public String getPoolName(JobInProgress job) {
+    JobConf conf = job.getJobConf();
+    return conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME).trim();
+  }
+
+  /**
+   * Get all pool names that have been seen either in the allocation file or in
+   * a MapReduce job.
+   */
+  public synchronized Collection<String> getPoolNames() {
+    List<String> list = new ArrayList<String>();
+    for (Pool pool: getPools()) {
+      list.add(pool.getName());
+    }
+    Collections.sort(list);
+    return list;
+  }
+
+  public int getUserMaxJobs(String user) {
+    if (userMaxJobs.containsKey(user)) {
+      return userMaxJobs.get(user);
+    } else {
+      return userMaxJobsDefault;
+    }
+  }
+
+  public int getPoolMaxJobs(String pool) {
+    if (poolMaxJobs.containsKey(pool)) {
+      return poolMaxJobs.get(pool);
+    } else {
+      return Integer.MAX_VALUE;
+    }
+  }
+}

+ 101 - 0
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java

@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A pluggable object for selecting tasks to run from a {@link JobInProgress} on
+ * a given {@link TaskTracker}, for use by the {@link TaskScheduler}. The
+ * <code>TaskSelector</code> is in charge of managing both locality and
+ * speculative execution. For the latter purpose, it must also provide counts of
+ * how many tasks each speculative job needs to launch, so that the scheduler
+ * can take this into account in its calculations.
+ */
+public abstract class TaskSelector implements Configurable {
+  protected Configuration conf;
+  protected TaskTrackerManager taskTrackerManager;
+  
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public synchronized void setTaskTrackerManager(
+      TaskTrackerManager taskTrackerManager) {
+    this.taskTrackerManager = taskTrackerManager;
+  }
+  
+  /**
+   * Lifecycle method to allow the TaskSelector to start any work in separate
+   * threads.
+   */
+  public void start() throws IOException {
+    // do nothing
+  }
+  
+  /**
+   * Lifecycle method to allow the TaskSelector to stop any work it is doing.
+   */
+  public void terminate() throws IOException {
+    // do nothing
+  }
+  
+  /**
+   * How many speculative map tasks does the given job want to launch?
+   * @param job The job to count speculative maps for
+   * @return Number of speculative maps that can be launched for job
+   */
+  public abstract int neededSpeculativeMaps(JobInProgress job);
+
+  /**
+   * How many speculative reduce tasks does the given job want to launch?
+   * @param job The job to count speculative reduces for
+   * @return Number of speculative reduces that can be launched for job
+   */
+  public abstract int neededSpeculativeReduces(JobInProgress job);
+  
+  /**
+   * Choose a map task to run from the given job on the given TaskTracker.
+   * @param taskTracker {@link TaskTrackerStatus} of machine to run on
+   * @param job Job to select a task for
+   * @return A {@link Task} to run on the machine, or <code>null</code> if
+   *         no map should be launched from this job on the task tracker.
+   * @throws IOException 
+   */
+  public abstract Task obtainNewMapTask(TaskTrackerStatus taskTracker,
+      JobInProgress job) throws IOException;
+
+  /**
+   * Choose a reduce task to run from the given job on the given TaskTracker.
+   * @param taskTracker {@link TaskTrackerStatus} of machine to run on
+   * @param job Job to select a task for
+   * @return A {@link Task} to run on the machine, or <code>null</code> if
+   *         no reduce should be launched from this job on the task tracker.
+   * @throws IOException 
+   */
+  public abstract Task obtainNewReduceTask(TaskTrackerStatus taskTracker,
+      JobInProgress job) throws IOException;
+}

+ 26 - 0
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskType.java

@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * Utility enum for map and reduce task types.
+ */
+public enum TaskType {
+  MAP, REDUCE
+}

+ 33 - 0
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/WeightAdjuster.java

@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configurable;
+
+/**
+ * A pluggable object for altering the weights of jobs in the fair scheduler,
+ * which is used for example by {@link NewJobWeightBooster} to give higher
+ * weight to new jobs so that short jobs finish faster.
+ * 
+ * May implement {@link Configurable} to access configuration parameters.
+ */
+public interface WeightAdjuster {
+  public double adjustWeight(JobInProgress job, TaskType taskType,
+      double curWeight);
+}

+ 1064 - 0
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

@@ -0,0 +1,1064 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+
+public class TestFairScheduler extends TestCase {
+  final static String TEST_DIR = new File(".").getAbsolutePath();
+  final static String ALLOC_FILE = new File("./test-pools").getAbsolutePath();
+  private static final String POOL_PROPERTY = "pool";
+  
+  private static int jobCounter;
+  private static int taskCounter;
+  
+  static class FakeJobInProgress extends JobInProgress {
+    
+    private FakeTaskTrackerManager taskTrackerManager;
+    
+    public FakeJobInProgress(JobConf jobConf,
+        FakeTaskTrackerManager taskTrackerManager) throws IOException {
+      super(new JobID("test", ++jobCounter), jobConf);
+      this.taskTrackerManager = taskTrackerManager;
+      this.startTime = System.currentTimeMillis();
+      this.status = new JobStatus();
+      this.status.setRunState(JobStatus.PREP);
+    }
+    
+    @Override
+    public synchronized void initTasks() throws IOException {
+      // do nothing
+    }
+
+    @Override
+    public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
+        int ignored) throws IOException {
+      TaskAttemptID attemptId = getTaskAttemptID(true);
+      Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
+        @Override
+        public String toString() {
+          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+        }
+      };
+      taskTrackerManager.startTask(tts.getTrackerName(), task);
+      runningMapTasks++;
+      return task;
+    }
+    
+    @Override
+    public Task obtainNewReduceTask(final TaskTrackerStatus tts,
+        int clusterSize, int ignored) throws IOException {
+      TaskAttemptID attemptId = getTaskAttemptID(false);
+      Task task = new ReduceTask("", attemptId, 0, 10) {
+        @Override
+        public String toString() {
+          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+        }
+      };
+      taskTrackerManager.startTask(tts.getTrackerName(), task);
+      runningReduceTasks++;
+      return task;
+    }
+    
+    private TaskAttemptID getTaskAttemptID(boolean isMap) {
+      JobID jobId = getJobID();
+      return new TaskAttemptID(jobId.getJtIdentifier(),
+          jobId.getId(), isMap, ++taskCounter, 0);
+    }
+  }
+  
+  static class FakeTaskTrackerManager implements TaskTrackerManager {
+    int maps = 0;
+    int reduces = 0;
+    int maxMapTasksPerTracker = 2;
+    int maxReduceTasksPerTracker = 2;
+    List<JobInProgressListener> listeners =
+      new ArrayList<JobInProgressListener>();
+    
+    private Map<String, TaskTrackerStatus> trackers =
+      new HashMap<String, TaskTrackerStatus>();
+    private Map<String, TaskStatus> taskStatuses = 
+      new HashMap<String, TaskStatus>();
+
+    public FakeTaskTrackerManager() {
+      trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
+          new ArrayList<TaskStatus>(), 0,
+          maxMapTasksPerTracker, maxReduceTasksPerTracker));
+      trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2,
+          new ArrayList<TaskStatus>(), 0,
+          maxMapTasksPerTracker, maxReduceTasksPerTracker));
+    }
+    
+    @Override
+    public ClusterStatus getClusterStatus() {
+      int numTrackers = trackers.size();
+      return new ClusterStatus(numTrackers, maps, reduces,
+          numTrackers * maxMapTasksPerTracker,
+          numTrackers * maxReduceTasksPerTracker,
+          JobTracker.State.RUNNING);
+    }
+
+    @Override
+    public int getNumberOfUniqueHosts() {
+      return 0;
+    }
+
+    @Override
+    public Collection<TaskTrackerStatus> taskTrackers() {
+      return trackers.values();
+    }
+
+
+    @Override
+    public void addJobInProgressListener(JobInProgressListener listener) {
+      listeners.add(listener);
+    }
+
+    @Override
+    public void removeJobInProgressListener(JobInProgressListener listener) {
+      listeners.remove(listener);
+    }
+    
+    // Test methods
+    
+    public void submitJob(JobInProgress job) {
+      for (JobInProgressListener listener : listeners) {
+        listener.jobAdded(job);
+      }
+    }
+    
+    public TaskTrackerStatus getTaskTracker(String trackerID) {
+      return trackers.get(trackerID);
+    }
+    
+    public void startTask(String taskTrackerName, final Task t) {
+      if (t.isMapTask()) {
+        maps++;
+      } else {
+        reduces++;
+      }
+      TaskStatus status = new TaskStatus() {
+        @Override
+        public boolean getIsMap() {
+          return t.isMapTask();
+        }
+      };
+      taskStatuses.put(t.getTaskID().toString(), status);
+      status.setRunState(TaskStatus.State.RUNNING);
+      trackers.get(taskTrackerName).getTaskReports().add(status);
+    }
+    
+    public void finishTask(String taskTrackerName, String tipId) {
+      TaskStatus status = taskStatuses.get(tipId);
+      if (status.getIsMap()) {
+        maps--;
+      } else {
+        reduces--;
+      }
+      status.setRunState(TaskStatus.State.SUCCEEDED);
+    }
+  }
+  
+  protected class FakeClock extends FairScheduler.Clock {
+    private long time = 0;
+    
+    public void advance(long millis) {
+      time += millis;
+    }
+
+    @Override
+    long getTime() {
+      return time;
+    }
+  }
+  
+  protected JobConf conf;
+  protected FairScheduler scheduler;
+  private FakeTaskTrackerManager taskTrackerManager;
+  private FakeClock clock;
+
+  @Override
+  protected void setUp() throws Exception {
+    jobCounter = 0;
+    taskCounter = 0;
+    new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+    // Create an empty pools file (so we can add/remove pools later)
+    FileWriter fileWriter = new FileWriter(ALLOC_FILE);
+    fileWriter.write("<?xml version=\"1.0\"?>\n");
+    fileWriter.write("<allocations />\n");
+    fileWriter.close();
+    conf = new JobConf();
+    conf.set("mapred.fairscheduler.allocation.file", ALLOC_FILE);
+    conf.set("mapred.fairscheduler.poolnameproperty", POOL_PROPERTY);
+    taskTrackerManager = new FakeTaskTrackerManager();
+    clock = new FakeClock();
+    scheduler = new FairScheduler(clock, false);
+    scheduler.setConf(conf);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    scheduler.start();
+  }
+  
+  @Override
+  protected void tearDown() throws Exception {
+    if (scheduler != null) {
+      scheduler.terminate();
+    }
+  }
+  
+  private JobInProgress submitJob(int state, int maps, int reduces)
+      throws IOException {
+    return submitJob(state, maps, reduces, null);
+  }
+  
+  private JobInProgress submitJob(int state, int maps, int reduces, String pool)
+      throws IOException {
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setNumMapTasks(maps);
+    jobConf.setNumReduceTasks(reduces);
+    if (pool != null)
+      jobConf.set(POOL_PROPERTY, pool);
+    JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager);
+    job.getStatus().setRunState(state);
+    taskTrackerManager.submitJob(job);
+    job.startTime = clock.time;
+    return job;
+  }
+  
+  protected void submitJobs(int number, int state, int maps, int reduces)
+    throws IOException {
+    for (int i = 0; i < number; i++) {
+      submitJob(state, maps, reduces);
+    }
+  }
+
+  public void testAllocationFileParsing() throws Exception {
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>"); 
+    // Give pool A a minimum of 1 map, 2 reduces
+    out.println("<pool name=\"poolA\">");
+    out.println("<minMaps>1</minMaps>");
+    out.println("<minReduces>2</minReduces>");
+    out.println("</pool>");
+    // Give pool B a minimum of 2 maps, 1 reduce
+    out.println("<pool name=\"poolB\">");
+    out.println("<minMaps>2</minMaps>");
+    out.println("<minReduces>1</minReduces>");
+    out.println("</pool>");
+    // Give pool C min maps but no min reduces
+    out.println("<pool name=\"poolC\">");
+    out.println("<minMaps>2</minMaps>");
+    out.println("</pool>");
+    // Give pool D a limit of 3 running jobs
+    out.println("<pool name=\"poolD\">");
+    out.println("<maxRunningJobs>3</maxRunningJobs>");
+    out.println("</pool>");
+    // Set default limit of jobs per user to 5
+    out.println("<userMaxJobsDefault>5</userMaxJobsDefault>");
+    // Give user1 a limit of 10 jobs
+    out.println("<user name=\"user1\">");
+    out.println("<maxRunningJobs>10</maxRunningJobs>");
+    out.println("</user>");
+    out.println("</allocations>"); 
+    out.close();
+    
+    PoolManager poolManager = scheduler.getPoolManager();
+    poolManager.reloadAllocs();
+    
+    assertEquals(5, poolManager.getPools().size()); // 4 in file + default pool
+    assertEquals(0, poolManager.getAllocation(Pool.DEFAULT_POOL_NAME,
+        TaskType.MAP));
+    assertEquals(0, poolManager.getAllocation(Pool.DEFAULT_POOL_NAME,
+        TaskType.REDUCE));
+    assertEquals(1, poolManager.getAllocation("poolA", TaskType.MAP));
+    assertEquals(2, poolManager.getAllocation("poolA", TaskType.REDUCE));
+    assertEquals(2, poolManager.getAllocation("poolB", TaskType.MAP));
+    assertEquals(1, poolManager.getAllocation("poolB", TaskType.REDUCE));
+    assertEquals(2, poolManager.getAllocation("poolC", TaskType.MAP));
+    assertEquals(0, poolManager.getAllocation("poolC", TaskType.REDUCE));
+    assertEquals(0, poolManager.getAllocation("poolD", TaskType.MAP));
+    assertEquals(0, poolManager.getAllocation("poolD", TaskType.REDUCE));
+    assertEquals(Integer.MAX_VALUE, poolManager.getPoolMaxJobs("poolA"));
+    assertEquals(3, poolManager.getPoolMaxJobs("poolD"));
+    assertEquals(10, poolManager.getUserMaxJobs("user1"));
+    assertEquals(5, poolManager.getUserMaxJobs("user2"));
+  }
+  
+  public void testTaskNotAssignedWhenNoJobsArePresent() throws IOException {
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+  }
+
+  public void testNonRunningJobsAreIgnored() throws IOException {
+    submitJobs(1, JobStatus.PREP, 10, 10);
+    submitJobs(1, JobStatus.SUCCEEDED, 10, 10);
+    submitJobs(1, JobStatus.FAILED, 10, 10);
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    advanceTime(100); // Check that we still don't assign jobs after an update
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+  }
+
+  /**
+   * This test contains two jobs with fewer required tasks than there are slots.
+   * We check that all tasks are assigned, but job 1 gets them first because it
+   * was submitted earlier.
+   */
+  public void testSmallJobs() throws IOException {
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1);
+    JobInfo info1 = scheduler.infos.get(job1);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.runningMaps);
+    assertEquals(0,    info1.runningReduces);
+    assertEquals(2,    info1.neededMaps);
+    assertEquals(1,    info1.neededReduces);
+    assertEquals(0,    info1.mapDeficit);
+    assertEquals(0,    info1.reduceDeficit);
+    assertEquals(4.0,  info1.mapFairShare);
+    assertEquals(4.0,  info1.reduceFairShare);
+    
+    // Advance time before submitting another job j2, to make j1 run before j2
+    // deterministically.
+    advanceTime(100);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2);
+    JobInfo info2 = scheduler.infos.get(job2);
+    
+    // Check scheduler variables; the fair shares should now have been allocated
+    // equally between j1 and j2, but j1 should have (4 slots)*(100 ms) deficit
+    assertEquals(0,    info1.runningMaps);
+    assertEquals(0,    info1.runningReduces);
+    assertEquals(2,    info1.neededMaps);
+    assertEquals(1,    info1.neededReduces);
+    assertEquals(400,  info1.mapDeficit);
+    assertEquals(400,  info1.reduceDeficit);
+    assertEquals(2.0,  info1.mapFairShare);
+    assertEquals(2.0,  info1.reduceFairShare);
+    assertEquals(0,    info2.runningMaps);
+    assertEquals(0,    info2.runningReduces);
+    assertEquals(1,    info2.neededMaps);
+    assertEquals(2,    info2.neededReduces);
+    assertEquals(0,    info2.mapDeficit);
+    assertEquals(0,    info2.reduceDeficit);
+    assertEquals(2.0,  info2.mapFairShare);
+    assertEquals(2.0,  info2.reduceFairShare);
+    
+    // Assign tasks and check that all slots are filled with j1, then j2
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000006_0 on tt2");
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+    
+    // Check that the scheduler has started counting the tasks as running
+    // as soon as it launched them.
+    assertEquals(2,  info1.runningMaps);
+    assertEquals(1,  info1.runningReduces);
+    assertEquals(0,  info1.neededMaps);
+    assertEquals(0,  info1.neededReduces);
+    assertEquals(1,  info2.runningMaps);
+    assertEquals(2,  info2.runningReduces);
+    assertEquals(0, info2.neededMaps);
+    assertEquals(0, info2.neededReduces);
+  }
+  
+  /**
+   * This test begins by submitting two jobs with 10 maps and reduces each.
+   * The first job is submitted 100ms after the second, during which time no
+   * tasks run. After this, we assign tasks to all slots, which should all be
+   * from job 1. These run for 200ms, at which point job 2 now has a deficit
+   * of 400 while job 1 is down to a deficit of 0. We then finish all tasks and
+   * assign new ones, which should all be from job 2. These run for 50 ms,
+   * which is not enough time for job 2 to make up its deficit (it only makes up
+   * 100 ms of deficit). Finally we assign a new round of tasks, which should
+   * all be from job 2 again.
+   */
+  public void testLargeJobs() throws IOException {
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info1 = scheduler.infos.get(job1);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.runningMaps);
+    assertEquals(0,    info1.runningReduces);
+    assertEquals(10,   info1.neededMaps);
+    assertEquals(10,   info1.neededReduces);
+    assertEquals(0,    info1.mapDeficit);
+    assertEquals(0,    info1.reduceDeficit);
+    assertEquals(4.0,  info1.mapFairShare);
+    assertEquals(4.0,  info1.reduceFairShare);
+    
+    // Advance time before submitting another job j2, to make j1 run before j2
+    // deterministically.
+    advanceTime(100);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info2 = scheduler.infos.get(job2);
+    
+    // Check scheduler variables; the fair shares should now have been allocated
+    // equally between j1 and j2, but j1 should have (4 slots)*(100 ms) deficit
+    assertEquals(0,    info1.runningMaps);
+    assertEquals(0,    info1.runningReduces);
+    assertEquals(10,   info1.neededMaps);
+    assertEquals(10,   info1.neededReduces);
+    assertEquals(400,  info1.mapDeficit);
+    assertEquals(400,  info1.reduceDeficit);
+    assertEquals(2.0,  info1.mapFairShare);
+    assertEquals(2.0,  info1.reduceFairShare);
+    assertEquals(0,    info2.runningMaps);
+    assertEquals(0,    info2.runningReduces);
+    assertEquals(10,   info2.neededMaps);
+    assertEquals(10,   info2.neededReduces);
+    assertEquals(0,    info2.mapDeficit);
+    assertEquals(0,    info2.reduceDeficit);
+    assertEquals(2.0,  info2.mapFairShare);
+    assertEquals(2.0,  info2.reduceFairShare);
+    
+    // Assign tasks and check that all slots are initially filled with job 1
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
+    
+    // Check that the scheduler has started counting the tasks as running
+    // as soon as it launched them.
+    assertEquals(4,  info1.runningMaps);
+    assertEquals(4,  info1.runningReduces);
+    assertEquals(6,  info1.neededMaps);
+    assertEquals(6,  info1.neededReduces);
+    assertEquals(0,  info2.runningMaps);
+    assertEquals(0,  info2.runningReduces);
+    assertEquals(10, info2.neededMaps);
+    assertEquals(10, info2.neededReduces);
+    
+    // Finish up the tasks and advance time again. Note that we must finish
+    // the task since FakeJobInProgress does not properly maintain running
+    // tasks, so the scheduler will always get an empty task list from
+    // the JobInProgress's getMapTasks/getReduceTasks and think they finished.
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000003_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000004_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000005_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000006_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000007_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000008_0");
+    advanceTime(200);
+    assertEquals(0,   info1.runningMaps);
+    assertEquals(0,   info1.runningReduces);
+    assertEquals(0,   info1.mapDeficit);
+    assertEquals(0,   info1.reduceDeficit);
+    assertEquals(0,   info2.runningMaps);
+    assertEquals(0,   info2.runningReduces);
+    assertEquals(400, info2.mapDeficit);
+    assertEquals(400, info2.reduceDeficit);
+
+    // Assign tasks and check that all slots are now filled with job 2
+    checkAssignment("tt1", "attempt_test_0002_m_000009_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000010_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000011_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000012_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000013_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000014_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000015_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000016_0 on tt2");
+
+    // Finish up the tasks and advance time again, but give job 2 only 50ms.
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000009_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000010_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000011_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000012_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000013_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000014_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000015_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000016_0");
+    advanceTime(50);
+    assertEquals(0,   info1.runningMaps);
+    assertEquals(0,   info1.runningReduces);
+    assertEquals(100, info1.mapDeficit);
+    assertEquals(100, info1.reduceDeficit);
+    assertEquals(0,   info2.runningMaps);
+    assertEquals(0,   info2.runningReduces);
+    assertEquals(300, info2.mapDeficit);
+    assertEquals(300, info2.reduceDeficit);
+
+    // Assign tasks and check that all slots are now still with job 2
+    checkAssignment("tt1", "attempt_test_0002_m_000017_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000018_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000019_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000020_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000021_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000022_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000023_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000024_0 on tt2");
+  }
+  
+
+  /**
+   * We submit two jobs such that one has 2x the priority of the other, wait
+   * for 100 ms, and check that the weights/deficits are okay and that the
+   * tasks all go to the high-priority job.
+   */
+  public void testJobsWithPriorities() throws IOException {
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info1 = scheduler.infos.get(job1);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info2 = scheduler.infos.get(job2);
+    job2.setPriority(JobPriority.HIGH);
+    scheduler.update();
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.runningMaps);
+    assertEquals(0,    info1.runningReduces);
+    assertEquals(10,   info1.neededMaps);
+    assertEquals(10,   info1.neededReduces);
+    assertEquals(0,    info1.mapDeficit);
+    assertEquals(0,    info1.reduceDeficit);
+    assertEquals(1.33, info1.mapFairShare, 0.1);
+    assertEquals(1.33, info1.reduceFairShare, 0.1);
+    assertEquals(0,    info2.runningMaps);
+    assertEquals(0,    info2.runningReduces);
+    assertEquals(10,   info2.neededMaps);
+    assertEquals(10,   info2.neededReduces);
+    assertEquals(0,    info2.mapDeficit);
+    assertEquals(0,    info2.reduceDeficit);
+    assertEquals(2.66, info2.mapFairShare, 0.1);
+    assertEquals(2.66, info2.reduceFairShare, 0.1);
+    
+    // Advance time and check deficits
+    advanceTime(100);
+    assertEquals(133,  info1.mapDeficit, 1.0);
+    assertEquals(133,  info1.reduceDeficit, 1.0);
+    assertEquals(266,  info2.mapDeficit, 1.0);
+    assertEquals(266,  info2.reduceDeficit, 1.0);
+    
+    // Assign tasks and check that all slots are filled with j1, then j2
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000008_0 on tt2");
+  }
+  
+  /**
+   * This test starts by submitting three large jobs:
+   * - job1 in the default pool, at time 0
+   * - job2 in poolA, with an allocation of 1 map / 2 reduces, at time 200
+   * - job3 in poolB, with an allocation of 2 maps / 1 reduce, at time 200
+   * 
+   * After this, we sleep 100ms, until time 300. At this point, job1 has the
+   * highest map deficit, job3 the second, and job2 the third. This is because
+   * job3 has more maps in its min share than job2, but job1 has been around
+   * a long time at the beginning. The reduce deficits are similar, except job2
+   * comes before job3 because it had a higher reduce minimum share.
+   * 
+   * Finally, assign tasks to all slots. The maps should be assigned in the
+   * order job3, job2, job1 because 3 and 2 both have guaranteed slots and 3
+   * has a higher deficit. The reduces should be assigned as job2, job3, job1.
+   */
+  public void testLargeJobsWithPools() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    // Give pool A a minimum of 1 map, 2 reduces
+    out.println("<pool name=\"poolA\">");
+    out.println("<minMaps>1</minMaps>");
+    out.println("<minReduces>2</minReduces>");
+    out.println("</pool>");
+    // Give pool B a minimum of 2 maps, 1 reduce
+    out.println("<pool name=\"poolB\">");
+    out.println("<minMaps>2</minMaps>");
+    out.println("<minReduces>1</minReduces>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info1 = scheduler.infos.get(job1);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.runningMaps);
+    assertEquals(0,    info1.runningReduces);
+    assertEquals(10,   info1.neededMaps);
+    assertEquals(10,   info1.neededReduces);
+    assertEquals(0,    info1.mapDeficit);
+    assertEquals(0,    info1.reduceDeficit);
+    assertEquals(4.0,  info1.mapFairShare);
+    assertEquals(4.0,  info1.reduceFairShare);
+    
+    // Advance time 200ms and submit jobs 2 and 3
+    advanceTime(200);
+    assertEquals(800,  info1.mapDeficit);
+    assertEquals(800,  info1.reduceDeficit);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    JobInfo info2 = scheduler.infos.get(job2);
+    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
+    JobInfo info3 = scheduler.infos.get(job3);
+    
+    // Check that minimum and fair shares have been allocated
+    assertEquals(0,    info1.minMaps);
+    assertEquals(0,    info1.minReduces);
+    assertEquals(1.0,  info1.mapFairShare);
+    assertEquals(1.0,  info1.reduceFairShare);
+    assertEquals(1,    info2.minMaps);
+    assertEquals(2,    info2.minReduces);
+    assertEquals(1.0,  info2.mapFairShare);
+    assertEquals(2.0,  info2.reduceFairShare);
+    assertEquals(2,    info3.minMaps);
+    assertEquals(1,    info3.minReduces);
+    assertEquals(2.0,  info3.mapFairShare);
+    assertEquals(1.0,  info3.reduceFairShare);
+    
+    // Advance time 100ms and check deficits
+    advanceTime(100);
+    assertEquals(900,  info1.mapDeficit);
+    assertEquals(900,  info1.reduceDeficit);
+    assertEquals(100,  info2.mapDeficit);
+    assertEquals(200,  info2.reduceDeficit);
+    assertEquals(200,  info3.mapDeficit);
+    assertEquals(100,  info3.reduceDeficit);
+    
+    // Assign tasks and check that slots are first given to needy jobs
+    checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0003_r_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
+  }
+
+  /**
+   * This test starts by submitting three large jobs:
+   * - job1 in the default pool, at time 0
+   * - job2 in poolA, with an allocation of 2 maps / 2 reduces, at time 200
+   * - job3 in poolA, with an allocation of 2 maps / 2 reduces, at time 300
+   * 
+   * After this, we sleep 100ms, until time 400. At this point, job1 has the
+   * highest deficit, job2 the second, and job3 the third. The first two tasks
+   * should be assigned to job2 and job3 since they are in a pool with an
+   * allocation guarantee, but the next two slots should be assigned to job 3
+   * because the pool will no longer be needy.
+   */
+  public void testLargeJobsWithExcessCapacity() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    // Give pool A a minimum of 2 maps, 2 reduces
+    out.println("<pool name=\"poolA\">");
+    out.println("<minMaps>2</minMaps>");
+    out.println("<minReduces>2</minReduces>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info1 = scheduler.infos.get(job1);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.runningMaps);
+    assertEquals(0,    info1.runningReduces);
+    assertEquals(10,   info1.neededMaps);
+    assertEquals(10,   info1.neededReduces);
+    assertEquals(0,    info1.mapDeficit);
+    assertEquals(0,    info1.reduceDeficit);
+    assertEquals(4.0,  info1.mapFairShare);
+    assertEquals(4.0,  info1.reduceFairShare);
+    
+    // Advance time 200ms and submit job 2
+    advanceTime(200);
+    assertEquals(800,  info1.mapDeficit);
+    assertEquals(800,  info1.reduceDeficit);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    JobInfo info2 = scheduler.infos.get(job2);
+    
+    // Check that minimum and fair shares have been allocated
+    assertEquals(0,    info1.minMaps);
+    assertEquals(0,    info1.minReduces);
+    assertEquals(2.0,  info1.mapFairShare);
+    assertEquals(2.0,  info1.reduceFairShare);
+    assertEquals(2,    info2.minMaps);
+    assertEquals(2,    info2.minReduces);
+    assertEquals(2.0,  info2.mapFairShare);
+    assertEquals(2.0,  info2.reduceFairShare);
+    
+    // Advance time 100ms and submit job 3
+    advanceTime(100);
+    assertEquals(1000, info1.mapDeficit);
+    assertEquals(1000, info1.reduceDeficit);
+    assertEquals(200,  info2.mapDeficit);
+    assertEquals(200,  info2.reduceDeficit);
+    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    JobInfo info3 = scheduler.infos.get(job3);
+    
+    // Check that minimum and fair shares have been allocated
+    assertEquals(0,    info1.minMaps);
+    assertEquals(0,    info1.minReduces);
+    assertEquals(1.33, info1.mapFairShare, 0.1);
+    assertEquals(1.33, info1.reduceFairShare, 0.1);
+    assertEquals(1,    info2.minMaps);
+    assertEquals(1,    info2.minReduces);
+    assertEquals(1.33, info2.mapFairShare, 0.1);
+    assertEquals(1.33, info2.reduceFairShare, 0.1);
+    assertEquals(1,    info3.minMaps);
+    assertEquals(1,    info3.minReduces);
+    assertEquals(1.33, info3.mapFairShare, 0.1);
+    assertEquals(1.33, info3.reduceFairShare, 0.1);
+    
+    // Advance time 100ms and check deficits
+    advanceTime(100);
+    assertEquals(1133, info1.mapDeficit, 1.0);
+    assertEquals(1133, info1.reduceDeficit, 1.0);
+    assertEquals(333,  info2.mapDeficit, 1.0);
+    assertEquals(333,  info2.reduceDeficit, 1.0);
+    assertEquals(133,  info3.mapDeficit, 1.0);
+    assertEquals(133,  info3.reduceDeficit, 1.0);
+    
+    // Assign tasks and check that slots are first given to needy jobs, but
+    // that job 1 gets two tasks after due to having a larger deficit.
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_r_000004_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
+  }
+  
+  /**
+   * This test starts by submitting two jobs at time 0:
+   * - job1 in the default pool
+   * - job2, with 1 map and 1 reduce, in poolA, which has an alloc of 4
+   *   maps and 4 reduces
+   * 
+   * When we assign the slots, job2 should only get 1 of each type of task.
+   * 
+   * The fair share for job 2 should be 2.0 however, because even though it is
+   * running only one task, it accumulates deficit in case it will have failures
+   * or need speculative tasks later. (TODO: This may not be a good policy.)
+   */
+  public void testSmallJobInLargePool() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    // Give pool A a minimum of 4 maps, 4 reduces
+    out.println("<pool name=\"poolA\">");
+    out.println("<minMaps>4</minMaps>");
+    out.println("<minReduces>4</minReduces>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info1 = scheduler.infos.get(job1);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 1, "poolA");
+    JobInfo info2 = scheduler.infos.get(job2);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.runningMaps);
+    assertEquals(0,    info1.runningReduces);
+    assertEquals(10,   info1.neededMaps);
+    assertEquals(10,   info1.neededReduces);
+    assertEquals(0,    info1.mapDeficit);
+    assertEquals(0,    info1.reduceDeficit);
+    assertEquals(2.0,  info1.mapFairShare);
+    assertEquals(2.0,  info1.reduceFairShare);
+    assertEquals(0,    info2.runningMaps);
+    assertEquals(0,    info2.runningReduces);
+    assertEquals(1,    info2.neededMaps);
+    assertEquals(1,    info2.neededReduces);
+    assertEquals(0,    info2.mapDeficit);
+    assertEquals(0,    info2.reduceDeficit);
+    assertEquals(2.0,  info2.mapFairShare);
+    assertEquals(2.0,  info2.reduceFairShare);
+    
+    // Assign tasks and check that slots are first given to needy jobs
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
+  }
+  
+  /**
+   * This test starts by submitting four jobs in the default pool. However, the
+   * maxRunningJobs limit for this pool has been set to two. We should see only
+   * the first two jobs get scheduled, each with half the total slots.
+   */
+  public void testPoolMaxJobs() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<pool name=\"default\">");
+    out.println("<maxRunningJobs>2</maxRunningJobs>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    
+    // Submit jobs, advancing time in-between to make sure that they are
+    // all submitted at distinct times.
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info1 = scheduler.infos.get(job1);
+    advanceTime(10);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info2 = scheduler.infos.get(job2);
+    advanceTime(10);
+    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info3 = scheduler.infos.get(job3);
+    advanceTime(10);
+    JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info4 = scheduler.infos.get(job4);
+    
+    // Check scheduler variables
+    assertEquals(2.0,  info1.mapFairShare);
+    assertEquals(2.0,  info1.reduceFairShare);
+    assertEquals(2.0,  info2.mapFairShare);
+    assertEquals(2.0,  info2.reduceFairShare);
+    assertEquals(0.0,  info3.mapFairShare);
+    assertEquals(0.0,  info3.reduceFairShare);
+    assertEquals(0.0,  info4.mapFairShare);
+    assertEquals(0.0,  info4.reduceFairShare);
+    
+    // Assign tasks and check that slots are first to jobs 1 and 2
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    advanceTime(100);
+    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000008_0 on tt2");
+  }
+
+  /**
+   * This test starts by submitting two jobs by user "user1" to the default
+   * pool, and two jobs by "user2". We set user1's job limit to 1. We should
+   * see one job from user1 and two from user2. 
+   */
+  public void testUserMaxJobs() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<user name=\"user1\">");
+    out.println("<maxRunningJobs>1</maxRunningJobs>");
+    out.println("</user>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    
+    // Submit jobs, advancing time in-between to make sure that they are
+    // all submitted at distinct times.
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    job1.getJobConf().set("user.name", "user1");
+    JobInfo info1 = scheduler.infos.get(job1);
+    advanceTime(10);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+    job2.getJobConf().set("user.name", "user1");
+    JobInfo info2 = scheduler.infos.get(job2);
+    advanceTime(10);
+    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
+    job3.getJobConf().set("user.name", "user2");
+    JobInfo info3 = scheduler.infos.get(job3);
+    advanceTime(10);
+    JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
+    job4.getJobConf().set("user.name", "user2");
+    JobInfo info4 = scheduler.infos.get(job4);
+    
+    // Check scheduler variables
+    assertEquals(1.33,  info1.mapFairShare, 0.1);
+    assertEquals(1.33,  info1.reduceFairShare, 0.1);
+    assertEquals(0.0,   info2.mapFairShare);
+    assertEquals(0.0,   info2.reduceFairShare);
+    assertEquals(1.33,  info3.mapFairShare, 0.1);
+    assertEquals(1.33,  info3.reduceFairShare, 0.1);
+    assertEquals(1.33,  info4.mapFairShare, 0.1);
+    assertEquals(1.33,  info4.reduceFairShare, 0.1);
+    
+    // Assign tasks and check that slots are first to jobs 1 and 3
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    advanceTime(100);
+    checkAssignment("tt2", "attempt_test_0003_m_000005_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0003_m_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0003_r_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0003_r_000008_0 on tt2");
+  }
+  
+  /**
+   * Test a combination of pool job limits and user job limits, the latter
+   * specified through both the userMaxJobsDefaults (for some users) and
+   * user-specific &lt;user&gt; elements in the allocations file. 
+   */
+  public void testComplexJobLimits() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<pool name=\"poolA\">");
+    out.println("<maxRunningJobs>1</maxRunningJobs>");
+    out.println("</pool>");
+    out.println("<user name=\"user1\">");
+    out.println("<maxRunningJobs>1</maxRunningJobs>");
+    out.println("</user>");
+    out.println("<user name=\"user2\">");
+    out.println("<maxRunningJobs>10</maxRunningJobs>");
+    out.println("</user>");
+    out.println("<userMaxJobsDefault>2</userMaxJobsDefault>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    
+    // Submit jobs, advancing time in-between to make sure that they are
+    // all submitted at distinct times.
+    
+    // Two jobs for user1; only one should get to run
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    job1.getJobConf().set("user.name", "user1");
+    JobInfo info1 = scheduler.infos.get(job1);
+    advanceTime(10);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+    job2.getJobConf().set("user.name", "user1");
+    JobInfo info2 = scheduler.infos.get(job2);
+    advanceTime(10);
+    
+    // Three jobs for user2; all should get to run
+    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
+    job3.getJobConf().set("user.name", "user2");
+    JobInfo info3 = scheduler.infos.get(job3);
+    advanceTime(10);
+    JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
+    job4.getJobConf().set("user.name", "user2");
+    JobInfo info4 = scheduler.infos.get(job4);
+    advanceTime(10);
+    JobInProgress job5 = submitJob(JobStatus.RUNNING, 10, 10);
+    job5.getJobConf().set("user.name", "user2");
+    JobInfo info5 = scheduler.infos.get(job5);
+    advanceTime(10);
+    
+    // Three jobs for user3; only two should get to run
+    JobInProgress job6 = submitJob(JobStatus.RUNNING, 10, 10);
+    job6.getJobConf().set("user.name", "user3");
+    JobInfo info6 = scheduler.infos.get(job6);
+    advanceTime(10);
+    JobInProgress job7 = submitJob(JobStatus.RUNNING, 10, 10);
+    job7.getJobConf().set("user.name", "user3");
+    JobInfo info7 = scheduler.infos.get(job7);
+    advanceTime(10);
+    JobInProgress job8 = submitJob(JobStatus.RUNNING, 10, 10);
+    job8.getJobConf().set("user.name", "user3");
+    JobInfo info8 = scheduler.infos.get(job8);
+    advanceTime(10);
+    
+    // Two jobs for user4, in poolA; only one should get to run
+    JobInProgress job9 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    job9.getJobConf().set("user.name", "user4");
+    JobInfo info9 = scheduler.infos.get(job9);
+    advanceTime(10);
+    JobInProgress job10 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    job10.getJobConf().set("user.name", "user4");
+    JobInfo info10 = scheduler.infos.get(job10);
+    advanceTime(10);
+    
+    // Check scheduler variables
+    double SHARE = 4.0 / 7.0; // We have 4 slots and 7 runnable jobs
+    assertEquals(SHARE,  info1.mapFairShare, 0.1);
+    assertEquals(SHARE,  info1.reduceFairShare, 0.1);
+    assertEquals(0.0,    info2.mapFairShare);
+    assertEquals(0.0,    info2.reduceFairShare);
+    assertEquals(SHARE,  info3.mapFairShare, 0.1);
+    assertEquals(SHARE,  info3.reduceFairShare, 0.1);
+    assertEquals(SHARE,  info4.mapFairShare, 0.1);
+    assertEquals(SHARE,  info4.reduceFairShare, 0.1);
+    assertEquals(SHARE,  info5.mapFairShare, 0.1);
+    assertEquals(SHARE,  info5.reduceFairShare, 0.1);
+    assertEquals(SHARE,  info6.mapFairShare, 0.1);
+    assertEquals(SHARE,  info6.reduceFairShare, 0.1);
+    assertEquals(SHARE,  info7.mapFairShare, 0.1);
+    assertEquals(SHARE,  info7.reduceFairShare, 0.1);
+    assertEquals(0.0,    info8.mapFairShare);
+    assertEquals(0.0,    info8.reduceFairShare);
+    assertEquals(SHARE,  info9.mapFairShare, 0.1);
+    assertEquals(SHARE,  info9.reduceFairShare, 0.1);
+    assertEquals(0.0,    info10.mapFairShare);
+    assertEquals(0.0,    info10.reduceFairShare);
+  }
+  
+  public void testSizeBasedWeight() throws Exception {
+    scheduler.sizeBasedWeight = true;
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 10);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 20, 1);
+    assertTrue(scheduler.infos.get(job2).mapFairShare >
+               scheduler.infos.get(job1).mapFairShare);
+    assertTrue(scheduler.infos.get(job1).reduceFairShare >
+               scheduler.infos.get(job2).reduceFairShare);
+  }
+  
+  private void advanceTime(long time) {
+    clock.advance(time);
+    scheduler.update();
+  }
+
+  protected TaskTrackerStatus tracker(String taskTrackerName) {
+    return taskTrackerManager.getTaskTracker(taskTrackerName);
+  }
+  
+  protected void checkAssignment(String taskTrackerName,
+      String expectedTaskString) throws IOException {
+    List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+    assertNotNull(expectedTaskString, tasks);
+    assertEquals(expectedTaskString, 1, tasks.size());
+    assertEquals(expectedTaskString, tasks.get(0).toString());
+  }
+  
+}

+ 3 - 0
src/mapred/org/apache/hadoop/mapred/TaskInProgress.java

@@ -832,4 +832,7 @@ class TaskInProgress {
     rawSplit.clearBytes();
   }
 
+  TreeMap<TaskAttemptID, String> getActiveTasks() {
+    return activeTasks;
+  }
 }