Browse Source

Merge -r 732868:732869 from trunk to branch-0.20 for HADOOP-4789.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@732870 13f79535-47bb-0310-9956-ffa450edef68
Matei Alexandru Zaharia 16 years ago
parent
commit
0dabffaef6

+ 40 - 25
src/contrib/fairscheduler/README

@@ -8,7 +8,8 @@
 # implied.  See the License for the specific language governing
 # permissions and limitations under the License.
 
-This package implements fair scheduling for MapReduce jobs.
+This package implements a fair scheduler for MapReduce jobs with additional
+support for guaranteed shares and job limits.
 
 Fair scheduling is a method of assigning resources to jobs such that all jobs
 get, on average, an equal share of resources over time. When there is a single
@@ -21,18 +22,24 @@ number of users. Finally, fair sharing can also work with job priorities - the
 priorities are used as weights to determine the fraction of total compute time
 that each job should get.
 
+The scheduler actually organizes jobs further into "pools", and shares resources
+fairly between these pools. By default, there is a separate pool for each user,
+so that each user gets the same share of the cluster no matter how many jobs
+they submit. However, it is also possible to set a job's pool based on the
+user's Unix group or any other jobconf property, such as the queue name
+property used by the Capacity Scheduler (JIRA HADOOP-3445). Within each pool,
+fair sharing is used to share capacity between the running jobs. Pools can also
+be given weights to share the cluster non-proportionally in the config file.
+
 In addition to providing fair sharing, the Fair Scheduler allows assigning
-jobs to "pools" with guaranteed minimum shares. When a pool contains jobs,
-it gets at least its minimum share, but when a pool does not need its full
-capacity, the excess is shared between other running jobs. Thus pools are
-a way to guarantee capacity for particular user groups while utilizing the
-cluster efficiently when these users are not submitting any jobs. Within each
-pool, fair sharing is used to share capacity between the running jobs. By
-default the pool is set based on the queue.name property in the jobconf which
-will be introduced with the Hadoop Resource Manager (JIRA 3445), but it's
-possible to also have a pool per user or per Unix user group.
-
-The fair scheduler lets all jobs run by default, but it is also possible to
+guaranteed minimum shares to pools, which is useful for ensuring that certain
+users, groups or production applications always get sufficient resources.
+When a pool contains jobs, it gets at least its minimum share, but when the pool
+does not need its full guaranteed share, the excess is split between other
+running jobs. This lets the scheduler guarantee capacity for pools while
+utilizing resources efficiently when these pools don't contain jobs.
+
+The Fair Scheduler lets all jobs run by default, but it is also possible to
 limit the number of running jobs per user and per pool through the config
 file. This can be useful when a user must submit hundreds of jobs at once,
 or in general to improve performance if running too many jobs at once would
@@ -45,8 +52,7 @@ time, as in the default FIFO scheduler in Hadoop.
 Finally, the fair scheduler provides several extension points where the basic
 functionality can be extended. For example, the weight calculation can be
 modified to give a priority boost to new jobs, implementing a "shortest job
-first" like policy which will reduce response times for interactive jobs even
-further. 
+first" policy which reduces response times for interactive jobs even further. 
 
 --------------------------------------------------------------------------------
 
@@ -88,17 +94,22 @@ mapred.fairscheduler.allocation.file:
     for each pool, as well as the per-pool and per-user limits on number of
     running jobs. If this property is not provided, allocations are not used.
     This file must be in XML format, and can contain three types of elements:
-    - pool elements, which may contain elements for minMaps, minReduces and
-      maxRunningJobs (limit the number of jobs from the pool to run at once).
-    - user elements, which may contain a maxRunningJobs to limit jobs.
-    - A userMaxJobsDefault element, which sets the running job limit for any
-      users that do not have their own elements.
+    - pool elements, which may contain elements for minMaps, minReduces,
+      maxRunningJobs (limit the number of jobs from the pool to run at once),
+      and weight (to share the cluster non-proportionally with other pools).
+    - user elements, which may contain a maxRunningJobs to limit jobs. Note
+      that by default, there is a separate pool for each user, so these may not
+      be necessary; they are useful, however, if you create a pool per user
+      group or manually assign jobs to pools.
+    - A userMaxJobsDefault element, which sets the default running job limit
+      for any users whose limit is not specified.
     The following example file shows how to create each type of element:
         <?xml version="1.0"?>
         <allocations>
           <pool name="sample_pool">
             <minMaps>5</minMaps>
             <minReduces>5</minReduces>
+            <weight>2.0</weight>
           </pool>
           <user name="sample_user">
             <maxRunningJobs>6</maxRunningJobs>
@@ -106,11 +117,13 @@ mapred.fairscheduler.allocation.file:
           <userMaxJobsDefault>3</userMaxJobsDefault>
         </allocations>
     This example creates a pool sample_pool with a guarantee of 5 map slots
