Pārlūkot izejas kodu

MAPREDUCE-5979. FairScheduler: zero weight can cause sort failures. (Anubhav Dhoot via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1612724 13f79535-47bb-0310-9956-ffa450edef68
Karthik Kambatla 10 gadi atpakaļ
vecāks
revīzija
c76dddbe1b

+ 3 - 0
CHANGES.txt

@@ -227,6 +227,9 @@ Release 1.3.0 - unreleased
     MAPREDUCE-5777. Support utf-8 text with BOM (byte order marker) 
     (Zhihai Xu via kasha)
 
+    MAPREDUCE-5979. FairScheduler: zero weight can cause sort failures. 
+    (Anubhav Dhoot via kasha)
+
 Release 1.2.2 - unreleased
 
   INCOMPATIBLE CHANGES

+ 14 - 2
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java

@@ -76,8 +76,10 @@ class SchedulingAlgorithms {
       boolean s2Needy = s2.getRunningTasks() < minShare2;
       minShareRatio1 = s1.getRunningTasks() / Math.max(minShare1, 1.0);
       minShareRatio2 = s2.getRunningTasks() / Math.max(minShare2, 1.0);
-      tasksToWeightRatio1 = s1.getRunningTasks() / s1.getWeight();
-      tasksToWeightRatio2 = s2.getRunningTasks() / s2.getWeight();
+      tasksToWeightRatio1 = fixTaskToWeightRatio(s1.getRunningTasks() / s1
+          .getWeight());
+      tasksToWeightRatio2 = fixTaskToWeightRatio(s2.getRunningTasks() / s2
+          .getWeight());
       int res = 0;
       if (s1Needy && !s2Needy)
         res = -1;
@@ -98,6 +100,16 @@ class SchedulingAlgorithms {
     }
   }
 
+  // A zero weight with zero running task can cause NaN will cannot compare
+  // correctly. Instead set to inifinty as we want it to be treated similar
+  // to zero weight schedulable with any number of running task.
+  private static double fixTaskToWeightRatio(double ratio) {
+    if (Double.isNaN(ratio)) {
+      return Double.POSITIVE_INFINITY;
+    }
+    return ratio;
+  }
+
   /** 
    * Number of iterations for the binary search in computeFairShares. This is 
    * equivalent to the number of bits of precision in the output. 25 iterations 

+ 96 - 0
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
@@ -3090,6 +3091,101 @@ public class TestFairScheduler extends TestCase {
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt2")));
   }
+
+
+
+  class TestJobSchedulableSort extends JobSchedulable {
+
+    private final double testFairShare;
+    private int demand;
+    private String name;
+    private int runningTasks;
+    private double testweight = 1.0;
+    private long startTime;
+
+    public TestJobSchedulableSort(String name, int demand, int runningTasks,
+                                  double fairShare) {
+      this.name = name;
+      this.demand = demand;
+      this.runningTasks = runningTasks;
+      this.testFairShare = fairShare;
+      this.startTime = System.currentTimeMillis();
+    }
+
+    public void updateWeight(double weight) {
+      this.testweight = weight;
+    }
+
+    public void updateRunningTasks(int runningTasks) {
+      this.runningTasks = runningTasks;
+    }
+
+    @Override
+    public String getName() {
+      return name;
+    }
+
+    @Override
+    public int getRunningTasks() {
+      return runningTasks;
+    }
+
+    @Override
+    public int getDemand() {
+      return demand;
+    }
+
+    @Override
+    public double getWeight() {
+      return testweight;
+    }
+
+
+    @Override
+    public double getFairShare() {
+      return testFairShare;
+    }
+
+    @Override
+    public long getStartTime() {
+      return startTime;
+    }
+  }
+
+  public void testFairShareComparator()
+  {
+    List<TestJobSchedulableSort> jobs = new ArrayList<TestJobSchedulableSort>();
+    final int iterations = 100;
+    int jobCount = 100;
+
+    Comparator<Schedulable> comparator = new
+        SchedulingAlgorithms.FairShareComparator();
+
+    TestJobSchedulableSort s1 = new TestJobSchedulableSort("job_Z", 100,
+        10, 4.0);
+    s1.updateWeight(1);
+    TestJobSchedulableSort s2 = new TestJobSchedulableSort("job_Y", 100,
+        0, 4.0);
+    s2.updateWeight(0.0);
+    TestJobSchedulableSort s3 = new TestJobSchedulableSort("job_X", 100,
+        1, 4.0);
+    s3.updateWeight(0.0);
+
+    // Verify that when weight is set to zero (s2 in this case),
+    // the ratio does not become NaN.
+    // because Math.signum(anyNumber - NaN) or Math.signum(NaN - anyNumber)
+    // is always zero. That would cause sorting to fail because it would use
+    // job names to sort when one number is NaN and not otherwise causing
+    // s1>s2 (by name) and s2>s3 (by name) but s1<s3 (by running task to
+    // weight)
+
+    // s2 has inifinite ratio and should compare larger
+    assertTrue(comparator.compare(s1, s2) < 0);
+    // s2 and s3 have infinite ratio and s2 should compare larger by name
+    assertTrue(comparator.compare(s2, s3) > 0);
+    // s3 has a higher running task to weight ratio (infinity)
+    assertTrue(comparator.compare(s1, s3) < 0);
+  }
   
   
   /**