Browse Source

MAPREDUCE-5465. Tasks are often killed before they exit on their own. Contributed by Ming Ma

Jason Lowe 10 years ago
parent
commit
444836b3dc
22 changed files with 1060 additions and 150 deletions
  1. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 10 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
  3. 2 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
  4. 30 8
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
  5. 63 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptFinishingMonitor.java
  6. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
  7. 36 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java
  8. 3 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
  9. 359 86
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  10. 7 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
  11. 10 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
  12. 108 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
  13. 16 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
  14. 6 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
  15. 2 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
  16. 101 41
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
  17. 5 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
  18. 260 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
  19. 9 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
  20. 20 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
  21. 6 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
  22. 3 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -408,6 +408,9 @@ Release 2.8.0 - UNRELEASED
     MAPREDUCE-6353. Divide by zero error in MR AM when calculating available 
     containers. (Anubhav Dhoot via kasha)
 
+    MAPREDUCE-5465. Tasks are often killed before they exit on their own
+    (Ming Ma via jlowe)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 10 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java

@@ -264,7 +264,8 @@ public class LocalContainerLauncher extends AbstractService implements
           context.getEventHandler().handle(
               new TaskAttemptEvent(taId,
                   TaskAttemptEventType.TA_CONTAINER_CLEANED));
-
+        } else if (event.getType() == EventType.CONTAINER_COMPLETED) {
+          LOG.debug("Container completed " + event.toString());
         } else {
           LOG.warn("Ignoring unexpected event " + event.toString());
         }
@@ -314,7 +315,14 @@ public class LocalContainerLauncher extends AbstractService implements
         }
         runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
                    (numReduceTasks > 0), localMapFiles);
