Переглянути джерело

MAPREDUCE-2129. Job may hang if mapreduce.job.committer.setup.cleanup.needed=false and mapreduce.map/reduce.failures.maxpercent>0 (Subroto Sanyal via tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1358233 13f79535-47bb-0310-9956-ffa450edef68
Thomas Graves 13 роки тому
батько
коміт
3c8cedf814

+ 4 - 0
CHANGES.txt

@@ -58,6 +58,10 @@ Release 1.2.0 - unreleased
 
     MAPREDUCE-4359. Potential deadlock in Counters. (tomwhite)
 
+    MAPREDUCE-2129. Job may hang if 
+    mapreduce.job.committer.setup.cleanup.needed=false and
+    mapreduce.map/reduce.failures.maxpercent>0 (Subroto Sanyal via tgraves)
+
 Release 1.1.0 - unreleased
 
   INCOMPATIBLE CHANGES

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -1619,7 +1619,7 @@ public class JobInProgress {
   }
   
   public synchronized boolean scheduleReduces() {
-    return finishedMapTasks >= completedMapsForReduceSlowstart;
+    return finishedMapTasks + failedMapTIPs >= completedMapsForReduceSlowstart;
   }
   
   /**

+ 32 - 1
src/test/org/apache/hadoop/mapred/TestJobInProgress.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.StaticMapping;
+import org.mockito.Mockito;
 
 import static org.junit.Assert.*;
 import org.junit.Test;
@@ -113,6 +114,16 @@ public class TestJobInProgress {
     launchTask(FailMapTaskJob.class, IdentityReducer.class);
     checkTaskCounts();
   }
+
+  /** 
+   * Test to ensure that the job works when slow start is used and 
+   * some tasks are allowed to fail  
+   */
+  @Test
+  public void testSlowStartAndFailurePercent() throws Exception {
+    launchTaskSlowStart(FailMapTaskJob.class, IdentityReducer.class);
+    checkTaskCounts();
+  }
   
   @Test
   public void testPendingReduceTaskCount() throws Exception {
@@ -246,6 +257,16 @@ public class TestJobInProgress {
     dfsCluster.shutdown();
   }
   
+  void launchTaskSlowStart(Class MapClass,Class ReduceClass) throws Exception{
+    JobConf job = configure(MapClass, ReduceClass, 5, 10, true);
+    // set it so no reducers start until all maps finished
+    job.setFloat("mapred.reduce.slowstart.completed.maps", 1.0f);
+    // allow all maps to fail
+    job.setInt("mapred.max.map.failures.percent", 100);
+    try {
+      JobClient.runJob(job);
+    } catch (IOException ioe) {}
+  }
 
   void launchTask(Class MapClass,Class ReduceClass) throws Exception{
     JobConf job = configure(MapClass, ReduceClass, 5, 10, true);
@@ -306,5 +327,15 @@ public class TestJobInProgress {
       }
     }
   }
-  
+
+  @Test
+  public void testScheduleReducesConsiderFailedMapTips() throws Exception {
+    JobInProgress jip = Mockito.mock(JobInProgress.class);
+    Mockito.when(jip.scheduleReduces()).thenCallRealMethod();
+    jip.failedMapTIPs = 10;
+    jip.finishedMapTasks = 50;
+    jip.completedMapsForReduceSlowstart = 60;
+    assertTrue("The Reduce is not scheduled", jip.scheduleReduces());
+  }
+
 }