Explorar o código

HADOOP-5075. Potential infinite loop in updateMinSlots.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@736917 13f79535-47bb-0310-9956-ffa450edef68
Matei Alexandru Zaharia %!s(int64=16) %!d(string=hai) anos
pai
achega
40b30c2e7a

+ 17 - 3
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java

@@ -480,7 +480,7 @@ public class FairScheduler extends TaskScheduler {
         }
       }
       mapWeightSums.put(pool.getName(), mapWeightSum);
-      reduceWeightSums.put(pool.getName(), mapWeightSum);
+      reduceWeightSums.put(pool.getName(), reduceWeightSum);
     }
     // And normalize the weights based on pool sums and pool weights
     // to share fairly across pools (proportional to their weights)
@@ -489,8 +489,16 @@ public class FairScheduler extends TaskScheduler {
       JobInfo info = entry.getValue();
       String pool = poolMgr.getPoolName(job);
       double poolWeight = poolMgr.getPoolWeight(pool);
-      info.mapWeight *= (poolWeight / mapWeightSums.get(pool)); 
-      info.reduceWeight *= (poolWeight / reduceWeightSums.get(pool));
+      double mapWeightSum = mapWeightSums.get(pool);
+      double reduceWeightSum = reduceWeightSums.get(pool);
+      if (mapWeightSum == 0)
+        info.mapWeight = 0;
+      else
+        info.mapWeight *= (poolWeight / mapWeightSum); 
+      if (reduceWeightSum == 0)
+        info.reduceWeight = 0;
+      else
+        info.reduceWeight *= (poolWeight / reduceWeightSum); 
     }
   }
   
@@ -555,6 +563,12 @@ public class FairScheduler extends TaskScheduler {
               int share = (int) Math.ceil(oldSlots * weight / totalWeight);
               slotsLeft = giveMinSlots(job, type, slotsLeft, share);
             }
+            if (slotsLeft > 0) {
+              LOG.warn("Had slotsLeft = " + slotsLeft + " after the final "
+                  + "loop in updateMinSlots. This probably means some fair "
+                  + "scheduler weights are being set to NaN or Infinity.");
+            }
+            break;
           }
         }
       }

+ 45 - 0
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

@@ -1151,6 +1151,51 @@ public class TestFairScheduler extends TestCase {
     assertEquals(0.28,  info4.reduceFairShare, 0.01);
   }
 
+  /**
+   * This test submits jobs in two pools, poolA and poolB. None of the
+   * jobs in poolA have maps, but this should not affect their reduce
+   * share.
+   */
+  public void testPoolWeightsWhenNoMaps() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<pool name=\"poolA\">");
+    out.println("<weight>2.0</weight>");
+    out.println("</pool>");
+    out.println("<pool name=\"poolB\">");
+    out.println("<weight>1.0</weight>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    
+    // Submit jobs, advancing time in-between to make sure that they are
+    // all submitted at distinct times.
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
+    JobInfo info1 = scheduler.infos.get(job1);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
+    JobInfo info2 = scheduler.infos.get(job2);
+    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
+    JobInfo info3 = scheduler.infos.get(job3);
+    advanceTime(10);
+    
+    assertEquals(0,     info1.mapWeight, 0.01);
+    assertEquals(1.0,   info1.reduceWeight, 0.01);
+    assertEquals(0,     info2.mapWeight, 0.01);
+    assertEquals(1.0,   info2.reduceWeight, 0.01);
+    assertEquals(1.0,   info3.mapWeight, 0.01);
+    assertEquals(1.0,   info3.reduceWeight, 0.01);
+    
+    assertEquals(0,     info1.mapFairShare, 0.01);
+    assertEquals(1.33,  info1.reduceFairShare, 0.01);
+    assertEquals(0,     info2.mapFairShare, 0.01);
+    assertEquals(1.33,  info2.reduceFairShare, 0.01);
+    assertEquals(4,     info3.mapFairShare, 0.01);
+    assertEquals(1.33,  info3.reduceFairShare, 0.01);
+  }
+
   /**
    * Tests that max-running-tasks per node are set by assigning load
    * equally accross the cluster in CapBasedLoadManager.