-        
+
+        // In non-uber mode, TA gets TA_CONTAINER_COMPLETED from MRAppMaster
+        // as part of NM -> RM -> AM notification route.
+        // In uber mode, given the task run inside the MRAppMaster container,
+        // we have to simulate the notification.
+        context.getEventHandler().handle(new TaskAttemptEvent(attemptID,
+            TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+
       } catch (RuntimeException re) {
         JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
         jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1);

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java

@@ -67,4 +67,6 @@ public interface AppContext {
   boolean hasSuccessfullyUnregistered();
 
   String getNMHostname();
+
+  TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor();
 }

+ 30 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -207,6 +207,14 @@ public class MRAppMaster extends CompositeService {
   private SpeculatorEventDispatcher speculatorEventDispatcher;
   private AMPreemptionPolicy preemptionPolicy;
 
+  // After a task attempt completes from TaskUmbilicalProtocol's point of view,
+  // it will be transitioned to finishing state.
+  // taskAttemptFinishingMonitor is just a timer for attempts in finishing
+  // state. If the attempt stays in finishing state for too long,
+  // taskAttemptFinishingMonitor will notify the attempt via TA_TIMED_OUT
+  // event.
+  private TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
+
   private Job job;
   private Credentials jobCredentials = new Credentials(); // Filled during init
   protected UserGroupInformation currentUser; // Will be setup during init
@@ -249,6 +257,12 @@ public class MRAppMaster extends CompositeService {
     logSyncer = TaskLog.createLogSyncer();
     LOG.info("Created MRAppMaster for application " + applicationAttemptId);
   }
+  protected TaskAttemptFinishingMonitor createTaskAttemptFinishingMonitor(
+      EventHandler eventHandler) {
+    TaskAttemptFinishingMonitor monitor =
+        new TaskAttemptFinishingMonitor(eventHandler);
+    return monitor;
+  }
 
   @Override
   protected void serviceInit(final Configuration conf) throws Exception {
@@ -259,7 +273,11 @@ public class MRAppMaster extends CompositeService {
 
     initJobCredentialsAndUGI(conf);
 
-    context = new RunningAppContext(conf);
+    dispatcher = createDispatcher();
+    addIfService(dispatcher);
+    taskAttemptFinishingMonitor = createTaskAttemptFinishingMonitor(dispatcher.getEventHandler());
+    addIfService(taskAttemptFinishingMonitor);
+    context = new RunningAppContext(conf, taskAttemptFinishingMonitor);
 
     // Job name is the same as the app name util we support DAG of jobs
     // for an app later
@@ -326,9 +344,6 @@ public class MRAppMaster extends CompositeService {
     }
     
     if (errorHappenedShutDown) {
-      dispatcher = createDispatcher();
-      addIfService(dispatcher);
-      
       NoopEventHandler eater = new NoopEventHandler();
       //We do not have a JobEventDispatcher in this path
       dispatcher.register(JobEventType.class, eater);
@@ -375,9 +390,6 @@ public class MRAppMaster extends CompositeService {
     } else {
       committer = createOutputCommitter(conf);
 
-      dispatcher = createDispatcher();
-      addIfService(dispatcher);
-
       //service to handle requests from JobClient
       clientService = createClientService(context);
       // Init ClientService separately so that we stop it separately, since this
@@ -965,10 +977,14 @@ public class MRAppMaster extends CompositeService {
     private final ClusterInfo clusterInfo = new ClusterInfo();
     private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
 
-    public RunningAppContext(Configuration config) {
+    private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
+
+    public RunningAppContext(Configuration config,
+        TaskAttemptFinishingMonitor taskAttemptFinishingMonitor) {
       this.conf = config;
       this.clientToAMTokenSecretManager =
           new ClientToAMTokenSecretManager(appAttemptID, null);
+      this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor;
     }
 
     @Override
@@ -1053,6 +1069,12 @@ public class MRAppMaster extends CompositeService {
     public String getNMHostname() {
       return nmHost;
     }
+
+    @Override
+    public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
+      return taskAttemptFinishingMonitor;
+    }
+
   }
 
   @SuppressWarnings("unchecked")

+ 63 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptFinishingMonitor.java

@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+/**
+ * This class generates TA_TIMED_OUT if the task attempt stays in FINISHING
+ * state for too long.
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class TaskAttemptFinishingMonitor extends
+    AbstractLivelinessMonitor<TaskAttemptId> {
+
+  private EventHandler eventHandler;
+
+  public TaskAttemptFinishingMonitor(EventHandler eventHandler) {
+    super("TaskAttemptFinishingMonitor", new SystemClock());
+    this.eventHandler = eventHandler;
+  }
+
+  public void init(Configuration conf) {
+    super.init(conf);
+    int expireIntvl = conf.getInt(MRJobConfig.TASK_EXIT_TIMEOUT,
+        MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
+    int checkIntvl = conf.getInt(
+        MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS,
+        MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT);
+
+    setExpireInterval(expireIntvl);
+    setMonitorInterval(checkIntvl);
+  }
+
+  @Override
+  protected void expire(TaskAttemptId id) {
+    eventHandler.handle(
+        new TaskAttemptEvent(id,
+        TaskAttemptEventType.TA_TIMED_OUT));
+  }
+}

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java

@@ -370,7 +370,7 @@ public class MRClientService extends AbstractService implements ClientService {
           new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
       appContext.getEventHandler().handle(
           new TaskAttemptEvent(taskAttemptId, 
-              TaskAttemptEventType.TA_FAILMSG));
+              TaskAttemptEventType.TA_FAILMSG_BY_CLIENT));
       FailTaskAttemptResponse response = recordFactory.
         newRecordInstance(FailTaskAttemptResponse.class);
       return response;

+ 36 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java

@@ -30,9 +30,42 @@ public enum TaskAttemptStateInternal {
   UNASSIGNED, 
   ASSIGNED, 
   RUNNING, 
-  COMMIT_PENDING, 
-  SUCCESS_CONTAINER_CLEANUP, 
-  SUCCEEDED, 
+  COMMIT_PENDING,
+
+  // Transition into SUCCESS_FINISHING_CONTAINER
+  // After the attempt finishes successfully from
+  // TaskUmbilicalProtocol's point of view, it will transition to
+  // SUCCESS_FINISHING_CONTAINER state. That will give a chance for the
+  // container to exit by itself. In the transition,
+  // the attempt will notify the task via T_ATTEMPT_SUCCEEDED so that
+  // from job point of view, the task is considered succeeded.
+
+  // Transition out of SUCCESS_FINISHING_CONTAINER
+  // The attempt will transition from SUCCESS_FINISHING_CONTAINER to
+  // SUCCESS_CONTAINER_CLEANUP if it doesn't receive container exit
+  // notification within TASK_EXIT_TIMEOUT;
+  // Or it will transition to SUCCEEDED if it receives container exit
+  // notification from YARN.
+  SUCCESS_FINISHING_CONTAINER,
+
+  // Transition into FAIL_FINISHING_CONTAINER
+  // After the attempt fails from
+  // TaskUmbilicalProtocol's point of view, it will transition to
+  // FAIL_FINISHING_CONTAINER state. That will give a chance for the container
+  // to exit by itself. In the transition,
+  // the attempt will notify the task via T_ATTEMPT_FAILED so that
+  // from job point of view, the task is considered failed.
+
+  // Transition out of FAIL_FINISHING_CONTAINER
+  // The attempt will transition from FAIL_FINISHING_CONTAINER to
+  // FAIL_CONTAINER_CLEANUP if it doesn't receive container exit
+  // notification within TASK_EXIT_TIMEOUT;
+  // Or it will transition to FAILED if it receives container exit
+  // notification from YARN.
+  FAIL_FINISHING_CONTAINER,
+
+  SUCCESS_CONTAINER_CLEANUP,
+  SUCCEEDED,
   FAIL_CONTAINER_CLEANUP, 
   FAIL_TASK_CLEANUP, 
   FAILED, 

+ 3 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java

@@ -49,6 +49,9 @@ public enum TaskAttemptEventType {
   TA_TIMED_OUT,
   TA_PREEMPTED,
 
+  //Producer:Client
+  TA_FAILMSG_BY_CLIENT,
+
   //Producer:TaskCleaner
   TA_CLEANUP_DONE,
 

+ 359 - 86
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -184,8 +184,20 @@ public abstract class TaskAttemptImpl implements
   private Locality locality;
   private Avataar avataar;
 
-  private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
-    new CleanupContainerTransition();
+  private static final CleanupContainerTransition
+      CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition();
+  private static final MoveContainerToSucceededFinishingTransition
+      SUCCEEDED_FINISHING_TRANSITION =
+          new MoveContainerToSucceededFinishingTransition();
+  private static final MoveContainerToFailedFinishingTransition
+      FAILED_FINISHING_TRANSITION =
+          new MoveContainerToFailedFinishingTransition();
+  private static final ExitFinishingOnTimeoutTransition
+      FINISHING_ON_TIMEOUT_TRANSITION =
+          new ExitFinishingOnTimeoutTransition();
+
+  private static final FinalizeFailedTransition FINALIZE_FAILED_TRANSITION =
+      new FinalizeFailedTransition();
 
   private static final DiagnosticInformationUpdater 
     DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION 
@@ -204,6 +216,8 @@ public abstract class TaskAttemptImpl implements
       TaskAttemptEventType.TA_COMMIT_PENDING,
       TaskAttemptEventType.TA_DONE,
       TaskAttemptEventType.TA_FAILMSG,
+      TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
+      TaskAttemptEventType.TA_TIMED_OUT,
       TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE);
 
   private static final StateMachineFactory
@@ -221,16 +235,16 @@ public abstract class TaskAttemptImpl implements
      .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_KILL, new KilledTransition())
      .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
-         TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
+         TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, new FailedTransition())
      .addTransition(TaskAttemptStateInternal.NEW,
          EnumSet.of(TaskAttemptStateInternal.FAILED,
              TaskAttemptStateInternal.KILLED,
              TaskAttemptStateInternal.SUCCEEDED),
          TaskAttemptEventType.TA_RECOVER, new RecoverTransition())
      .addTransition(TaskAttemptStateInternal.NEW,
-          TaskAttemptStateInternal.NEW,
-          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
-          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+         TaskAttemptStateInternal.NEW,
+         TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+         DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
 
      // Transitions from the UNASSIGNED state.
      .addTransition(TaskAttemptStateInternal.UNASSIGNED,
@@ -238,14 +252,14 @@ public abstract class TaskAttemptImpl implements
          new ContainerAssignedTransition())
      .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
-             TaskAttemptStateInternal.KILLED, true))
+         TaskAttemptStateInternal.KILLED, true))
      .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED,
-         TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition(
+         TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, new DeallocateContainerTransition(
              TaskAttemptStateInternal.FAILED, true))
      .addTransition(TaskAttemptStateInternal.UNASSIGNED,
-          TaskAttemptStateInternal.UNASSIGNED,
-          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
-          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+         TaskAttemptStateInternal.UNASSIGNED,
+         TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+         DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
 
      // Transitions from the ASSIGNED state.
      .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING,
@@ -258,15 +272,19 @@ public abstract class TaskAttemptImpl implements
          TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
          new DeallocateContainerTransition(TaskAttemptStateInternal.FAILED, false))
      .addTransition(TaskAttemptStateInternal.ASSIGNED,
-         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_CONTAINER_COMPLETED,
-         CLEANUP_CONTAINER_TRANSITION)
+         FINALIZE_FAILED_TRANSITION)
      .addTransition(TaskAttemptStateInternal.ASSIGNED, 
          TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)
-     .addTransition(TaskAttemptStateInternal.ASSIGNED, 
+     .addTransition(TaskAttemptStateInternal.ASSIGNED,
+         TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+         TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION)
+     .addTransition(TaskAttemptStateInternal.ASSIGNED,
          TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
-         TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
+         TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
+             CLEANUP_CONTAINER_TRANSITION)
 
      // Transitions from RUNNING state.
      .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
@@ -274,23 +292,27 @@ public abstract class TaskAttemptImpl implements
      .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-     // If no commit is required, task directly goes to success
+     // If no commit is required, task goes to finishing state
+     // This will give a chance for the container to exit by itself
      .addTransition(TaskAttemptStateInternal.RUNNING,
-         TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
-         TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
+         TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         TaskAttemptEventType.TA_DONE, SUCCEEDED_FINISHING_TRANSITION)
      // If commit is required, task goes through commit pending state.
      .addTransition(TaskAttemptStateInternal.RUNNING,
          TaskAttemptStateInternal.COMMIT_PENDING,
          TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition())
      // Failure handling while RUNNING
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+         TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION)
      .addTransition(TaskAttemptStateInternal.RUNNING,
          TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
-         TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
+         TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, CLEANUP_CONTAINER_TRANSITION)
       //for handling container exit without sending the done or fail msg
      .addTransition(TaskAttemptStateInternal.RUNNING,
-         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_CONTAINER_COMPLETED,
-         CLEANUP_CONTAINER_TRANSITION)
+         FINALIZE_FAILED_TRANSITION)
      // Timeout handling while RUNNING
      .addTransition(TaskAttemptStateInternal.RUNNING,
          TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
@@ -301,12 +323,97 @@ public abstract class TaskAttemptImpl implements
          TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
      // Kill handling
      .addTransition(TaskAttemptStateInternal.RUNNING,
-         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
+         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+         TaskAttemptEventType.TA_KILL,
          CLEANUP_CONTAINER_TRANSITION)
      .addTransition(TaskAttemptStateInternal.RUNNING,
          TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_PREEMPTED, new PreemptedTransition())
 
+     // Transitions from SUCCESS_FINISHING_CONTAINER state
+     // When the container exits by itself, the notification of container
+     // completed event will be routed via NM -> RM -> AM.
+     // After MRAppMaster gets notification from RM, it will generate
+     // TA_CONTAINER_COMPLETED event.
+     .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         TaskAttemptStateInternal.SUCCEEDED,
+         TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+         new ExitFinishingOnContainerCompletedTransition())
+     // Given TA notifies task T_ATTEMPT_SUCCEEDED when it transitions to
+     // SUCCESS_FINISHING_CONTAINER, it is possible to receive the event
+     // TA_CONTAINER_CLEANED in the following scenario.
+     // 1. It is the last task for the job.
+     // 2. After the task receives T_ATTEMPT_SUCCEEDED, it will notify job.
+     // 3. Job will be marked completed.
+     // 4. As part of MRAppMaster's shutdown, all containers will be killed.
+     .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         TaskAttemptStateInternal.SUCCEEDED,
+         TaskAttemptEventType.TA_CONTAINER_CLEANED,
+         new ExitFinishingOnContainerCleanedupTransition())
+     // The client wants to kill the task. Given the task is in finishing
+     // state, it could go to succeeded state or killed state. If it is a
+     // reducer, it will go to succeeded state;
+     // otherwise, it goes to killed state.
+     .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         EnumSet.of(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
+             TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP),
+         TaskAttemptEventType.TA_KILL,
+         new KilledAfterSucceededFinishingTransition())
+     // The attempt stays in finishing state for too long
+     // Let us clean up the container
+     .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
+         TaskAttemptEventType.TA_TIMED_OUT, FINISHING_ON_TIMEOUT_TRANSITION)
+     .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+         DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+     // ignore-able events
+     .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         EnumSet.of(TaskAttemptEventType.TA_UPDATE,
+             TaskAttemptEventType.TA_DONE,
+             TaskAttemptEventType.TA_COMMIT_PENDING,
+             TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_FAILMSG_BY_CLIENT))
+
+     // Transitions from FAIL_FINISHING_CONTAINER state
+     // When the container exits by itself, the notification of container
+     // completed event will be routed via NM -> RM -> AM.
+     // After MRAppMaster gets notification from RM, it will generate
+     // TA_CONTAINER_COMPLETED event.
+    .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+        TaskAttemptStateInternal.FAILED,
+        TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+        new ExitFinishingOnContainerCompletedTransition())
+     // Given TA notifies task T_ATTEMPT_FAILED when it transitions to
+     // FAIL_FINISHING_CONTAINER, it is possible to receive the event
+     // TA_CONTAINER_CLEANED in the following scenario.
+     // 1. It is the last task attempt for the task.
+     // 2. After the task receives T_ATTEMPT_FAILED, it will notify job.
+     // 3. Job will be marked failed.
+     // 4. As part of MRAppMaster's shutdown, all containers will be killed.
+    .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+        TaskAttemptStateInternal.FAILED,
+        TaskAttemptEventType.TA_CONTAINER_CLEANED,
+        new ExitFinishingOnContainerCleanedupTransition())
+    .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+        TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+        TaskAttemptEventType.TA_TIMED_OUT, FINISHING_ON_TIMEOUT_TRANSITION)
+    .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+        TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+        TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+        DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+        // ignore-able events
+    .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+        TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+        EnumSet.of(TaskAttemptEventType.TA_KILL,
+            TaskAttemptEventType.TA_UPDATE,
+            TaskAttemptEventType.TA_DONE,
+            TaskAttemptEventType.TA_COMMIT_PENDING,
+            TaskAttemptEventType.TA_FAILMSG,
+            TaskAttemptEventType.TA_FAILMSG_BY_CLIENT))
+
      // Transitions from COMMIT_PENDING state
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
          TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
@@ -316,22 +423,27 @@ public abstract class TaskAttemptImpl implements
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
-         TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
-         TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
+         TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         TaskAttemptEventType.TA_DONE, SUCCEEDED_FINISHING_TRANSITION)
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
-         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
+         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+         TaskAttemptEventType.TA_KILL,
          CLEANUP_CONTAINER_TRANSITION)
      // if container killed by AM shutting down
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
          TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
-         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
-         TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
+         TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+         TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION)
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
          TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
+             CLEANUP_CONTAINER_TRANSITION)
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_CONTAINER_COMPLETED,
-         CLEANUP_CONTAINER_TRANSITION)
+         FINALIZE_FAILED_TRANSITION)
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
          TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
@@ -348,8 +460,8 @@ public abstract class TaskAttemptImpl implements
      // Transitions from SUCCESS_CONTAINER_CLEANUP state
      // kill and cleanup the container
      .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
-         TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
-         new SucceededTransition())
+         TaskAttemptStateInternal.SUCCEEDED,
+         TaskAttemptEventType.TA_CONTAINER_CLEANED)
      .addTransition(
           TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
           TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
@@ -360,6 +472,7 @@ public abstract class TaskAttemptImpl implements
          TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
              TaskAttemptEventType.TA_TIMED_OUT,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED))
 
@@ -383,6 +496,7 @@ public abstract class TaskAttemptImpl implements
              TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
              TaskAttemptEventType.TA_TIMED_OUT))
 
       // Transitions from KILL_CONTAINER_CLEANUP
@@ -405,6 +519,7 @@ public abstract class TaskAttemptImpl implements
              TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
              TaskAttemptEventType.TA_TIMED_OUT))
 
      // Transitions from FAIL_TASK_CLEANUP
@@ -425,6 +540,7 @@ public abstract class TaskAttemptImpl implements
              TaskAttemptEventType.TA_COMMIT_PENDING,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
              TaskAttemptEventType.TA_CONTAINER_CLEANED,
              // Container launch events can arrive late
              TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
@@ -447,6 +563,7 @@ public abstract class TaskAttemptImpl implements
              TaskAttemptEventType.TA_COMMIT_PENDING,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
              TaskAttemptEventType.TA_CONTAINER_CLEANED,
              TaskAttemptEventType.TA_PREEMPTED,
              // Container launch events can arrive late
@@ -460,7 +577,7 @@ public abstract class TaskAttemptImpl implements
          new TooManyFetchFailureTransition())
       .addTransition(TaskAttemptStateInternal.SUCCEEDED,
           EnumSet.of(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED),
-          TaskAttemptEventType.TA_KILL, 
+          TaskAttemptEventType.TA_KILL,
           new KilledAfterSuccessTransition())
      .addTransition(
          TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED,
@@ -470,6 +587,10 @@ public abstract class TaskAttemptImpl implements
      .addTransition(TaskAttemptStateInternal.SUCCEEDED,
          TaskAttemptStateInternal.SUCCEEDED,
          EnumSet.of(TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
+             // TaskAttemptFinishingMonitor might time out the attempt right
+             // after the attempt receives TA_CONTAINER_COMPLETED.
+             TaskAttemptEventType.TA_TIMED_OUT,
              TaskAttemptEventType.TA_CONTAINER_CLEANED,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED))
 
@@ -1213,21 +1334,21 @@ public abstract class TaskAttemptImpl implements
       return TaskAttemptState.STARTING;
     case COMMIT_PENDING:
       return TaskAttemptState.COMMIT_PENDING;
-    case FAILED:
-      return TaskAttemptState.FAILED;
-    case KILLED:
-      return TaskAttemptState.KILLED;
-      // All CLEANUP states considered as RUNNING since events have not gone out
-      // to the Task yet. May be possible to consider them as a Finished state.
     case FAIL_CONTAINER_CLEANUP:
     case FAIL_TASK_CLEANUP:
+    case FAIL_FINISHING_CONTAINER:
+    case FAILED:
+      return TaskAttemptState.FAILED;
     case KILL_CONTAINER_CLEANUP:
     case KILL_TASK_CLEANUP:
-    case SUCCESS_CONTAINER_CLEANUP:
+    case KILLED:
+      return TaskAttemptState.KILLED;
     case RUNNING:
       return TaskAttemptState.RUNNING;
     case NEW:
       return TaskAttemptState.NEW;
+    case SUCCESS_CONTAINER_CLEANUP:
+    case SUCCESS_FINISHING_CONTAINER:
     case SUCCEEDED:
       return TaskAttemptState.SUCCEEDED;
     default:
@@ -1429,6 +1550,15 @@ public abstract class TaskAttemptImpl implements
     }
   }
 
+  private static void finalizeProgress(TaskAttemptImpl taskAttempt) {
+    // unregister it to TaskAttemptListener so that it stops listening
+    taskAttempt.taskAttemptListener.unregister(
+        taskAttempt.attemptId, taskAttempt.jvmID);
+    taskAttempt.reportedStatus.progress = 1.0f;
+    taskAttempt.updateProgressSplits();
+  }
+
+
   static class RequestContainerTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     private final boolean rescheduled;
@@ -1661,53 +1791,66 @@ public abstract class TaskAttemptImpl implements
     }
   }
 
-  private static class SucceededTransition implements
+  /**
+   * Transition from SUCCESS_FINISHING_CONTAINER or FAIL_FINISHING_CONTAINER
+   * state upon receiving TA_CONTAINER_COMPLETED event
+   */
+  private static class ExitFinishingOnContainerCompletedTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @SuppressWarnings("unchecked")
     @Override
-    public void transition(TaskAttemptImpl taskAttempt, 
+    public void transition(TaskAttemptImpl taskAttempt,
+       TaskAttemptEvent event) {
+      taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
+          taskAttempt.attemptId);
+      sendContainerCompleted(taskAttempt);
+    }
+  }
+
+  private static class ExitFinishingOnContainerCleanedupTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt,
         TaskAttemptEvent event) {
-      //set the finish time
-      taskAttempt.setFinishTime();
-      taskAttempt.eventHandler.handle(
-          createJobCounterUpdateEventTASucceeded(taskAttempt));
-      taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
-      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
-          taskAttempt.attemptId,
-          TaskEventType.T_ATTEMPT_SUCCEEDED));
-      taskAttempt.eventHandler.handle
-      (new SpeculatorEvent
-          (taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
-   }
+      taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
+          taskAttempt.attemptId);
+    }
   }
 
   private static class FailedTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @SuppressWarnings("unchecked")
     @Override
-    public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
+    public void transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
       // set the finish time
       taskAttempt.setFinishTime();
-      
-      if (taskAttempt.getLaunchTime() != 0) {
-        taskAttempt.eventHandler
-            .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
-        TaskAttemptUnsuccessfulCompletionEvent tauce =
-            createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
-                TaskAttemptStateInternal.FAILED);
-        taskAttempt.eventHandler.handle(new JobHistoryEvent(
-            taskAttempt.attemptId.getTaskId().getJobId(), tauce));
-        // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
-        // handling failed map/reduce events.
-      }else {
-        LOG.debug("Not generating HistoryFinish event since start event not " +
-            "generated for taskAttempt: " + taskAttempt.getID());
-      }
-      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
-          taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
+      notifyTaskAttemptFailed(taskAttempt);
     }
   }
 
