Browse Source

MAPREDUCE-4727. Handle successful NM stop requests. (sseth)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-3902@1398523 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth 13 năm trước cách đây
mục cha
commit
8ceb8160dc
30 tập tin đã thay đổi với 664 bổ sung305 xóa
  1. 2 0
      hadoop-mapreduce-project/CHANGES.txt.MR-3902
  2. 20 13
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/LocalContainerAllocator.java
  3. 34 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventContainerTerminated.java
  4. 36 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventContainerTerminating.java
  5. 36 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventNodeFailed.java
  6. 0 12
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventTerminated.java
  7. 3 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventType.java
  8. 118 31
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java
  9. 1 9
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java
  10. 8 8
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/launcher/ContainerLauncherImpl.java
  11. 32 23
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java
  12. 41 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventTAEnded.java
  13. 1 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java
  14. 0 29
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTAStopRequestEvent.java
  15. 0 18
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTASucceededEvent.java
  16. 17 14
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
  17. 17 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEvent.java
  18. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventStopFailed.java
  19. 29 10
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventType.java
  20. 184 102
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java
  21. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java
  22. 20 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java
  23. 19 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventTaskAttemptEnded.java
  24. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java
  25. 2 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java
  26. 0 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
  27. 23 20
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
  28. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java
  29. 2 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java
  30. 15 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java

+ 2 - 0
hadoop-mapreduce-project/CHANGES.txt.MR-3902

@@ -26,3 +26,5 @@ Branch MR-3902
   MAPREDUCE-4664. ContainerHeartbeatHandler should be pinged on a getTask call (sseth)
 
   MAPREDUCE-4663. Container Launch should be independent of o.a.h.m.Task (sseth)
+
+  MAPREDUCE-4727. Handle successful NM stop requests. (sseth)

+ 20 - 13
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/LocalContainerAllocator.java

@@ -47,12 +47,11 @@ import org.apache.hadoop.mapreduce.v2.app2.job.Job;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventTerminated;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminated;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptRemoteStartEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTAStopRequestEvent;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTASucceededEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventTAEnded;
 import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicator;
 import org.apache.hadoop.util.StringUtils;
@@ -290,7 +289,7 @@ public class LocalContainerAllocator extends AbstractService
 
       // CLEANUP event generated.f
       appContext.getEventHandler().handle(
-          new TaskAttemptEventTerminated(attemptID));
+          new TaskAttemptEventContainerTerminated(attemptID, null));
 
     } catch (IOException ioe) {
       // if umbilical itself barfs (in error-handler of runSubMap()),
@@ -303,23 +302,23 @@ public class LocalContainerAllocator extends AbstractService
   }
   
   @SuppressWarnings("unchecked")
-  public void handleTaStopRequest(AMSchedulerTAStopRequestEvent sEvent) {
+  public void handleTaStopRequest(AMSchedulerEventTAEnded sEvent) {
     // Implies a failed or killed task.
     // This will trigger a CLEANUP event. UberAM is supposed to fail if there's
     // event a single failed attempt. Hence the CLEANUP is OK (otherwise delay
     // cleanup till end of job). TODO Enforce job failure on single task attempt
     // failure.
     appContext.getEventHandler().handle(
-        new TaskAttemptEventTerminated(sEvent.getAttemptID()));
+        new TaskAttemptEventContainerTerminated(sEvent.getAttemptID(), null));
     taskAttemptListenern.unregisterTaskAttempt(sEvent.getAttemptID());
   }
 
   @SuppressWarnings("unchecked")
-  public void handleTaSucceededRequest(AMSchedulerTASucceededEvent sEvent) {
+  public void handleTaSucceededRequest(AMSchedulerEventTAEnded sEvent) {
     // Successful taskAttempt.
     // Same CLEANUP comment as handleTaStopRequest
     appContext.getEventHandler().handle(
-        new TaskAttemptEventTerminated(sEvent.getAttemptID()));
+        new TaskAttemptEventContainerTerminated(sEvent.getAttemptID(), null));
     taskAttemptListenern.unregisterTaskAttempt(sEvent.getAttemptID());
   }
 
@@ -329,11 +328,19 @@ public class LocalContainerAllocator extends AbstractService
     case S_TA_LAUNCH_REQUEST:
       handleTaLaunchRequest((AMSchedulerTALaunchRequestEvent) sEvent);
       break;
-    case S_TA_STOP_REQUEST: // Effectively means a failure.
-      handleTaStopRequest((AMSchedulerTAStopRequestEvent) sEvent);
-      break;
-    case S_TA_SUCCEEDED:
-      handleTaSucceededRequest((AMSchedulerTASucceededEvent) sEvent);
+    case S_TA_ENDED: // Effectively means a failure.
+      AMSchedulerEventTAEnded event = (AMSchedulerEventTAEnded) sEvent;
+      switch(event.getState()) {
+      case FAILED:
+      case KILLED:
+        handleTaStopRequest(event);
+        break;
+      case SUCCEEDED:
+        handleTaSucceededRequest(event);
+        break;
+      default:
+        throw new YarnException("Unexpected TaskAttemptState: " + event.getState());
+      }
       break;
     default:
       LOG.warn("Invalid event type for LocalContainerAllocator: "

+ 34 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventContainerTerminated.java

@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+public class TaskAttemptEventContainerTerminated extends TaskAttemptEvent {
+
+  private final String message;
+
+  public TaskAttemptEventContainerTerminated(TaskAttemptId id, String message) {
+    super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+    this.message = message;
+  }
+
+  public String getDiagnosticInfo() {
+    return message;
+  }
+}

+ 36 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventContainerTerminating.java

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+public class TaskAttemptEventContainerTerminating extends TaskAttemptEvent {
+
+  private final String message;
+
+  public TaskAttemptEventContainerTerminating(TaskAttemptId id,
+      String diagMessage) {
+    super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATING);
+    this.message = diagMessage;
+  }
+
+  public String getDiagnosticInfo() {
+    return this.message;
+  }
+
+}

+ 36 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventNodeFailed.java

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+public class TaskAttemptEventNodeFailed extends TaskAttemptEvent {
+
+  private final String message;
+
+  public TaskAttemptEventNodeFailed(TaskAttemptId id,
+      String diagMessage) {
+    super(id, TaskAttemptEventType.TA_NODE_FAILED);
+    this.message = diagMessage;
+  }
+
+  public String getDiagnosticInfo() {
+    return this.message;
+  }
+
+}

+ 0 - 12
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventTerminated.java

@@ -1,12 +0,0 @@
-package org.apache.hadoop.mapreduce.v2.app2.job.event;
-
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-
-public class TaskAttemptEventTerminated extends TaskAttemptEvent {
-
-  public TaskAttemptEventTerminated(TaskAttemptId id) {
-    super(id, TaskAttemptEventType.TA_TERMINATED);
-    // TODO Auto-generated constructor stub
-  }
-
-}

+ 3 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventType.java

@@ -44,7 +44,9 @@ public enum TaskAttemptEventType {
   //Producer: Container / Scheduler.
   // Indicates that the RM considers the container to be complete. Implies the 
   // JVM is done, except in one case. TOOD: document the case.
-  TA_TERMINATED,
+  TA_CONTAINER_TERMINATING,
+  TA_CONTAINER_TERMINATED,
+  TA_NODE_FAILED,
   
   //Producer: Job
   TA_TOO_MANY_FETCH_FAILURES,

+ 118 - 31
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java

@@ -66,6 +66,9 @@ import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobTaskAttemptFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminated;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminating;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventNodeFailed;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptRemoteStartEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptScheduleEvent;
@@ -74,8 +77,7 @@ import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEven
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskTAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTAStopRequestEvent;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTASucceededEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventTAEnded;
 import org.apache.hadoop.mapreduce.v2.app2.speculate.SpeculatorEvent;
 import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleanupEvent;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@@ -174,7 +176,9 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
         .addTransition(TaskAttemptState.START_WAIT, TaskAttemptState.START_WAIT, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
         .addTransition(TaskAttemptState.START_WAIT, TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestBeforeRunningTransition())
         .addTransition(TaskAttemptState.START_WAIT, TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestBeforeRunningTransition())
-        .addTransition(TaskAttemptState.START_WAIT, TaskAttemptState.FAILED, TaskAttemptEventType.TA_TERMINATED, createContainerCompletedBeforeRunningTransition())
+        .addTransition(TaskAttemptState.START_WAIT, TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, createNodeFailedBeforeRunningTransition())
+        .addTransition(TaskAttemptState.START_WAIT, TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, createContainerTerminatingBeforeRunningTransition())
+        .addTransition(TaskAttemptState.START_WAIT, TaskAttemptState.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createContainerCompletedBeforeRunningTransition())
         
         .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING, TaskAttemptEventType.TA_STATUS_UPDATE, createStatusUpdateTransition())
         .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
@@ -184,7 +188,11 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
         .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, createFailRequestWhileRunningTransition())
         .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestWhileRunningTransition())
         .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestWhileRunningTransition())
