Ver código fonte

MAPREDUCE-7020. Task timeout in uber mode can crash AM. Contributed by Peter Bacsko

Jason Lowe 7 anos atrás
pai
commit
1ef88c956e

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -18,6 +18,9 @@ Release 2.7.6 - UNRELEASED
     MAPREDUCE-7028. Concurrent task progress updates causing NPE in
     Application Master. (Gergo Repas via jlowe)
 
+    MAPREDUCE-7020. Task timeout in uber mode can crash AM. (Peter Bacsko
+    via jlowe)
+
 Release 2.7.5 - 2017-12-14
 
   INCOMPATIBLE CHANGES

+ 3 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java

@@ -343,8 +343,9 @@ public class TaskAttemptListenerImpl extends CompositeService
     AtomicReference<TaskAttemptStatus> lastStatusRef =
         attemptIdToStatus.get(yarnAttemptID);
     if (lastStatusRef == null) {
-      throw new IllegalStateException("Status update was called"
-          + " with illegal TaskAttemptId: " + yarnAttemptID);
+      LOG.error("Status update was called with illegal TaskAttemptId: "
+          + yarnAttemptID);
+      return false;
     }
 
     taskHeartbeatHandler.progressing(yarnAttemptID);

+ 4 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java

@@ -397,13 +397,15 @@ public class TestTaskAttemptListenerImpl {
     assertEquals(Phase.REDUCE, status.phase);
   }
 
-  @Test(expected = IllegalStateException.class)
+  @Test
   public void testStatusUpdateFromUnregisteredTask()
       throws IOException, InterruptedException{
     configureMocks();
     startListener(false);
 
-    listener.statusUpdate(attemptID, firstReduceStatus);
+    boolean taskFound = listener.statusUpdate(attemptID, firstReduceStatus);
+
+    assertFalse(taskFound);
   }
 
   private void configureMocks() {

+ 12 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java

@@ -736,6 +736,10 @@ abstract public class Task implements Writable, Configurable {
       int remainingRetries = MAX_RETRIES;
       // get current flag value and reset it as well
       boolean sendProgress = resetProgressFlag();
+
+      boolean uberized = conf.getBoolean("mapreduce.task.uberized",
+          false);
+
       while (!taskDone.get()) {
         synchronized (lock) {
           done = false;
@@ -770,9 +774,14 @@ abstract public class Task implements Writable, Configurable {
           // if Task Tracker is not aware of our task ID (probably because it died and 
           // came back up), kill ourselves
           if (!taskFound) {
-            LOG.warn("Parent died.  Exiting "+taskId);
-            resetDoneFlag();
-            System.exit(66);
+            if (uberized) {
+              taskDone.set(true);
+              break;
+            } else {
+              LOG.warn("Parent died.  Exiting "+taskId);
+              resetDoneFlag();
+              System.exit(66);
+            }
           }
 
           sendProgress = resetProgressFlag();