Forráskód Böngészése

MAPREDUCE-7020. Task timeout in uber mode can crash AM. Contributed by Peter Bacsko

(cherry picked from commit 6eef3d7f1a1e5e3f27fb3bf7596663640d786181)
Jason Lowe 7 éve
szülő
commit
d7acc45ce4

+ 5 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java

@@ -368,14 +368,16 @@ public class TaskAttemptListenerImpl extends CompositeService
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
         TypeConverter.toYarn(taskAttemptID);
 
+    AMFeedback feedback = new AMFeedback();
     AtomicReference<TaskAttemptStatus> lastStatusRef =
         attemptIdToStatus.get(yarnAttemptID);
     if (lastStatusRef == null) {
-      throw new IllegalStateException("Status update was called"
-          + " with illegal TaskAttemptId: " + yarnAttemptID);
+      LOG.error("Status update was called with illegal TaskAttemptId: "
+          + yarnAttemptID);
+      feedback.setTaskFound(false);
+      return feedback;
     }
 
-    AMFeedback feedback = new AMFeedback();
     feedback.setTaskFound(true);
 
     // Propagating preemption to the task if TASK_PREEMPTION is enabled

+ 4 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java

@@ -487,13 +487,15 @@ public class TestTaskAttemptListenerImpl {
     assertEquals(Phase.REDUCE, status.phase);
   }
 
-  @Test(expected = IllegalStateException.class)
+  @Test
   public void testStatusUpdateFromUnregisteredTask()
       throws IOException, InterruptedException{
     configureMocks();
     startListener(false);
 
-    listener.statusUpdate(attemptID, firstReduceStatus);
+    AMFeedback feedback = listener.statusUpdate(attemptID, firstReduceStatus);
+
+    assertFalse(feedback.getTaskFound());
   }
 
   private void configureMocks() {

+ 11 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java

@@ -790,6 +790,9 @@ abstract public class Task implements Writable, Configurable {
       long taskProgressInterval = MRJobConfUtil.
           getTaskProgressReportInterval(conf);
 
+      boolean uberized = conf.getBoolean("mapreduce.task.uberized",
+          false);
+
       while (!taskDone.get()) {
         synchronized (lock) {
           done = false;
@@ -828,9 +831,14 @@ abstract public class Task implements Writable, Configurable {
           // if Task Tracker is not aware of our task ID (probably because it died and 
           // came back up), kill ourselves
           if (!taskFound) {
-            LOG.warn("Parent died.  Exiting "+taskId);
-            resetDoneFlag();
-            System.exit(66);
+            if (uberized) {
+              taskDone.set(true);
+              break;
+            } else {
+              LOG.warn("Parent died.  Exiting "+taskId);
+              resetDoneFlag();
+              System.exit(66);
+            }
           }
 
           // Set a flag that says we should preempt this is read by