-        .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.FAILED, TaskAttemptEventType.TA_TERMINATED, createContainerCompletedWhileRunningTransition()) 
+        .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, createNodeFailedWhileRunningTransition())
+        .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING, TaskAttemptEventType.TA_CONTAINER_TERMINATING, createContainerTerminatingWhileRunningTransition())
+        .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createContainerCompletedWhileRunningTransition())
+        
+        // XXX Maybe move getMessage / getDiagnosticInfo into the base TaskAttemptEvent ?
         
         .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.COMMIT_PENDING, TaskAttemptEventType.TA_STATUS_UPDATE, createStatusUpdateTransition())
         .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.COMMIT_PENDING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
@@ -194,28 +202,31 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
         .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, createFailRequestWhileRunningTransition())
         .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestWhileRunningTransition())
         .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestWhileRunningTransition())
-        .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.FAILED, TaskAttemptEventType.TA_TERMINATED, createContainerCompletedWhileRunningTransition())
+        .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, createNodeFailedWhileRunningTransition())
+        .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.RUNNING, TaskAttemptEventType.TA_CONTAINER_TERMINATING, createContainerTerminatingWhileRunningTransition())
+        .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createContainerCompletedWhileRunningTransition())
 
-        .addTransition(TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptState.KILLED, TaskAttemptEventType.TA_TERMINATED, createTerminatedTransition())
+        .addTransition(TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptState.KILLED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createTerminatedTransition())
         .addTransition(TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptState.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST))
+        .addTransition(TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptState.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING))
         
-        .addTransition(TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptState.FAILED, TaskAttemptEventType.TA_TERMINATED, createTerminatedTransition())
+        .addTransition(TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptState.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createTerminatedTransition())
         .addTransition(TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptState.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST))
+        .addTransition(TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptState.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING))
         
         .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_TERMINATED))
+        .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED))
 
         .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_TERMINATED))
+        .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED))
         
         // TODO XXX: FailRequest / KillRequest at SUCCEEDED need to consider Map / Reduce task.
         .addTransition(TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
         .addTransition(TaskAttemptState.SUCCEEDED, TaskAttemptState.FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestAfterSuccessTransition())
         .addTransition(TaskAttemptState.SUCCEEDED, TaskAttemptState.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestAfterSuccessTransition())
+        .addTransition(TaskAttemptState.SUCCEEDED, TaskAttemptState.KILLED, TaskAttemptEventType.TA_NODE_FAILED, createNodeFailedAfterSuccessTransition())
         .addTransition(TaskAttemptState.SUCCEEDED, TaskAttemptState.FAILED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, createTooManyFetchFailuresTransition())
-        .addTransition(TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED, EnumSet.of(TaskAttemptEventType.TA_TERMINATED, TaskAttemptEventType.TA_TIMED_OUT))
+        .addTransition(TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED, EnumSet.of(TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_CONTAINER_TERMINATING))
         
         
         .installTopology();
@@ -839,6 +850,7 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
       ta.sendEvent(createJobCounterUpdateEventTATerminated(ta, false,
           TaskAttemptState.FAILED));
       if (ta.getLaunchTime() != 0) {
+        // TODO XXX: For cases like this, recovery goes for a toss, since the the attempt will not exist in the history file.
         ta.sendEvent(new JobHistoryEvent(ta.jobId,
             createTaskAttemptUnsuccessfulCompletionEvent(ta,
                 TaskAttemptState.FAILED)));
@@ -851,9 +863,6 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
       // Send out events to the Task - indicating TaskAttemptFailure.
       ta.sendEvent(new TaskTAttemptEvent(ta.attemptId,
           TaskEventType.T_ATTEMPT_FAILED));
-
-      // TODO Informing the scheduler is only required if the event came in
-      // after the scheduler was asked to launch the task. Likely in a subclass.
     }
   }
 
@@ -886,9 +895,6 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
       // Send out events to the Task - indicating TaskAttemptFailure.
       ta.sendEvent(new TaskTAttemptEvent(ta.attemptId,
           TaskEventType.T_ATTEMPT_KILLED));
-
-      // TODO Informing the scheduler is only required if the event came in
-      // after the scheduler was asked to launch the task. Likely in a subclass.
     }
   }
   
@@ -960,10 +966,10 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
   protected static class FailRequestBeforeRunning extends FailRequest {
     @Override
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
-      // XXX Remove Comment: Takes care of finish time, history, TaskEvent.
       super.transition(ta, event);
       // Inform the scheduler
-      ta.sendEvent(new AMSchedulerTAStopRequestEvent(ta.attemptId, true));
+      ta.sendEvent(new AMSchedulerEventTAEnded(ta.attemptId,
+          TaskAttemptState.FAILED));
       // Decrement speculator container request.
       ta.maybeSendSpeculatorContainerRelease();
       
@@ -983,7 +989,8 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
       // XXX Remove Comment: Takes care of finish time, history, TaskEvent.
       super.transition(ta, event);
       // Inform the scheduler
-      ta.sendEvent(new AMSchedulerTAStopRequestEvent(ta.attemptId, false));
+      ta.sendEvent(new AMSchedulerEventTAEnded(ta.attemptId,
+          TaskAttemptState.KILLED));
       // Decrement speculator container request.
       ta.maybeSendSpeculatorContainerRelease();
       
@@ -991,21 +998,53 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
     }
   }
 
+  protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
+      createNodeFailedBeforeRunningTransition() {
+    return new NodeFailedBeforeRunning();
+  }
+
+  protected static class NodeFailedBeforeRunning extends
+      KillRequestBeforeRunning {
+    @Override
+    public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+      TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
+      ta.addDiagnosticInfo(nfEvent.getDiagnosticInfo());
+    }
+  }
+
+  protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
+      createContainerTerminatingBeforeRunningTransition() {
+    return new ContainerTerminatingBeforeRunning();
+  }
+
+  protected static class ContainerTerminatingBeforeRunning extends
+      FailRequestBeforeRunning {
+    @Override
+    public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+      super.transition(ta, event);
+      TaskAttemptEventContainerTerminating tEvent = 
+          (TaskAttemptEventContainerTerminating) event;
+      ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
+    }
+  }
+
   protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> 
       createContainerCompletedBeforeRunningTransition() {
     return new ContainerCompletedBeforeRunning();
   }
 
-  protected static class ContainerCompletedBeforeRunning extends FailRequest {
+  protected static class ContainerCompletedBeforeRunning extends
+      FailRequestBeforeRunning {
     @Override
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
       super.transition(ta, event);
-      // History, Inform Task, finishTime handled by FailRequest
-      // Decrement speculator container request.
-      ta.maybeSendSpeculatorContainerRelease();
-      
+      ta.sendTaskAttemptCleanupEvent();
+
+      TaskAttemptEventContainerTerminated tEvent =
+          (TaskAttemptEventContainerTerminated) event;
+      ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
+
       // TODO XXX: Maybe other counters: Failed, Killed, etc.
-      // TODO XXX XXX: May need to inform the scheduler.
     }
   }
 
@@ -1073,7 +1112,8 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
       ta.sendEvent(createJobCounterUpdateEventSlotMillis(ta));
 
       // Inform the Scheduler.
