Browse Source

MAPREDUCE-5288. ResourceEstimator#getEstimatedTotalMapOutputSize suffers from divide by zero issues. (kkambatl via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1506756 13f79535-47bb-0310-9956-ffa450edef68
Alejandro Abdelnur 12 years ago
parent
commit
465d88fce7

+ 3 - 0
CHANGES.txt

@@ -97,6 +97,9 @@ Release 1.3.0 - unreleased
     MAPREDUCE-5405. Job recovery can fail if task log directory symlink from
     prior run still exists. (cnauroth)
 
+    MAPREDUCE-5288. ResourceEstimator#getEstimatedTotalMapOutputSize suffers 
+    from divide by zero issues. (kkambatl via tucu)
+
 Release 1.2.1 - 2013.07.06
 
   INCOMPATIBLE CHANGES

+ 1 - 2
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -47,7 +47,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.Counters.CountersExceededException;
 import org.apache.hadoop.mapred.JobHistory.Values;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
@@ -779,7 +778,7 @@ public class JobInProgress {
            numMapTasks));
     
     // ... use the same for estimating the total output of all maps
-    resourceEstimator.setThreshhold(completedMapsForReduceSlowstart);
+    resourceEstimator.setThreshold(completedMapsForReduceSlowstart);
     
     // create cleanup two cleanup tips, one map and one reduce.
     cleanup = new TaskInProgress[2];

+ 10 - 6
src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java

@@ -39,11 +39,11 @@ class ResourceEstimator {
 
   private int completedMapsUpdates;
   final private JobInProgress job;
-  private int threshholdToUse;
+  int thresholdToUse;
 
   public ResourceEstimator(JobInProgress job) {
     this.job = job;
-    threshholdToUse = job.desiredMaps()/ 10;
+    thresholdToUse = Math.max(1, job.desiredMaps()/ 10);
   }
 
   protected synchronized void updateWithCompletedTask(TaskStatus ts, 
@@ -68,14 +68,14 @@ class ResourceEstimator {
    * @return estimated length of this job's total map output
    */
   protected synchronized long getEstimatedTotalMapOutputSize()  {
-    if(completedMapsUpdates < threshholdToUse) {
+    if(completedMapsUpdates < thresholdToUse) {
       return 0;
     } else {
       long inputSize = job.getInputLength() + job.desiredMaps(); 
       //add desiredMaps() so that randomwriter case doesn't blow up
       //the multiplication might lead to overflow, casting it with
       //double prevents it
-      long estimate = Math.round(((double)inputSize * 
+      long estimate = Math.round(((double)inputSize *
           completedMapsOutputSize * 2.0)/completedMapsInputSize);
       if (LOG.isDebugEnabled()) {
         LOG.debug("estimate total map output will be " + estimate);
@@ -115,7 +115,11 @@ class ResourceEstimator {
    * that we can get right estimates before we reach these number
    * of maps.
    */
-  void setThreshhold(int numMaps) {
-    threshholdToUse = Math.min(threshholdToUse, numMaps);
+  void setThreshold(int numMaps) {
+    if (numMaps == 0) {
+      thresholdToUse = 1;
+    } else {
+      thresholdToUse = Math.min(thresholdToUse, numMaps);
+    }
   }
 }

+ 23 - 4
src/test/org/apache/hadoop/mapred/TestResourceEstimation.java

@@ -17,12 +17,30 @@
  */
 package org.apache.hadoop.mapred;
 
-import junit.framework.TestCase;
+import static org.junit.Assert.*;
+
 import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.junit.Test;
+import org.mockito.Mockito;
 
-public class TestResourceEstimation extends TestCase {
-  
+public class TestResourceEstimation {
+
+  @Test (timeout = 1000)
+  public void testThresholdToUse() {
+    JobInProgress jip = Mockito.mock(JobInProgress.class);
+    Mockito.when(jip.desiredMaps()).thenReturn(4);
+
+    ResourceEstimator re = new ResourceEstimator(jip);
+    assertEquals("Incorrect thresholdToUse", 1, re.thresholdToUse);
 
+    re.setThreshold(10);
+    assertEquals("Incorrect thresholdToUse", 1, re.thresholdToUse);
+
+    re.setThreshold(0);
+    assertEquals("Incorrect thresholdToUse", 1, re.thresholdToUse);
+  }
+  
+  @Test(timeout = 10000)
   public void testResourceEstimator() throws Exception {
     final int maps = 100;
     final int reduces = 2;
@@ -55,7 +73,8 @@ public class TestResourceEstimation extends TestCase {
     assertEquals(2* singleMapOutputSize * maps / reduces, re.getEstimatedReduceInputSize());
     
   }
-  
+
+  @Test (timeout = 10000)
   public void testWithNonZeroInput() throws Exception {
     final int maps = 100;
     final int reduces = 2;