瀏覽代碼

MAPREDUCE-5079 from trunk. Changes job recovery to restore state directly from job history, instaed of simulating state machine events. Contributed by Jason Lowe and Robert Parker.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1466771 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth 12 年之前
父節點
當前提交
21e21f7f7d
共有 21 個文件被更改,包括 1131 次插入752 次删除
  1. 4 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 116 58
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
  3. 23 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java
  4. 1 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
  5. 50 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptRecoverEvent.java
  6. 1 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java
  7. 50 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskRecoverEvent.java
  8. 24 9
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
  9. 2 7
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
  10. 2 7
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
  11. 152 28
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  12. 182 85
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
  13. 0 39
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java
  14. 0 480
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
  15. 5 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
  16. 474 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
  17. 2 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
  18. 21 17
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
  19. 5 14
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
  20. 16 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
  21. 1 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

+ 4 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -37,6 +37,10 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-4875. coverage fixing for org.apache.hadoop.mapred
     (Aleksey Gorshkov via bobby)
 
+    MAPREDUCE-5079. Changes job recovery to restore state directly from job
+    history, instaed of simulating state machine events.
+    (Jason Lowe and Robert Parker via sseth)
+
   OPTIMIZATIONS
 
     MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the

+ 116 - 58
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -24,9 +24,12 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.io.IOUtils;
@@ -46,6 +49,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.EventReader;
@@ -54,6 +58,9 @@ import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -61,6 +68,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
@@ -74,6 +82,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
@@ -84,8 +93,6 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
 import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
-import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
-import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
@@ -94,6 +101,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -166,7 +174,6 @@ public class MRAppMaster extends CompositeService {
   private AppContext context;
   private Dispatcher dispatcher;
   private ClientService clientService;
-  private Recovery recoveryServ;
   private ContainerAllocator containerAllocator;
   private ContainerLauncher containerLauncher;
   private EventHandler<CommitterEvent> committerEventHandler;
@@ -179,7 +186,6 @@ public class MRAppMaster extends CompositeService {
   private OutputCommitter committer;
   private JobEventDispatcher jobEventDispatcher;
   private JobHistoryEventHandler jobHistoryEventHandler;
-  private boolean inRecovery = false;
   private SpeculatorEventDispatcher speculatorEventDispatcher;
 
   private Job job;
@@ -192,6 +198,8 @@ public class MRAppMaster extends CompositeService {
   private String shutDownMessage = null;
   JobStateInternal forcedState = null;
 
+  private long recoveredJobStartTime = 0;
+
   public MRAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
       long appSubmitTime) {
@@ -337,34 +345,9 @@ public class MRAppMaster extends CompositeService {
       }
     } else {
       committer = createOutputCommitter(conf);
-      boolean recoveryEnabled = conf.getBoolean(
-          MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
-      boolean recoverySupportedByCommitter = committer.isRecoverySupported();
-
-      // If a shuffle secret was not provided by the job client then this app
-      // attempt will generate one.  However that disables recovery if there
-      // are reducers as the shuffle secret would be app attempt specific.
-      boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
-          TokenCache.getShuffleSecretKey(fsTokens) != null);
-
-      if (recoveryEnabled && recoverySupportedByCommitter
-          && shuffleKeyValidForRecovery && appAttemptID.getAttemptId() > 1) {
-        LOG.info("Recovery is enabled. "
-            + "Will try to recover from previous life on best effort basis.");
-        recoveryServ = createRecoveryService(context);
-        addIfService(recoveryServ);
-        dispatcher = recoveryServ.getDispatcher();
-        clock = recoveryServ.getClock();
-        inRecovery = true;
-      } else {
-        LOG.info("Not starting RecoveryService: recoveryEnabled: "
-            + recoveryEnabled + " recoverySupportedByCommitter: "
-            + recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
-            + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
-            + appAttemptID.getAttemptId());
-        dispatcher = createDispatcher();
-        addIfService(dispatcher);
-      }
+
+      dispatcher = createDispatcher();
+      addIfService(dispatcher);
 
       //service to handle requests from JobClient
       clientService = createClientService(context);
@@ -586,15 +569,6 @@ public class MRAppMaster extends CompositeService {
     return new JobFinishEventHandler();
   }
 
-  /**
-   * Create the recovery service.
-   * @return an instance of the recovery service.
-   */
-  protected Recovery createRecoveryService(AppContext appContext) {
-    return new RecoveryService(appContext.getApplicationAttemptId(),
-        appContext.getClock(), getCommitter(), isNewApiCommitter());
-  }
-
   /** Create and initialize (but don't start) a single job. 
    * @param forcedState a state to force the job into or null for normal operation. 
    * @param diagnostic a diagnostic message to include with the job.
@@ -606,7 +580,8 @@ public class MRAppMaster extends CompositeService {
     Job newJob =
         new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
             taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
-            completedTasksFromPreviousRun, metrics, newApiCommitter,
+            completedTasksFromPreviousRun, metrics,
+            committer, newApiCommitter,
             currentUser.getUserName(), appSubmitTime, amInfos, context, 
             forcedState, diagnostic);
     ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
@@ -965,18 +940,8 @@ public class MRAppMaster extends CompositeService {
   public void start() {
 
     amInfos = new LinkedList<AMInfo>();
-
-    // Pull completedTasks etc from recovery
-    if (inRecovery) {
-      completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
-      amInfos = recoveryServ.getAMInfos();
-    } else {
-      // Get the amInfos anyways irrespective of whether recovery is enabled or
-      // not IF this is not the first AM generation
-      if (appAttemptID.getAttemptId() != 1) {
-        amInfos.addAll(readJustAMInfos());
-      }
-    }
+    completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>();
+    processRecovery();
 
     // Current an AMInfo for the current AM generation.
     AMInfo amInfo =
@@ -1038,13 +1003,105 @@ public class MRAppMaster extends CompositeService {
     startJobs();
   }
 
+  private void processRecovery() {
+    if (appAttemptID.getAttemptId() == 1) {
+      return;  // no need to recover on the first attempt
+    }
+
+    boolean recoveryEnabled = getConfig().getBoolean(
+        MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
+        MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
+    boolean recoverySupportedByCommitter =
+        committer != null && committer.isRecoverySupported();
+
+    // If a shuffle secret was not provided by the job client then this app
+    // attempt will generate one.  However that disables recovery if there
+    // are reducers as the shuffle secret would be app attempt specific.
+    int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
+    boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
+        TokenCache.getShuffleSecretKey(fsTokens) != null);
+
+    if (recoveryEnabled && recoverySupportedByCommitter
+        && shuffleKeyValidForRecovery) {
+      LOG.info("Recovery is enabled. "
+          + "Will try to recover from previous life on best effort basis.");
+      try {
+        parsePreviousJobHistory();
+      } catch (IOException e) {
+        LOG.warn("Unable to parse prior job history, aborting recovery", e);
+        // try to get just the AMInfos
+        amInfos.addAll(readJustAMInfos());
+      }
+    } else {
+      LOG.info("Will not try to recover. recoveryEnabled: "
+            + recoveryEnabled + " recoverySupportedByCommitter: "
+            + recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
+            + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
+            + appAttemptID.getAttemptId());
+      // Get the amInfos anyways whether recovery is enabled or not
+      amInfos.addAll(readJustAMInfos());
+    }
+  }
+
+  private static FSDataInputStream getPreviousJobHistoryStream(
+      Configuration conf, ApplicationAttemptId appAttemptId)
+      throws IOException {
+    Path historyFile = JobHistoryUtils.getPreviousJobHistoryPath(conf,
+        appAttemptId);
+    LOG.info("Previous history file is at " + historyFile);
+    return historyFile.getFileSystem(conf).open(historyFile);
+  }
+
+  private void parsePreviousJobHistory() throws IOException {
+    FSDataInputStream in = getPreviousJobHistoryStream(getConfig(),
+        appAttemptID);
+    JobHistoryParser parser = new JobHistoryParser(in);
+    JobInfo jobInfo = parser.parse();
+    Exception parseException = parser.getParseException();
+    if (parseException != null) {
+      LOG.info("Got an error parsing job-history file" +
+          ", ignoring incomplete events.", parseException);
+    }
+    Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
+        .getAllTasks();
+    for (TaskInfo taskInfo : taskInfos.values()) {
+      if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
+        Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator =
+            taskInfo.getAllTaskAttempts().entrySet().iterator();
+        while (taskAttemptIterator.hasNext()) {
+          Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next();
+          if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) {
+            taskAttemptIterator.remove();
+          }
+        }
+        completedTasksFromPreviousRun
+            .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
+        LOG.info("Read from history task "
+            + TypeConverter.toYarn(taskInfo.getTaskId()));
+      }
+    }
+    LOG.info("Read completed tasks from history "
+        + completedTasksFromPreviousRun.size());
+    recoveredJobStartTime = jobInfo.getLaunchTime();
+
+    // recover AMInfos
+    List<JobHistoryParser.AMInfo> jhAmInfoList = jobInfo.getAMInfos();
+    if (jhAmInfoList != null) {
+      for (JobHistoryParser.AMInfo jhAmInfo : jhAmInfoList) {
+        AMInfo amInfo = MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
+            jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
+            jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
+            jhAmInfo.getNodeManagerHttpPort());
+        amInfos.add(amInfo);
+      }
+    }
+  }
+
   private List<AMInfo> readJustAMInfos() {
     List<AMInfo> amInfos = new ArrayList<AMInfo>();
     FSDataInputStream inputStream = null;
     try {
-      inputStream =
-          RecoveryService.getPreviousJobHistoryFileStream(getConfig(),
-            appAttemptID);
+      inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID);
       EventReader jobHistoryEventReader = new EventReader(inputStream);
 
       // All AMInfos are contiguous. Track when the first AMStartedEvent
@@ -1095,7 +1152,8 @@ public class MRAppMaster extends CompositeService {
   @SuppressWarnings("unchecked")
   protected void startJobs() {
     /** create a job-start event to get this ball rolling */
-    JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
+    JobEvent startJobEvent = new JobStartEvent(job.getID(),
+        recoveredJobStartTime);
     /** send the job-start event. this triggers the job execution. */
     dispatcher.getEventHandler().handle(startJobEvent);
   }

+ 23 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/package-info.java → hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java

@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,6 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-@InterfaceAudience.Private
-package org.apache.hadoop.mapreduce.v2.app.recover;
-import org.apache.hadoop.classification.InterfaceAudience;
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+
+public class JobStartEvent extends JobEvent {
+
+  long recoveredJobStartTime;
+
+  public JobStartEvent(JobId jobID) {
+    this(jobID, 0);
+  }
+
+  public JobStartEvent(JobId jobID, long recoveredJobStartTime) {
+    super(jobID, JobEventType.JOB_START);
+    this.recoveredJobStartTime = recoveredJobStartTime;
+  }
+
+  public long getRecoveredJobStartTime() {
+    return recoveredJobStartTime;
+  }
+}

+ 1 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java

@@ -26,6 +26,7 @@ public enum TaskAttemptEventType {
   //Producer:Task
   TA_SCHEDULE,
   TA_RESCHEDULE,
+  TA_RECOVER,
 
   //Producer:Client, Task
   TA_KILL,

+ 50 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptRecoverEvent.java

@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+public class TaskAttemptRecoverEvent extends TaskAttemptEvent {
+
+  private TaskAttemptInfo taInfo;
+  private OutputCommitter committer;
+  private boolean recoverAttemptOutput;
+
+  public TaskAttemptRecoverEvent(TaskAttemptId id, TaskAttemptInfo taInfo,
+      OutputCommitter committer, boolean recoverOutput) {
+    super(id, TaskAttemptEventType.TA_RECOVER);
+    this.taInfo = taInfo;
+    this.committer = committer;
+    this.recoverAttemptOutput = recoverOutput;
+  }
+
+  public TaskAttemptInfo getTaskAttemptInfo() {
+    return taInfo;
+  }
+
+  public OutputCommitter getCommitter() {
+    return committer;
+  }
+
+  public boolean getRecoverOutput() {
+    return recoverAttemptOutput;
+  }
+}

+ 1 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java

@@ -28,6 +28,7 @@ public enum TaskEventType {
 
   //Producer:Job
   T_SCHEDULE,
+  T_RECOVER,
 
   //Producer:Speculator
   T_ADD_SPEC_ATTEMPT,

+ 50 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskRecoverEvent.java

@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+
+public class TaskRecoverEvent extends TaskEvent {
+
+  private TaskInfo taskInfo;
+  private OutputCommitter committer;
+  private boolean recoverTaskOutput;
+
+  public TaskRecoverEvent(TaskId taskID, TaskInfo taskInfo,
+      OutputCommitter committer, boolean recoverTaskOutput) {
+    super(taskID, TaskEventType.T_RECOVER);
+    this.taskInfo = taskInfo;
+    this.committer = committer;
+    this.recoverTaskOutput = recoverTaskOutput;
+  }
+
+  public TaskInfo getTaskInfo() {
+    return taskInfo;
+  }
+
+  public OutputCommitter getOutputCommitter() {
+    return committer;
+  }
+
+  public boolean getRecoverTaskOutput() {
+    return recoverTaskOutput;
+  }
+}

+ 24 - 9
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -91,6 +92,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
@@ -98,6 +100,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@@ -153,6 +156,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   private final Lock writeLock;
   private final JobId jobId;
   private final String jobName;
+  private final OutputCommitter committer;
   private final boolean newApiCommitter;
   private final org.apache.hadoop.mapreduce.JobID oldJobId;
   private final TaskAttemptListener taskAttemptListener;
@@ -523,7 +527,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
       JobTokenSecretManager jobTokenSecretManager,
       Credentials fsTokenCredentials, Clock clock,
       Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
-      boolean newApiCommitter, String userName,
+      OutputCommitter committer, boolean newApiCommitter, String userName,
       long appSubmitTime, List<AMInfo> amInfos, AppContext appContext,
       JobStateInternal forcedState, String forcedDiagnostic) {
     this.applicationAttemptId = applicationAttemptId;
@@ -539,6 +543,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default");
     this.appSubmitTime = appSubmitTime;
     this.oldJobId = TypeConverter.fromYarn(jobId);
+    this.committer = committer;
     this.newApiCommitter = newApiCommitter;
 
     this.taskAttemptListener = taskAttemptListener;
@@ -809,10 +814,16 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     }
   }
 
-  protected void scheduleTasks(Set<TaskId> taskIDs) {
+  protected void scheduleTasks(Set<TaskId> taskIDs,
+      boolean recoverTaskOutput) {
     for (TaskId taskID : taskIDs) {
-      eventHandler.handle(new TaskEvent(taskID, 
-          TaskEventType.T_SCHEDULE));
+      TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID);
+      if (taskInfo != null) {
+        eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo,
+            committer, recoverTaskOutput));
+      } else {
+        eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE));
+      }
     }
   }
 