-    and 5 reduce slots. It also limits the number of running jobs per user
+    and 5 reduce slots. The pool also has a weight of 2.0, meaning it has a 2x
+    higher share of the cluster than other pools (the default weight is 1).
+    Finally, the example limits the number of running jobs per user
     to 3, except for sample_user, who can run 6 jobs concurrently.
     Any pool not defined in the allocations file will have no guaranteed
-    capacity. Also, any pool or user with no max running jobs set in the file
-    will be allowed to run an unlimited number of jobs.
+    capacity and a weight of 1.0. Also, any pool or user with no max running
+    jobs set in the file will be allowed to run an unlimited number of jobs.
 
 mapred.fairscheduler.assignmultiple:
     Allows the scheduler to assign both a map task and a reduce task on each
@@ -128,9 +141,11 @@ mapred.fairscheduler.sizebasedweight:
 
 mapred.fairscheduler.poolnameproperty:
     Specify which jobconf property is used to determine the pool that a job
-    belongs in. String, default: queue.name (the same property as the queue
-    name in the Hadoop Resource Manager, JIRA 3445). You can use user.name
-    or group.name to base it on the Unix user or Unix group for example.
+    belongs in. String, default: user.name (i.e. one pool for each user).
+    Some other useful values to set this to are:
+    - group.name (to create a pool per Unix group).
+    - mapred.job.queue.name (the same property as the queue name in the
+    Capacity Scheduler, JIRA HADOOP-3445).
 
 mapred.fairscheduler.weightadjuster:
     An extensibility point that lets you specify a class to adjust the weights

+ 33 - 3
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java

@@ -450,11 +450,41 @@ public class FairScheduler extends TaskScheduler {
   }
 
   private void updateWeights() {
+    // First, calculate raw weights for each job
     for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
       JobInProgress job = entry.getKey();
       JobInfo info = entry.getValue();
-      info.mapWeight = calculateWeight(job, TaskType.MAP);
-      info.reduceWeight = calculateWeight(job, TaskType.REDUCE);
+      info.mapWeight = calculateRawWeight(job, TaskType.MAP);
+      info.reduceWeight = calculateRawWeight(job, TaskType.REDUCE);
+    }
+    // Now calculate job weight sums for each pool
+    Map<String, Double> mapWeightSums = new HashMap<String, Double>();
+    Map<String, Double> reduceWeightSums = new HashMap<String, Double>();
+    for (Pool pool: poolMgr.getPools()) {
+      double mapWeightSum = 0;
+      double reduceWeightSum = 0;
+      for (JobInProgress job: pool.getJobs()) {
+        if (isRunnable(job)) {
+          if (runnableTasks(job, TaskType.MAP) > 0) {
+            mapWeightSum += infos.get(job).mapWeight;
+          }
+          if (runnableTasks(job, TaskType.REDUCE) > 0) {
+            reduceWeightSum += infos.get(job).reduceWeight;
+          }
+        }
+      }
+      mapWeightSums.put(pool.getName(), mapWeightSum);
+      reduceWeightSums.put(pool.getName(), mapWeightSum);
+    }
+    // And normalize the weights based on pool sums and pool weights
+    // to share fairly across pools (proportional to their weights)
+    for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
+      JobInProgress job = entry.getKey();
+      JobInfo info = entry.getValue();
+      String pool = poolMgr.getPoolName(job);
+      double poolWeight = poolMgr.getPoolWeight(pool);
+      info.mapWeight *= (poolWeight / mapWeightSums.get(pool)); 
+      info.reduceWeight *= (poolWeight / reduceWeightSums.get(pool));
     }
   }
   
@@ -615,7 +645,7 @@ public class FairScheduler extends TaskScheduler {
     }
   }
 