+  private static class FinalizeFailedTransition extends FailedTransition {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
+      finalizeProgress(taskAttempt);
+      sendContainerCompleted(taskAttempt);
+      super.transition(taskAttempt, event);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static void sendContainerCompleted(TaskAttemptImpl taskAttempt) {
+    taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
+        taskAttempt.attemptId,
+        taskAttempt.container.getId(), StringInterner
+        .weakIntern(taskAttempt.container.getNodeId().toString()),
+        taskAttempt.container.getContainerToken(),
+        ContainerLauncher.EventType.CONTAINER_COMPLETED));
+  }
+
   private static class RecoverTransition implements
       MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
 
@@ -1832,6 +1975,35 @@ public abstract class TaskAttemptImpl implements
     }
   }
 
+  private static class KilledAfterSucceededFinishingTransition
+      implements MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent,
+      TaskAttemptStateInternal> {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
+      taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
+          taskAttempt.attemptId);
+      sendContainerCleanup(taskAttempt, event);
+      if(taskAttempt.getID().getTaskId().getTaskType() == TaskType.REDUCE) {
+        // after a reduce task has succeeded, its outputs are in safe in HDFS.
+        // logically such a task should not be killed. we only come here when
+        // there is a race condition in the event queue. E.g. some logic sends
+        // a kill request to this attempt when the successful completion event
+        // for this task is already in the event queue. so the kill event will
+        // get executed immediately after the attempt is marked successful and
+        // result in this transition being exercised.
+        // ignore this for reduce tasks
+        LOG.info("Ignoring killed event for successful reduce task attempt" +
+            taskAttempt.getID().toString());
+        return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP;
+      } else {
+        return TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP;
+      }
+    }
+  }
+
   private static class KilledTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
 