@@ -1263,7 +1274,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
                 job.conf, splits[i], 
                 job.taskAttemptListener, 
                 job.jobToken, job.fsTokens,
-                job.clock, job.completedTasksFromPreviousRun, 
+                job.clock,
                 job.applicationAttemptId.getAttemptId(),
                 job.metrics, job.appContext);
         job.addTask(task);
@@ -1281,7 +1292,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
                 job.conf, job.numMapTasks, 
                 job.taskAttemptListener, job.jobToken,
                 job.fsTokens, job.clock,
-                job.completedTasksFromPreviousRun, 
                 job.applicationAttemptId.getAttemptId(),
                 job.metrics, job.appContext);
         job.addTask(task);
@@ -1317,8 +1327,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     @Override
     public void transition(JobImpl job, JobEvent event) {
       job.setupProgress = 1.0f;
-      job.scheduleTasks(job.mapTasks);  // schedule (i.e., start) the maps
-      job.scheduleTasks(job.reduceTasks);
+      job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
+      job.scheduleTasks(job.reduceTasks, true);
 
       // If we have no tasks, just transition to job completed
       if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
@@ -1349,7 +1359,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
      */
     @Override
     public void transition(JobImpl job, JobEvent event) {
-      job.startTime = job.clock.getTime();
+      JobStartEvent jse = (JobStartEvent) event;
+      if (jse.getRecoveredJobStartTime() != 0) {
+        job.startTime = jse.getRecoveredJobStartTime();
+      } else {
+        job.startTime = job.clock.getTime();
+      }
       JobInitedEvent jie =
         new JobInitedEvent(job.oldJobId,
              job.startTime,

+ 2 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java

@@ -18,17 +18,13 @@
 
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
-import java.util.Map;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapTaskAttemptImpl;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -49,11 +45,10 @@ public class MapTaskImpl extends TaskImpl {
       TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
-      Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
-      MRAppMetrics metrics, AppContext appContext) {
+      int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
     super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
         conf, taskAttemptListener, jobToken, credentials, clock,
-        completedTasksFromPreviousRun, startCount, metrics, appContext);
+        appAttemptId, metrics, appContext);
     this.taskSplitMetaInfo = taskSplitMetaInfo;
   }
 

+ 2 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java

@@ -18,16 +18,12 @@
 
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
-import java.util.Map;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -47,11 +43,10 @@ public class ReduceTaskImpl extends TaskImpl {
       int numMapTasks, TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
-      Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
-      MRAppMetrics metrics, AppContext appContext) {
+      int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
     super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
         taskAttemptListener, jobToken, credentials, clock,
-        completedTasksFromPreviousRun, startCount, metrics, appContext);
+        appAttemptId, metrics, appContext);
     this.numMapTasks = numMapTasks;
   }
 

+ 152 - 28
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -56,10 +56,12 @@ import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
@@ -86,6 +88,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
@@ -121,6 +124,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -196,6 +200,11 @@ public abstract class TaskAttemptImpl implements
          TaskAttemptEventType.TA_KILL, new KilledTransition())
      .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
+     .addTransition(TaskAttemptStateInternal.NEW,
+         EnumSet.of(TaskAttemptStateInternal.FAILED,
+             TaskAttemptStateInternal.KILLED,
+             TaskAttemptStateInternal.SUCCEEDED),
+         TaskAttemptEventType.TA_RECOVER, new RecoverTransition())
      .addTransition(TaskAttemptStateInternal.NEW,
           TaskAttemptStateInternal.NEW,
           TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
@@ -1019,6 +1028,103 @@ public abstract class TaskAttemptImpl implements
     }
   }
 
