Ver código fonte

MAPREDUCE-5822. FairScheduler does not preempt due to fairshare-starvation when fairshare is 1. (Anubhav Dhoot via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1586784 13f79535-47bb-0310-9956-ffa450edef68
Karthik Kambatla 11 anos atrás
pai
commit
9dfd00465a

+ 3 - 0
CHANGES.txt

@@ -205,6 +205,9 @@ Release 1.3.0 - unreleased
     MAPREDUCE-5808. Port output replication factor configurable for terasort to
     Hadoop 1.x. (Chuan Liu via cnauroth)
 
+    MAPREDUCE-5822. FairScheduler does not preempt due to fairshare-starvation
+    when fairshare is 1. (Anubhav Dhoot via kasha)
+
 Release 1.2.2 - unreleased
 
   INCOMPATIBLE CHANGES

+ 5 - 3
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java

@@ -859,11 +859,13 @@ public class FairScheduler extends TaskScheduler {
   /**
    * Is a pool being starved for fair share for the given task type?
    * This is defined as being below half its fair share.
+   * Dividing by 2 and then truncating to an integer may cause
+   * calculations to fail if half of fair share truncates to zero.
+   * Instead multiply the rest of the terms by 2.
    */
   boolean isStarvedForFairShare(PoolSchedulable sched) {
-    int desiredFairShare = (int) Math.floor(Math.min(
-        sched.getFairShare() / 2, sched.getDemand()));
-    return (sched.getRunningTasks() < desiredFairShare);
+    return (sched.getRunningTasks() * 2 < Math.min(sched.getFairShare(),
+        sched.getDemand() * 2));
   }
 
   /**

+ 99 - 1
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

@@ -2061,7 +2061,105 @@ public class TestFairScheduler extends TestCase {
     assertNull(scheduler.assignTasks(tracker("tt3")));
     assertNull(scheduler.assignTasks(tracker("tt4")));
   }
-  
+
+  /**
+   * This test verifies starvation even if half of fair share is greater than
+   * zero but less than 1. While truncating to integer, this should not be
+   * calculated as zero as it will never be calculated as starved. This test
+   * ensures preemption due to starvation will occur even if half of fair
+   * share is between 0 and 1.
+   *
+   * The test first starts job 1, which takes 2 map slots and 2 reduce slots,
+   * in pool 1.  We then submit job 2 in pool 2 which gets the remaining 2
+   * map and 2 reduce slots. Finally we submit job  3 in pool 3 which gets no
+   * slots.
+   * At this point the fair share of each pool will be 4/3 = 1 slots.
+   * Pool 1 and 2 will be above its fair share, pool 3 will be below half fair
+   * share.
+   * Therefore pool 2 should preempt a task (after a timeout).
+   */
+  public void testFairShareBoundaryPreemption() throws Exception {
+    // Create cluster of 1 node
+    setUpCluster(1, 2, false);
+    // Enable preemption in scheduler
+    scheduler.preemptionEnabled = true;
+    // Set up pools file with a fair share preemtion timeout of 1 minute
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<fairSharePreemptionTimeout>60</fairSharePreemptionTimeout>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+
+    // Grab pools (they'll be created even though they're not in the alloc file)
+    Pool pool1 = scheduler.getPoolManager().getPool("pool1");
+    Pool pool2 = scheduler.getPoolManager().getPool("pool2");
+    Pool pool3 = scheduler.getPoolManager().getPool("pool3");
+    Pool pool4 = scheduler.getPoolManager().getPool("pool3");
+
+    // Submit job 1. We advance time by 100 between each task tracker
+    // assignment stage to ensure that the tasks from job2 on tt2 are the ones
+    // that are deterministically preempted first (being the latest launched
+    // tasks in an over-allocated job).
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 2, "pool1");
+    advanceTime(100);
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    advanceTime(100);
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    advanceTime(100);
+
+    // Submit job 2.
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 6, 6, "pool2");
+    advanceTime(100);
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    advanceTime(100);
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+    advanceTime(100);
+
+    // Submit job 3.
+    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "pool3");
+
+    // Check that after 59 seconds, neither pool can preempt
+    advanceTime(59000);
+    assertEquals(0, scheduler.tasksToPreempt(pool2.getMapSchedulable(),
+        clock.getTime()));
+    assertEquals(0, scheduler.tasksToPreempt(pool2.getReduceSchedulable(),
+        clock.getTime()));
+    assertEquals(0, scheduler.tasksToPreempt(pool3.getMapSchedulable(),
+        clock.getTime()));
+    assertEquals(0, scheduler.tasksToPreempt(pool3.getReduceSchedulable(),
+        clock.getTime()));
+
+    // Wait 2 more seconds, so that job 2 has now been in the system for 61s.
+    // Now pool 3 should be able to preempt 1 tasks (its share of 1)
+    advanceTime(2000);
+    assertEquals(0, scheduler.tasksToPreempt(pool2.getMapSchedulable(),
+        clock.getTime()));
+    assertEquals(0, scheduler.tasksToPreempt(pool2.getReduceSchedulable(),
+        clock.getTime()));
+    assertEquals(1, scheduler.tasksToPreempt(pool3.getMapSchedulable(),
+        clock.getTime()));
+    assertEquals(1, scheduler.tasksToPreempt(pool3.getReduceSchedulable(),
+        clock.getTime()));
+
+    // Test that the tasks actually get preempted and we can assign new ones
+    scheduler.preemptTasksIfNecessary();
+    scheduler.update();
+    assertEquals(2, job1.runningMaps());
+    assertEquals(2, job1.runningReduces());
+    assertEquals(1, job2.runningMaps());
+    assertEquals(1, job2.runningReduces());
+    checkAssignment("tt2", "attempt_test_0003_m_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0003_r_000000_0 on tt2");
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+  }
+
   /**
    * This test runs on a 3-node (6-slot) cluster to allow 3 pools with fair
    * shares equal 2 slots to coexist (which makes the half-fair-share