@@ -1887,6 +2059,31 @@ public abstract class TaskAttemptImpl implements
     }
   }
 
+  /**
+   * Transition from SUCCESS_FINISHING_CONTAINER or FAIL_FINISHING_CONTAINER
+   * state upon receiving TA_TIMED_OUT event
+   */
+  private static class ExitFinishingOnTimeoutTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
+      taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
+          taskAttempt.attemptId);
+      // The attempt stays in finishing state for too long
+      String msg = "Task attempt " + taskAttempt.getID() + " is done from " +
+          "TaskUmbilicalProtocol's point of view. However, it stays in " +
+          "finishing state for too long";
+      LOG.warn(msg);
+      taskAttempt.addDiagnosticInfo(msg);
+      sendContainerCleanup(taskAttempt, event);
+    }
+  }
+
+  /**
+   * Finish and clean up the container
+   */
   private static class CleanupContainerTransition implements
        SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @SuppressWarnings("unchecked")
@@ -1894,27 +2091,103 @@ public abstract class TaskAttemptImpl implements
     public void transition(TaskAttemptImpl taskAttempt, 
         TaskAttemptEvent event) {
       // unregister it to TaskAttemptListener so that it stops listening
-      // for it
-      taskAttempt.taskAttemptListener.unregister(
-          taskAttempt.attemptId, taskAttempt.jvmID);
+      // for it.
+      finalizeProgress(taskAttempt);
+      sendContainerCleanup(taskAttempt, event);
+    }
+  }
 