+  @SuppressWarnings("unchecked")
+  public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo,
+      OutputCommitter committer, boolean recoverOutput) {
+    containerID = taInfo.getContainerId();
+    containerNodeId = ConverterUtils.toNodeId(taInfo.getHostname() + ":"
+        + taInfo.getPort());
+    containerMgrAddress = StringInterner.weakIntern(
+        containerNodeId.toString());
+    nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
+        + taInfo.getHttpPort());
+    nodeRackName = RackResolver.resolve(
+        containerNodeId.getHost()).getNetworkLocation();
+    launchTime = taInfo.getStartTime();
+    finishTime = (taInfo.getFinishTime() != -1) ?
+        taInfo.getFinishTime() : clock.getTime();
+    shufflePort = taInfo.getShufflePort();
+    trackerName = taInfo.getHostname();
+    httpPort = taInfo.getHttpPort();
+    sendLaunchedEvents();
+
+    reportedStatus.id = attemptId;
+    reportedStatus.progress = 1.0f;
+    reportedStatus.counters = taInfo.getCounters();
+    reportedStatus.stateString = taInfo.getState();
+    reportedStatus.phase = Phase.CLEANUP;
+    reportedStatus.mapFinishTime = taInfo.getMapFinishTime();
+    reportedStatus.shuffleFinishTime = taInfo.getShuffleFinishTime();
+    reportedStatus.sortFinishTime = taInfo.getSortFinishTime();
+    addDiagnosticInfo(taInfo.getError());
+
+    boolean needToClean = false;
+    String recoveredState = taInfo.getTaskStatus();
+    if (recoverOutput
+        && TaskAttemptState.SUCCEEDED.toString().equals(recoveredState)) {
+      TaskAttemptContext tac = new TaskAttemptContextImpl(conf,
+          TypeConverter.fromYarn(attemptId));
+      try {
+        committer.recoverTask(tac);
+        LOG.info("Recovered output from task attempt " + attemptId);
+      } catch (Exception e) {
+        LOG.error("Unable to recover task attempt " + attemptId, e);
+        LOG.info("Task attempt " + attemptId + " will be recovered as KILLED");
+        recoveredState = TaskAttemptState.KILLED.toString();
+        needToClean = true;
+      }
+    }
+
+    TaskAttemptStateInternal attemptState;
+    if (TaskAttemptState.SUCCEEDED.toString().equals(recoveredState)) {
+      attemptState = TaskAttemptStateInternal.SUCCEEDED;
+      reportedStatus.taskState = TaskAttemptState.SUCCEEDED;
+      eventHandler.handle(createJobCounterUpdateEventTASucceeded(this));
+      logAttemptFinishedEvent(attemptState);
+    } else if (TaskAttemptState.FAILED.toString().equals(recoveredState)) {
+      attemptState = TaskAttemptStateInternal.FAILED;
+      reportedStatus.taskState = TaskAttemptState.FAILED;
+      eventHandler.handle(createJobCounterUpdateEventTAFailed(this));
+      TaskAttemptUnsuccessfulCompletionEvent tauce =
+          createTaskAttemptUnsuccessfulCompletionEvent(this,
+              TaskAttemptStateInternal.FAILED);
+      eventHandler.handle(
+          new JobHistoryEvent(attemptId.getTaskId().getJobId(), tauce));
+    } else {
+      if (!TaskAttemptState.KILLED.toString().equals(recoveredState)) {
+        if (String.valueOf(recoveredState).isEmpty()) {
+          LOG.info("TaskAttempt" + attemptId
+              + " had not completed, recovering as KILLED");
+        } else {
+          LOG.warn("TaskAttempt " + attemptId + " found in unexpected state "
+              + recoveredState + ", recovering as KILLED");
+        }
+        addDiagnosticInfo("Killed during application recovery");
+        needToClean = true;
+      }
+      attemptState = TaskAttemptStateInternal.KILLED;
+      reportedStatus.taskState = TaskAttemptState.KILLED;
+      eventHandler.handle(createJobCounterUpdateEventTAKilled(this));
+      TaskAttemptUnsuccessfulCompletionEvent tauce =
+          createTaskAttemptUnsuccessfulCompletionEvent(this,
+              TaskAttemptStateInternal.KILLED);
+      eventHandler.handle(
+          new JobHistoryEvent(attemptId.getTaskId().getJobId(), tauce));
+    }
+
+    if (needToClean) {
+      TaskAttemptContext tac = new TaskAttemptContextImpl(conf,
+          TypeConverter.fromYarn(attemptId));
+      try {
+        committer.abortTask(tac);
+      } catch (Exception e) {
+        LOG.warn("Task cleanup failed for attempt " + attemptId, e);
+      }
+    }
+
+    return attemptState;
+  }
+
   private static TaskAttemptState getExternalState(
       TaskAttemptStateInternal smState) {
     switch (smState) {
@@ -1078,6 +1184,18 @@ public abstract class TaskAttemptImpl implements
     return slotMillisIncrement;
   }
 
+  private static JobCounterUpdateEvent createJobCounterUpdateEventTASucceeded(
+      TaskAttemptImpl taskAttempt) {
+    long slotMillis = computeSlotMillis(taskAttempt);
+    TaskId taskId = taskAttempt.attemptId.getTaskId();
+    JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
+    jce.addCounterUpdate(
+      taskId.getTaskType() == TaskType.MAP ?
+        JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
+        slotMillis);
+    return jce;
+  }
+
   private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
       TaskAttemptImpl taskAttempt) {
     TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
@@ -1134,6 +1252,25 @@ public abstract class TaskAttemptImpl implements
     return tauce;
   }
 
+  @SuppressWarnings("unchecked")
+  private void sendLaunchedEvents() {
+    JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId()
+        .getJobId());
+    jce.addCounterUpdate(attemptId.getTaskId().getTaskType() == TaskType.MAP ?
+        JobCounter.TOTAL_LAUNCHED_MAPS : JobCounter.TOTAL_LAUNCHED_REDUCES, 1);
+    eventHandler.handle(jce);
+
+    LOG.info("TaskAttempt: [" + attemptId
+        + "] using containerId: [" + containerID + " on NM: ["
+        + containerMgrAddress + "]");
+    TaskAttemptStartedEvent tase =
+      new TaskAttemptStartedEvent(TypeConverter.fromYarn(attemptId),
+          TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+          launchTime, trackerName, httpPort, shufflePort, containerID);
+    eventHandler.handle(
+        new JobHistoryEvent(attemptId.getTaskId().getJobId(), tase));
+  }
+
   private WrappedProgressSplitsBlock getProgressSplitBlock() {
     readLock.lock();
     try {
@@ -1377,26 +1514,7 @@ public abstract class TaskAttemptImpl implements
                                                                   // Costly?
       taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
       taskAttempt.httpPort = nodeHttpInetAddr.getPort();
-      JobCounterUpdateEvent jce =
-          new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
-              .getJobId());
-      jce.addCounterUpdate(
-          taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ? 
-              JobCounter.TOTAL_LAUNCHED_MAPS: JobCounter.TOTAL_LAUNCHED_REDUCES
-              , 1);
-      taskAttempt.eventHandler.handle(jce);
-      
-      LOG.info("TaskAttempt: [" + taskAttempt.attemptId
-          + "] using containerId: [" + taskAttempt.containerID + " on NM: ["
-          + taskAttempt.containerMgrAddress + "]");
-      TaskAttemptStartedEvent tase =
-        new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
-            TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
-            taskAttempt.launchTime,
-            nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
-            taskAttempt.shufflePort, taskAttempt.containerID);
-      taskAttempt.eventHandler.handle
-          (new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
+      taskAttempt.sendLaunchedEvents();
       taskAttempt.eventHandler.handle
           (new SpeculatorEvent
               (taskAttempt.attemptId, true, taskAttempt.clock.getTime()));
@@ -1445,14 +1563,8 @@ public abstract class TaskAttemptImpl implements
         TaskAttemptEvent event) {
       //set the finish time
       taskAttempt.setFinishTime();
-      long slotMillis = computeSlotMillis(taskAttempt);
-      TaskId taskId = taskAttempt.attemptId.getTaskId();
-      JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
-      jce.addCounterUpdate(
-        taskId.getTaskType() == TaskType.MAP ? 
-          JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
-          slotMillis);
-      taskAttempt.eventHandler.handle(jce);
+      taskAttempt.eventHandler.handle(
+          createJobCounterUpdateEventTASucceeded(taskAttempt));
       taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
@@ -1490,6 +1602,18 @@ public abstract class TaskAttemptImpl implements
     }
   }
 
+  private static class RecoverTransition implements
+      MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
+
+    @Override
+    public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
+      TaskAttemptRecoverEvent tare = (TaskAttemptRecoverEvent) event;
+      return taskAttempt.recover(tare.getTaskAttemptInfo(),
+          tare.getCommitter(), tare.getRecoverOutput());
+    }
+  }
+
   @SuppressWarnings({ "unchecked" })
   private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
     //Log finished events only if an attempt started.

+ 182 - 85
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumSet;
@@ -36,7 +37,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
@@ -67,8 +68,10 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
@@ -149,6 +152,12 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
         TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
     .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED, 
         TaskEventType.T_KILL, new KillNewTransition())
+    .addTransition(TaskStateInternal.NEW,
+        EnumSet.of(TaskStateInternal.FAILED,
+                   TaskStateInternal.KILLED,
+                   TaskStateInternal.RUNNING,
+                   TaskStateInternal.SUCCEEDED),
+        TaskEventType.T_RECOVER, new RecoverTransition())
 
     // Transitions from SCHEDULED state
       //when the first attempt is launched, the task state is set to RUNNING
@@ -245,20 +254,16 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   // By default, the next TaskAttempt number is zero. Changes during recovery  
   protected int nextAttemptNumber = 0;
-  private List<TaskAttemptInfo> taskAttemptsFromPreviousGeneration =
-      new ArrayList<TaskAttemptInfo>();
 