-      ta.sendEvent(new AMSchedulerTASucceededEvent(ta.attemptId));
+      ta.sendEvent(new AMSchedulerEventTAEnded(ta.attemptId,
+          TaskAttemptState.SUCCEEDED));
       
       // Inform the task.
       ta.sendEvent(new TaskTAttemptEvent(ta.attemptId,
@@ -1118,19 +1158,50 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
       // TODO Speculator does not need to go out. maybeSend... will take care of this for now.
     }
   }
-  
+
+  protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
+      createNodeFailedWhileRunningTransition() {
+    return new NodeFailedWhileRunning();
+  }
+
+  protected static class NodeFailedWhileRunning extends FailRequestWhileRunning {
+    @Override
+    public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+      TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
+      ta.addDiagnosticInfo(nfEvent.getDiagnosticInfo());
+    }
+  }
+
+  protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
+      createContainerTerminatingWhileRunningTransition() {
+    return new ContainerTerminatingWhileRunning();
+  }
+
+  protected static class ContainerTerminatingWhileRunning extends
+      FailRequestWhileRunning {
+    @Override
+    public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+      super.transition(ta, event);
+      TaskAttemptEventContainerTerminating tEvent =
+          (TaskAttemptEventContainerTerminating) event;
+      ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
+    }
+  }
+
   protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
       createContainerCompletedWhileRunningTransition() {
     return new ContaienrCompletedWhileRunning();
   }
 
   protected static class ContaienrCompletedWhileRunning extends
-      ContainerCompletedBeforeRunning {
+      FailRequestWhileRunning {
     @Override
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
       super.transition(ta, event);
       ta.sendTaskAttemptCleanupEvent();
-      ta.taskHeartbeatHandler.unregister(ta.attemptId);
+      TaskAttemptEventContainerTerminated tEvent =
+          (TaskAttemptEventContainerTerminated) event;
+      ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
     }
   }
 
@@ -1145,6 +1216,9 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
     @Override
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
       ta.sendTaskAttemptCleanupEvent();
+      TaskAttemptEventContainerTerminated tEvent =
+          (TaskAttemptEventContainerTerminated) event;
+      ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
     }
 
   }
@@ -1184,6 +1258,19 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
       ta.sendTaskAttemptCleanupEvent();
     }
   }