-      if (event instanceof TaskAttemptKillEvent) {
-        taskAttempt.addDiagnosticInfo(
-            ((TaskAttemptKillEvent) event).getMessage());
-      }
+  @SuppressWarnings("unchecked")
+  private static void sendContainerCleanup(TaskAttemptImpl taskAttempt,
+      TaskAttemptEvent event) {
+    if (event instanceof TaskAttemptKillEvent) {
+      taskAttempt.addDiagnosticInfo(
+          ((TaskAttemptKillEvent) event).getMessage());
+    }
+    //send the cleanup event to containerLauncher
+    taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
+        taskAttempt.attemptId,
+        taskAttempt.container.getId(), StringInterner
+        .weakIntern(taskAttempt.container.getNodeId().toString()),
+        taskAttempt.container.getContainerToken(),
+        ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
+  }
+
+  /**
+   * Transition to SUCCESS_FINISHING_CONTAINER upon receiving TA_DONE event
+   */
+  private static class MoveContainerToSucceededFinishingTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
+      finalizeProgress(taskAttempt);
+
+      // register it to finishing state
+      taskAttempt.appContext.getTaskAttemptFinishingMonitor().register(
+          taskAttempt.attemptId);
+
+      // set the finish time
+      taskAttempt.setFinishTime();
+
+      // notify job history
+      taskAttempt.eventHandler.handle(
+          createJobCounterUpdateEventTASucceeded(taskAttempt));
+      taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
+
+      //notify the task even though the container might not have exited yet.
+      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+          taskAttempt.attemptId,
+          TaskEventType.T_ATTEMPT_SUCCEEDED));
+      taskAttempt.eventHandler.handle
+          (new SpeculatorEvent
+              (taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
 
-      taskAttempt.reportedStatus.progress = 1.0f;
-      taskAttempt.updateProgressSplits();
-      //send the cleanup event to containerLauncher
-      taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
-          taskAttempt.attemptId, 
-          taskAttempt.container.getId(), StringInterner
-              .weakIntern(taskAttempt.container.getNodeId().toString()),
-          taskAttempt.container.getContainerToken(),
-          ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
     }
   }
 
+  /**
+   * Transition to FAIL_FINISHING_CONTAINER upon receiving TA_FAILMSG event
+   */
+  private static class MoveContainerToFailedFinishingTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
+      finalizeProgress(taskAttempt);
+      // register it to finishing state
+      taskAttempt.appContext.getTaskAttemptFinishingMonitor().register(
+          taskAttempt.attemptId);
+      notifyTaskAttemptFailed(taskAttempt);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt) {
+    // set the finish time
+    taskAttempt.setFinishTime();
+
+    if (taskAttempt.getLaunchTime() != 0) {
+      taskAttempt.eventHandler
+          .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
+      TaskAttemptUnsuccessfulCompletionEvent tauce =
+          createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+              TaskAttemptStateInternal.FAILED);
+      taskAttempt.eventHandler.handle(new JobHistoryEvent(
+          taskAttempt.attemptId.getTaskId().getJobId(), tauce));
+      // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
+      // handling failed map/reduce events.
+    }else {
+      LOG.debug("Not generating HistoryFinish event since start event not " +
+          "generated for taskAttempt: " + taskAttempt.getID());
+    }
+    taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+        taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
+
+  }
+
   private void addDiagnosticInfo(String diag) {
     if (diag != null && !diag.equals("")) {
       diagnostics.add(diag);

+ 7 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java

@@ -27,7 +27,13 @@ public interface ContainerLauncher
 
   enum EventType {
     CONTAINER_REMOTE_LAUNCH,
-    CONTAINER_REMOTE_CLEANUP
+    CONTAINER_REMOTE_CLEANUP,
+    // When TaskAttempt receives TA_CONTAINER_COMPLETED,
+    // it will notify ContainerLauncher so that the container can be removed
+    // from ContainerLauncher's launched containers list
+    // Otherwise, ContainerLauncher will try to stop the containers as part of
+    // serviceStop.
+    CONTAINER_COMPLETED
   }
 
 }

+ 10 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java

@@ -121,7 +121,11 @@ public class ContainerLauncherImpl extends AbstractService implements
     public synchronized boolean isCompletelyDone() {
       return state == ContainerState.DONE || state == ContainerState.FAILED;
     }
-    
+
+    public synchronized void done() {
+      state = ContainerState.DONE;
+    }
+
     @SuppressWarnings("unchecked")
     public synchronized void launch(ContainerRemoteLaunchEvent event) {
       LOG.info("Launching " + taskAttemptID);
@@ -378,6 +382,11 @@ public class ContainerLauncherImpl extends AbstractService implements
       case CONTAINER_REMOTE_CLEANUP:
         c.kill();
         break;
+
+      case CONTAINER_COMPLETED:
+        c.done();
+        break;
+
       }
       removeContainerIfDone(containerID);
     }

