瀏覽代碼

MAPREDUCE-5692. Add explicit diagnostics when a task attempt is killed due to speculative execution (Gera Shegalov via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1552799 13f79535-47bb-0310-9956-ffa450edef68
Sanford Ryza 11 年之前
父節點
當前提交
d7e028de1a

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -44,6 +44,9 @@ Release 2.4.0 - UNRELEASED
     MAPREDUCE-5052. Job History UI and web services confusing job start time and
     job submit time (Chen He via jeagles)
 
+    MAPREDUCE-5692. Add explicit diagnostics when a task attempt is killed due
+    to speculative execution (Gera Shegalov via Sandy Ryza)
+
   OPTIMIZATIONS
 
     MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)

+ 18 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -1552,6 +1552,12 @@ public abstract class TaskAttemptImpl implements
         TaskAttemptEvent event) {
       //set the finish time
       taskAttempt.setFinishTime();
+
+      if (event instanceof TaskAttemptKillEvent) {
+        taskAttempt.addDiagnosticInfo(
+            ((TaskAttemptKillEvent) event).getMessage());
+      }
+
       //send the deallocate event to ContainerAllocator
       taskAttempt.eventHandler.handle(
           new ContainerAllocatorEvent(taskAttempt.attemptId,
@@ -1855,6 +1861,12 @@ public abstract class TaskAttemptImpl implements
         LOG.debug("Not generating HistoryFinish event since start event not " +
             "generated for taskAttempt: " + taskAttempt.getID());
       }
+
+      if (event instanceof TaskAttemptKillEvent) {
+        taskAttempt.addDiagnosticInfo(
+            ((TaskAttemptKillEvent) event).getMessage());
+      }
+
 //      taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
@@ -1872,6 +1884,12 @@ public abstract class TaskAttemptImpl implements
       // for it
       taskAttempt.taskAttemptListener.unregister(
           taskAttempt.attemptId, taskAttempt.jvmID);
+
+      if (event instanceof TaskAttemptKillEvent) {
+        taskAttempt.addDiagnosticInfo(
+            ((TaskAttemptKillEvent) event).getMessage());
+      }
+
       taskAttempt.reportedStatus.progress = 1.0f;
       taskAttempt.updateProgressSplits();
       //send the cleanup event to containerLauncher

+ 7 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java

@@ -69,6 +69,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobMapTaskRescheduledEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
@@ -100,6 +101,7 @@ import com.google.common.annotations.VisibleForTesting;
 public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   private static final Log LOG = LogFactory.getLog(TaskImpl.class);
+  private static final String SPECULATION = "Speculation: ";
 
   protected final JobConf conf;
   protected final Path jobFile;
@@ -906,8 +908,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
         LOG.info(task.commitAttempt
             + " already given a go for committing the task output, so killing "
             + attemptID);
-        task.eventHandler.handle(new TaskAttemptEvent(
-            attemptID, TaskAttemptEventType.TA_KILL));
+        task.eventHandler.handle(new TaskAttemptKillEvent(attemptID,
+            SPECULATION + task.commitAttempt + " committed first!"));
       }
     }
   }
@@ -932,9 +934,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
             //  other reasons.
             !attempt.isFinished()) {
           LOG.info("Issuing kill to other attempt " + attempt.getID());
-          task.eventHandler.handle(
-              new TaskAttemptEvent(attempt.getID(), 
-                  TaskAttemptEventType.TA_KILL));
+          task.eventHandler.handle(new TaskAttemptKillEvent(attempt.getID(),
+              SPECULATION + task.successfulAttempt + " succeeded first!"));
         }
       }
       task.finished(TaskStateInternal.SUCCEEDED);
@@ -1199,8 +1200,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
   private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
     if (attempt != null && !attempt.isFinished()) {
       eventHandler.handle(
-          new TaskAttemptEvent(attempt.getID(),
-              TaskAttemptEventType.TA_KILL));
+          new TaskAttemptKillEvent(attempt.getID(), logMsg));
     }
   }
 

+ 36 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2;
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Random;
@@ -106,17 +107,21 @@ public class TestSpeculativeExecutionWithMRApp {
 
     int maxTimeWait = 10;
     boolean successfullySpeculated = false;
+    TaskAttempt[] ta = null;
     while (maxTimeWait > 0 && !successfullySpeculated) {
       if (taskToBeSpeculated.getAttempts().size() != 2) {
         Thread.sleep(1000);
         clock.setTime(System.currentTimeMillis() + 20000);
       } else {
         successfullySpeculated = true;
+        // finish 1st TA, 2nd will be killed
+        ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated);
       }
       maxTimeWait--;
     }
     Assert
       .assertTrue("Couldn't speculate successfully", successfullySpeculated);
+    verifySpeculationMessage(app, ta);
   }
 
   @Test(timeout = 60000)
@@ -197,16 +202,47 @@ public class TestSpeculativeExecutionWithMRApp {
 
     int maxTimeWait = 5;
     boolean successfullySpeculated = false;
+    TaskAttempt[] ta = null;
     while (maxTimeWait > 0 && !successfullySpeculated) {
       if (speculatedTask.getAttempts().size() != 2) {
         Thread.sleep(1000);
       } else {
         successfullySpeculated = true;
+        ta = makeFirstAttemptWin(appEventHandler, speculatedTask);
       }
       maxTimeWait--;
     }
     Assert
       .assertTrue("Couldn't speculate successfully", successfullySpeculated);
+    verifySpeculationMessage(app, ta);
+  }
+
+  private static TaskAttempt[] makeFirstAttemptWin(
+      EventHandler appEventHandler, Task speculatedTask) {
+
+    // finish 1st TA, 2nd will be killed
+    Collection<TaskAttempt> attempts = speculatedTask.getAttempts().values();
+    TaskAttempt[] ta = new TaskAttempt[attempts.size()];
+    attempts.toArray(ta);
+    appEventHandler.handle(
+        new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE));
+    appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(),
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    return ta;
+  }
+
+  private static void verifySpeculationMessage(MRApp app, TaskAttempt[] ta)
+      throws Exception {
+    app.waitForState(ta[0], TaskAttemptState.SUCCEEDED);
+    app.waitForState(ta[1], TaskAttemptState.KILLED);
+    boolean foundSpecMsg = false;
+    for (String msg : ta[1].getDiagnostics()) {
+      if (msg.contains("Speculation")) {
+        foundSpecMsg = true;
+        break;
+      }
+    }
+    Assert.assertTrue("No speculation diagnostics!", foundSpecMsg);
   }
 
   private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id,