+  
+  protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
+      createNodeFailedAfterSuccessTransition() {
+    return new NodeFailedAfterSuccess();
+  }
+  
+  protected static class NodeFailedAfterSuccess extends KillRequestAfterSuccess {
+    @Override
+    public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+      TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
+      ta.addDiagnosticInfo(nfEvent.getDiagnosticInfo());
+    }
+  }
 
   protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> 
       createTooManyFetchFailuresTransition() {

+ 1 - 9
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java

@@ -44,7 +44,6 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
-import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
@@ -73,7 +72,6 @@ import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskTAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app2.metrics.MRAppMetrics;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTAStopRequestEvent;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
@@ -859,15 +857,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       if (attempt.getAssignedContainerMgrAddress() != null) {
         //container was assigned
         // TOOD XXX: What else changes other than this one transition.
-        
         // This can originate from TOO_MANY_FETCH_FAILURES -> the Container may still be running. Ask the scheduler to KILL it.
-        // TODO XXX: Send out a TA_STOP_REQUEST. or the Task sends this out directly, considering the TaskAttempt may already have completed.
-//        task.eventHandler.handle(new ContainerFailedEvent(attempt.getID(), 
-//            attempt.getAssignedContainerMgrAddress()));
-        // TODO XXX: This is not required here. TaskAttempt should be sending out the STOP_REQUEST
-        task.eventHandler.handle(new AMSchedulerTAStopRequestEvent(castEvent.getTaskAttemptID(), true));
       }
-      
+
       task.finishedAttempts++;
       if (task.failedAttempts < task.maxAttempts) {
         task.handleTaskAttemptCompletion(

+ 8 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/launcher/ContainerLauncherImpl.java

@@ -38,9 +38,11 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.app2.AppContext;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunchFailed;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunched;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventStopFailed;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -65,7 +67,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 // TODO XXX: See what part of this lifecycle and state management can be simplified.
 // Ideally, no state - only sendStart / sendStop.
 
-// TODO XXX: ShufflePort needs to make it over to the TaskAttempt.
 // TODO XXX: Review this entire code and clean it up.
 
 /**
@@ -210,14 +211,18 @@ public class ContainerLauncherImpl extends AbstractService implements
               .newRecord(StopContainerRequest.class);
             stopRequest.setContainerId(this.containerID);
             proxy.stopContainer(stopRequest);
-
+            // If stopContainer returns without an error, assuming the stop made
+            // it over to the NodeManager.
+          context.getEventHandler().handle(
+              new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT));
         } catch (Throwable t) {
 
           // ignore the cleanup failure
           String message = "cleanup failed for container "
             + this.containerID + " : "
             + StringUtils.stringifyException(t);
-          context.getEventHandler().handle(new AMContainerEventStopFailed(containerID, message));
+          context.getEventHandler().handle(
+              new AMContainerEventStopFailed(containerID, message));
           LOG.warn(message);
           this.state = ContainerState.DONE;
           return;
@@ -228,11 +233,6 @@ public class ContainerLauncherImpl extends AbstractService implements
         }
         this.state = ContainerState.DONE;
       }
-      // TODO XXX: NO STOPPED event. Waiting for the RM to get back.
-      // after killing, send killed event to task attempt
-//      context.getEventHandler().handle(
-//          new TaskAttemptEvent(this.taskAttemptID,
-//              TaskAttemptEventType.TA_CONTAINER_CLEANED));
     }
   }
 

+ 32 - 23
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java

@@ -66,8 +66,8 @@ import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskTAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventContainerCompleted;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventType;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTAStopRequestEvent;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTASucceededEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventTAEnded;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorContainerDeAllocateRequestEvent;
@@ -85,6 +85,7 @@ import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleanupEvent;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -468,32 +469,40 @@ public class RecoveryService extends CompositeService implements Recovery {
       } 
       
       // Handle Events which may be sent to the scheduler.
-      else if (event.getType() == AMSchedulerEventType.S_TA_SUCCEEDED) {
-        // Inform the container that the task attempt succeeded.
-        AMSchedulerTASucceededEvent sEvent = (AMSchedulerTASucceededEvent)event;
-        
-        // Leaving the event in the map - for TA failure after success.
-        ContainerId containerId = attemptToContainerMap.get(sEvent.getAttemptID());
-        actualHandler.handle(new AMContainerTASucceededEvent(containerId,
-            sEvent.getAttemptID()));
-        return;
-        // XXX (Post-3902)tal.unregister happens here. Ensure THH handles it
-        // correctly in case of recovery.
-      }
-      else if (event.getType() == AMSchedulerEventType.S_TA_STOP_REQUEST) {
+      else if (event.getType() == AMSchedulerEventType.S_TA_ENDED) {
         // Tell the container to stop.
-        AMSchedulerTAStopRequestEvent sEvent = (AMSchedulerTAStopRequestEvent) event;
-        ContainerId containerId = attemptToContainerMap.get(sEvent.getAttemptID());
-        actualHandler.handle(new AMContainerEvent(containerId,
-            AMContainerEventType.C_STOP_REQUEST));
-        return;
-        // XXX (Post-3902)chh.unregister happens here. Ensure THH handles it
-        // correctly in case of recovery.
+        AMSchedulerEventTAEnded sEvent = (AMSchedulerEventTAEnded) event;
+        ContainerId containerId = attemptToContainerMap.get(sEvent
+            .getAttemptID());
+        switch (sEvent.getState()) {
+        case FAILED: 
+        case KILLED:
+          actualHandler.handle(new AMContainerEvent(containerId,
+              AMContainerEventType.C_STOP_REQUEST));
+          return;
+          // XXX (Post-3902)chh.unregister happens here. Ensure THH handles it
+          // correctly in case of recovery.
+        case SUCCEEDED:
+          // Inform the container that the task attempt succeeded.
+          // Leaving the event in the map - for TA failure after success.
+          actualHandler.handle(new AMContainerTASucceededEvent(containerId,
+              sEvent.getAttemptID()));
+          return;
+          // XXX (Post-3902)tal.unregister happens here. Ensure THH handles it
+          // correctly in case of recovery.
+        default:
+            throw new YarnException("Invalid state " + sEvent.getState());
+        }
       }
       
-      // Ignore de-allocate requests for the container.
+      // De-allocate containers used by previous attempts immediately.
       else if (event.getType() == NMCommunicatorEventType.CONTAINER_STOP_REQUEST) {
         // Ignore. Unless we start relying on a successful NM.stopContainer() call.
+        NMCommunicatorEvent nEvent = (NMCommunicatorEvent)event;
+        ContainerId cId = nEvent.getContainerId();
+        ContainerStatus cs = BuilderUtils.newContainerStatus(cId,
+            ContainerState.COMPLETE, "", 0);
+        actualHandler.handle(new AMContainerEventCompleted(cs));
         return;
       }
       

+ 41 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventTAEnded.java

@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+
+public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
+
+  private final TaskAttemptId attemptId;
+  private TaskAttemptState state;
+
+  public AMSchedulerEventTAEnded(TaskAttemptId attemptId, TaskAttemptState state) {
+    super(AMSchedulerEventType.S_TA_ENDED);
+    this.attemptId = attemptId;
+    this.state = state;
+  }
+
+  public TaskAttemptId getAttemptID() {
+    return this.attemptId;
+  }
+
+  public TaskAttemptState getState() {
+    return this.state;
+  }
+}

+ 1 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java

@@ -3,9 +3,7 @@ package org.apache.hadoop.mapreduce.v2.app2.rm;
 public enum AMSchedulerEventType {
   //Producer: TaskAttempt
   S_TA_LAUNCH_REQUEST,
-  S_TA_STOP_REQUEST, // Maybe renamed to S_TA_END / S_TA_ABNORMAL_END
-  S_TA_SUCCEEDED,
-  S_TA_ENDED,
+  S_TA_ENDED, // Annotated with FAILED/KILLED/SUCCEEDED.
 
   //Producer: RMCommunicator
   S_CONTAINERS_ALLOCATED,

+ 0 - 29
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTAStopRequestEvent.java

@@ -1,29 +0,0 @@
-package org.apache.hadoop.mapreduce.v2.app2.rm;
-
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-
-public class AMSchedulerTAStopRequestEvent extends AMSchedulerEvent {
-
-  // TODO XXX: Maybe include the ContainerId along with this -> for TOO_MANY_FETCH_FAILURES.
-  private final TaskAttemptId attemptId;
-  private final boolean failed;
-
-  public AMSchedulerTAStopRequestEvent(TaskAttemptId attemptId, boolean failed) {
-    super(AMSchedulerEventType.S_TA_STOP_REQUEST);
-    this.attemptId = attemptId;
-    this.failed = failed;
-  }
-
-  public TaskAttemptId getAttemptID() {
-    return this.attemptId;
-  }
-
-  // TODO XXX: Rename
-  public boolean failed() {
-    return failed;
-  }
-
-  public boolean killed() {
-    return !failed;
-  }
-}

+ 0 - 18
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTASucceededEvent.java

@@ -1,18 +0,0 @@
-package org.apache.hadoop.mapreduce.v2.app2.rm;
-
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-
-public class AMSchedulerTASucceededEvent extends AMSchedulerEvent {
-
-  private final TaskAttemptId attemptId;
-
-  public AMSchedulerTASucceededEvent(TaskAttemptId attemptId) {
-    super(AMSchedulerEventType.S_TA_SUCCEEDED);
-    this.attemptId = attemptId;
-  }
-
-  public TaskAttemptId getAttemptID() {
-    return this.attemptId;
-  }
-
-}

+ 17 - 14
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app2.AppContext;
 import org.apache.hadoop.mapreduce.v2.app2.job.Job;
@@ -332,18 +333,20 @@ public class RMContainerAllocator extends AbstractService
       recalculateReduceSchedule = true;
       handleTaLaunchRequest((AMSchedulerTALaunchRequestEvent) sEvent);
       break;
-    case S_TA_STOP_REQUEST: // Effectively means a failure.
+    case S_TA_ENDED: // Effectively means a failure.
       recalculateReduceSchedule = true;
-      handleTaStopRequest((AMSchedulerTAStopRequestEvent) sEvent);
-      break;
-    case S_TA_SUCCEEDED:
-      recalculateReduceSchedule = true;
-      handleTaSucceededRequest((AMSchedulerTASucceededEvent) sEvent);
-      break;
-    case S_TA_ENDED:
-      recalculateReduceSchedule = true;
-      // TODO XXX XXX: Not generated yet. Depends on E05 etc. Also look at
-      // TaskAttempt transitions.
+      AMSchedulerEventTAEnded event = (AMSchedulerEventTAEnded)sEvent;
+      switch(event.getState()) {
+      case FAILED:
+      case KILLED:
+        handleTaStopRequest((AMSchedulerEventTAEnded) sEvent);
+        break;
+      case SUCCEEDED:
+        handleTaSucceededRequest(event);
+        break;
+      default:
+        throw new YarnException("Unexecpted TA_ENDED state: " + event.getState()); 
+      }
       break;
     case S_CONTAINERS_ALLOCATED:
       // Conditional recalculateReduceSchedule
@@ -391,7 +394,7 @@ public class RMContainerAllocator extends AbstractService
     }
   }
 
-  private void handleTaStopRequest(AMSchedulerTAStopRequestEvent event) {
+  private void handleTaStopRequest(AMSchedulerEventTAEnded event) {
     TaskAttemptId aId = event.getAttemptID();
     attemptToLaunchRequestMap.remove(aId);
     // TODO XXX: This remove may need to be deferred. Possible for a SUCCESSFUL taskAttempt to fail,
@@ -410,7 +413,7 @@ public class RMContainerAllocator extends AbstractService
           // stopped.
           sendEvent(new AMNodeEventTaskAttemptEnded(containerMap
               .get(containerId).getContainer().getNodeId(), containerId,
-              event.getAttemptID(), event.failed()));
+              event.getAttemptID(), event.getState() == TaskAttemptState.FAILED));
         } else {
           LOG.warn("Received a STOP request for absent taskAttempt: "
               + event.getAttemptID());
@@ -422,7 +425,7 @@ public class RMContainerAllocator extends AbstractService
     }
   }
   
-  private void handleTaSucceededRequest(AMSchedulerTASucceededEvent event) {
+  private void handleTaSucceededRequest(AMSchedulerEventTAEnded event) {
     // TODO XXX Remember the assigned containerId even after task success.
     // Required for TOO_MANY_FETCH_FAILURES
     attemptToLaunchRequestMap.remove(event.getAttemptID());

+ 17 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEvent.java

@@ -1,13 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
 package org.apache.hadoop.mapreduce.v2.app2.rm.container;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 
-// TODO: Implement.
-
 public class AMContainerEvent extends AbstractEvent<AMContainerEventType> {
 
-
   private final ContainerId containerId;
   
   public AMContainerEvent(ContainerId containerId, AMContainerEventType type) {

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventStopFailed.java

@@ -10,7 +10,7 @@ public class AMContainerEventStopFailed extends AMContainerEvent {
   private final String message;
 
   public AMContainerEventStopFailed(ContainerId containerId, String message) {
-    super(containerId, AMContainerEventType.C_STOP_FAILED);
+    super(containerId, AMContainerEventType.C_NM_STOP_FAILED);
     this.message = message;
   }
 

+ 29 - 10
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventType.java

@@ -1,34 +1,53 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 package org.apache.hadoop.mapreduce.v2.app2.rm.container;
 
 public enum AMContainerEventType {
 
-  // TODO Merge START/LAUNCH, STOP/HALT
-  
   //Producer: Scheduler
-  C_START_REQUEST,
+  C_LAUNCH_REQUEST,
   C_ASSIGN_TA,
   
   //Producer: NMCommunicator
   C_LAUNCHED,
   C_LAUNCH_FAILED, // TODO XXX: Send a diagnostic update message to the TaskAttempts assigned to this container ?
-  
+
   //Producer: TAL: PULL_TA is a sync call.
   C_PULL_TA,
-  
+
   //Producer: Scheduler via TA
-  C_TA_SUCCEEDED,
-  
-  //Producer:RMCommunicator
+  C_TA_SUCCEEDED, // maybe change this to C_TA_FINISHED with a status.
+
+  //Producer: RMCommunicator
   C_COMPLETED,
+  
+  //Producer: RMCommunicator, AMNode
   C_NODE_FAILED,
   
   //Producer: TA-> Scheduler -> Container (in case of failure etc)
   //          Scheduler -> Container (in case of pre-emption etc)
-  //          Node -> Container (in case of Node unhealthy etc)
+  //          Node -> Container (in case of Node blacklisted etc)
   C_STOP_REQUEST,
   
   //Producer: NMCommunicator
-  C_STOP_FAILED,
+  C_NM_STOP_FAILED,
+  C_NM_STOP_SENT,
   
   //Producer: ContainerHeartbeatHandler
   C_TIMED_OUT,

+ 184 - 102
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java

@@ -39,9 +39,9 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app2.AppContext;
 import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventTerminated;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminated;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminating;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventNodeFailed;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventContainerCompleted;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorStopRequestEvent;
@@ -61,10 +61,9 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
 public class AMContainerImpl implements AMContainer {
 
   private static final Log LOG = LogFactory.getLog(AMContainerImpl.class);
-
+  
   private final ReadLock readLock;
   private final WriteLock writeLock;
-  // TODO Use ContainerId or a custom JvmId.
   private final ContainerId containerId;
   // Container to be used for getters on capability, locality etc.
   private final Container container;
@@ -88,7 +87,7 @@ public class AMContainerImpl implements AMContainer {
   
   private TaskAttemptId pendingAttempt;
   private TaskAttemptId runningAttempt;
-  private TaskAttemptId interruptedEvent;
+  private List<TaskAttemptId> failedAssignments;
   private TaskAttemptId pullAttempt;
   
   private boolean inError = false;
@@ -109,53 +108,59 @@ public class AMContainerImpl implements AMContainer {
   private void initStateMachineFactory() {
     stateMachineFactory = 
     stateMachineFactory
-        .addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING, AMContainerEventType.C_START_REQUEST, createLaunchRequestTransition())
+        .addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING, AMContainerEventType.C_LAUNCH_REQUEST, createLaunchRequestTransition())
         .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtAllocatedTransition())
         .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtAllocatedTransition())
         .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_STOP_REQUEST, createStopRequestTransition())
         .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtAllocatedTransition())
-        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), createGenericErrorTransition())
+        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), createGenericErrorTransition())
         
         
-        .addTransition(AMContainerState.LAUNCHING, EnumSet.of(AMContainerState.LAUNCHING, AMContainerState.STOPPING), AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptTransition())
+        .addTransition(AMContainerState.LAUNCHING, EnumSet.of(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptTransition())
         .addTransition(AMContainerState.LAUNCHING, AMContainerState.IDLE, AMContainerEventType.C_LAUNCHED, createLaunchedTransition())
         .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_FAILED, createLaunchFailedTransition())
         .addTransition(AMContainerState.LAUNCHING, AMContainerState.LAUNCHING, AMContainerEventType.C_PULL_TA) // Is assuming the pullAttempt will be null.
         .addTransition(AMContainerState.LAUNCHING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtLaunchingTransition())
-        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtLaunchingTransition())
+        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtLaunchingTransition())
         .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtLaunchingTransition())
-        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_START_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), createGenericErrorAtLaunchingTransition())
-        
-        
-        .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.IDLE, AMContainerState.STOPPING), AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtIdleTransition())
+        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), createGenericErrorAtLaunchingTransition())
+
+        .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtIdleTransition())
         .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.RUNNING, AMContainerState.IDLE), AMContainerEventType.C_PULL_TA, createPullTAAtIdleTransition())
         .addTransition(AMContainerState.IDLE, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtIdleTransition())
-        .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtIdleTransition())
-        .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, AMContainerEventType.C_TIMED_OUT, createTimedOutAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, createTimedOutAtIdleTransition())
         .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtIdleTransition())
-        .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_START_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_FAILED), createGenericErrorAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), createGenericErrorAtIdleTransition())
         
-        .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtRunningTransition())
         .addTransition(AMContainerState.RUNNING, AMContainerState.RUNNING, AMContainerEventType.C_PULL_TA)
         .addTransition(AMContainerState.RUNNING, AMContainerState.IDLE, AMContainerEventType.C_TA_SUCCEEDED, createTASucceededAtRunningTransition())
         .addTransition(AMContainerState.RUNNING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtRunningTransition())
-        .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtRunningTransition())
-        .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_TIMED_OUT, createTimedOutAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, createTimedOutAtRunningTransition())
         .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtRunningTransition())
-        .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_START_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_STOP_FAILED), createGenericErrorAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), createGenericErrorAtRunningTransition())
         
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, createAssignTAAtStoppingTransition())
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtStoppingTransition())
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_SENT)
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_FAILED, createStopFailedAtNMStopRequested()) // TODO XXX: Rename these.
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtNMStopRequestedTransition())
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TIMED_OUT))
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_LAUNCH_REQUEST, createGenericErrorAtStoppingTransition())
         
         .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_ASSIGN_TA, createAssignTAAtStoppingTransition())
         .addTransition(AMContainerState.STOPPING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtStoppingTransition())
-        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedBaseTransition())
-        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
-        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_START_REQUEST, createGenericErrorAtStoppingTransition())
+        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtStoppingTransition())
+        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_LAUNCH_REQUEST, createGenericErrorAtStoppingTransition())
         
         .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, createAssignTAAtCompletedTransition())
-        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, createNodeFailedBaseTransition())
-        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_START_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_STOP_FAILED), createGenericErrorAtStoppingTransition())
-        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_COMPLETED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_COMPLETED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TIMED_OUT))
- 
+        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtCompletedTransition())
+        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
+        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST), createGenericErrorAtStoppingTransition())
+
         .installTopology();
   }
 
@@ -335,8 +340,8 @@ public class AMContainerImpl implements AMContainer {
       AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
       container.inError = true;
       container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
-          "AMScheduler Error: TaskAttempt should not be" +
-          " allocated before a launch request.");
+          "AMScheduler Error: TaskAttempt allocated to unlaunched container: "
+              + container.getContainerId());
       container.sendCompletedToScheduler();
       container.deAllocate();
       LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId()
@@ -387,6 +392,10 @@ public class AMContainerImpl implements AMContainer {
     }
   }
   
+  protected void registerFailedTAAssignment(TaskAttemptId taId) {
+    failedAssignments.add(taId);
+  }
+  
   protected void deAllocate() {
     sendEvent(new RMCommunicatorContainerDeAllocateRequestEvent(containerId));
   }
@@ -396,15 +405,17 @@ public class AMContainerImpl implements AMContainer {
   }
 
   protected void sendTerminatedToTaskAttempt(TaskAttemptId taId, String message) {
-    if (message != null) {
-      sendEvent(new TaskAttemptDiagnosticsUpdateEvent(taId, message));
-    }
-    sendEvent(new TaskAttemptEventTerminated(taId));
+    sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
   }
 
-  protected void sendKillRequestToTaskAttempt(TaskAttemptId taId) {
-    sendEvent(new TaskAttemptEventKillRequest(taId,
-        "Node running the contianer failed"));
+  protected void sendTerminatingToTA(TaskAttemptId taId, String message) {
+    sendEvent(new TaskAttemptEventContainerTerminating(taId, message));
+  }
+  
+  protected void sendNodeFailureToTA(AMContainerEvent event,
+      TaskAttemptId taId, String message) {
+    sendEvent(new TaskAttemptEventNodeFailed(taId, message));
+    // TODO XXX: Diag message from the node. Otherwise include the nodeId
   }
 
   protected void sendStopRequestToNM() {
@@ -439,11 +450,14 @@ public class AMContainerImpl implements AMContainer {
         container.inError = true;
         String errorMessage = "AMScheduler Error: Multiple simultaneous " +
         		"taskAttempt allocations to: " + container.getContainerId();
-        container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
-            errorMessage);
-        container.deAllocate();
+        container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+        container.registerFailedTAAssignment(event.getTaskAttemptId());
+        // TODO XXX: Verify that it's ok to send in a NM_STOP_REQUEST. The
+        // NMCommunicator should be able to handle this. The STOP_REQUEST would
+        // only go out after the START_REQUEST.
         LOG.warn(errorMessage);
-        return AMContainerState.STOPPING;
+        container.sendStopRequestToNM();
+        return AMContainerState.STOP_REQUESTED;
       }
       container.pendingAttempt = event.getTaskAttemptId();
       container.remoteTaskMap.put(event.getTaskAttemptId(),
@@ -490,7 +504,7 @@ public class AMContainerImpl implements AMContainer {
         container.pendingAttempt = null;
         if (container.lastTaskFinishTime != 0) {
           long idleTimeDiff = System.currentTimeMillis() - container.lastTaskFinishTime;
-          LOG.info("Computing idle time for container: " + container.getContainerId() + ", lastFinishTime: " + container.lastTaskFinishTime + ", Incremented by: " + idleTimeDiff);
+          LOG.info("XXX: Computing idle time for container: " + container.getContainerId() + ", lastFinishTime: " + container.lastTaskFinishTime + ", Incremented by: " + idleTimeDiff);
           container.idleTimeBetweenTasks += System.currentTimeMillis() - container.lastTaskFinishTime;
         }
         LOG.info("XXX: Assigned task + [" + container.runningAttempt + "] to container: [" + container.getContainerId() + "]");
@@ -512,8 +526,8 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       if (container.pendingAttempt != null) {
         AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent;
-        container.sendEvent(new TaskAttemptDiagnosticsUpdateEvent(
-            container.pendingAttempt, event.getMessage()));
+        container.sendTerminatingToTA(container.pendingAttempt,
+            event.getMessage());
       }
       container.deAllocate();
     }
@@ -531,7 +545,8 @@ public class AMContainerImpl implements AMContainer {
       if (container.pendingAttempt != null) {
         String errorMessage = "Container" + container.getContainerId()
             + " failed. Received COMPLETED event while trying to launch";
-        container.sendTerminatedToTaskAttempt(container.pendingAttempt,errorMessage);
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt,
+            errorMessage);
         LOG.warn(errorMessage);    
         // TODO XXX Maybe nullify pendingAttempt.
       }
@@ -548,11 +563,14 @@ public class AMContainerImpl implements AMContainer {
       SingleArcTransition<AMContainerImpl, AMContainerEvent> {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      if (container.pendingAttempt != null) {
+        container.sendTerminatingToTA(container.pendingAttempt,
+            " Container" + container.getContainerId() + " received a STOP_REQUEST");
+      }
       container.sendStopRequestToNM();
-      container.deAllocate();
     }
   }
-  
+
   protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
       createNodeFailedAtLaunchingTransition() {
     return new NodeFailedAtLaunching();
@@ -563,7 +581,10 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       if (container.pendingAttempt != null) {
-        container.sendKillRequestToTaskAttempt(container.pendingAttempt);
+        container.sendNodeFailureToTA(cEvent, container.pendingAttempt, null);
+        // TODO XXX: Maybe include a diagnostic message along with the incoming
+        // Node failure event.
+        container.sendTerminatingToTA(container.pendingAttempt, "Node failure");
       }
       container.sendStopRequestToNM();
       container.deAllocate();
@@ -575,7 +596,7 @@ public class AMContainerImpl implements AMContainer {
     return new AssignTaskAttemptAtIdle();
   }
 
-  // TODO Make this the base for all assignRequests. Some more error checking in
+  // TODO XXX Make this the base for all assignRequests. Some more error checking in
   // that case.
   protected static class AssignTaskAttemptAtIdle
       implements
@@ -588,17 +609,16 @@ public class AMContainerImpl implements AMContainer {
         container.inError = true;
         String errorMessage = "AMScheduler Error: Multiple simultaneous "
             + "taskAttempt allocations to: " + container.getContainerId();
-        container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
-            errorMessage);
+        container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+        container.registerFailedTAAssignment(event.getTaskAttemptId());
         LOG.warn(errorMessage);
         container.sendStopRequestToNM();
-        container.deAllocate();
         container.containerHeartbeatHandler.unregister(container.containerId);
         
-        return AMContainerState.STOPPING;
+        return AMContainerState.STOP_REQUESTED;
       }
       container.pendingAttempt = event.getTaskAttemptId();
-      // TODO LATER. Cleanup the remoteTaskMap.
+      // TODO XXX LATER. Cleanup the remoteTaskMap.
       container.remoteTaskMap.put(event.getTaskAttemptId(),
           event.getRemoteTask());
       return AMContainerState.IDLE;
@@ -617,10 +637,12 @@ public class AMContainerImpl implements AMContainer {
       LOG.info("Cotnainer with id: " + container.getContainerId()
           + " Completed." + " Previous state was: " + container.getState());
       if (container.pendingAttempt != null) {
-        container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt,
+            "Container " + container.getContainerId() + " FINISHED.");
       }
       container.sendCompletedToScheduler();
       container.containerHeartbeatHandler.unregister(container.containerId);
+      container.unregisterJvmFromListener(container.jvmId);
     }
   }
   
@@ -629,16 +651,13 @@ public class AMContainerImpl implements AMContainer {
     return new StopRequestAtIdle();
   }
   
-  protected static class StopRequestAtIdle implements
-      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+  protected static class StopRequestAtIdle extends StopRequestAtLaunching {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
       LOG.info("XXX: IdleTimeBetweenTasks: " + container.idleTimeBetweenTasks);
-      container.sendStopRequestToNM();
-      container.deAllocate();
       container.containerHeartbeatHandler.unregister(container.containerId);
       container.unregisterJvmFromListener(container.jvmId);
-      // TODO XXXXXXXXX: Unregister from TAL so that the Container kills itself (via a kill task assignment)
     }
   }
 
@@ -648,6 +667,7 @@ public class AMContainerImpl implements AMContainer {
   }
 
   protected static class TimedOutAtIdle extends StopRequestAtIdle {
+    // TODO XXX: Override to change the diagnostic message that goes to the TaskAttempt. Functionality is the same.
   }
   
   protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
@@ -675,15 +695,13 @@ public class AMContainerImpl implements AMContainer {
       SingleArcTransition<AMContainerImpl, AMContainerEvent> {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
-      container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
+      container.sendTerminatedToTaskAttempt(container.runningAttempt,
+          "Container " + container.getContainerId()
+              + " FINISHED while task was running");
       container.sendCompletedToScheduler();
       container.containerHeartbeatHandler.unregister(container.containerId);
       container.unregisterAttemptFromListener(container.runningAttempt);
       container.unregisterJvmFromListener(container.jvmId);
-      container.interruptedEvent = container.runningAttempt;
-      container.runningAttempt = null;
-      
-      
     }
   }
 
@@ -696,10 +714,9 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
       container.unregisterAttemptFromListener(container.runningAttempt);
-//      container.unregisterJvmFromListener(container.jvmId);
+      container.sendTerminatingToTA(container.runningAttempt,
+          " Container" + container.getContainerId() + " received a STOP_REQUEST");
       // TODO XXX: All running transition. verify whether runningAttempt should be null.
-      container.interruptedEvent = container.runningAttempt;
-      container.runningAttempt = null;
     }
   }
 
@@ -709,6 +726,7 @@ public class AMContainerImpl implements AMContainer {
   }
 
   protected static class TimedOutAtRunning extends StopRequestAtRunning {
+    // TODO XXX: Change the error message.
   }
 
   protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
@@ -721,12 +739,10 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
-      container.sendKillRequestToTaskAttempt(container.runningAttempt);
+      container.sendNodeFailureToTA(cEvent, container.runningAttempt, null);
+      container.sendTerminatingToTA(container.runningAttempt, "Node failure");
+
       container.unregisterAttemptFromListener(container.runningAttempt);
-      container.unregisterJvmFromListener(container.jvmId);
-      container.interruptedEvent = container.runningAttempt;
-      container.runningAttempt = null;
-      
     }
   }
  
@@ -744,9 +760,9 @@ public class AMContainerImpl implements AMContainer {
       container.inError = true;
       String errorMessage = "AttemptId: " + event.getTaskAttemptId()
           + " cannot be allocated to container: " + container.getContainerId()
-          + " in STOPPING state";
-      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
-          errorMessage);
+          + " in " + container.getState() + " state";
+      container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+      container.registerFailedTAAssignment(event.getTaskAttemptId());
     }
   }
 
@@ -761,6 +777,7 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       container.inError = true;
+      // TODO XXX: Anything else required in the error transitions ?
     }
   }
 
@@ -791,22 +808,32 @@ public class AMContainerImpl implements AMContainer {
       SingleArcTransition<AMContainerImpl, AMContainerEvent> {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
-      // XXX: Would some of these events not have gone out when entering the STOPPING state. Fix errorMessages
+      // TODO XXX: Set everything to null after sending these out.
       if (container.pendingAttempt != null) {
         container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
       }
       if (container.runningAttempt != null) {
         container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
       }
-      if (container.interruptedEvent != null) {
-        container.sendTerminatedToTaskAttempt(container.interruptedEvent, null);
-      }
       container.sendCompletedToScheduler();
     }
   }
 
+  protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
+      createStopFailedAtNMStopRequested() {
+    return new StopFailedAtNMStopRequested();
+  }
+
+  protected static class StopFailedAtNMStopRequested implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      container.deAllocate();
+    }
+  }
 
-  protected SingleArcTransition<AMContainerImpl, AMContainerEvent> createNodeFailedBaseTransition() {
+  protected SingleArcTransition<AMContainerImpl, AMContainerEvent> 
+      createNodeFailedBaseTransition() {
     return new NodeFailedBase();
   }
   
@@ -820,43 +847,96 @@ public class AMContainerImpl implements AMContainer {
       // let multiple events go out and the TA should be able to handle them.
       // Kill_TA going out in this case.
       if (container.runningAttempt != null) {
-        container.killTaskAttempt(container.runningAttempt);
+        container.sendNodeFailureToTA(cEvent, container.runningAttempt, null);
+        container.sendTerminatingToTA(container.runningAttempt, "Node Failure");
       }
       if (container.pendingAttempt != null) {
-        container.killTaskAttempt(container.pendingAttempt);
+        container.sendNodeFailureToTA(cEvent, container.pendingAttempt, null);
       }
       for (TaskAttemptId attemptId : container.completedAttempts) {
         // TODO XXX: Make sure TaskAttempt knows how to handle kills to REDUCEs.
-//        if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
-          container.killTaskAttempt(attemptId);
-//        }s
+        container.sendNodeFailureToTA(cEvent, attemptId, null);
       }
-      
     }
   }
   
-  private void killTaskAttempt(TaskAttemptId attemptId) {
-    sendEvent(new TaskAttemptEventKillRequest(attemptId, "The node running the task attempt was marked as bad"));
+  protected SingleArcTransition<AMContainerImpl, AMContainerEvent> 
+      createNodeFailedAtStoppingTransition() {
+    return new NodeFailedAtSopping();
   }
   
+  protected static class NodeFailedAtSopping extends NodeFailedBase {
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      if (container.runningAttempt != null) { 
+        container.sendTerminatingToTA(container.runningAttempt, "Node Failure");
+      }
+    }
+  }
+
+  protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
+      createNodeFailedAtCompletedTransition() {
+    return new NodeFailedAtCompleted();
+  }
+
+  protected static class NodeFailedAtCompleted extends NodeFailedBase {
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      if (container.runningAttempt != null) {
+        container.sendTerminatedToTaskAttempt(container.runningAttempt,
+            "Node Failure");
+      }
+    }
+  }
+
+  protected SingleArcTransition<AMContainerImpl, AMContainerEvent> createNodeFailedAtNMStopRequestedTransition() {
+    return new NodeFailedAtNMStopRequested();
+  }
+
+  protected static class NodeFailedAtNMStopRequested implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      if (container.runningAttempt != null) {
+        container.sendNodeFailureToTA(cEvent, container.runningAttempt,
+            null);
+        container.sendTerminatingToTA(container.runningAttempt, "Node Failure");
+      }
+      if (container.pendingAttempt != null) {
+        container.sendNodeFailureToTA(cEvent, container.pendingAttempt,
+            null);
+      }
+      for (TaskAttemptId attemptId : container.completedAttempts) {
+        // TODO XXX: Make sure TaskAttempt knows how to handle kills to REDUCEs.
+        container.sendNodeFailureToTA(cEvent, attemptId, null);
+      }
+      for (TaskAttemptId attemptId : container.failedAssignments) {
+        container.sendNodeFailureToTA(cEvent, attemptId, null);
+      }
+      container.deAllocate();
+    }
+  }
+
   protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
       createNodeFailedAtIdleTransition() {
     return new NodeFailedAtIdle();
   }
-  
-  protected static class NodeFailedAtIdle implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
-    
+
+  protected static class NodeFailedAtIdle implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       container.sendStopRequestToNM();
       container.deAllocate();
       if (container.pendingAttempt != null) {
-        container.sendKillRequestToTaskAttempt(container.pendingAttempt);
+        container.sendNodeFailureToTA(cEvent, container.pendingAttempt, null);
+        container.sendTerminatingToTA(container.pendingAttempt, "Node Failure");
       }
       for (TaskAttemptId taId : container.completedAttempts) {
-        container.sendKillRequestToTaskAttempt(taId);
+        container.sendNodeFailureToTA(cEvent, taId, null);
       }
       container.containerHeartbeatHandler.unregister(container.containerId);
+      container.unregisterJvmFromListener(container.jvmId);
     }
   }
 
@@ -873,16 +953,18 @@ public class AMContainerImpl implements AMContainer {
       container.inError = true;
       String errorMessage = "AttemptId: " + event.getTaskAttemptId()
           + " cannot be allocated to container: " + container.getContainerId()
-          + " in RUNNING state";
-      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(), errorMessage);
+          + " in RUNNING state. Already executing TaskAttempt: "
+          + container.runningAttempt;
+      container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+      container.registerFailedTAAssignment(event.getTaskAttemptId());
+      
+      container.sendTerminatingToTA(container.runningAttempt, errorMessage);
+      
       container.sendStopRequestToNM();
-      container.deAllocate();
       container.unregisterAttemptFromListener(container.runningAttempt);
       container.unregisterJvmFromListener(container.jvmId);
       container.containerHeartbeatHandler.unregister(container.containerId);
-      container.interruptedEvent = container.runningAttempt;
-      container.runningAttempt = null;
-      // TODO XXX: Is the TAL unregister required ?
+
     }
   }
 
@@ -926,6 +1008,7 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
       container.containerHeartbeatHandler.unregister(container.containerId);
+      container.unregisterJvmFromListener(container.jvmId);
     }
   }
   
@@ -939,12 +1022,11 @@ public class AMContainerImpl implements AMContainer {
       super.transition(container, cEvent);
       container.unregisterAttemptFromListener(container.runningAttempt);
       container.unregisterJvmFromListener(container.jvmId);
-      container.interruptedEvent = container.runningAttempt;
-      container.runningAttempt = null;
     }
   }
 
   // TODO Create a generic ERROR state. Container tries informing relevant components in this case.
+  // TODO XXX: Rename all generic error transitions.
   
 
 }

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java

@@ -38,7 +38,7 @@ public class AMContainerLaunchRequestEvent extends AMContainerEvent {
   public AMContainerLaunchRequestEvent(ContainerId containerId, JobId jobId,
       TaskType taskType, Token<JobTokenIdentifier> jobToken,
       Credentials credentials, boolean shouldProfile, JobConf jobConf) {
-    super(containerId, AMContainerEventType.C_START_REQUEST);
+    super(containerId, AMContainerEventType.C_LAUNCH_REQUEST);
     this.jobId = jobId;
     this.taskTypeForContainer = taskType;
     this.jobToken = jobToken;

+ 20 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java

@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
 package org.apache.hadoop.mapreduce.v2.app2.rm.container;
 
 public enum AMContainerState {
@@ -5,6 +22,9 @@ public enum AMContainerState {
   LAUNCHING,
   IDLE,
   RUNNING,
+  // indicates a NM stop request has been attempted. This request could fail, in
+  // which case an RM stop request needs to be sent.
+  STOP_REQUESTED, 
   STOPPING,
   COMPLETED,
 }

+ 19 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventTaskAttemptEnded.java

@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
 package org.apache.hadoop.mapreduce.v2.app2.rm.node;
 
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -11,7 +28,8 @@ public class AMNodeEventTaskAttemptEnded extends AMNodeEvent {
   private final ContainerId containerId;
   private final TaskAttemptId taskAttemptId;
   
-  public AMNodeEventTaskAttemptEnded(NodeId nodeId, ContainerId containerId, TaskAttemptId taskAttemptId, boolean failed) {
+  public AMNodeEventTaskAttemptEnded(NodeId nodeId, ContainerId containerId,
+      TaskAttemptId taskAttemptId, boolean failed) {
     super(nodeId, AMNodeEventType.N_TA_ENDED);
     this.failed = failed;
     this.containerId = containerId;

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java

@@ -29,7 +29,7 @@ public enum AMNodeEventType {
   //Producer: RMCommunicator
   N_TURNED_UNHEALTHY,
   N_TURNED_HEALTHY,
-  N_NODE_COUNT_UPDATED,
+  N_NODE_COUNT_UPDATED, // for blacklisting.
   
   //Producer: AMNodeManager
   N_BLACKLISTING_ENABLED,

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java

@@ -259,6 +259,7 @@ public class AMNodeImpl implements AMNode {
               AMNodeEventType.N_NODE_WAS_BLACKLISTED));
           return AMNodeState.BLACKLISTED;
           // TODO XXX: An event likely needs to go out to the scheduler.
+          // XXX Someone needs to update the scheduler tables - send a ZEROd request to the scheduler. Who's doing that ?
         }
       }
       return AMNodeState.ACTIVE;
@@ -378,6 +379,7 @@ public class AMNodeImpl implements AMNode {
       LOG.info("Node: " + node.getNodeId()
           + " got allocated a contaienr with id: " + event.getContainerId()
           + " while in UNHEALTHY state. Releasing it.");
+      // TODO XXX: Maybe consider including some diagnostics with this event. (RM reported NODE as unhealthy maybe). Which would then be included in diagnostics from the Container.
       node.sendEvent(new AMContainerEvent(event.getContainerId(),
           AMContainerEventType.C_NODE_FAILED));
     }

+ 0 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import static junit.framework.Assert.assertFalse;
 import static junit.framework.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -51,7 +50,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.mockito.verification.VerificationMode;
 
 public class TestJobHistoryEventHandler {
 

+ 23 - 20
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java

@@ -72,8 +72,7 @@ import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTAStopRequestEvent;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTASucceededEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventTAEnded;
 import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
@@ -261,7 +260,8 @@ public class MRApp extends MRAppMaster {
     TaskAttemptReport report = attempt.getReport();
     while (!finalState.equals(report.getTaskAttemptState()) &&
         timeoutSecs++ < 20) {
-      System.out.println("TaskAttempt State is : " + report.getTaskAttemptState() +
+      System.out.println("TaskAttempt State for " + attempt.getID() + " is : " + 
+          report.getTaskAttemptState() +
           " Waiting for state : " + finalState +
           "   progress : " + report.getProgress());
       report = attempt.getReport();
@@ -651,24 +651,27 @@ public class MRApp extends MRAppMaster {
                 .getRemoteTask()));
 
         break;
-      case S_TA_STOP_REQUEST:
+      case S_TA_ENDED:
         // Send out a Container_stop_request.
-        AMSchedulerTAStopRequestEvent stEvent = (AMSchedulerTAStopRequestEvent) rawEvent;
-        LOG.info("XXX: Handling S_TA_STOP_REQUEST for attemptId:" + stEvent.getAttemptID());
-        getContext().getEventHandler().handle(
-            new AMContainerEvent(attemptToContainerIdMap.get(stEvent
-                .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
-
-        break;
-      case S_TA_SUCCEEDED:
-        // No re-use in MRApp. Stop the container.
-        AMSchedulerTASucceededEvent suEvent = (AMSchedulerTASucceededEvent) rawEvent;
-        LOG.info("XXX: Handling S_TA_SUCCEEDED for attemptId: "
-            + suEvent.getAttemptID());
-        getContext().getEventHandler().handle(
-            new AMContainerEvent(attemptToContainerIdMap.get(suEvent
-                .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
-        break;
+        AMSchedulerEventTAEnded sEvent = (AMSchedulerEventTAEnded) rawEvent;
+        LOG.info("XXX: Handling S_TA_ENDED for attemptId:"
+            + sEvent.getAttemptID() + " with state: " + sEvent.getState());
+        switch (sEvent.getState()) {
+        case FAILED:
+        case KILLED:
+          getContext().getEventHandler().handle(
+              new AMContainerEvent(attemptToContainerIdMap.get(sEvent
+                  .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
+          break;
+        case SUCCEEDED:
+          // No re-use in MRApp. Stop the container.
+          getContext().getEventHandler().handle(
+              new AMContainerEvent(attemptToContainerIdMap.get(sEvent
+                  .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
+          break;
+        default:
+          throw new YarnException("Unexpected state: " + sEvent.getState());
+        }
       case S_CONTAINERS_ALLOCATED:
         break;
       case S_CONTAINER_COMPLETED:

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java

@@ -195,7 +195,7 @@ public class TestFail {
     // TODO XXX: This may not be a valid test.
     app.getDispatcher().getEventHandler().handle(
         new TaskAttemptEvent(attempt.getID(),
-            TaskAttemptEventType.TA_TERMINATED));
+            TaskAttemptEventType.TA_CONTAINER_TERMINATED));
     app.waitForState(job, JobState.FAILED);
   }
 

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java

@@ -58,7 +58,8 @@ public class TestMapReduceChildJVM {
       " -Dhadoop.root.logger=INFO,CLA" +
       " org.apache.hadoop.mapred.YarnChild2 127.0.0.1" +
       " 54321" +
-      " attempt_0_0000_m_000000_0" +
+      " job_0_0000" +
+      " MAP" +
       " 0" +
       " 1><LOG_DIR>/stdout" +
       " 2><LOG_DIR>/stderr ]", app.myCommandLine);

+ 15 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java

@@ -45,6 +45,7 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
@@ -815,6 +816,12 @@ public class TestRMContainerAllocator {
       super.handleEvent(event);
     }
     
+    @Override
+    protected boolean shouldProfileTaskAttempt(JobConf conf,
+        org.apache.hadoop.mapred.Task remoteTask) {
+      return false;
+    }
+    
     static Priority getMapPriority() {
       return BuilderUtils.newPriority(PRIORITY_MAP.getPriority());
     }
@@ -845,6 +852,12 @@ public class TestRMContainerAllocator {
         int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) {
       recalculatedReduceSchedule = true;
     }
+    
+    @Override
+    protected boolean shouldProfileTaskAttempt(JobConf conf,
+        org.apache.hadoop.mapred.Task remoteTask) {
+      return false;
+    }
   }
 
   class TrackingAMContainerRequestor extends RMContainerRequestor {
@@ -928,7 +941,7 @@ public class TestRMContainerAllocator {
     
     @Override
     public void handle(Event event) {
-      if (event.getType() == AMContainerEventType.C_START_REQUEST) {
+      if (event.getType() == AMContainerEventType.C_LAUNCH_REQUEST) {
         launchRequests.add((AMContainerLaunchRequestEvent)event);
       } else if (event.getType() == AMContainerEventType.C_ASSIGN_TA) {
         assignEvents.add((AMContainerAssignTAEvent)event);
@@ -960,6 +973,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     when(mockJob.getID()).thenReturn(jobId);
     when(mockJob.getProgress()).thenReturn(0.0f);
+    when(mockJob.getConf()).thenReturn(conf);
 
     Clock clock = new ControlledClock(new SystemClock());