+ 108 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java

@@ -0,0 +1,108 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapred;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestTaskAttemptFinishingMonitor {
+
+  @Test
+  public void testFinshingAttemptTimeout()
+      throws IOException, InterruptedException {
+    SystemClock clock = new SystemClock();
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 100);
+    conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10);
+
+    AppContext appCtx = mock(AppContext.class);
+    JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+    RMHeartbeatHandler rmHeartbeatHandler =
+        mock(RMHeartbeatHandler.class);
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =
+        new TaskAttemptFinishingMonitor(eventHandler);
+    taskAttemptFinishingMonitor.init(conf);
+    taskAttemptFinishingMonitor.start();
+
+    when(appCtx.getEventHandler()).thenReturn(eventHandler);
+    when(appCtx.getNMHostname()).thenReturn("0.0.0.0");
+    when(appCtx.getTaskAttemptFinishingMonitor()).thenReturn(
+        taskAttemptFinishingMonitor);
+    when(appCtx.getClock()).thenReturn(clock);
+
+    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+    policy.init(appCtx);
+    TaskAttemptListenerImpl listener =
+        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, policy);
+
+    listener.init(conf);
+    listener.start();
+
+    JobId jid = MRBuilderUtils.newJobId(12345, 1, 1);
+    TaskId tid = MRBuilderUtils.newTaskId(jid, 0,
+        org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
+    appCtx.getTaskAttemptFinishingMonitor().register(attemptId);
+    int check = 0;
+    while ( !eventHandler.timedOut &&  check++ < 10 ) {
+      Thread.sleep(100);
+    }
+    taskAttemptFinishingMonitor.stop();
+
+    assertTrue("Finishing attempt didn't time out.", eventHandler.timedOut);
+
+  }
+
+  public static class MockEventHandler implements EventHandler {
+    public boolean timedOut = false;
+
+    @Override
+    public void handle(Event event) {
+      if (event instanceof TaskAttemptEvent) {
+        TaskAttemptEvent attemptEvent = ((TaskAttemptEvent) event);
+        if (TaskAttemptEventType.TA_TIMED_OUT == attemptEvent.getType()) {
+          timedOut = true;
+        }
+      }
+    }
+  };
+
+}

+ 16 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -481,6 +481,20 @@ public class MRApp extends MRAppMaster {
     return newJob;
   }
 
+  @Override
+  protected TaskAttemptFinishingMonitor
+      createTaskAttemptFinishingMonitor(
+      EventHandler eventHandler) {
+    return new TaskAttemptFinishingMonitor(eventHandler) {
+      @Override
+      public synchronized void register(TaskAttemptId attemptID) {
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID,
+                TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+      }
+    };
+  }
+
   @Override
   protected TaskAttemptListener createTaskAttemptListener(
       AppContext context, AMPreemptionPolicy policy) {
@@ -541,6 +555,8 @@ public class MRApp extends MRAppMaster {
             new TaskAttemptEvent(event.getTaskAttemptID(),
                 TaskAttemptEventType.TA_CONTAINER_CLEANED));
         break;
+      case CONTAINER_COMPLETED:
+        break;
       }
     }
   }

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java

@@ -148,4 +148,10 @@ public class MockAppContext implements AppContext {
     // bogus - Not Required
     return null;
   }
+
+  @Override
+  public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
+      return null;
+  }
+
 }

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java

@@ -223,6 +223,8 @@ public class TestFail {
                 new TaskAttemptEvent(event.getTaskAttemptID(),
                     TaskAttemptEventType.TA_CONTAINER_CLEANED));
             break;
+          case CONTAINER_COMPLETED:
+            super.handle(event);
           }
         }
 

+ 101 - 41
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java

@@ -159,7 +159,7 @@ public class TestKill {
               super.dispatch(new TaskAttemptEvent(taID,
                 TaskAttemptEventType.TA_DONE));
               super.dispatch(new TaskAttemptEvent(taID,
-                TaskAttemptEventType.TA_CONTAINER_CLEANED));
+                TaskAttemptEventType.TA_CONTAINER_COMPLETED));
               super.dispatch(new TaskTAttemptEvent(taID,
                 TaskEventType.T_ATTEMPT_SUCCEEDED));
               this.cachedKillEvent = killEvent;
@@ -211,40 +211,9 @@ public class TestKill {
     app.getContext().getEventHandler()
       .handle(new JobEvent(jobId, JobEventType.JOB_KILL));
 
-    app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
-  }
-
-  static class MyAsyncDispatch extends AsyncDispatcher {
-    private CountDownLatch latch;
-    private TaskAttemptEventType attemptEventTypeToWait;
-    MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType attemptEventTypeToWait) {
-      super();
-      this.latch = latch;
-      this.attemptEventTypeToWait = attemptEventTypeToWait;
-    }
-
-    @Override
-    protected void dispatch(Event event) {
-      if (event instanceof TaskAttemptEvent) {
-        TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event;
-        TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID();
-        if (attemptEvent.getType() == this.attemptEventTypeToWait
-            && attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) {
-          try {
-            latch.await();
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-          }
-        }
-      }
-      super.dispatch(event);
-    }
+    app.waitForInternalState((JobImpl) job, JobStateInternal.KILLED);
   }
 