-  private double calculateWeight(JobInProgress job, TaskType taskType) {
+  private double calculateRawWeight(JobInProgress job, TaskType taskType) {
     if (!isRunnable(job)) {
       return 0;
     } else {

+ 18 - 1
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java

@@ -60,6 +60,9 @@ public class PoolManager {
   // Map and reduce minimum allocations for each pool
   private Map<String, Integer> mapAllocs = new HashMap<String, Integer>();
   private Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
+
+  // Sharing weights for each pool
+  private Map<String, Double> poolWeights = new HashMap<String, Double>();
   
   // Max concurrent running jobs for each pool and for each user; in addition,
   // for users that have no max specified, we use the userMaxJobsDefault.
@@ -80,7 +83,7 @@ public class PoolManager {
   public PoolManager(Configuration conf) throws IOException, SAXException,
       AllocationConfigurationException, ParserConfigurationException {
     this.poolNameProperty = conf.get(
-        "mapred.fairscheduler.poolnameproperty", "mapred.job.queue.name");
+        "mapred.fairscheduler.poolnameproperty", "user.name");
     this.allocFile = conf.get("mapred.fairscheduler.allocation.file");
     if (allocFile == null) {
       LOG.warn("No mapred.fairscheduler.allocation.file given in jobconf - " +
@@ -162,6 +165,7 @@ public class PoolManager {
     Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
     Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
     Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
+    Map<String, Double> poolWeights = new HashMap<String, Double>();
     int userMaxJobsDefault = Integer.MAX_VALUE;
     
     // Remember all pool names so we can display them on web UI, etc.
@@ -204,6 +208,10 @@ public class PoolManager {
             String text = ((Text)field.getFirstChild()).getData().trim();
             int val = Integer.parseInt(text);
             poolMaxJobs.put(poolName, val);
+          } else if ("weight".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            double val = Double.parseDouble(text);
+            poolWeights.put(poolName, val);
           }
         }
       } else if ("user".equals(element.getTagName())) {
@@ -237,6 +245,7 @@ public class PoolManager {
       this.poolMaxJobs = poolMaxJobs;
       this.userMaxJobs = userMaxJobs;
       this.userMaxJobsDefault = userMaxJobsDefault;
+      this.poolWeights = poolWeights;
       for (String name: poolNamesInAllocFile) {
         getPool(name);
       }
@@ -321,4 +330,12 @@ public class PoolManager {
       return Integer.MAX_VALUE;
     }
   }
+
+  public double getPoolWeight(String pool) {
+    if (poolWeights.containsKey(pool)) {
+      return poolWeights.get(pool);
+    } else {
+      return 1.0;
+    }
+  }
 }

+ 85 - 28
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

@@ -759,25 +759,25 @@ public class TestFairScheduler extends TestCase {
     // Check that minimum and fair shares have been allocated
     assertEquals(0,    info1.minMaps);
     assertEquals(0,    info1.minReduces);
-    assertEquals(1.33, info1.mapFairShare, 0.1);
-    assertEquals(1.33, info1.reduceFairShare, 0.1);
+    assertEquals(2,    info1.mapFairShare, 0.1);
+    assertEquals(2,    info1.reduceFairShare, 0.1);
     assertEquals(1,    info2.minMaps);
     assertEquals(1,    info2.minReduces);
-    assertEquals(1.33, info2.mapFairShare, 0.1);
-    assertEquals(1.33, info2.reduceFairShare, 0.1);
+    assertEquals(1,    info2.mapFairShare, 0.1);
+    assertEquals(1,    info2.reduceFairShare, 0.1);
     assertEquals(1,    info3.minMaps);
     assertEquals(1,    info3.minReduces);
-    assertEquals(1.33, info3.mapFairShare, 0.1);
-    assertEquals(1.33, info3.reduceFairShare, 0.1);
+    assertEquals(1,    info3.mapFairShare, 0.1);
+    assertEquals(1,    info3.reduceFairShare, 0.1);
     
     // Advance time 100ms and check deficits
     advanceTime(100);
-    assertEquals(1133, info1.mapDeficit, 1.0);
-    assertEquals(1133, info1.reduceDeficit, 1.0);
-    assertEquals(333,  info2.mapDeficit, 1.0);
-    assertEquals(333,  info2.reduceDeficit, 1.0);
-    assertEquals(133,  info3.mapDeficit, 1.0);
-    assertEquals(133,  info3.reduceDeficit, 1.0);
+    assertEquals(1200, info1.mapDeficit, 1.0);
+    assertEquals(1200, info1.reduceDeficit, 1.0);
+    assertEquals(300,  info2.mapDeficit, 1.0);
+    assertEquals(300,  info2.reduceDeficit, 1.0);
+    assertEquals(100,  info3.mapDeficit, 1.0);
+    assertEquals(100,  info3.reduceDeficit, 1.0);
     
     // Assign tasks and check that slots are first given to needy jobs, but
     // that job 1 gets two tasks after due to having a larger deficit.
@@ -1036,26 +1036,29 @@ public class TestFairScheduler extends TestCase {
     JobInfo info10 = scheduler.infos.get(job10);
     advanceTime(10);
     
-    // Check scheduler variables
-    double SHARE = 4.0 / 7.0; // We have 4 slots and 7 runnable jobs
-    assertEquals(SHARE,  info1.mapFairShare, 0.1);
-    assertEquals(SHARE,  info1.reduceFairShare, 0.1);
+    // Check scheduler variables. The jobs in poolA should get half
+    // the total share, while those in the default pool should get
+    // the other half. This works out to 2 slots each for the jobs
+    // in poolA and 1/3 each for the jobs in the default pool because
+    // there are 2 runnable jobs in poolA and 6 jobs in the default pool.
+    assertEquals(0.33,   info1.mapFairShare, 0.1);
+    assertEquals(0.33,   info1.reduceFairShare, 0.1);
     assertEquals(0.0,    info2.mapFairShare);
     assertEquals(0.0,    info2.reduceFairShare);
-    assertEquals(SHARE,  info3.mapFairShare, 0.1);
-    assertEquals(SHARE,  info3.reduceFairShare, 0.1);
-    assertEquals(SHARE,  info4.mapFairShare, 0.1);
-    assertEquals(SHARE,  info4.reduceFairShare, 0.1);
-    assertEquals(SHARE,  info5.mapFairShare, 0.1);
-    assertEquals(SHARE,  info5.reduceFairShare, 0.1);
-    assertEquals(SHARE,  info6.mapFairShare, 0.1);
-    assertEquals(SHARE,  info6.reduceFairShare, 0.1);
-    assertEquals(SHARE,  info7.mapFairShare, 0.1);
-    assertEquals(SHARE,  info7.reduceFairShare, 0.1);
+    assertEquals(0.33,   info3.mapFairShare, 0.1);
+    assertEquals(0.33,   info3.reduceFairShare, 0.1);
+    assertEquals(0.33,   info4.mapFairShare, 0.1);
+    assertEquals(0.33,   info4.reduceFairShare, 0.1);
+    assertEquals(0.33,   info5.mapFairShare, 0.1);
+    assertEquals(0.33,   info5.reduceFairShare, 0.1);
+    assertEquals(0.33,   info6.mapFairShare, 0.1);
+    assertEquals(0.33,   info6.reduceFairShare, 0.1);
+    assertEquals(0.33,   info7.mapFairShare, 0.1);
+    assertEquals(0.33,   info7.reduceFairShare, 0.1);
     assertEquals(0.0,    info8.mapFairShare);
     assertEquals(0.0,    info8.reduceFairShare);
-    assertEquals(SHARE,  info9.mapFairShare, 0.1);
-    assertEquals(SHARE,  info9.reduceFairShare, 0.1);
+    assertEquals(2.0,    info9.mapFairShare, 0.1);
+    assertEquals(2.0,    info9.reduceFairShare, 0.1);
     assertEquals(0.0,    info10.mapFairShare);
     assertEquals(0.0,    info10.reduceFairShare);
   }
@@ -1094,6 +1097,60 @@ public class TestFairScheduler extends TestCase {
     assertTrue(scheduler.enoughMapsFinishedToRunReduces(1, 1));
   }
   
+
+  /**
+   * This test submits jobs in three pools: poolA, which has a weight
+   * of 2.0; poolB, which has a weight of 0.5; and the default pool, which
+   * should have a weight of 1.0. It then checks that the map and reduce
+   * fair shares are given out accordingly. We then submit a second job to
+   * pool B and check that each gets half of the pool (weight of 0.25).
+   */
+  public void testPoolWeights() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<pool name=\"poolA\">");
+    out.println("<weight>2.0</weight>");
+    out.println("</pool>");
+    out.println("<pool name=\"poolB\">");
+    out.println("<weight>0.5</weight>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    
+    // Submit jobs, advancing time in-between to make sure that they are
+    // all submitted at distinct times.
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info1 = scheduler.infos.get(job1);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    JobInfo info2 = scheduler.infos.get(job2);
+    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
+    JobInfo info3 = scheduler.infos.get(job3);
+    advanceTime(10);
+    
+    assertEquals(1.14,  info1.mapFairShare, 0.01);
+    assertEquals(1.14,  info1.reduceFairShare, 0.01);
+    assertEquals(2.28,  info2.mapFairShare, 0.01);
+    assertEquals(2.28,  info2.reduceFairShare, 0.01);
+    assertEquals(0.57,  info3.mapFairShare, 0.01);
+    assertEquals(0.57,  info3.reduceFairShare, 0.01);
+    
+    JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
+    JobInfo info4 = scheduler.infos.get(job4);
+    advanceTime(10);
+    
+    assertEquals(1.14,  info1.mapFairShare, 0.01);
+    assertEquals(1.14,  info1.reduceFairShare, 0.01);
+    assertEquals(2.28,  info2.mapFairShare, 0.01);
+    assertEquals(2.28,  info2.reduceFairShare, 0.01);
+    assertEquals(0.28,  info3.mapFairShare, 0.01);
+    assertEquals(0.28,  info3.reduceFairShare, 0.01);
+    assertEquals(0.28,  info4.mapFairShare, 0.01);
+    assertEquals(0.28,  info4.reduceFairShare, 0.01);
+  }
+  
   private void advanceTime(long time) {
     clock.advance(time);
     scheduler.update();