-  private static final class RecoverdAttemptsComparator implements
-      Comparator<TaskAttemptInfo> {
-    @Override
-    public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) {
-      long diff = attempt1.getStartTime() - attempt2.getStartTime();
-      return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
-    }
-  }
-
-  private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
-      new RecoverdAttemptsComparator();
+  // For sorting task attempts by completion time
+  private static final Comparator<TaskAttemptInfo> TA_INFO_COMPARATOR =
+      new Comparator<TaskAttemptInfo>() {
+        @Override
+        public int compare(TaskAttemptInfo a, TaskAttemptInfo b) {
+          long diff = a.getFinishTime() - b.getFinishTime();
+          return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
+        }
+      };
 
   @Override
   public TaskState getState() {
@@ -275,8 +280,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
-      Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
-      MRAppMetrics metrics, AppContext appContext) {
+      int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
     this.conf = conf;
     this.clock = clock;
     this.jobFile = remoteJobConfFile;
@@ -300,41 +304,15 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     this.metrics = metrics;
     this.appContext = appContext;
 
-    // See if this is from a previous generation.
-    if (completedTasksFromPreviousRun != null
-        && completedTasksFromPreviousRun.containsKey(taskId)) {
-      // This task has TaskAttempts from previous generation. We have to replay
-      // them.
-      LOG.info("Task is from previous run " + taskId);
-      TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId);
-      Map<TaskAttemptID, TaskAttemptInfo> allAttempts =
-          taskInfo.getAllTaskAttempts();
-      taskAttemptsFromPreviousGeneration = new ArrayList<TaskAttemptInfo>();
-      taskAttemptsFromPreviousGeneration.addAll(allAttempts.values());
-      Collections.sort(taskAttemptsFromPreviousGeneration,
-        RECOVERED_ATTEMPTS_COMPARATOR);
-    }
-
-    if (taskAttemptsFromPreviousGeneration.isEmpty()) {
-      // All the previous attempts are exhausted, now start with a new
-      // generation.
-
-      // All the new TaskAttemptIDs are generated based on MR
-      // ApplicationAttemptID so that attempts from previous lives don't
-      // over-step the current one. This assumes that a task won't have more
-      // than 1000 attempts in its single generation, which is very reasonable.
-      // Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts
-      // and requires serious medical attention.
-      nextAttemptNumber = (startCount - 1) * 1000;
-    } else {
-      // There are still some TaskAttempts from previous generation, use them
-      nextAttemptNumber =
-          taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
-    }
-
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);
+
+    // All the new TaskAttemptIDs are generated based on MR
+    // ApplicationAttemptID so that attempts from previous lives don't
+    // over-step the current one. This assumes that a task won't have more
+    // than 1000 attempts in its single generation, which is very reasonable.
+    nextAttemptNumber = (appAttemptId - 1) * 1000;
   }
 
   @Override
@@ -593,13 +571,27 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   // This is always called in the Write Lock
   private void addAndScheduleAttempt() {
-    TaskAttempt attempt = createAttempt();
+    TaskAttempt attempt = addAttempt();
+    inProgressAttempts.add(attempt.getID());
+    //schedule the nextAttemptNumber
+    if (failedAttempts.size() > 0) {
+      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
+          TaskAttemptEventType.TA_RESCHEDULE));
+    } else {
+      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
+          TaskAttemptEventType.TA_SCHEDULE));
+    }
+  }
+
+  private TaskAttemptImpl addAttempt() {
+    TaskAttemptImpl attempt = createAttempt();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Created attempt " + attempt.getID());
     }
     switch (attempts.size()) {
       case 0:
-        attempts = Collections.singletonMap(attempt.getID(), attempt);
+        attempts = Collections.singletonMap(attempt.getID(),
+            (TaskAttempt) attempt);
         break;
         
       case 1:
@@ -615,24 +607,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
         break;
     }
 
-    // Update nextATtemptNumber
-    if (taskAttemptsFromPreviousGeneration.isEmpty()) {
-      ++nextAttemptNumber;
-    } else {
-      // There are still some TaskAttempts from previous generation, use them
-      nextAttemptNumber =
-          taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
-    }
-
-    inProgressAttempts.add(attempt.getID());
-    //schedule the nextAttemptNumber
-    if (failedAttempts.size() > 0) {
-      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
-        TaskAttemptEventType.TA_RESCHEDULE));
-    } else {
-      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
-          TaskAttemptEventType.TA_SCHEDULE));
-    }
+    ++nextAttemptNumber;
+    return attempt;
   }
 
   @Override
@@ -696,6 +672,16 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
+  private void sendTaskStartedEvent() {
+    TaskStartedEvent tse = new TaskStartedEvent(
+        TypeConverter.fromYarn(taskId), getLaunchTime(),
+        TypeConverter.fromYarn(taskId.getTaskType()),
+        getSplitsAsString());
+    eventHandler
+        .handle(new JobHistoryEvent(taskId.getJobId(), tse));
+    historyTaskStartGenerated = true;
+  }
+
   private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
     TaskFinishedEvent tfe =
       new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
@@ -730,6 +716,16 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     task.successfulAttempt = null;
   }
 
+  private void sendTaskSucceededEvents() {
+    eventHandler.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
+    LOG.info("Task succeeded with attempt " + successfulAttempt);
+    if (historyTaskStartGenerated) {
+      TaskFinishedEvent tfe = createTaskFinishedEvent(this,
+          TaskStateInternal.SUCCEEDED);
+      eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
+    }
+  }
+
   /**
   * @return a String representation of the splits.
   *
@@ -741,6 +737,122 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 	  return "";
   }
 
+  /**
+   * Recover a completed task from a previous application attempt
+   * @param taskInfo recovered info about the task
+   * @param recoverTaskOutput whether to recover task outputs
+   * @return state of the task after recovery
+   */
+  private TaskStateInternal recover(TaskInfo taskInfo,
+      OutputCommitter committer, boolean recoverTaskOutput) {
+    LOG.info("Recovering task " + taskId
+        + " from prior app attempt, status was " + taskInfo.getTaskStatus());
+
+    scheduledTime = taskInfo.getStartTime();
+    sendTaskStartedEvent();
+    Collection<TaskAttemptInfo> attemptInfos =
+        taskInfo.getAllTaskAttempts().values();
+
+    if (attemptInfos.size() > 0) {
+      metrics.launchedTask(this);
+    }
+
+    // recover the attempts for this task in the order they finished
+    // so task attempt completion events are ordered properly
+    int savedNextAttemptNumber = nextAttemptNumber;
+    ArrayList<TaskAttemptInfo> taInfos =
+        new ArrayList<TaskAttemptInfo>(taskInfo.getAllTaskAttempts().values());
+    Collections.sort(taInfos, TA_INFO_COMPARATOR);
+    for (TaskAttemptInfo taInfo : taInfos) {
+      nextAttemptNumber = taInfo.getAttemptId().getId();
+      TaskAttemptImpl attempt = addAttempt();
+      // handle the recovery inline so attempts complete before task does
+      attempt.handle(new TaskAttemptRecoverEvent(attempt.getID(), taInfo,
+          committer, recoverTaskOutput));
+      finishedAttempts.add(attempt.getID());
+      TaskAttemptCompletionEventStatus taces = null;
+      TaskAttemptState attemptState = attempt.getState();
+      switch (attemptState) {
+      case FAILED:
+        taces = TaskAttemptCompletionEventStatus.FAILED;
+        break;
+      case KILLED:
+        taces = TaskAttemptCompletionEventStatus.KILLED;
+        break;
+      case SUCCEEDED:
+        taces = TaskAttemptCompletionEventStatus.SUCCEEDED;
+        break;
+      default:
+        throw new IllegalStateException(
+            "Unexpected attempt state during recovery: " + attemptState);
+      }
+      if (attemptState == TaskAttemptState.FAILED) {
+        failedAttempts.add(attempt.getID());
+        if (failedAttempts.size() >= maxAttempts) {
+          taces = TaskAttemptCompletionEventStatus.TIPFAILED;
+        }
+      }
+
+      // don't clobber the successful attempt completion event
+      // TODO: this shouldn't be necessary after MAPREDUCE-4330
+      if (successfulAttempt == null) {
+        handleTaskAttemptCompletion(attempt.getID(), taces);
+        if (attemptState == TaskAttemptState.SUCCEEDED) {
+          successfulAttempt = attempt.getID();
+        }
+      }
+    }
+    nextAttemptNumber = savedNextAttemptNumber;
+
+    TaskStateInternal taskState = TaskStateInternal.valueOf(
+        taskInfo.getTaskStatus());
+    switch (taskState) {
+    case SUCCEEDED:
+      if (successfulAttempt != null) {
+        sendTaskSucceededEvents();
+      } else {
+        LOG.info("Missing successful attempt for task " + taskId
+            + ", recovering as RUNNING");
+        // there must have been a fetch failure and the retry wasn't complete
+        taskState = TaskStateInternal.RUNNING;
+        metrics.runningTask(this);
+        addAndScheduleAttempt();
+      }
+      break;
+    case FAILED:
+    case KILLED:
+    {
+      if (taskState == TaskStateInternal.KILLED && attemptInfos.size() == 0) {
+        metrics.endWaitingTask(this);
+      }
+      TaskFailedEvent tfe = new TaskFailedEvent(taskInfo.getTaskId(),
+          taskInfo.getFinishTime(), taskInfo.getTaskType(),
+          taskInfo.getError(), taskInfo.getTaskStatus(),
+          taskInfo.getFailedDueToAttemptId());
+      eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
+      eventHandler.handle(
+          new JobTaskEvent(taskId, getExternalState(taskState)));
+      break;
+    }
+    default:
+      throw new java.lang.AssertionError("Unexpected recovered task state: "
+          + taskState);
+    }
+
+    return taskState;
+  }
+
+  private static class RecoverTransition
+    implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
+
+    @Override
+    public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
+      TaskRecoverEvent tre = (TaskRecoverEvent) event;
+      return task.recover(tre.getTaskInfo(), tre.getOutputCommitter(),
+          tre.getRecoverTaskOutput());
+    }
+  }
+
   private static class InitialScheduleTransition
     implements SingleArcTransition<TaskImpl, TaskEvent> {
 
@@ -748,13 +860,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     public void transition(TaskImpl task, TaskEvent event) {
       task.addAndScheduleAttempt();
       task.scheduledTime = task.clock.getTime();
-      TaskStartedEvent tse = new TaskStartedEvent(
-          TypeConverter.fromYarn(task.taskId), task.getLaunchTime(),
-          TypeConverter.fromYarn(task.taskId.getTaskType()),
-          task.getSplitsAsString());
-      task.eventHandler
-          .handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
-      task.historyTaskStartGenerated = true;
+      task.sendTaskStartedEvent();
     }
   }
 
