فهرست منبع

MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol for Job, Task and TaskAttempt. Contributed by Siddarth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1399978 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 12 سال پیش
والد
کامیت
7b5f02425e
20فایلهای تغییر یافته به همراه546 افزوده شده و 342 حذف شده
  1. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 30 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
  3. 42 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java
  4. 23 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskStateInternal.java
  5. 108 89
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
  6. 159 114
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  7. 85 63
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
  8. 2 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
  9. 26 5
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
  10. 4 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
  11. 6 5
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
  12. 15 11
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
  13. 2 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
  14. 4 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
  15. 19 9
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
  16. 0 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java
  17. 6 12
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptState.java
  18. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskState.java
  19. 2 7
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
  20. 9 17
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -8,6 +8,9 @@ Release 0.23.5 - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol
+    for Job, Task and TaskAttempt. (Siddarth Seth via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 30 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java

@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job;
+
+public enum JobStateInternal {
+  NEW,
+  INITED,
+  RUNNING,
+  SUCCEEDED,
+  FAILED,
+  KILL_WAIT,
+  KILLED,
+  ERROR
+}

+ 42 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java

@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+* TaskAttemptImpl internal state machine states.
+*
+*/
+@Private
+public enum TaskAttemptStateInternal {
+  NEW, 
+  UNASSIGNED, 
+  ASSIGNED, 
+  RUNNING, 
+  COMMIT_PENDING, 
+  SUCCESS_CONTAINER_CLEANUP, 
+  SUCCEEDED, 
+  FAIL_CONTAINER_CLEANUP, 
+  FAIL_TASK_CLEANUP, 
+  FAILED, 
+  KILL_CONTAINER_CLEANUP, 
+  KILL_TASK_CLEANUP, 
+  KILLED,
+}

+ 23 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskStateInternal.java

@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job;
+
+public enum TaskStateInternal {
+  NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED
+}

+ 108 - 89
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -76,6 +76,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
@@ -196,150 +197,150 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
       new CounterUpdateTransition();
 
   protected static final
-    StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> 
+    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> 
        stateMachineFactory
-     = new StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent>
-              (JobState.NEW)
+     = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
+              (JobStateInternal.NEW)
 
           // Transitions from NEW state
-          .addTransition(JobState.NEW, JobState.NEW,
+          .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.NEW, JobState.NEW,
+          .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition
-              (JobState.NEW,
-              EnumSet.of(JobState.INITED, JobState.FAILED),
+              (JobStateInternal.NEW,
+              EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
               JobEventType.JOB_INIT,
               new InitTransition())
-          .addTransition(JobState.NEW, JobState.KILLED,
+          .addTransition(JobStateInternal.NEW, JobStateInternal.KILLED,
               JobEventType.JOB_KILL,
               new KillNewJobTransition())
-          .addTransition(JobState.NEW, JobState.ERROR,
+          .addTransition(JobStateInternal.NEW, JobStateInternal.ERROR,
               JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
           // Transitions from INITED state
-          .addTransition(JobState.INITED, JobState.INITED,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.INITED, JobState.INITED,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
-          .addTransition(JobState.INITED, JobState.RUNNING,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.RUNNING,
               JobEventType.JOB_START,
               new StartTransition())
-          .addTransition(JobState.INITED, JobState.KILLED,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.KILLED,
               JobEventType.JOB_KILL,
               new KillInitedJobTransition())
-          .addTransition(JobState.INITED, JobState.ERROR,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.ERROR,
               JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
           // Transitions from RUNNING state
-          .addTransition(JobState.RUNNING, JobState.RUNNING,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
               TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
           .addTransition
-              (JobState.RUNNING,
-              EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED),
+              (JobStateInternal.RUNNING,
+              EnumSet.of(JobStateInternal.RUNNING, JobStateInternal.SUCCEEDED, JobStateInternal.FAILED),
               JobEventType.JOB_TASK_COMPLETED,
               new TaskCompletedTransition())
           .addTransition
-              (JobState.RUNNING,
-              EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED),
+              (JobStateInternal.RUNNING,
+              EnumSet.of(JobStateInternal.RUNNING, JobStateInternal.SUCCEEDED, JobStateInternal.FAILED),
               JobEventType.JOB_COMPLETED,
               new JobNoTasksCompletedTransition())
-          .addTransition(JobState.RUNNING, JobState.KILL_WAIT,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.KILL_WAIT,
               JobEventType.JOB_KILL, new KillTasksTransition())
-          .addTransition(JobState.RUNNING, JobState.RUNNING,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_MAP_TASK_RESCHEDULED,
               new MapTaskRescheduledTransition())
-          .addTransition(JobState.RUNNING, JobState.RUNNING,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.RUNNING, JobState.RUNNING,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
-          .addTransition(JobState.RUNNING, JobState.RUNNING,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
               new TaskAttemptFetchFailureTransition())
           .addTransition(
-              JobState.RUNNING,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR,
+              JobStateInternal.RUNNING,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
           // Transitions from KILL_WAIT state.
           .addTransition
-              (JobState.KILL_WAIT,
-              EnumSet.of(JobState.KILL_WAIT, JobState.KILLED),
+              (JobStateInternal.KILL_WAIT,
+              EnumSet.of(JobStateInternal.KILL_WAIT, JobStateInternal.KILLED),
               JobEventType.JOB_TASK_COMPLETED,
               new KillWaitTaskCompletedTransition())
-          .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+          .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
               JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
               TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
-          .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+          .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+          .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
-              JobState.KILL_WAIT,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR,
+              JobStateInternal.KILL_WAIT,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
-          .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+          .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
               EnumSet.of(JobEventType.JOB_KILL,
                          JobEventType.JOB_MAP_TASK_RESCHEDULED,
                          JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // Transitions from SUCCEEDED state
-          .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
+          .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
+          .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
-              JobState.SUCCEEDED,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR,
+              JobStateInternal.SUCCEEDED,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
-          .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
+          .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
               EnumSet.of(JobEventType.JOB_KILL,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // Transitions from FAILED state
-          .addTransition(JobState.FAILED, JobState.FAILED,
+          .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.FAILED, JobState.FAILED,
+          .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
-              JobState.FAILED,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR,
+              JobStateInternal.FAILED,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
-          .addTransition(JobState.FAILED, JobState.FAILED,
+          .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
               EnumSet.of(JobEventType.JOB_KILL,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // Transitions from KILLED state
-          .addTransition(JobState.KILLED, JobState.KILLED,
+          .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.KILLED, JobState.KILLED,
+          .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
-              JobState.KILLED,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR,
+              JobStateInternal.KILLED,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
-          .addTransition(JobState.KILLED, JobState.KILLED,
+          .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
               EnumSet.of(JobEventType.JOB_KILL,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // No transitions from INTERNAL_ERROR state. Ignore all.
           .addTransition(
-              JobState.ERROR,
-              JobState.ERROR,
+              JobStateInternal.ERROR,
+              JobStateInternal.ERROR,
               EnumSet.of(JobEventType.JOB_INIT,
                   JobEventType.JOB_KILL,
                   JobEventType.JOB_TASK_COMPLETED,
@@ -348,12 +349,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
                   JobEventType.JOB_DIAGNOSTIC_UPDATE,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
                   JobEventType.INTERNAL_ERROR))
-          .addTransition(JobState.ERROR, JobState.ERROR,
+          .addTransition(JobStateInternal.ERROR, JobStateInternal.ERROR,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           // create the topology tables
           .installTopology();
  
-  private final StateMachine<JobState, JobEventType, JobEvent> stateMachine;
+  private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;
 
   //changing fields while the job is running
   private int numMapTasks;
@@ -418,7 +419,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     stateMachine = stateMachineFactory.make(this);
   }
 
-  protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
+  protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
     return stateMachine;
   }
 
@@ -492,9 +493,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     readLock.lock();
 
     try {
-      JobState state = getState();
-      if (state == JobState.ERROR || state == JobState.FAILED
-          || state == JobState.KILLED || state == JobState.SUCCEEDED) {
+      JobStateInternal state = getInternalState();
+      if (state == JobStateInternal.ERROR || state == JobStateInternal.FAILED
+          || state == JobStateInternal.KILLED || state == JobStateInternal.SUCCEEDED) {
         this.mayBeConstructFinalFullCounters();
         return fullCounters;
       }
@@ -559,7 +560,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
         diagsb.append(s).append("\n");
       }
 
-      if (getState() == JobState.NEW) {
+      if (getInternalState() == JobStateInternal.NEW) {
         return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
             appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
             cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
@@ -646,7 +647,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   public JobState getState() {
     readLock.lock();
     try {
-     return getStateMachine().getCurrentState();
+      return getExternalState(getStateMachine().getCurrentState());
     } finally {
       readLock.unlock();
     }
@@ -667,7 +668,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     LOG.debug("Processing " + event.getJobId() + " of type " + event.getType());
     try {
       writeLock.lock();
-      JobState oldState = getState();
+      JobStateInternal oldState = getInternalState();
       try {
          getStateMachine().doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
@@ -678,9 +679,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
             JobEventType.INTERNAL_ERROR));
       }
       //notify the eventhandler of state change
-      if (oldState != getState()) {
+      if (oldState != getInternalState()) {
         LOG.info(jobId + "Job Transitioned from " + oldState + " to "
-                 + getState());
+                 + getInternalState());
       }
     }
     
@@ -689,6 +690,24 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     }
   }
 
+  @Private
+  public JobStateInternal getInternalState() {
+    readLock.lock();
+    try {
+      return getStateMachine().getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private static JobState getExternalState(JobStateInternal smState) {
+    if (smState == JobStateInternal.KILL_WAIT) {
+      return JobState.KILLED;
+    } else {
+      return JobState.valueOf(smState.name());
+    }
+  }
+
   //helpful in testing
   protected void addTask(Task task) {
     synchronized (tasksSyncHandle) {
@@ -729,7 +748,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     return FileSystem.get(conf);
   }
   
-  static JobState checkJobCompleteSuccess(JobImpl job) {
+  static JobStateInternal checkJobCompleteSuccess(JobImpl job) {
     // check for Job success
     if (job.completedTaskCount == job.tasks.size()) {
       try {
@@ -739,16 +758,16 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
         LOG.error("Could not do commit for Job", e);
         job.addDiagnostic("Job commit failed: " + e.getMessage());
         job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
-        return job.finished(JobState.FAILED);
+        return job.finished(JobStateInternal.FAILED);
       }
       job.logJobHistoryFinishedEvent();
-      return job.finished(JobState.SUCCEEDED);
+      return job.finished(JobStateInternal.SUCCEEDED);
     }
     return null;
   }
 
-  JobState finished(JobState finalState) {
-    if (getState() == JobState.RUNNING) {
+  JobStateInternal finished(JobStateInternal finalState) {
+    if (getInternalState() == JobStateInternal.RUNNING) {
       metrics.endRunningJob(this);
     }
     if (finishTime == 0) setFinishTime();
@@ -943,7 +962,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   */
 
   public static class InitTransition 
-      implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
+      implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
 
     /**
      * Note that this transition method is called directly (and synchronously)
@@ -953,7 +972,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
      * way; MR version is).
      */
     @Override
-    public JobState transition(JobImpl job, JobEvent event) {
+    public JobStateInternal transition(JobImpl job, JobEvent event) {
       job.metrics.submittedJob(job);
       job.metrics.preparingJob(job);
       try {
@@ -1019,7 +1038,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
         createReduceTasks(job);
 
         job.metrics.endPreparingJob(job);
-        return JobState.INITED;
+        return JobStateInternal.INITED;
         //TODO XXX Should JobInitedEvent be generated here (instead of in StartTransition)
 
       } catch (IOException e) {
@@ -1028,7 +1047,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
             + StringUtils.stringifyException(e));
         job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
         job.metrics.endPreparingJob(job);
-        return job.finished(JobState.FAILED);
+        return job.finished(JobStateInternal.FAILED);
       }
     }
 
@@ -1236,9 +1255,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
       JobUnsuccessfulCompletionEvent failedEvent =
           new JobUnsuccessfulCompletionEvent(job.oldJobId,
               job.finishTime, 0, 0,
-              JobState.KILLED.toString());
+              JobStateInternal.KILLED.toString());
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
-      job.finished(JobState.KILLED);
+      job.finished(JobStateInternal.KILLED);
     }
   }
 
@@ -1248,7 +1267,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     public void transition(JobImpl job, JobEvent event) {
       job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
       job.addDiagnostic("Job received Kill in INITED state.");
-      job.finished(JobState.KILLED);
+      job.finished(JobStateInternal.KILLED);
     }
   }
 
@@ -1329,10 +1348,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   }
 
   private static class TaskCompletedTransition implements
-      MultipleArcTransition<JobImpl, JobEvent, JobState> {
+      MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
 
     @Override
-    public JobState transition(JobImpl job, JobEvent event) {
+    public JobStateInternal transition(JobImpl job, JobEvent event) {
       job.completedTaskCount++;
       LOG.info("Num completed Tasks: " + job.completedTaskCount);
       JobTaskEvent taskEvent = (JobTaskEvent) event;
@@ -1348,7 +1367,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
       return checkJobForCompletion(job);
     }
 
-    protected JobState checkJobForCompletion(JobImpl job) {
+    protected JobStateInternal checkJobForCompletion(JobImpl job) {
       //check for Job failure
       if (job.failedMapTaskCount*100 > 
         job.allowedMapFailuresPercent*job.numMapTasks ||
@@ -1362,16 +1381,16 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
         LOG.info(diagnosticMsg);
         job.addDiagnostic(diagnosticMsg);
         job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
-        return job.finished(JobState.FAILED);
+        return job.finished(JobStateInternal.FAILED);
       }
       
-      JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
+      JobStateInternal jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
       if (jobCompleteSuccess != null) {
         return jobCompleteSuccess;
       }
       
       //return the current state, Job not finished yet
-      return job.getState();
+      return job.getInternalState();
     }
 
     private void taskSucceeded(JobImpl job, Task task) {
@@ -1405,17 +1424,17 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
 
   // Transition class for handling jobs with no tasks
   static class JobNoTasksCompletedTransition implements
-  MultipleArcTransition<JobImpl, JobEvent, JobState> {
+  MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
 
     @Override
-    public JobState transition(JobImpl job, JobEvent event) {
-      JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
+    public JobStateInternal transition(JobImpl job, JobEvent event) {
+      JobStateInternal jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
       if (jobCompleteSuccess != null) {
         return jobCompleteSuccess;
       }
       
       // Return the current state, Job not finished yet
-      return job.getState();
+      return job.getInternalState();
     }
   }
 
@@ -1432,14 +1451,14 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   private static class KillWaitTaskCompletedTransition extends  
       TaskCompletedTransition {
     @Override
-    protected JobState checkJobForCompletion(JobImpl job) {
+    protected JobStateInternal checkJobForCompletion(JobImpl job) {
       if (job.completedTaskCount == job.tasks.size()) {
         job.setFinishTime();
         job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
-        return job.finished(JobState.KILLED);
+        return job.finished(JobStateInternal.KILLED);
       }
       //return the current state, Job not finished yet
-      return job.getState();
+      return job.getInternalState();
     }
   }
 
@@ -1478,9 +1497,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
       JobUnsuccessfulCompletionEvent failedEvent =
           new JobUnsuccessfulCompletionEvent(job.oldJobId,
               job.finishTime, 0, 0,
-              JobState.ERROR.toString());
+              JobStateInternal.ERROR.toString());
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
-      job.finished(JobState.ERROR);
+      job.finished(JobStateInternal.ERROR);
     }
   }
 

+ 159 - 114
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -75,6 +75,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -128,6 +129,8 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.RackResolver;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Implementation of TaskAttempt interface.
  */
@@ -178,149 +181,149 @@ public abstract class TaskAttemptImpl implements
       = new DiagnosticInformationUpdater();
 
   private static final StateMachineFactory
-        <TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+        <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
         stateMachineFactory
     = new StateMachineFactory
-             <TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
-           (TaskAttemptState.NEW)
+             <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
+           (TaskAttemptStateInternal.NEW)
 
      // Transitions from the NEW state.
-     .addTransition(TaskAttemptState.NEW, TaskAttemptState.UNASSIGNED,
+     .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
          TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false))
-     .addTransition(TaskAttemptState.NEW, TaskAttemptState.UNASSIGNED,
+     .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
          TaskAttemptEventType.TA_RESCHEDULE, new RequestContainerTransition(true))
-     .addTransition(TaskAttemptState.NEW, TaskAttemptState.KILLED,
+     .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_KILL, new KilledTransition())
-     .addTransition(TaskAttemptState.NEW, TaskAttemptState.FAILED,
+     .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
 
      // Transitions from the UNASSIGNED state.
-     .addTransition(TaskAttemptState.UNASSIGNED,
-         TaskAttemptState.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
+     .addTransition(TaskAttemptStateInternal.UNASSIGNED,
+         TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
          new ContainerAssignedTransition())
-     .addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.KILLED,
+     .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
-             TaskAttemptState.KILLED, true))
-     .addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.FAILED,
+             TaskAttemptStateInternal.KILLED, true))
+     .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition(
-             TaskAttemptState.FAILED, true))
+             TaskAttemptStateInternal.FAILED, true))
 
      // Transitions from the ASSIGNED state.
-     .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.RUNNING,
+     .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING,
          TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
          new LaunchedContainerTransition())
-     .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.ASSIGNED,
+     .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.ASSIGNED,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-     .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.FAILED,
+     .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
-         new DeallocateContainerTransition(TaskAttemptState.FAILED, false))
-     .addTransition(TaskAttemptState.ASSIGNED,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+         new DeallocateContainerTransition(TaskAttemptStateInternal.FAILED, false))
+     .addTransition(TaskAttemptStateInternal.ASSIGNED,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_CONTAINER_COMPLETED,
          CLEANUP_CONTAINER_TRANSITION)
       // ^ If RM kills the container due to expiry, preemption etc. 
-     .addTransition(TaskAttemptState.ASSIGNED, 
-         TaskAttemptState.KILL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.ASSIGNED, 
+         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)
-     .addTransition(TaskAttemptState.ASSIGNED, 
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.ASSIGNED, 
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
 
      // Transitions from RUNNING state.
-     .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING,
+     .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
          TaskAttemptEventType.TA_UPDATE, new StatusUpdater())
-     .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING,
+     .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
      // If no commit is required, task directly goes to success
-     .addTransition(TaskAttemptState.RUNNING,
-         TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
      // If commit is required, task goes through commit pending state.
-     .addTransition(TaskAttemptState.RUNNING,
-         TaskAttemptState.COMMIT_PENDING,
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.COMMIT_PENDING,
          TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition())
      // Failure handling while RUNNING
-     .addTransition(TaskAttemptState.RUNNING,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
       //for handling container exit without sending the done or fail msg
-     .addTransition(TaskAttemptState.RUNNING,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_CONTAINER_COMPLETED,
          CLEANUP_CONTAINER_TRANSITION)
      // Timeout handling while RUNNING
-     .addTransition(TaskAttemptState.RUNNING,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
      // if container killed by AM shutting down
-     .addTransition(TaskAttemptState.RUNNING,
-         TaskAttemptState.KILLED,
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
      // Kill handling
-     .addTransition(TaskAttemptState.RUNNING,
-         TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
          CLEANUP_CONTAINER_TRANSITION)
 
      // Transitions from COMMIT_PENDING state
-     .addTransition(TaskAttemptState.COMMIT_PENDING,
-         TaskAttemptState.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
          new StatusUpdater())
-     .addTransition(TaskAttemptState.COMMIT_PENDING,
-         TaskAttemptState.COMMIT_PENDING,
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.COMMIT_PENDING,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-     .addTransition(TaskAttemptState.COMMIT_PENDING,
-         TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
-     .addTransition(TaskAttemptState.COMMIT_PENDING,
-         TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
          CLEANUP_CONTAINER_TRANSITION)
      // if container killed by AM shutting down
-     .addTransition(TaskAttemptState.COMMIT_PENDING,
-         TaskAttemptState.KILLED,
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
-     .addTransition(TaskAttemptState.COMMIT_PENDING,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
-     .addTransition(TaskAttemptState.COMMIT_PENDING,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_CONTAINER_COMPLETED,
          CLEANUP_CONTAINER_TRANSITION)
-     .addTransition(TaskAttemptState.COMMIT_PENDING,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
 
      // Transitions from SUCCESS_CONTAINER_CLEANUP state
      // kill and cleanup the container
-     .addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
-         TaskAttemptState.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
+     .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
          new SucceededTransition())
      .addTransition(
-          TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
-          TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+          TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
+          TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
           TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
           DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
       // Ignore-able events
-     .addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
-         TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_FAILMSG,
              TaskAttemptEventType.TA_TIMED_OUT,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED))
 
      // Transitions from FAIL_CONTAINER_CLEANUP state.
-     .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
-         TaskAttemptState.FAIL_TASK_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
          TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
-     .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
       // Ignore-able events
-     .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED,
              TaskAttemptEventType.TA_UPDATE,
@@ -333,17 +336,17 @@ public abstract class TaskAttemptImpl implements
              TaskAttemptEventType.TA_TIMED_OUT))
 
       // Transitions from KILL_CONTAINER_CLEANUP
-     .addTransition(TaskAttemptState.KILL_CONTAINER_CLEANUP,
-         TaskAttemptState.KILL_TASK_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.KILL_TASK_CLEANUP,
          TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
-     .addTransition(TaskAttemptState.KILL_CONTAINER_CLEANUP,
-         TaskAttemptState.KILL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
      // Ignore-able events
      .addTransition(
-         TaskAttemptState.KILL_CONTAINER_CLEANUP,
-         TaskAttemptState.KILL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED,
              TaskAttemptEventType.TA_UPDATE,
@@ -356,16 +359,16 @@ public abstract class TaskAttemptImpl implements
 
      // Transitions from FAIL_TASK_CLEANUP
      // run the task cleanup
-     .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP,
-         TaskAttemptState.FAILED, TaskAttemptEventType.TA_CLEANUP_DONE,
+     .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
+         TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CLEANUP_DONE,
          new FailedTransition())
-     .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP,
-         TaskAttemptState.FAIL_TASK_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
+         TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
       // Ignore-able events
-     .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP,
-         TaskAttemptState.FAIL_TASK_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
+         TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED,
              TaskAttemptEventType.TA_UPDATE,
@@ -378,16 +381,16 @@ public abstract class TaskAttemptImpl implements
              TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
 
      // Transitions from KILL_TASK_CLEANUP
-     .addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
-         TaskAttemptState.KILLED, TaskAttemptEventType.TA_CLEANUP_DONE,
+     .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
+         TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CLEANUP_DONE,
          new KilledTransition())
-     .addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
-         TaskAttemptState.KILL_TASK_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
+         TaskAttemptStateInternal.KILL_TASK_CLEANUP,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
      // Ignore-able events
-     .addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
-         TaskAttemptState.KILL_TASK_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
+         TaskAttemptStateInternal.KILL_TASK_CLEANUP,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED,
              TaskAttemptEventType.TA_UPDATE,
@@ -400,28 +403,28 @@ public abstract class TaskAttemptImpl implements
              TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
 
       // Transitions from SUCCEEDED
-     .addTransition(TaskAttemptState.SUCCEEDED, //only possible for map attempts
-         TaskAttemptState.FAILED,
+     .addTransition(TaskAttemptStateInternal.SUCCEEDED, //only possible for map attempts
+         TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
          new TooManyFetchFailureTransition())
      .addTransition(
-         TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED,
+         TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
      // Ignore-able events for SUCCEEDED state
-     .addTransition(TaskAttemptState.SUCCEEDED,
-         TaskAttemptState.SUCCEEDED,
+     .addTransition(TaskAttemptStateInternal.SUCCEEDED,
+         TaskAttemptStateInternal.SUCCEEDED,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_FAILMSG,
              TaskAttemptEventType.TA_CONTAINER_CLEANED,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED))
 
      // Transitions from FAILED state
-     .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED,
+     .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED,
        TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
        DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
      // Ignore-able events for FAILED state
-     .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED,
+     .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_ASSIGNED,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED,
@@ -436,11 +439,11 @@ public abstract class TaskAttemptImpl implements
              TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE))
 
      // Transitions from KILLED state
-     .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED,
+     .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
      // Ignore-able events for KILLED state
-     .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED,
+     .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_ASSIGNED,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED,
@@ -457,7 +460,7 @@ public abstract class TaskAttemptImpl implements
      .installTopology();
 
   private final StateMachine
-         <TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+         <TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
     stateMachine;
 
   private ContainerId containerID;
@@ -855,9 +858,9 @@ public abstract class TaskAttemptImpl implements
     readLock.lock();
     try {
       // TODO: Use stateMachine level method?
-      return (getState() == TaskAttemptState.SUCCEEDED || 
-          getState() == TaskAttemptState.FAILED ||
-          getState() == TaskAttemptState.KILLED);
+      return (getInternalState() == TaskAttemptStateInternal.SUCCEEDED || 
+              getInternalState() == TaskAttemptStateInternal.FAILED ||
+              getInternalState() == TaskAttemptStateInternal.KILLED);
     } finally {
       readLock.unlock();
     }
@@ -934,7 +937,7 @@ public abstract class TaskAttemptImpl implements
   public TaskAttemptState getState() {
     readLock.lock();
     try {
-      return stateMachine.getCurrentState();
+      return getExternalState(stateMachine.getCurrentState());
     } finally {
       readLock.unlock();
     }
@@ -949,7 +952,7 @@ public abstract class TaskAttemptImpl implements
     }
     writeLock.lock();
     try {
-      final TaskAttemptState oldState = getState();
+      final TaskAttemptStateInternal oldState = getInternalState();
       try {
         stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
@@ -961,16 +964,58 @@ public abstract class TaskAttemptImpl implements
         eventHandler.handle(new JobEvent(this.attemptId.getTaskId().getJobId(),
             JobEventType.INTERNAL_ERROR));
       }
-      if (oldState != getState()) {
+      if (oldState != getInternalState()) {
           LOG.info(attemptId + " TaskAttempt Transitioned from " 
            + oldState + " to "
-           + getState());
+           + getInternalState());
       }
     } finally {
       writeLock.unlock();
     }
   }
 
+  @VisibleForTesting
+  public TaskAttemptStateInternal getInternalState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private static TaskAttemptState getExternalState(
+      TaskAttemptStateInternal smState) {
+    switch (smState) {
+    case ASSIGNED:
+    case UNASSIGNED:
+      return TaskAttemptState.STARTING;
+    case COMMIT_PENDING:
+      return TaskAttemptState.COMMIT_PENDING;
+    case FAILED:
+      return TaskAttemptState.FAILED;
+    case KILLED:
+      return TaskAttemptState.KILLED;
+      // All CLEANUP states considered as RUNNING since events have not gone out
+      // to the Task yet. May be possible to consider them as a Finished state.
+    case FAIL_CONTAINER_CLEANUP:
+    case FAIL_TASK_CLEANUP:
+    case KILL_CONTAINER_CLEANUP:
+    case KILL_TASK_CLEANUP:
+    case SUCCESS_CONTAINER_CLEANUP:
+    case RUNNING:
+      return TaskAttemptState.RUNNING;
+    case NEW:
+      return TaskAttemptState.NEW;
+    case SUCCEEDED:
+      return TaskAttemptState.SUCCEEDED;
+    default:
+      throw new YarnException("Attempt to convert invalid "
+          + "stateMachineTaskAttemptState to externalTaskAttemptState: "
+          + smState);
+    }
+  }
+
   //always called in write lock
   private void setFinishTime() {
     //set the finish time only if launch time is set
@@ -1035,7 +1080,7 @@ public abstract class TaskAttemptImpl implements
   private static
       TaskAttemptUnsuccessfulCompletionEvent
       createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt,
-          TaskAttemptState attemptState) {
+          TaskAttemptStateInternal attemptState) {
     TaskAttemptUnsuccessfulCompletionEvent tauce =
         new TaskAttemptUnsuccessfulCompletionEvent(
             TypeConverter.fromYarn(taskAttempt.attemptId),
@@ -1216,10 +1261,10 @@ public abstract class TaskAttemptImpl implements
 
   private static class DeallocateContainerTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
-    private final TaskAttemptState finalState;
+    private final TaskAttemptStateInternal finalState;
     private final boolean withdrawsContainerRequest;
     DeallocateContainerTransition
-        (TaskAttemptState finalState, boolean withdrawsContainerRequest) {
+        (TaskAttemptStateInternal finalState, boolean withdrawsContainerRequest) {
       this.finalState = finalState;
       this.withdrawsContainerRequest = withdrawsContainerRequest;
     }
@@ -1257,10 +1302,10 @@ public abstract class TaskAttemptImpl implements
         TaskAttemptUnsuccessfulCompletionEvent tauce =
             createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
                 finalState);
-        if(finalState == TaskAttemptState.FAILED) {
+        if(finalState == TaskAttemptStateInternal.FAILED) {
           taskAttempt.eventHandler
             .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
-        } else if(finalState == TaskAttemptState.KILLED) {
+        } else if(finalState == TaskAttemptStateInternal.KILLED) {
           taskAttempt.eventHandler
           .handle(createJobCounterUpdateEventTAKilled(taskAttempt));
         }
@@ -1374,7 +1419,7 @@ public abstract class TaskAttemptImpl implements
           JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
           slotMillis);
       taskAttempt.eventHandler.handle(jce);
-      taskAttempt.logAttemptFinishedEvent(TaskAttemptState.SUCCEEDED);
+      taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
           TaskEventType.T_ATTEMPT_SUCCEEDED));
@@ -1397,10 +1442,10 @@ public abstract class TaskAttemptImpl implements
             .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
         TaskAttemptUnsuccessfulCompletionEvent tauce =
             createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
-                TaskAttemptState.FAILED);
+                TaskAttemptStateInternal.FAILED);
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
-        // taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED); Not
+        // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
         // handling failed map/reduce events.
       }else {
         LOG.debug("Not generating HistoryFinish event since start event not " +
@@ -1412,7 +1457,7 @@ public abstract class TaskAttemptImpl implements
   }
 
   @SuppressWarnings({ "unchecked" })
-  private void logAttemptFinishedEvent(TaskAttemptState state) {
+  private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
     //Log finished events only if an attempt started.
     if (getLaunchTime() == 0) return; 
     if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
@@ -1466,7 +1511,7 @@ public abstract class TaskAttemptImpl implements
             .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
         TaskAttemptUnsuccessfulCompletionEvent tauce =
             createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
-                TaskAttemptState.FAILED);
+                TaskAttemptStateInternal.FAILED);
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
       }else {
@@ -1492,14 +1537,14 @@ public abstract class TaskAttemptImpl implements
             .handle(createJobCounterUpdateEventTAKilled(taskAttempt));
         TaskAttemptUnsuccessfulCompletionEvent tauce =
             createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
-                TaskAttemptState.KILLED);
+                TaskAttemptStateInternal.KILLED);
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
       }else {
         LOG.debug("Not generating HistoryFinish event since start event not " +
         		"generated for taskAttempt: " + taskAttempt.getID());
       }
-//      taskAttempt.logAttemptFinishedEvent(TaskAttemptState.KILLED); Not logging Map/Reduce attempts in case of failure.
+//      taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
           TaskEventType.T_ATTEMPT_KILLED));

+ 85 - 63
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
@@ -83,6 +84,8 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Implementation of Task interface.
  */
@@ -124,62 +127,62 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
      KILL_TRANSITION = new KillTransition();
 
   private static final StateMachineFactory
-               <TaskImpl, TaskState, TaskEventType, TaskEvent> 
+               <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent> 
             stateMachineFactory 
-           = new StateMachineFactory<TaskImpl, TaskState, TaskEventType, TaskEvent>
-               (TaskState.NEW)
+           = new StateMachineFactory<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
+               (TaskStateInternal.NEW)
 
     // define the state machine of Task
 
     // Transitions from NEW state
-    .addTransition(TaskState.NEW, TaskState.SCHEDULED, 
+    .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED, 
         TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
-    .addTransition(TaskState.NEW, TaskState.KILLED, 
+    .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED, 
         TaskEventType.T_KILL, new KillNewTransition())
 
     // Transitions from SCHEDULED state
       //when the first attempt is launched, the task state is set to RUNNING
-     .addTransition(TaskState.SCHEDULED, TaskState.RUNNING, 
+     .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.RUNNING, 
          TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
-     .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT, 
+     .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT, 
          TaskEventType.T_KILL, KILL_TRANSITION)
-     .addTransition(TaskState.SCHEDULED, TaskState.SCHEDULED, 
+     .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED, 
          TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
-     .addTransition(TaskState.SCHEDULED, 
-        EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED), 
+     .addTransition(TaskStateInternal.SCHEDULED, 
+        EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED), 
         TaskEventType.T_ATTEMPT_FAILED, 
         new AttemptFailedTransition())
  
     // Transitions from RUNNING state
-    .addTransition(TaskState.RUNNING, TaskState.RUNNING, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, 
         TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later
-    .addTransition(TaskState.RUNNING, TaskState.RUNNING, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, 
         TaskEventType.T_ATTEMPT_COMMIT_PENDING,
         new AttemptCommitPendingTransition())
-    .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
         TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
-    .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED, 
         TaskEventType.T_ATTEMPT_SUCCEEDED,
         new AttemptSucceededTransition())
-    .addTransition(TaskState.RUNNING, TaskState.RUNNING, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, 
         TaskEventType.T_ATTEMPT_KILLED,
         ATTEMPT_KILLED_TRANSITION)
-    .addTransition(TaskState.RUNNING, 
-        EnumSet.of(TaskState.RUNNING, TaskState.FAILED), 
+    .addTransition(TaskStateInternal.RUNNING, 
+        EnumSet.of(TaskStateInternal.RUNNING, TaskStateInternal.FAILED), 
         TaskEventType.T_ATTEMPT_FAILED,
         new AttemptFailedTransition())
-    .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.KILL_WAIT, 
         TaskEventType.T_KILL, KILL_TRANSITION)
 
     // Transitions from KILL_WAIT state
-    .addTransition(TaskState.KILL_WAIT,
-        EnumSet.of(TaskState.KILL_WAIT, TaskState.KILLED),
+    .addTransition(TaskStateInternal.KILL_WAIT,
+        EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
         TaskEventType.T_ATTEMPT_KILLED,
         new KillWaitAttemptKilledTransition())
     // Ignore-able transitions.
     .addTransition(
-        TaskState.KILL_WAIT,
-        TaskState.KILL_WAIT,
+        TaskStateInternal.KILL_WAIT,
+        TaskStateInternal.KILL_WAIT,
         EnumSet.of(TaskEventType.T_KILL,
             TaskEventType.T_ATTEMPT_LAUNCHED,
             TaskEventType.T_ATTEMPT_COMMIT_PENDING,
@@ -188,31 +191,31 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
             TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // Transitions from SUCCEEDED state
-    .addTransition(TaskState.SUCCEEDED, //only possible for map tasks
-        EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED, TaskState.FAILED),
+    .addTransition(TaskStateInternal.SUCCEEDED, //only possible for map tasks
+        EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED, TaskStateInternal.FAILED),
         TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
     // Ignore-able transitions.
     .addTransition(
-        TaskState.SUCCEEDED, TaskState.SUCCEEDED,
+        TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
         EnumSet.of(TaskEventType.T_KILL,
             TaskEventType.T_ADD_SPEC_ATTEMPT,
             TaskEventType.T_ATTEMPT_LAUNCHED,
             TaskEventType.T_ATTEMPT_KILLED))
 
     // Transitions from FAILED state        
-    .addTransition(TaskState.FAILED, TaskState.FAILED,
+    .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
         EnumSet.of(TaskEventType.T_KILL,
                    TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // Transitions from KILLED state
-    .addTransition(TaskState.KILLED, TaskState.KILLED,
+    .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
         EnumSet.of(TaskEventType.T_KILL,
                    TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // create the topology tables
     .installTopology();
 
-  private final StateMachine<TaskState, TaskEventType, TaskEvent>
+  private final StateMachine<TaskStateInternal, TaskEventType, TaskEvent>
     stateMachine;
 
   // By default, the next TaskAttempt number is zero. Changes during recovery  
@@ -243,7 +246,12 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   @Override
   public TaskState getState() {
-    return stateMachine.getCurrentState();
+    readLock.lock();
+    try {
+      return getExternalState(getInternalState());
+    } finally {
+      readLock.unlock();
+    }
   }
 
   public TaskImpl(JobId jobId, TaskType taskType, int partition,
@@ -350,9 +358,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     readLock.lock();
     try {
      // TODO: Use stateMachine level method?
-      return (getState() == TaskState.SUCCEEDED ||
-          getState() == TaskState.FAILED ||
-          getState() == TaskState.KILLED);
+      return (getInternalState() == TaskStateInternal.SUCCEEDED ||
+              getInternalState() == TaskStateInternal.FAILED ||
+              getInternalState() == TaskStateInternal.KILLED);
     } finally {
       readLock.unlock();
     }
@@ -427,6 +435,24 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
+  @VisibleForTesting
+  public TaskStateInternal getInternalState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }    
+  }
+
+  private static TaskState getExternalState(TaskStateInternal smState) {
+    if (smState == TaskStateInternal.KILL_WAIT) {
+      return TaskState.KILLED;
+    } else {
+      return TaskState.valueOf(smState.name());
+    }
+  }
+
   //this is always called in read/write lock
   private long getLaunchTime() {
     long taskLaunchTime = 0;
@@ -478,8 +504,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     return finishTime;
   }
   
-  private TaskState finished(TaskState finalState) {
-    if (getState() == TaskState.RUNNING) {
+  private TaskStateInternal finished(TaskStateInternal finalState) {
+    if (getInternalState() == TaskStateInternal.RUNNING) {
       metrics.endRunningTask(this);
     }
     return finalState;
@@ -494,11 +520,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       switch (at.getState()) {
       
       // ignore all failed task attempts
-      case FAIL_CONTAINER_CLEANUP: 
-      case FAIL_TASK_CLEANUP: 
-      case FAILED: 
-      case KILL_CONTAINER_CLEANUP: 
-      case KILL_TASK_CLEANUP: 
+      case FAILED:
       case KILLED:
         continue;      
       }      
@@ -599,7 +621,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
     try {
       writeLock.lock();
-      TaskState oldState = getState();
+      TaskStateInternal oldState = getInternalState();
       try {
         stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
@@ -607,9 +629,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
             + this.taskId, e);
         internalError(event.getType());
       }
-      if (oldState != getState()) {
+      if (oldState != getInternalState()) {
         LOG.info(taskId + " Task Transitioned from " + oldState + " to "
-            + getState());
+            + getInternalState());
       }
 
     } finally {
@@ -653,7 +675,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
-  private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskState taskState) {
+  private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
     TaskFinishedEvent tfe =
       new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
         TypeConverter.fromYarn(task.successfulAttempt),
@@ -664,7 +686,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     return tfe;
   }
   
-  private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskState taskState, TaskAttemptId taId) {
+  private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskStateInternal taskState, TaskAttemptId taId) {
     StringBuilder errorSb = new StringBuilder();
     if (diag != null) {
       for (String d : diag) {
@@ -763,7 +785,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       // issue kill to all other attempts
       if (task.historyTaskStartGenerated) {
         TaskFinishedEvent tfe = createTaskFinishedEvent(task,
-            TaskState.SUCCEEDED);
+            TaskStateInternal.SUCCEEDED);
         task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
             tfe));
       }
@@ -779,7 +801,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
                   TaskAttemptEventType.TA_KILL));
         }
       }
-      task.finished(TaskState.SUCCEEDED);
+      task.finished(TaskStateInternal.SUCCEEDED);
     }
   }
 
@@ -799,12 +821,12 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 
 
   private static class KillWaitAttemptKilledTransition implements
-      MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
+      MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
-    protected TaskState finalState = TaskState.KILLED;
+    protected TaskStateInternal finalState = TaskStateInternal.KILLED;
 
     @Override
-    public TaskState transition(TaskImpl task, TaskEvent event) {
+    public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       task.handleTaskAttemptCompletion(
           ((TaskTAttemptEvent) event).getTaskAttemptID(), 
           TaskAttemptCompletionEventStatus.KILLED);
@@ -821,18 +843,18 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
         }
 
         task.eventHandler.handle(
-            new JobTaskEvent(task.taskId, finalState));
+            new JobTaskEvent(task.taskId, getExternalState(finalState)));
         return finalState;
       }
-      return task.getState();
+      return task.getInternalState();
     }
   }
 
   private static class AttemptFailedTransition implements
-    MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
+    MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
     @Override
-    public TaskState transition(TaskImpl task, TaskEvent event) {
+    public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       task.failedAttempts++;
       TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
       if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
@@ -863,7 +885,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
         
         if (task.historyTaskStartGenerated) {
         TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(),
-            TaskState.FAILED, taId);
+            TaskStateInternal.FAILED, taId);
         task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
             taskFailedEvent));
         } else {
@@ -872,13 +894,13 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
         }
         task.eventHandler.handle(
             new JobTaskEvent(task.taskId, TaskState.FAILED));
-        return task.finished(TaskState.FAILED);
+        return task.finished(TaskStateInternal.FAILED);
       }
       return getDefaultState(task);
     }
 
-    protected TaskState getDefaultState(Task task) {
-      return task.getState();
+    protected TaskStateInternal getDefaultState(TaskImpl task) {
+      return task.getInternalState();
     }
 
     protected void unSucceed(TaskImpl task) {
@@ -892,14 +914,14 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       extends AttemptFailedTransition {
 
     @Override
-    public TaskState transition(TaskImpl task, TaskEvent event) {
+    public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       if (event instanceof TaskTAttemptEvent) {
         TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
-        if (task.getState() == TaskState.SUCCEEDED &&
+        if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
             !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
           // don't allow a different task attempt to override a previous
           // succeeded state
-          return TaskState.SUCCEEDED;
+          return TaskStateInternal.SUCCEEDED;
         }
       }
       
@@ -922,8 +944,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
 
     @Override
-    protected TaskState getDefaultState(Task task) {
-      return TaskState.SCHEDULED;
+    protected TaskStateInternal getDefaultState(TaskImpl task) {
+      return TaskStateInternal.SCHEDULED;
     }
   }
 
@@ -934,7 +956,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       
       if (task.historyTaskStartGenerated) {
       TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
-            TaskState.KILLED, null); // TODO Verify failedAttemptId is null
+            TaskStateInternal.KILLED, null); // TODO Verify failedAttemptId is null
       task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
           taskFailedEvent));
       }else {

+ 2 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java

@@ -44,9 +44,9 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -365,7 +365,7 @@ public class DefaultSpeculator extends AbstractService implements
 
     for (TaskAttempt taskAttempt : attempts.values()) {
       if (taskAttempt.getState() == TaskAttemptState.RUNNING
-          || taskAttempt.getState() == TaskAttemptState.ASSIGNED) {
+          || taskAttempt.getState() == TaskAttemptState.STARTING) {
         if (++numberRunningAttempts > 1) {
           return ALREADY_SPECULATING;
         }

+ 26 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -50,8 +50,10 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
@@ -60,6 +62,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
@@ -240,6 +243,24 @@ public class MRApp extends MRAppMaster {
     return job;
   }
 
+  public void waitForInternalState(TaskAttemptImpl attempt,
+      TaskAttemptStateInternal finalState) throws Exception {
+    int timeoutSecs = 0;
+    TaskAttemptReport report = attempt.getReport();
+    TaskAttemptStateInternal iState = attempt.getInternalState();
+    while (!finalState.equals(iState) && timeoutSecs++ < 20) {
+      System.out.println("TaskAttempt Internal State is : " + iState
+          + " Waiting for Internal state : " + finalState + "   progress : "
+          + report.getProgress());
+      Thread.sleep(500);
+      report = attempt.getReport();
+      iState = attempt.getInternalState();
+    }   
+    System.out.println("TaskAttempt Internal State is : " + iState);
+    Assert.assertEquals("TaskAttempt Internal state is not correct (timedout)",
+        finalState, iState);
+  }
+
   public void waitForState(TaskAttempt attempt, 
       TaskAttemptState finalState) throws Exception {
     int timeoutSecs = 0;
@@ -501,18 +522,18 @@ public class MRApp extends MRAppMaster {
     //override the init transition
     private final TestInitTransition initTransition = new TestInitTransition(
         maps, reduces);
-    StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> localFactory
-        = stateMachineFactory.addTransition(JobState.NEW,
-            EnumSet.of(JobState.INITED, JobState.FAILED),
+    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory
+        = stateMachineFactory.addTransition(JobStateInternal.NEW,
+            EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
             JobEventType.JOB_INIT,
             // This is abusive.
             initTransition);
 
-    private final StateMachine<JobState, JobEventType, JobEvent>
+    private final StateMachine<JobStateInternal, JobEventType, JobEvent>
         localStateMachine;
 
     @Override
-    protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
+    protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
       return localStateMachine;
     }
 

+ 4 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java

@@ -36,8 +36,10 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
@@ -190,7 +192,8 @@ public class TestFail {
     Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts
         .size());
     TaskAttempt attempt = attempts.values().iterator().next();
-    app.waitForState(attempt, TaskAttemptState.ASSIGNED);
+    app.waitForInternalState((TaskAttemptImpl) attempt,
+        TaskAttemptStateInternal.ASSIGNED);
     app.getDispatcher().getEventHandler().handle(
         new TaskAttemptEvent(attempt.getID(),
             TaskAttemptEventType.TA_CONTAINER_COMPLETED));

+ 6 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

@@ -48,7 +48,6 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -56,9 +55,11 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
@@ -409,8 +410,8 @@ public class TestRMContainerAllocator {
     // Wait till all map-attempts request for containers
     for (Task t : job.getTasks().values()) {
       if (t.getType() == TaskType.MAP) {
-        mrApp.waitForState(t.getAttempts().values().iterator().next(),
-          TaskAttemptState.UNASSIGNED);
+        mrApp.waitForInternalState((TaskAttemptImpl) t.getAttempts().values()
+            .iterator().next(), TaskAttemptStateInternal.UNASSIGNED);
       }
     }
     amDispatcher.await();
@@ -560,8 +561,8 @@ public class TestRMContainerAllocator {
     amDispatcher.await();
     // Wait till all map-attempts request for containers
     for (Task t : job.getTasks().values()) {
-      mrApp.waitForState(t.getAttempts().values().iterator().next(),
-        TaskAttemptState.UNASSIGNED);
+      mrApp.waitForInternalState((TaskAttemptImpl) t.getAttempts().values()
+          .iterator().next(), TaskAttemptStateInternal.UNASSIGNED);
     }
     amDispatcher.await();
 

+ 15 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

@@ -42,8 +42,8 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -77,11 +77,11 @@ public class TestJobImpl {
     tasks.put(mockTask.getID(), mockTask);
     mockJob.tasks = tasks;
 
-    when(mockJob.getState()).thenReturn(JobState.ERROR);
+    when(mockJob.getInternalState()).thenReturn(JobStateInternal.ERROR);
     JobEvent mockJobEvent = mock(JobEvent.class);
-    JobState state = trans.transition(mockJob, mockJobEvent);
+    JobStateInternal state = trans.transition(mockJob, mockJobEvent);
     Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition",
-        JobState.ERROR, state);
+        JobStateInternal.ERROR, state);
   }
 
   @Test
@@ -96,9 +96,12 @@ public class TestJobImpl {
     when(mockJob.getCommitter()).thenReturn(mockCommitter);
     when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
     when(mockJob.getJobContext()).thenReturn(mockJobContext);
-    when(mockJob.finished(JobState.KILLED)).thenReturn(JobState.KILLED);
-    when(mockJob.finished(JobState.FAILED)).thenReturn(JobState.FAILED);
-    when(mockJob.finished(JobState.SUCCEEDED)).thenReturn(JobState.SUCCEEDED);
+    when(mockJob.finished(JobStateInternal.KILLED)).thenReturn(
+        JobStateInternal.KILLED);
+    when(mockJob.finished(JobStateInternal.FAILED)).thenReturn(
+        JobStateInternal.FAILED);
+    when(mockJob.finished(JobStateInternal.SUCCEEDED)).thenReturn(
+        JobStateInternal.SUCCEEDED);
 
     try {
       doThrow(new IOException()).when(mockCommitter).commitJob(any(JobContext.class));
@@ -106,11 +109,11 @@ public class TestJobImpl {
       // commitJob stubbed out, so this can't happen
     }
     doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
-    JobState jobState = JobImpl.checkJobCompleteSuccess(mockJob);
+    JobStateInternal jobState = JobImpl.checkJobCompleteSuccess(mockJob);
     Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
       "for successful job", jobState);
     Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
-        JobState.FAILED, jobState);
+        JobStateInternal.FAILED, jobState);
     verify(mockJob).abortJob(
         eq(org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
   }
@@ -129,7 +132,8 @@ public class TestJobImpl {
     when(mockJob.getJobContext()).thenReturn(mockJobContext);
     doNothing().when(mockJob).setFinishTime();
     doNothing().when(mockJob).logJobHistoryFinishedEvent();
-    when(mockJob.finished(any(JobState.class))).thenReturn(JobState.SUCCEEDED);
+    when(mockJob.finished(any(JobStateInternal.class))).thenReturn(
+        JobStateInternal.SUCCEEDED);
 
     try {
       doNothing().when(mockCommitter).commitJob(any(JobContext.class));
@@ -141,7 +145,7 @@ public class TestJobImpl {
       "for successful job",
       JobImpl.checkJobCompleteSuccess(mockJob));
     Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
-        JobState.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob));
+        JobStateInternal.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob));
   }
 
   @Test

+ 2 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

@@ -26,7 +26,6 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -48,13 +47,13 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -329,7 +328,7 @@ public class TestTaskImpl {
    * {@link TaskState#KILL_WAIT}
    */
   private void assertTaskKillWaitState() {
-    assertEquals(TaskState.KILL_WAIT, mockTask.getState());
+    assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState());
   }
   
   /**

+ 4 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java

@@ -37,7 +37,6 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -46,6 +45,8 @@ import org.apache.hadoop.mapreduce.v2.app.MRApp;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.api.ContainerManager;
@@ -260,7 +261,8 @@ public class TestContainerLauncher {
           attempts.size());
 
     TaskAttempt attempt = attempts.values().iterator().next();
-    app.waitForState(attempt, TaskAttemptState.ASSIGNED);
+      app.waitForInternalState((TaskAttemptImpl) attempt,
+          TaskAttemptStateInternal.ASSIGNED);
 
     app.waitForState(job, JobState.FAILED);
 

+ 19 - 9
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java

@@ -128,14 +128,26 @@ public class TypeConverter {
     return taskId;
   }
 
-  public static TaskAttemptState toYarn(org.apache.hadoop.mapred.TaskStatus.State state) {
-    if (state == org.apache.hadoop.mapred.TaskStatus.State.KILLED_UNCLEAN) {
-      return TaskAttemptState.KILLED;
-    }
-    if (state == org.apache.hadoop.mapred.TaskStatus.State.FAILED_UNCLEAN) {
+  public static TaskAttemptState toYarn(
+      org.apache.hadoop.mapred.TaskStatus.State state) {
+    switch (state) {
+    case COMMIT_PENDING:
+      return TaskAttemptState.COMMIT_PENDING;
+    case FAILED:
+    case FAILED_UNCLEAN:
       return TaskAttemptState.FAILED;
-    }
-    return TaskAttemptState.valueOf(state.toString());
+    case KILLED:
+    case KILLED_UNCLEAN:
+      return TaskAttemptState.KILLED;
+    case RUNNING:
+      return TaskAttemptState.RUNNING;
+    case SUCCEEDED:
+      return TaskAttemptState.SUCCEEDED;
+    case UNASSIGNED:
+      return TaskAttemptState.STARTING;
+    default:
+      throw new YarnException("Unrecognized State: " + state);
+    }   
   }
 
   public static Phase toYarn(org.apache.hadoop.mapred.TaskStatus.Phase phase) {
@@ -309,7 +321,6 @@ public class TypeConverter {
       return org.apache.hadoop.mapred.JobStatus.PREP;
     case RUNNING:
       return org.apache.hadoop.mapred.JobStatus.RUNNING;
-    case KILL_WAIT:
     case KILLED:
       return org.apache.hadoop.mapred.JobStatus.KILLED;
     case SUCCEEDED:
@@ -329,7 +340,6 @@ public class TypeConverter {
       return org.apache.hadoop.mapred.TIPStatus.PENDING;
     case RUNNING:
       return org.apache.hadoop.mapred.TIPStatus.RUNNING;
-    case KILL_WAIT:
     case KILLED:
       return org.apache.hadoop.mapred.TIPStatus.KILLED;
     case SUCCEEDED:

+ 0 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java

@@ -24,7 +24,6 @@ public enum JobState {
   RUNNING,
   SUCCEEDED,
   FAILED,
-  KILL_WAIT,
   KILLED,
   ERROR
 }

+ 6 - 12
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptState.java

@@ -19,17 +19,11 @@
 package org.apache.hadoop.mapreduce.v2.api.records;
 
 public enum TaskAttemptState {
-  NEW, 
-  UNASSIGNED, 
-  ASSIGNED, 
-  RUNNING, 
-  COMMIT_PENDING, 
-  SUCCESS_CONTAINER_CLEANUP, 
-  SUCCEEDED, 
-  FAIL_CONTAINER_CLEANUP, 
-  FAIL_TASK_CLEANUP, 
-  FAILED, 
-  KILL_CONTAINER_CLEANUP, 
-  KILL_TASK_CLEANUP, 
+  NEW,
+  STARTING,
+  RUNNING,
+  COMMIT_PENDING,
+  SUCCEEDED,
+  FAILED,
   KILLED
 }

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskState.java

@@ -19,5 +19,5 @@
 package org.apache.hadoop.mapreduce.v2.api.records;
 
 public enum TaskState {
-  NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED
+  NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILLED
 }

+ 2 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java

@@ -103,15 +103,10 @@ public class MRApps extends Apps {
   public static enum TaskAttemptStateUI {
     NEW(
         new TaskAttemptState[] { TaskAttemptState.NEW,
-        TaskAttemptState.UNASSIGNED, TaskAttemptState.ASSIGNED }),
+        TaskAttemptState.STARTING }),
     RUNNING(
         new TaskAttemptState[] { TaskAttemptState.RUNNING,
-            TaskAttemptState.COMMIT_PENDING,
-            TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
-            TaskAttemptState.FAIL_CONTAINER_CLEANUP,
-            TaskAttemptState.FAIL_TASK_CLEANUP,
-            TaskAttemptState.KILL_CONTAINER_CLEANUP,
-            TaskAttemptState.KILL_TASK_CLEANUP }),
+            TaskAttemptState.COMMIT_PENDING }),
     SUCCESSFUL(new TaskAttemptState[] { TaskAttemptState.SUCCEEDED}),
     FAILED(new TaskAttemptState[] { TaskAttemptState.FAILED}),
     KILLED(new TaskAttemptState[] { TaskAttemptState.KILLED});

+ 9 - 17
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto

@@ -50,8 +50,7 @@ enum TaskStateProto {
   TS_RUNNING = 3;
   TS_SUCCEEDED = 4;
   TS_FAILED = 5;
-  TS_KILL_WAIT = 6;
-  TS_KILLED = 7;
+  TS_KILLED = 6;
 }
 
 enum PhaseProto {
@@ -93,18 +92,12 @@ message TaskReportProto {
 
 enum TaskAttemptStateProto {
   TA_NEW = 1;
-  TA_UNASSIGNED = 2;
-  TA_ASSIGNED = 3;
-  TA_RUNNING = 4;
-  TA_COMMIT_PENDING = 5;
-  TA_SUCCESS_CONTAINER_CLEANUP = 6;
-  TA_SUCCEEDED = 7;
-  TA_FAIL_CONTAINER_CLEANUP = 8;
-  TA_FAIL_TASK_CLEANUP = 9;
-  TA_FAILED = 10;
-  TA_KILL_CONTAINER_CLEANUP = 11;
-  TA_KILL_TASK_CLEANUP = 12;
-  TA_KILLED = 13;
+  TA_STARTING= 2;
+  TA_RUNNING = 3;
+  TA_COMMIT_PENDING = 4;
+  TA_SUCCEEDED = 5;
+  TA_FAILED = 6;
+  TA_KILLED = 7;
 }
 
 message TaskAttemptReportProto {
@@ -131,9 +124,8 @@ enum JobStateProto {
   J_RUNNING = 3;
   J_SUCCEEDED = 4;
   J_FAILED = 5;
-  J_KILL_WAIT = 6;
-  J_KILLED = 7;
-  J_ERROR = 8;
+  J_KILLED = 6;
+  J_ERROR = 7;
 }
 
 message JobReportProto {