Ver código fonte

Zero map split input length combine with none zero map split input length will cause MR1 job hung. (zxu via rkanter)

Robert Kanter 10 anos atrás
pai
commit
5f5138e5b3

+ 3 - 0
CHANGES.txt

@@ -282,6 +282,9 @@ Release 1.3.0 - unreleased
 
     HDFS-6649. Documentation for setrep is wrong. (aajisaka)
 
+    Zero map split input length combine with none zero map split input
+    length will cause MR1 job hung. (zxu via rkanter)
+
 Release 1.2.2 - unreleased
 
   INCOMPATIBLE CHANGES

+ 5 - 1
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -938,7 +938,11 @@ public class JobInProgress {
   long getInputLength() {
     return inputLength;
   }
- 
+
+  void setInputLength(long length) {
+    inputLength = length;
+  }
+
   boolean isCleanupLaunched() {
     return launchedCleanup;
   }

+ 10 - 2
src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java

@@ -52,8 +52,16 @@ class ResourceEstimator {
     //-1 indicates error, which we don't average in.
     if(tip.isMapTask() &&  ts.getOutputSize() != -1)  {
       completedMapsUpdates++;
-
-      completedMapsInputSize+=(tip.getMapInputSize()+1);
+      long inputSize = tip.getMapInputSize();
+      if (inputSize == 0) {
+        // if map input size is 0, use map output size as input size
+        // to avoid job hung.
+        inputSize = ts.getOutputSize();
+        // map input size is changed, update JobInProgress.inputLength.
+        long length = job.getInputLength() + inputSize;
+        job.setInputLength(length);
+      }
+      completedMapsInputSize+=(inputSize+1);
       completedMapsOutputSize+=ts.getOutputSize();
 
       if(LOG.isDebugEnabled()) {

+ 10 - 8
src/test/org/apache/hadoop/mapred/TestResourceEstimation.java

@@ -55,12 +55,13 @@ public class TestResourceEstimation {
     //unfortunately, we can't set job input size from here.
     ResourceEstimator re = new ResourceEstimator(jip);
     
-    for(int i = 0; i < maps / 10 ; ++i) {
-
-      long estOutSize = re.getEstimatedMapOutputSize();
-      System.out.println(estOutSize);
-      assertEquals(0, estOutSize);
-      
+    for(int i = 0; i < maps; ++i) {
+      if (i < maps / 10) {
+        // re.thresholdToUse is maps / 10
+        long estOutSize = re.getEstimatedMapOutputSize();
+        System.out.println(estOutSize);
+        assertEquals(0, estOutSize);
+      }
       TaskStatus ts = new MapTaskStatus();
       ts.setOutputSize(singleMapOutputSize);
       JobSplit.TaskSplitMetaInfo split =
@@ -120,9 +121,10 @@ public class TestResourceEstimation {
     TaskInProgress tip = 
       new TaskInProgress(jid, "", split, jip.jobtracker, jc, jip, 0, 1);
     re.updateWithCompletedTask(ts, tip);
-    
+    // for 0 input size, use output size as input size for calculation
     long expectedTotalMapOutSize = (singleMapOutputSize*11) * 
-      ((maps*singleMapInputSize)+maps)/((singleMapInputSize+1)*10+1);
+      ((maps*singleMapInputSize)+maps)/((singleMapInputSize+1)*10+
+      singleMapOutputSize+1);
     assertEquals(2* expectedTotalMapOutSize/maps, re.getEstimatedMapOutputSize());
   }