@@ -808,16 +914,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       task.finishedAttempts.add(taskAttemptId);
       task.inProgressAttempts.remove(taskAttemptId);
       task.successfulAttempt = taskAttemptId;
-      task.eventHandler.handle(new JobTaskEvent(
-          task.taskId, TaskState.SUCCEEDED));
-      LOG.info("Task succeeded with attempt " + task.successfulAttempt);
-      // issue kill to all other attempts
-      if (task.historyTaskStartGenerated) {
-        TaskFinishedEvent tfe = createTaskFinishedEvent(task,
-            TaskStateInternal.SUCCEEDED);
-        task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
-            tfe));
-      }
+      task.sendTaskSucceededEvents();
       for (TaskAttempt attempt : task.attempts.values()) {
         if (attempt.getID() != task.successfulAttempt &&
             // This is okay because it can only talk us out of sending a

+ 0 - 39
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java

@@ -1,39 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.mapreduce.v2.app.recover;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
-import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.event.Dispatcher;
-
-public interface Recovery {
-
-  Dispatcher getDispatcher();
-
-  Clock getClock();
-  
-  Map<TaskId, TaskInfo> getCompletedTasks();
-  
-  List<AMInfo> getAMInfos();
-}

+ 0 - 480
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java

@@ -1,480 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.mapreduce.v2.app.recover;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
-import org.apache.hadoop.mapreduce.v2.api.records.Phase;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
-import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
-import org.apache.hadoop.mapreduce.v2.app.commit.CommitterTaskAbortEvent;
-import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
-import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
-import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
-import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
-import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
-import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
-import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
-import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.service.CompositeService;
-import org.apache.hadoop.yarn.service.Service;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-
-/*
- * Recovers the completed tasks from the previous life of Application Master.
- * The completed tasks are deciphered from the history file of the previous life.
- * Recovery service intercepts and replay the events for completed tasks.
- * While recovery is in progress, the scheduling of new tasks are delayed by 
- * buffering the task schedule events.
- * The recovery service controls the clock while recovery is in progress.
- */
-
-//TODO:
-//task cleanup for all non completed tasks
-public class RecoveryService extends CompositeService implements Recovery {
-
-  private static final Log LOG = LogFactory.getLog(RecoveryService.class);
-
-  private final ApplicationAttemptId applicationAttemptId;
-  private final OutputCommitter committer;
-  private final boolean newApiCommitter;
-  private final Dispatcher dispatcher;
-  private final ControlledClock clock;
-
-  private JobInfo jobInfo = null;
-  private final Map<TaskId, TaskInfo> completedTasks =
-    new HashMap<TaskId, TaskInfo>();
-  
-  private final List<TaskEvent> pendingTaskScheduleEvents =
-    new ArrayList<TaskEvent>();
-
-  private volatile boolean recoveryMode = false;
-
-  public RecoveryService(ApplicationAttemptId applicationAttemptId, 
-      Clock clock, OutputCommitter committer, boolean newApiCommitter) {
-    super("RecoveringDispatcher");
-    this.applicationAttemptId = applicationAttemptId;
-    this.committer = committer;
-    this.newApiCommitter = newApiCommitter;
-    this.dispatcher = createRecoveryDispatcher();
-    this.clock = new ControlledClock(clock);
-      addService((Service) dispatcher);
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    super.init(conf);
-    // parse the history file
-    try {
-      parse();
-    } catch (Exception e) {
-      LOG.warn(e);
-      LOG.warn("Could not parse the old history file. Aborting recovery. "
-          + "Starting afresh.", e);
-    }
-    if (completedTasks.size() > 0) {
-      recoveryMode = true;
-      LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS "
-          + "TO RECOVER " + completedTasks.size());
-      LOG.info("Job launch time " + jobInfo.getLaunchTime());
-      clock.setTime(jobInfo.getLaunchTime());
-    }
-  }
-
-  @Override
-  public Dispatcher getDispatcher() {
-    return dispatcher;
-  }
-
-  @Override
-  public Clock getClock() {
-    return clock;
-  }
-
-  @Override
-  public Map<TaskId, TaskInfo> getCompletedTasks() {
-    return completedTasks;
-  }
-
-  @Override
-  public List<AMInfo> getAMInfos() {
-    if (jobInfo == null || jobInfo.getAMInfos() == null) {
-      return new LinkedList<AMInfo>();
-    }
-    List<AMInfo> amInfos = new LinkedList<AMInfo>();
-    for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
-        .getAMInfos()) {
-      AMInfo amInfo =
-          MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
-              jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
-              jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
-              jhAmInfo.getNodeManagerHttpPort());
-
-      amInfos.add(amInfo);
-    }
-    return amInfos;
-  }
-
-  private void parse() throws IOException {
-    FSDataInputStream in =
-        getPreviousJobHistoryFileStream(getConfig(), applicationAttemptId);
-    JobHistoryParser parser = new JobHistoryParser(in);
-    jobInfo = parser.parse();
-    Exception parseException = parser.getParseException();
-    if (parseException != null) {
-      LOG.info("Got an error parsing job-history file" + 
-          ", ignoring incomplete events.", parseException);
-    }
-    Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
-        .getAllTasks();
-    for (TaskInfo taskInfo : taskInfos.values()) {
-      if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
-        Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator = 
-            taskInfo.getAllTaskAttempts().entrySet().iterator();
-        while (taskAttemptIterator.hasNext()) {
-          Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next();
-          if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) {
-            taskAttemptIterator.remove();
-          }
-        }
-        completedTasks
-            .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
-        LOG.info("Read from history task "
-            + TypeConverter.toYarn(taskInfo.getTaskId()));
-      }
-    }
-    LOG.info("Read completed tasks from history "
-        + completedTasks.size());
-  }
-
-  public static FSDataInputStream getPreviousJobHistoryFileStream(
-      Configuration conf, ApplicationAttemptId applicationAttemptId)
-      throws IOException {
-    FSDataInputStream in = null;
-    Path historyFile = null;
-    String jobId =
-        TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
-          .toString();
-    String jobhistoryDir =
-        JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
-    Path histDirPath =
-        FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
-    LOG.info("Trying file " + histDirPath.toString());
-    FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
-    // read the previous history file
-    historyFile =
-        fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
-          jobId, (applicationAttemptId.getAttemptId() - 1)));
-    LOG.info("History file is at " + historyFile);
-    in = fc.open(historyFile);
-    return in;
-  }
-  
-  protected Dispatcher createRecoveryDispatcher() {
-    return new RecoveryDispatcher();
-  }
-
-  @SuppressWarnings("rawtypes")
-  class RecoveryDispatcher extends AsyncDispatcher {
-    private final EventHandler actualHandler;
-    private final EventHandler handler;
-
-    RecoveryDispatcher() {
-      super();
-      actualHandler = super.getEventHandler();
-      handler = new InterceptingEventHandler(actualHandler);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void dispatch(Event event) {
-      if (recoveryMode) {
-        if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) {
-          TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
-              .getTaskAttemptID());
-          LOG.info("Recovered Attempt start time " + attInfo.getStartTime());
-          clock.setTime(attInfo.getStartTime());
-
-        } else if (event.getType() == TaskAttemptEventType.TA_DONE
-            || event.getType() == TaskAttemptEventType.TA_FAILMSG
-            || event.getType() == TaskAttemptEventType.TA_KILL) {
-          TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
-              .getTaskAttemptID());
-          LOG.info("Recovered Attempt finish time " + attInfo.getFinishTime());
-          clock.setTime(attInfo.getFinishTime());
-        }
-
-        else if (event.getType() == TaskEventType.T_ATTEMPT_FAILED
-            || event.getType() == TaskEventType.T_ATTEMPT_KILLED
-            || event.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED) {
-          TaskTAttemptEvent tEvent = (TaskTAttemptEvent) event;
-          LOG.info("Recovered Task attempt " + tEvent.getTaskAttemptID());
-          TaskInfo taskInfo = completedTasks.get(tEvent.getTaskAttemptID()
-              .getTaskId());
-          taskInfo.getAllTaskAttempts().remove(
-              TypeConverter.fromYarn(tEvent.getTaskAttemptID()));
-          // remove the task info from completed tasks if all attempts are
-          // recovered
-          if (taskInfo.getAllTaskAttempts().size() == 0) {
-            completedTasks.remove(tEvent.getTaskAttemptID().getTaskId());
-            // checkForRecoveryComplete
-            LOG.info("CompletedTasks() " + completedTasks.size());
-            if (completedTasks.size() == 0) {
-              recoveryMode = false;
-              clock.reset();
-              LOG.info("Setting the recovery mode to false. " +
-                 "Recovery is complete!");
-
-              // send all pending tasks schedule events
-              for (TaskEvent tEv : pendingTaskScheduleEvents) {
-                actualHandler.handle(tEv);
-              }
-
-            }
-          }
-        }
-      }
-      realDispatch(event);
-    }
-    
-    public void realDispatch(Event event) {
-      super.dispatch(event);
-    }
-
-    @Override
-    public EventHandler getEventHandler() {
-      return handler;
-    }
-  }
-
-  private TaskAttemptInfo getTaskAttemptInfo(TaskAttemptId id) {
-    TaskInfo taskInfo = completedTasks.get(id.getTaskId());
-    return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id));
-  }
-
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  private class InterceptingEventHandler implements EventHandler {
-    EventHandler actualHandler;
-
-    InterceptingEventHandler(EventHandler actualHandler) {
-      this.actualHandler = actualHandler;
-    }
-
-    @Override
-    public void handle(Event event) {
-      if (!recoveryMode) {
-        // delegate to the dispatcher one
-        actualHandler.handle(event);
-        return;
-      }
-
-      else if (event.getType() == TaskEventType.T_SCHEDULE) {
-        TaskEvent taskEvent = (TaskEvent) event;
-        // delay the scheduling of new tasks till previous ones are recovered
-        if (completedTasks.get(taskEvent.getTaskID()) == null) {
-          LOG.debug("Adding to pending task events "
-              + taskEvent.getTaskID());
-          pendingTaskScheduleEvents.add(taskEvent);
-          return;
-        }
-      }
-
-      else if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
-        TaskAttemptId aId = ((ContainerAllocatorEvent) event).getAttemptID();
-        TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
-        LOG.debug("CONTAINER_REQ " + aId);
-        sendAssignedEvent(aId, attInfo);
-        return;
-      }
-
-      else if (event.getType() == CommitterEventType.TASK_ABORT) {
-        TaskAttemptId aId = ((CommitterTaskAbortEvent) event).getAttemptID();
-        LOG.debug("TASK_CLEAN");
-        actualHandler.handle(new TaskAttemptEvent(aId,
-            TaskAttemptEventType.TA_CLEANUP_DONE));
-        return;
-      }
-
-      else if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH) {
-        TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event)
-            .getTaskAttemptID();
-        TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
-        actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId,
-            attInfo.getShufflePort()));
-        // send the status update event
-        sendStatusUpdateEvent(aId, attInfo);
-
-        TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus());
-        switch (state) {
-        case SUCCEEDED:
-          //recover the task output
-          
-          // check the committer type and construct corresponding context
-          TaskAttemptContext taskContext = null;
-          if(newApiCommitter) {
-            taskContext = new TaskAttemptContextImpl(getConfig(),
-                attInfo.getAttemptId());
-          } else {
-            taskContext = new org.apache.hadoop.mapred.TaskAttemptContextImpl(new JobConf(getConfig()),
-                TypeConverter.fromYarn(aId));
-          }
-          
-          try { 
-            TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
-            int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1); 
-            if(type == TaskType.REDUCE || (type == TaskType.MAP && numReducers <= 0)) {
-              committer.recoverTask(taskContext);
-              LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
-            } else {
-              LOG.info("Will not try to recover output for "
-                  + taskContext.getTaskAttemptID());
-            }
-          } catch (IOException e) {
-            LOG.error("Caught an exception while trying to recover task "+aId, e);
-            actualHandler.handle(new JobDiagnosticsUpdateEvent(
-                aId.getTaskId().getJobId(), "Error in recovering task output " + 
-                e.getMessage()));
-            actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
-                JobEventType.INTERNAL_ERROR));
-          }
-          
-          // send the done event
-          LOG.info("Sending done event to recovered attempt " + aId);
-          actualHandler.handle(new TaskAttemptEvent(aId,
-              TaskAttemptEventType.TA_DONE));
-          break;
-        case KILLED:
-          LOG.info("Sending kill event to recovered attempt " + aId);
-          actualHandler.handle(new TaskAttemptEvent(aId,
-              TaskAttemptEventType.TA_KILL));
-          break;
-        default:
-          LOG.info("Sending fail event to recovered attempt " + aId);
-          actualHandler.handle(new TaskAttemptEvent(aId,
-              TaskAttemptEventType.TA_FAILMSG));
-          break;
-        }
-        return;
-      }
-
-      else if (event.getType() == 
-        ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP) {
-        TaskAttemptId aId = ((ContainerLauncherEvent) event)
-          .getTaskAttemptID();
-        actualHandler.handle(
-           new TaskAttemptEvent(aId,
-                TaskAttemptEventType.TA_CONTAINER_CLEANED));
-        return;
-      }
-
-      // delegate to the actual handler
-      actualHandler.handle(event);
-    }
-
-    private void sendStatusUpdateEvent(TaskAttemptId yarnAttemptID,
-        TaskAttemptInfo attemptInfo) {
-      LOG.info("Sending status update event to " + yarnAttemptID);
-      TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
-      taskAttemptStatus.id = yarnAttemptID;
-      taskAttemptStatus.progress = 1.0f;
-      taskAttemptStatus.stateString = attemptInfo.getTaskStatus(); 
-      // taskAttemptStatus.outputSize = attemptInfo.getOutputSize();
-      taskAttemptStatus.phase = Phase.CLEANUP;
-      org.apache.hadoop.mapreduce.Counters cntrs = attemptInfo.getCounters();
-      if (cntrs == null) {
-        taskAttemptStatus.counters = null;
-      } else {
-        taskAttemptStatus.counters = cntrs;
-      }
-      actualHandler.handle(new TaskAttemptStatusUpdateEvent(
-          taskAttemptStatus.id, taskAttemptStatus));
-    }
-
-    private void sendAssignedEvent(TaskAttemptId yarnAttemptID,
-        TaskAttemptInfo attemptInfo) {
-      LOG.info("Sending assigned event to " + yarnAttemptID);
-      ContainerId cId = attemptInfo.getContainerId();
-
-      NodeId nodeId =
-          ConverterUtils.toNodeId(attemptInfo.getHostname() + ":"
-              + attemptInfo.getPort());
-      // Resource/Priority/ApplicationACLs are only needed while launching the
-      // container on an NM, these are already completed tasks, so setting them
-      // to null
-      Container container = BuilderUtils.newContainer(cId, nodeId,
-          attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(),
-          null, null, null);
-      actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID,
-          container, null));
-    }
-  }
-
-}