-  // This is to test a race condition where JobEventType.JOB_KILL is generated
-  // right after TaskAttemptEventType.TA_DONE is generated.
-  // TaskImpl's state machine might receive both T_ATTEMPT_SUCCEEDED
-  // and T_ATTEMPT_KILLED from the same attempt.
   @Test
   public void testKillTaskWaitKillJobAfterTA_DONE() throws Exception {
     CountDownLatch latch = new CountDownLatch(1);
@@ -269,15 +238,12 @@ public class TestKill {
     TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
     app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
 
-    // The order in the dispatch event queue, from the oldest to the newest
+    // The order in the dispatch event queue, from first to last
     // TA_DONE
-    // JOB_KILL
-    // CONTAINER_REMOTE_CLEANUP ( from TA_DONE's handling )
-    // T_KILL ( from JOB_KILL's handling )
-    // TA_CONTAINER_CLEANED ( from CONTAINER_REMOTE_CLEANUP's handling )
-    // TA_KILL ( from T_KILL's handling )
-    // T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_CLEANED's handling )
-    // T_ATTEMPT_KILLED ( from TA_KILL's handling )
+    // JobEventType.JOB_KILL
+    // TaskAttemptEventType.TA_CONTAINER_COMPLETED ( from TA_DONE handling )
+    // TaskEventType.T_KILL ( from JobEventType.JOB_KILL handling )
+    // TaskEventType.T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_COMPLETED handling )
 
     // Finish map
     app.getContext().getEventHandler().handle(
@@ -295,6 +261,100 @@ public class TestKill {
     app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
   }
 
+
+  @Test
+  public void testKillTaskWaitKillJobBeforeTA_DONE() throws Exception {
+    CountDownLatch latch = new CountDownLatch(1);
+    final Dispatcher dispatcher = new MyAsyncDispatch(latch, JobEventType.JOB_KILL);
+    MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) {
+      @Override
+      public Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    Job job = app.submit(new Configuration());
+    JobId jobId = app.getJobId();
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask = it.next();
+    Task reduceTask = it.next();
+    app.waitForState(mapTask, TaskState.RUNNING);
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next();
+    app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
+    TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
+    app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
+
+    // The order in the dispatch event queue, from first to last
+    // JobEventType.JOB_KILL
+    // TA_DONE
+    // TaskEventType.T_KILL ( from JobEventType.JOB_KILL handling )
+    // TaskAttemptEventType.TA_CONTAINER_COMPLETED ( from TA_DONE handling )
+    // TaskAttemptEventType.TA_KILL ( from TaskEventType.T_KILL handling )
+    // TaskEventType.T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_COMPLETED handling )
+    // TaskEventType.T_ATTEMPT_KILLED ( from TA_KILL handling )
+
+    // Now kill the job
+    app.getContext().getEventHandler()
+        .handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+
+    // Finish map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapAttempt.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //unblock
+    latch.countDown();
+
+    app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
+  }
+
+  static class MyAsyncDispatch extends AsyncDispatcher {
+    private CountDownLatch latch;
+    private TaskAttemptEventType attemptEventTypeToWait;
+    private JobEventType jobEventTypeToWait;
+    MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType attemptEventTypeToWait) {
+      super();
+      this.latch = latch;
+      this.attemptEventTypeToWait = attemptEventTypeToWait;
+    }
+
+    MyAsyncDispatch(CountDownLatch latch, JobEventType jobEventTypeToWait) {
+      super();
+      this.latch = latch;
+      this.jobEventTypeToWait = jobEventTypeToWait;
+    }
+
+    @Override
+    protected void dispatch(Event event) {
+      if (event instanceof TaskAttemptEvent) {
+        TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event;
+        TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID();
+        if (attemptEvent.getType() == this.attemptEventTypeToWait
+            && attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) {
+          try {
+            latch.await();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+      } else if ( event instanceof JobEvent) {
+        JobEvent jobEvent = (JobEvent) event;
+        if (jobEvent.getType() == this.jobEventTypeToWait) {
+          try {
+            latch.await();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+      }
+
+      super.dispatch(event);
+    }
+  }
+
   @Test
   public void testKillTaskAttempt() throws Exception {
     final CountDownLatch latch = new CountDownLatch(1);

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java

@@ -884,5 +884,10 @@ public class TestRuntimeEstimators {
       // bogus - Not Required
       return null;
     }
+
+    @Override
+    public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
+      return null;
+    }
   }
 }

+ 260 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -407,6 +408,7 @@ public class TestTaskAttempt{
     Resource resource = mock(Resource.class);
     when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
     when(resource.getMemory()).thenReturn(1024);
+    setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
 
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@@ -464,6 +466,7 @@ public class TestTaskAttempt{
     Resource resource = mock(Resource.class);
     when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
     when(resource.getMemory()).thenReturn(1024);
+    setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
 
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@@ -524,6 +527,7 @@ public class TestTaskAttempt{
     Resource resource = mock(Resource.class);
     when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
     when(resource.getMemory()).thenReturn(1024);
+    setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
 
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@@ -546,7 +550,7 @@ public class TestTaskAttempt{
     taImpl.handle(new TaskAttemptEvent(attemptId,
         TaskAttemptEventType.TA_DONE));
     taImpl.handle(new TaskAttemptEvent(attemptId,
-        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+        TaskAttemptEventType.TA_CONTAINER_COMPLETED));
 
     assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
         TaskAttemptState.SUCCEEDED);
@@ -593,6 +597,7 @@ public class TestTaskAttempt{
     Resource resource = mock(Resource.class);
     when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
     when(resource.getMemory()).thenReturn(1024);
+    setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
 
     TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
         jobFile, 1, splits, jobConf, taListener,
@@ -641,6 +646,7 @@ public class TestTaskAttempt{
     Resource resource = mock(Resource.class);
     when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
     when(resource.getMemory()).thenReturn(1024);
+    setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
 
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@@ -663,7 +669,7 @@ public class TestTaskAttempt{
     taImpl.handle(new TaskAttemptEvent(attemptId,
       TaskAttemptEventType.TA_DONE));
     taImpl.handle(new TaskAttemptEvent(attemptId,
-      TaskAttemptEventType.TA_CONTAINER_CLEANED));
+      TaskAttemptEventType.TA_CONTAINER_COMPLETED));
 
     assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
       TaskAttemptState.SUCCEEDED);
@@ -708,6 +714,7 @@ public class TestTaskAttempt{
     Resource resource = mock(Resource.class);
     when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
     when(resource.getMemory()).thenReturn(1024);
+    setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
 
     TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
         jobFile, 1, splits, jobConf, taListener,
@@ -753,6 +760,7 @@ public class TestTaskAttempt{
 	AppContext appCtx = mock(AppContext.class);
 	ClusterInfo clusterInfo = mock(ClusterInfo.class);
 	when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+  setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
 
 	TaskAttemptImpl taImpl =
 	  new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@@ -774,7 +782,7 @@ public class TestTaskAttempt{
 	taImpl.handle(new TaskAttemptEvent(attemptId,
 	    TaskAttemptEventType.TA_DONE));
 	taImpl.handle(new TaskAttemptEvent(attemptId,
-	    TaskAttemptEventType.TA_CONTAINER_CLEANED));
+	    TaskAttemptEventType.TA_CONTAINER_COMPLETED));
 	    
 	assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
 		      TaskAttemptState.SUCCEEDED);
@@ -967,6 +975,255 @@ public class TestTaskAttempt{
         taImpl.getInternalState());
   }
 
+
+  @Test
+  public void testKillMapTaskWhileSuccessFinishing() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt is not in SUCCEEDED state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+    assertEquals("Task attempt's internal state is not " +
+        "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
+        TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
+
+    // If the map task is killed when it is in SUCCESS_FINISHING_CONTAINER
+    // state, the state will move to KILL_CONTAINER_CLEANUP
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_KILL));
+    assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
+        TaskAttemptState.KILLED);
+    assertEquals("Task attempt's internal state is not KILL_CONTAINER_CLEANUP",
+        taImpl.getInternalState(),
+        TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    assertEquals("Task attempt's internal state is not KILL_TASK_CLEANUP",
+        taImpl.getInternalState(),
+        TaskAttemptStateInternal.KILL_TASK_CLEANUP);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CLEANUP_DONE));
+
+    assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
+        TaskAttemptState.KILLED);
+
+    assertFalse("InternalError occurred", eventHandler.internalError);
+  }
+
+  @Test
+  public void testKillMapTaskWhileFailFinishing() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_FAILMSG));
+
+    assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+    assertEquals("Task attempt's internal state is not " +
+        "FAIL_FINISHING_CONTAINER", taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER);
+
+    // If the map task is killed when it is in FAIL_FINISHING_CONTAINER state,
+    // the state will stay in FAIL_FINISHING_CONTAINER.
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_KILL));
+    assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+    assertEquals("Task attempt's internal state is not " +
+        "FAIL_FINISHING_CONTAINER", taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_TIMED_OUT));
+    assertEquals("Task attempt's internal state is not FAIL_CONTAINER_CLEANUP",
+        taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    assertEquals("Task attempt's internal state is not FAIL_TASK_CLEANUP",
+        taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_TASK_CLEANUP);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CLEANUP_DONE));
+
+    assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+
+    assertFalse("InternalError occurred", eventHandler.internalError);
+  }
+
+  @Test
+  public void testFailMapTaskByClient() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_FAILMSG_BY_CLIENT));
+
+    assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+    assertEquals("Task attempt's internal state is not " +
+        "FAIL_CONTAINER_CLEANUP", taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    assertEquals("Task attempt's internal state is not FAIL_TASK_CLEANUP",
+        taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_TASK_CLEANUP);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CLEANUP_DONE));
+
+    assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+
+    assertFalse("InternalError occurred", eventHandler.internalError);
+  }
+
+  @Test
+  public void testTaskAttemptDiagnosticEventOnFinishing() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+    assertEquals("Task attempt's internal state is not " +
+        "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
+        TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
+
+    // TA_DIAGNOSTICS_UPDATE doesn't change state
+    taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(taImpl.getID(),
+        "Task got updated"));
+    assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+    assertEquals("Task attempt's internal state is not " +
+        "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
+        TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
+
+    assertFalse("InternalError occurred", eventHandler.internalError);
+  }
+
+  @Test
+  public void testTimeoutWhileSuccessFinishing() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+    assertEquals("Task attempt's internal state is not " +
+        "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
+        TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
+
+    // If the task stays in SUCCESS_FINISHING_CONTAINER for too long,
+    // TaskAttemptListenerImpl will time out the attempt.
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_TIMED_OUT));
+    assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+    assertEquals("Task attempt's internal state is not " +
+        "SUCCESS_CONTAINER_CLEANUP", taImpl.getInternalState(),
+        TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP);
+
+    assertFalse("InternalError occurred", eventHandler.internalError);
+  }
+
+  @Test
+  public void testTimeoutWhileFailFinishing() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_FAILMSG));
+
+    assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+    assertEquals("Task attempt's internal state is not " +
+        "FAIL_FINISHING_CONTAINER", taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER);
+
+    // If the task stays in FAIL_FINISHING_CONTAINER for too long,
+    // TaskAttemptListenerImpl will time out the attempt.
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_TIMED_OUT));
+    assertEquals("Task attempt's internal state is not FAIL_CONTAINER_CLEANUP",
+        taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP);
+
+    assertFalse("InternalError occurred", eventHandler.internalError);
+  }
+
+  private void setupTaskAttemptFinishingMonitor(
+      EventHandler eventHandler, JobConf jobConf, AppContext appCtx) {
+    TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =
+        new TaskAttemptFinishingMonitor(eventHandler);
+    taskAttemptFinishingMonitor.init(jobConf);
+    when(appCtx.getTaskAttemptFinishingMonitor()).
+        thenReturn(taskAttemptFinishingMonitor);
+  }
+
+  private TaskAttemptImpl createTaskAttemptImpl(
+      MockEventHandler eventHandler) {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
+
+    TaskAttemptImpl taImpl =
+        new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+            splits, jobConf, taListener,
+            mock(Token.class), new Credentials(),
+            new SystemClock(), appCtx);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+        container, mock(Map.class)));
+    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+    return taImpl;
+  }
+
   public static class MockEventHandler implements EventHandler {
     public boolean internalError;
 

+ 9 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -235,7 +235,15 @@ public interface MRJobConfig {
   public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
 
   public static final String TASK_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.timeout.check-interval-ms";
-  
+
+  public static final String TASK_EXIT_TIMEOUT = "mapreduce.task.exit.timeout";
+
+  public static final int TASK_EXIT_TIMEOUT_DEFAULT = 60 * 1000;
+
+  public static final String TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.exit.timeout.check-interval-ms";
+
+  public static final int TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT = 20 * 1000;
+
   public static final String TASK_ID = "mapreduce.task.id";
 
   public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";

+ 20 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -1671,4 +1671,24 @@
     app master.
   </description>
 </property>
+
+<property>
+  <name>mapreduce.task.exit.timeout</name>
+  <value>60000</value>
+  <description>The number of milliseconds before a task will be
+  terminated if it stays in finishing state for too long.
+  After a task attempt completes from TaskUmbilicalProtocol's point of view,
+  it will be transitioned to finishing state. That will give a chance for the
+  task to exit by itself.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.task.exit.timeout.check-interval-ms</name>
+  <value>20000</value>
+  <description>The interval in milliseconds between which the MR framework
+  checks if task attempts stay in finishing state for too long.
+  </description>
+</property>
+
 </configuration>

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor;
 import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
@@ -399,4 +400,9 @@ public class JobHistory extends AbstractService implements HistoryContext {
     // bogus - Not Required
     return null;
   }
+
+  @Override
+  public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
+    return null;
+  }
 }

+ 3 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java

@@ -102,7 +102,7 @@ public class TestSpeculativeExecutionWithMRApp {
           appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
             TaskAttemptEventType.TA_DONE));
           appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
-            TaskAttemptEventType.TA_CONTAINER_CLEANED));
+            TaskAttemptEventType.TA_CONTAINER_COMPLETED));
           app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED);
         }
       }
@@ -170,7 +170,7 @@ public class TestSpeculativeExecutionWithMRApp {
           appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
             TaskAttemptEventType.TA_DONE));
           appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
-            TaskAttemptEventType.TA_CONTAINER_CLEANED));
+            TaskAttemptEventType.TA_CONTAINER_COMPLETED));
           numTasksToFinish--;
           app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED);
         } else {
@@ -228,7 +228,7 @@ public class TestSpeculativeExecutionWithMRApp {
     appEventHandler.handle(
         new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE));
     appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(),
-        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+        TaskAttemptEventType.TA_CONTAINER_COMPLETED));
     return ta;
   }