+ 5 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -414,7 +414,8 @@ public class MRApp extends MRAppMaster {
     Job newJob = new TestJob(getJobId(), getAttemptID(), conf, 
     		getDispatcher().getEventHandler(),
             getTaskAttemptListener(), getContext().getClock(),
-            isNewApiCommitter(), currentUser.getUserName(), getContext(),
+            getCommitter(), isNewApiCommitter(),
+            currentUser.getUserName(), getContext(),
             forcedState, diagnostic);
     ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 
@@ -648,12 +649,13 @@ public class MRApp extends MRAppMaster {
     public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
         Configuration conf, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Clock clock,
-        boolean newApiCommitter, String user, AppContext appContext, 
+        OutputCommitter committer, boolean newApiCommitter,
+        String user, AppContext appContext,
         JobStateInternal forcedState, String diagnostic) {
       super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
           conf, eventHandler, taskAttemptListener,
           new JobTokenSecretManager(), new Credentials(), clock,
-          getCompletedTaskFromPreviousRun(), metrics,
+          getCompletedTaskFromPreviousRun(), metrics, committer,
           newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
           appContext, forcedState, diagnostic);
 

+ 474 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java

@@ -18,10 +18,21 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 import junit.framework.Assert;
 
@@ -31,36 +42,66 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.Event;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.MapTaskImpl;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class TestRecovery {
@@ -75,6 +116,7 @@ public class TestRecovery {
   private Text val1 = new Text("val1");
   private Text val2 = new Text("val2");
 
+
   /**
    * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
    * completely disappears because of failed launch, one attempt gets killed and
@@ -1011,6 +1053,427 @@ public class TestRecovery {
     app.verifyCompleted();
   }
 
+  @Test
+  public void testRecoverySuccessAttempt() {
+    LOG.info("--- START: testRecoverySuccessAttempt ---");
+
+    long clusterTimestamp = System.currentTimeMillis();
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+        mockEventHandler);
+
+    TaskId taskId = recoverMapTask.getID();
+    JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+    TaskID taskID = new TaskID(jobID,
+        org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+    //Mock up the TaskAttempts
+    Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+        new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+    TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+    TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+        TaskAttemptState.SUCCEEDED);
+    mockTaskAttempts.put(taId1, mockTAinfo1);
+
+    TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+    TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+        TaskAttemptState.FAILED);
+    mockTaskAttempts.put(taId2, mockTAinfo2);
+
+    OutputCommitter mockCommitter = mock (OutputCommitter.class);
+    TaskInfo mockTaskInfo = mock(TaskInfo.class);
+    when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
+    when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+    when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+    recoverMapTask.handle(
+        new TaskRecoverEvent(taskId, mockTaskInfo,mockCommitter, true));
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler,atLeast(1)).handle(
+        (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+    Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+        new HashMap<TaskAttemptID, TaskAttemptState>();
+    finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED);
+    finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
+
+    List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+    jobHistoryEvents.add(EventType.TASK_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+    jobHistoryEvents.add(EventType.TASK_FINISHED);
+    recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates,
+        arg, jobHistoryEvents, 2L, 1L);
+  }
+
+  @Test
+  public void testRecoveryAllFailAttempts() {
+    LOG.info("--- START: testRecoveryAllFailAttempts ---");
+
+    long clusterTimestamp = System.currentTimeMillis();
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+        mockEventHandler);
+
+    TaskId taskId = recoverMapTask.getID();
+    JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+    TaskID taskID = new TaskID(jobID,
+        org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+    //Mock up the TaskAttempts
+    Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+        new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+    TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+    TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+        TaskAttemptState.FAILED);
+    mockTaskAttempts.put(taId1, mockTAinfo1);
+
+    TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+    TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+        TaskAttemptState.FAILED);
+    mockTaskAttempts.put(taId2, mockTAinfo2);
+
+    OutputCommitter mockCommitter = mock (OutputCommitter.class);
+
+    TaskInfo mockTaskInfo = mock(TaskInfo.class);
+    when(mockTaskInfo.getTaskStatus()).thenReturn("FAILED");
+    when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+    when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+    when(mockTaskInfo.getError()).thenReturn("error string");
+    when(mockTaskInfo.getTaskType()).thenReturn(TaskType.MAP);
+
+    recoverMapTask.handle(
+        new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler,atLeast(1)).handle(
+        (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+    Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+        new HashMap<TaskAttemptID, TaskAttemptState>();
+    finalAttemptStates.put(taId1, TaskAttemptState.FAILED);
+    finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
+
+    List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+    jobHistoryEvents.add(EventType.TASK_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+    jobHistoryEvents.add(EventType.TASK_FAILED);
+    recoveryChecker(recoverMapTask, TaskState.FAILED, finalAttemptStates,
+        arg, jobHistoryEvents, 2L, 2L);
+  }
+
+  @Test
+  public void testRecoveryTaskSuccessAllAttemptsFail() {
+    LOG.info("--- START:  testRecoveryTaskSuccessAllAttemptsFail ---");
+
+    long clusterTimestamp = System.currentTimeMillis();
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+        mockEventHandler);
+
+    TaskId taskId = recoverMapTask.getID();
+    JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+    TaskID taskID = new TaskID(jobID,
+        org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+    //Mock up the TaskAttempts
+    Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+        new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+    TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+    TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+        TaskAttemptState.FAILED);
+    mockTaskAttempts.put(taId1, mockTAinfo1);
+
+    TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+    TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+        TaskAttemptState.FAILED);
+    mockTaskAttempts.put(taId2, mockTAinfo2);
+
+    OutputCommitter mockCommitter = mock (OutputCommitter.class);
+    TaskInfo mockTaskInfo = mock(TaskInfo.class);
+    when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
+    when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+    when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+    recoverMapTask.handle(
+        new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler,atLeast(1)).handle(
+        (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+    TaskAttemptID taId3 = new TaskAttemptID(taskID, 2000);
+    Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+        new HashMap<TaskAttemptID, TaskAttemptState>();
+    finalAttemptStates.put(taId1, TaskAttemptState.FAILED);
+    finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
+    finalAttemptStates.put(taId3, TaskAttemptState.NEW);
+
+    List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+    jobHistoryEvents.add(EventType.TASK_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+    recoveryChecker(recoverMapTask, TaskState.RUNNING, finalAttemptStates,
+        arg, jobHistoryEvents, 2L, 2L);
+  }
+
+  @Test
+  public void testRecoveryTaskSuccessAllAttemptsSucceed() {
+    LOG.info("--- START:  testRecoveryTaskSuccessAllAttemptsFail ---");
+
+    long clusterTimestamp = System.currentTimeMillis();
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+        mockEventHandler);
+
+    TaskId taskId = recoverMapTask.getID();
+    JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+    TaskID taskID = new TaskID(jobID,
+        org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+    //Mock up the TaskAttempts
+    Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+        new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+    TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+    TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+        TaskAttemptState.SUCCEEDED);
+    mockTaskAttempts.put(taId1, mockTAinfo1);
+
+    TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+    TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+        TaskAttemptState.SUCCEEDED);
+    mockTaskAttempts.put(taId2, mockTAinfo2);
+
+    OutputCommitter mockCommitter = mock (OutputCommitter.class);
+    TaskInfo mockTaskInfo = mock(TaskInfo.class);
+    when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
+    when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+    when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+    recoverMapTask.handle(
+        new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler,atLeast(1)).handle(
+        (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+    Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+        new HashMap<TaskAttemptID, TaskAttemptState>();
+    finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED);
+    finalAttemptStates.put(taId2, TaskAttemptState.SUCCEEDED);
+
+    List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+    jobHistoryEvents.add(EventType.TASK_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
+    jobHistoryEvents.add(EventType.TASK_FINISHED);
+    recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates,
+        arg, jobHistoryEvents, 2L, 0L);
+  }
+
+  @Test
+  public void testRecoveryAllAttemptsKilled() {
+    LOG.info("--- START:  testRecoveryAllAttemptsKilled ---");
+
+    long clusterTimestamp = System.currentTimeMillis();
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+        mockEventHandler);
+
+    TaskId taskId = recoverMapTask.getID();
+    JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+    TaskID taskID = new TaskID(jobID,
+        org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+    //Mock up the TaskAttempts
+    Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+        new HashMap<TaskAttemptID, TaskAttemptInfo>();
+    TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+    TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+        TaskAttemptState.KILLED);
+    mockTaskAttempts.put(taId1, mockTAinfo1);
+
+    TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+    TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+        TaskAttemptState.KILLED);
+    mockTaskAttempts.put(taId2, mockTAinfo2);
+
+    OutputCommitter mockCommitter = mock (OutputCommitter.class);
+    TaskInfo mockTaskInfo = mock(TaskInfo.class);
+    when(mockTaskInfo.getTaskStatus()).thenReturn("KILLED");
+    when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+    when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+    when(mockTaskInfo.getError()).thenReturn("");
+    when(mockTaskInfo.getTaskType()).thenReturn(TaskType.MAP);
+
+    recoverMapTask.handle(
+        new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler,atLeast(1)).handle(
+        (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+    Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+        new HashMap<TaskAttemptID, TaskAttemptState>();
+    finalAttemptStates.put(taId1, TaskAttemptState.KILLED);
+    finalAttemptStates.put(taId2, TaskAttemptState.KILLED);
+
+    List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+    jobHistoryEvents.add(EventType.TASK_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED);
+    jobHistoryEvents.add(EventType.TASK_FAILED);
+    recoveryChecker(recoverMapTask, TaskState.KILLED, finalAttemptStates,
+        arg, jobHistoryEvents, 2L, 0L);
+  }
+
+  private void recoveryChecker(MapTaskImpl checkTask, TaskState finalState,
+      Map<TaskAttemptID, TaskAttemptState> finalAttemptStates,
+      ArgumentCaptor<Event> arg, List<EventType> expectedJobHistoryEvents,
+      long expectedMapLaunches, long expectedFailedMaps) {
+
+    assertEquals("Final State of Task", finalState, checkTask.getState());
+
+    Map<TaskAttemptId, TaskAttempt> recoveredAttempts =
+        checkTask.getAttempts();
+    assertEquals("Expected Number of Task Attempts",
+        finalAttemptStates.size(), recoveredAttempts.size());
+    for (TaskAttemptID taID : finalAttemptStates.keySet()) {
+      assertEquals("Expected Task Attempt State",
+          finalAttemptStates.get(taID),
+          recoveredAttempts.get(TypeConverter.toYarn(taID)).getState());
+    }
+
+    Iterator<Event> ie = arg.getAllValues().iterator();
+    int eventNum = 0;
+    long totalLaunchedMaps = 0;
+    long totalFailedMaps = 0;
+    boolean jobTaskEventReceived = false;
+
+    while (ie.hasNext()) {
+      Object current = ie.next();
+      ++eventNum;
+      LOG.info(eventNum + " " + current.getClass().getName());
+      String className = current.getClass().getName();
+      if (className.equals("org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent")) {
+        JobHistoryEvent jhe = (JobHistoryEvent) current;
+        LOG.info(expectedJobHistoryEvents.get(0).toString() + " " +
+            jhe.getHistoryEvent().getEventType().toString() + " " +
+            jhe.getJobID());
+        assertEquals(expectedJobHistoryEvents.get(0),
+            jhe.getHistoryEvent().getEventType());
+        expectedJobHistoryEvents.remove(0);
+      }  else if (className.equals("org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent")) {
+        JobCounterUpdateEvent jcue = (JobCounterUpdateEvent) current;
+
+        LOG.info("JobCounterUpdateEvent "
+            + jcue.getCounterUpdates().get(0).getCounterKey()
+            + " " + jcue.getCounterUpdates().get(0).getIncrementValue());
+        if (jcue.getCounterUpdates().get(0).getCounterKey() ==
+            JobCounter.NUM_FAILED_MAPS) {
+          totalFailedMaps += jcue.getCounterUpdates().get(0)
+              .getIncrementValue();
+        } else if (jcue.getCounterUpdates().get(0).getCounterKey() ==
+            JobCounter.TOTAL_LAUNCHED_MAPS) {
+          totalLaunchedMaps += jcue.getCounterUpdates().get(0)
+              .getIncrementValue();
+        }
+      } else if (className.equals("org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent")) {
+        JobTaskEvent jte = (JobTaskEvent) current;
+        assertEquals(jte.getState(), finalState);
+        jobTaskEventReceived = true;
+      }
+    }
+    assertTrue(jobTaskEventReceived || (finalState == TaskState.RUNNING));
+    assertEquals("Did not process all expected JobHistoryEvents",
+        0, expectedJobHistoryEvents.size());
+    assertEquals("Expected Map Launches",
+        expectedMapLaunches, totalLaunchedMaps);
+    assertEquals("Expected Failed Maps",
+        expectedFailedMaps, totalFailedMaps);
+  }
+
+  private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) {
+
+    ApplicationId appId = BuilderUtils.newApplicationId(clusterTimestamp, 1);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+
+    int partitions = 2;
+
+    Path remoteJobConfFile = mock(Path.class);
+    JobConf conf = new JobConf();
+    TaskAttemptListener taskAttemptListener = mock(TaskAttemptListener.class);
+    Token<JobTokenIdentifier> jobToken =
+        (Token<JobTokenIdentifier>) mock(Token.class);
+    Credentials credentials = null;
+    Clock clock = new SystemClock();
+    int appAttemptId = 3;
+    MRAppMetrics metrics = mock(MRAppMetrics.class);
+    Resource minContainerRequirements = mock(Resource.class);
+    when(minContainerRequirements.getMemory()).thenReturn(1000);
+
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    when(clusterInfo.getMinContainerCapability()).thenReturn(
+        minContainerRequirements);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getClusterInfo()).thenReturn(clusterInfo);
+
+    TaskSplitMetaInfo taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
+    MapTaskImpl mapTask = new MapTaskImpl(jobId, partitions,
+        eh, remoteJobConfFile, conf,
+        taskSplitMetaInfo, taskAttemptListener, jobToken, credentials, clock,
+        appAttemptId, metrics, appContext);
+    return mapTask;
+  }
+
+  private TaskAttemptInfo getMockTaskAttemptInfo(TaskAttemptID tai,
+      TaskAttemptState tas) {
+
+    ContainerId ci = mock(ContainerId.class);
+    Counters counters = mock(Counters.class);
+    TaskType tt = TaskType.MAP;
+
+    long finishTime = System.currentTimeMillis();
+
+    TaskAttemptInfo mockTAinfo = mock(TaskAttemptInfo.class);
+
+    when(mockTAinfo.getAttemptId()).thenReturn(tai);
+    when(mockTAinfo.getContainerId()).thenReturn(ci);
+    when(mockTAinfo.getCounters()).thenReturn(counters);
+    when(mockTAinfo.getError()).thenReturn("");
+    when(mockTAinfo.getFinishTime()).thenReturn(finishTime);
+    when(mockTAinfo.getHostname()).thenReturn("localhost");
+    when(mockTAinfo.getHttpPort()).thenReturn(23);
+    when(mockTAinfo.getMapFinishTime()).thenReturn(finishTime - 1000L);
+    when(mockTAinfo.getPort()).thenReturn(24);
+    when(mockTAinfo.getRackname()).thenReturn("defaultRack");
+    when(mockTAinfo.getShuffleFinishTime()).thenReturn(finishTime - 2000L);
+    when(mockTAinfo.getShufflePort()).thenReturn(25);
+    when(mockTAinfo.getSortFinishTime()).thenReturn(finishTime - 3000L);
+    when(mockTAinfo.getStartTime()).thenReturn(finishTime -10000);
+    when(mockTAinfo.getState()).thenReturn("task in progress");
+    when(mockTAinfo.getTaskStatus()).thenReturn(tas.toString());
+    when(mockTAinfo.getTaskType()).thenReturn(tt);
+    when(mockTAinfo.getTrackerName()).thenReturn("TrackerName");
+    return mockTAinfo;
+  }
+
   private void writeBadOutput(TaskAttempt attempt, Configuration conf)
   throws Exception {
   TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
@@ -1145,5 +1608,16 @@ public class TestRecovery {
   public static void main(String[] arg) throws Exception {
     TestRecovery test = new TestRecovery();
     test.testCrashed();
+    test.testMultipleCrashes();
+    test.testOutputRecovery();
+    test.testOutputRecoveryMapsOnly();
+    test.testRecoveryWithOldCommiter();
+    test.testSpeculative();
+    test.testRecoveryWithoutShuffleSecret();
+    test.testRecoverySuccessAttempt();
+    test.testRecoveryAllFailAttempts();
+    test.testRecoveryTaskSuccessAllAttemptsFail();
+    test.testRecoveryTaskSuccessAllAttemptsSucceed();
+    test.testRecoveryAllAttemptsKilled();
   }
 }

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java

@@ -223,7 +223,8 @@ import org.junit.Test;
       Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
           getDispatcher().getEventHandler(),
           getTaskAttemptListener(), getContext().getClock(),
-          isNewApiCommitter(), currentUser.getUserName(), getContext(),
+          getCommitter(), isNewApiCommitter(),
+          currentUser.getUserName(), getContext(),
           forcedState, diagnostic);
       ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 

+ 21 - 17
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
@@ -33,6 +34,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.MRConfig;
@@ -44,6 +46,7 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -54,6 +57,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
@@ -117,7 +121,7 @@ public class TestJobImpl {
     JobImpl job = createStubbedJob(conf, dispatcher, 0);
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+    job.handle(new JobStartEvent(job.getID()));
     assertJobState(job, JobStateInternal.SUCCEEDED);
     dispatcher.stop();
     commitHandler.stop();
@@ -201,7 +205,7 @@ public class TestJobImpl {
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    job.handle(new JobStartEvent(jobId));
     assertJobState(job, JobStateInternal.SETUP);
 
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
@@ -268,7 +272,7 @@ public class TestJobImpl {
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    job.handle(new JobStartEvent(jobId));
     assertJobState(job, JobStateInternal.FAIL_ABORT);
 
     job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
@@ -305,7 +309,7 @@ public class TestJobImpl {
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    job.handle(new JobStartEvent(jobId));
     assertJobState(job, JobStateInternal.SETUP);
 
     job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
@@ -345,7 +349,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
-        null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -356,7 +360,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
-        null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -367,7 +371,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
-        null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -378,7 +382,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
-        null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -389,7 +393,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
-        null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job5.checkAccess(ugi1, null));
     Assert.assertTrue(job5.checkAccess(ugi2, null));
   }
@@ -407,7 +411,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, true, null, 0, null, null, null, null);
+        mrAppMetrics, null, true, null, 0, null, null, null, null);
     job.handle(diagUpdateEvent);
     String diagnostics = job.getReport().getDiagnostics();
     Assert.assertNotNull(diagnostics);
@@ -418,7 +422,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, true, null, 0, null, null, null, null);
+        mrAppMetrics, null, true, null, 0, null, null, null, null);
     job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
     job.handle(diagUpdateEvent);
     diagnostics = job.getReport().getDiagnostics();
@@ -473,7 +477,7 @@ public class TestJobImpl {
     JobImpl job = new JobImpl(jobId, Records
         .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
         null, new JobTokenSecretManager(), new Credentials(), null, null,
-        mrAppMetrics, true, null, 0, null, null, null, null);
+        mrAppMetrics, null, true, null, 0, null, null, null, null);
     InitTransition initTransition = getInitTransition(2);
     JobEvent mockJobEvent = mock(JobEvent.class);
     initTransition.transition(job, mockJobEvent);
@@ -514,7 +518,7 @@ public class TestJobImpl {
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    job.handle(new JobStartEvent(jobId));
     assertJobState(job, JobStateInternal.FAILED);
 
     job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
@@ -578,7 +582,7 @@ public class TestJobImpl {
     StubbedJob job = createStubbedJob(conf, dispatcher, numSplits);
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+    job.handle(new JobStartEvent(job.getID()));
     assertJobState(job, JobStateInternal.RUNNING);
     return job;
   }
@@ -633,9 +637,9 @@ public class TestJobImpl {
         boolean newApiCommitter, String user, int numSplits) {
       super(jobId, applicationAttemptId, conf, eventHandler,
           null, new JobTokenSecretManager(), new Credentials(),
-          new SystemClock(), null, MRAppMetrics.create(),
-          newApiCommitter, user, System.currentTimeMillis(), null, null, null,
-          null);
+          new SystemClock(), Collections.<TaskId, TaskInfo> emptyMap(),
+          MRAppMetrics.create(), null, newApiCommitter, user,
+          System.currentTimeMillis(), null, null, null, null);
 
       initTransition = getInitTransition(numSplits);
       localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,

+ 5 - 14
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

@@ -27,7 +27,6 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskCounter;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -78,7 +76,6 @@ public class TestTaskImpl {
   private Path remoteJobConfFile;
   private Credentials credentials;
   private Clock clock;
-  private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
   private MRAppMetrics metrics;
   private TaskImpl mockTask;
   private ApplicationId appId;
@@ -102,13 +99,12 @@ public class TestTaskImpl {
         EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
         TaskAttemptListener taskAttemptListener,
         Token<JobTokenIdentifier> jobToken,
-        Credentials credentials, Clock clock,
-        Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
+        Credentials credentials, Clock clock, int startCount,
         MRAppMetrics metrics, AppContext appContext) {
       super(jobId, taskType , partition, eventHandler,
           remoteJobConfFile, conf, taskAttemptListener,
           jobToken, credentials, clock,
-          completedTasksFromPreviousRun, startCount, metrics, appContext);
+          startCount, metrics, appContext);
     }
 
     @Override
@@ -235,8 +231,7 @@ public class TestTaskImpl {
     mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
         remoteJobConfFile, conf, taskAttemptListener, jobToken,
         credentials, clock,
-        completedTasksFromPreviousRun, startCount,
-        metrics, appContext);        
+        startCount, metrics, appContext);        
     
   }
 
@@ -589,9 +584,7 @@ public class TestTaskImpl {
   public void testFailedTransitions() {
     mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
         remoteJobConfFile, conf, taskAttemptListener, jobToken,
-        credentials, clock,
-        completedTasksFromPreviousRun, startCount,
-        metrics, appContext) {
+        credentials, clock, startCount, metrics, appContext) {
           @Override
           protected int getMaxAttempts() {
             return 1;
@@ -658,9 +651,7 @@ public class TestTaskImpl {
   public void testCountersWithSpeculation() {
     mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
         remoteJobConfFile, conf, taskAttemptListener, jobToken,
-        credentials, clock,
-        completedTasksFromPreviousRun, startCount,
-        metrics, appContext) {
+        credentials, clock, startCount, metrics, appContext) {
           @Override
           protected int getMaxAttempts() {
             return 1;

+ 16 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 import com.google.common.base.Joiner;
@@ -524,4 +525,19 @@ public class JobHistoryUtils {
     sb.append(jobId.toString());
     return sb.toString();
   }
+
+  public static Path getPreviousJobHistoryPath(
+      Configuration conf, ApplicationAttemptId applicationAttemptId)
+      throws IOException {
+    String jobId =
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
+          .toString();
+    String jobhistoryDir =
+        JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
+    Path histDirPath = FileContext.getFileContext(conf).makeQualified(
+            new Path(jobhistoryDir));
+    FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
+    return fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
+        histDirPath,jobId, (applicationAttemptId.getAttemptId() - 1)));
+  }
 }

+ 1 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -410,6 +410,7 @@ public interface MRJobConfig {
   /** Enable job recovery.*/
   public static final String MR_AM_JOB_RECOVERY_ENABLE = 
     MR_AM_PREFIX + "job.recovery.enable";
+  public static final boolean MR_AM_JOB_RECOVERY_ENABLE_DEFAULT = true;
 
   /** 
    * Limit on the number of reducers that can be preempted to ensure that at