소스 검색

MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol for Job, Task and TaskAttempt. (sseth)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-3902@1400225 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth 12 년 전
부모
커밋
6da80bc769
13개의 변경된 파일459개의 추가작업 그리고 253개의 파일을 삭제
  1. 2 0
      hadoop-mapreduce-project/CHANGES.txt.MR-3902
  2. 30 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/JobStateInternal.java
  3. 38 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/TaskAttemptStateInternal.java
  4. 23 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/TaskStateInternal.java
  5. 112 93
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/JobImpl.java
  6. 116 74
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java
  7. 92 67
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java
  8. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/DefaultSpeculator.java
  9. 25 5
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
  10. 4 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java
  11. 10 9
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java
  12. 2 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskImpl.java
  13. 4 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncher.java

+ 2 - 0
hadoop-mapreduce-project/CHANGES.txt.MR-3902

@@ -28,3 +28,5 @@ Branch MR-3902
   MAPREDUCE-4663. Container Launch should be independent of o.a.h.m.Task (sseth)
 
   MAPREDUCE-4727. Handle successful NM stop requests. (sseth)
+
+  MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol for Job, Task and TaskAttempt. (sseth)

+ 30 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/JobStateInternal.java

@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.job;
+
+public enum JobStateInternal {
+  NEW,
+  INITED,
+  RUNNING,
+  SUCCEEDED,
+  FAILED,
+  KILL_WAIT,
+  KILLED,
+  ERROR
+}

+ 38 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/TaskAttemptStateInternal.java

@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.job;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+* TaskAttemptImpl internal state machine states.
+*
+*/
+@Private
+public enum TaskAttemptStateInternal {
+  NEW,
+  START_WAIT,
+  RUNNING,
+  COMMIT_PENDING,
+  KILL_IN_PROGRESS, 
+  FAIL_IN_PROGRESS,
+  KILLED,
+  FAILED,
+  SUCCEEDED
+}

+ 23 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/TaskStateInternal.java

@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.job;
+
+public enum TaskStateInternal {
+  NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED
+}

+ 112 - 93
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/JobImpl.java

@@ -77,6 +77,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app2.AppContext;
 import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app2.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app2.job.Task;
 import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobCounterUpdateEvent;
@@ -217,123 +218,123 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
       new UpdatedNodesTransition();
 
   protected static final
-    StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> 
+    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> 
        stateMachineFactory
-     = new StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent>
-              (JobState.NEW)
+     = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
+              (JobStateInternal.NEW)
 
           // Transitions from NEW state
-          .addTransition(JobState.NEW, JobState.NEW,
+          .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.NEW, JobState.NEW,
+          .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition
-              (JobState.NEW,
-              EnumSet.of(JobState.INITED, JobState.FAILED),
+              (JobStateInternal.NEW,
+              EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
               JobEventType.JOB_INIT,
               new InitTransition())
-          .addTransition(JobState.NEW, JobState.KILLED,
+          .addTransition(JobStateInternal.NEW, JobStateInternal.KILLED,
               JobEventType.JOB_KILL,
               new KillNewJobTransition())
-          .addTransition(JobState.NEW, JobState.ERROR,
+          .addTransition(JobStateInternal.NEW, JobStateInternal.ERROR,
               JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
-          .addTransition(JobState.NEW, JobState.NEW,
+          .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
               JobEventType.JOB_UPDATED_NODES)
               
           // Transitions from INITED state
-          .addTransition(JobState.INITED, JobState.INITED,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.INITED, JobState.INITED,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
-          .addTransition(JobState.INITED, JobState.RUNNING,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.RUNNING,
               JobEventType.JOB_START,
               new StartTransition())
-          .addTransition(JobState.INITED, JobState.KILLED,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.KILLED,
               JobEventType.JOB_KILL,
               new KillInitedJobTransition())
-          .addTransition(JobState.INITED, JobState.ERROR,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.ERROR,
               JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
-          .addTransition(JobState.INITED, JobState.INITED,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
               JobEventType.JOB_UPDATED_NODES)
               
           // Transitions from RUNNING state
-          .addTransition(JobState.RUNNING, JobState.RUNNING,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
               TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
           .addTransition
-              (JobState.RUNNING,
-              EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED),
+              (JobStateInternal.RUNNING,
+              EnumSet.of(JobStateInternal.RUNNING, JobStateInternal.SUCCEEDED, JobStateInternal.FAILED),
               JobEventType.JOB_TASK_COMPLETED,
               new TaskCompletedTransition())
           .addTransition
-              (JobState.RUNNING,
-              EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED),
+              (JobStateInternal.RUNNING,
+              EnumSet.of(JobStateInternal.RUNNING, JobStateInternal.SUCCEEDED, JobStateInternal.FAILED),
               JobEventType.JOB_COMPLETED,
               new JobNoTasksCompletedTransition())
-          .addTransition(JobState.RUNNING, JobState.KILL_WAIT,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.KILL_WAIT,
               JobEventType.JOB_KILL, new KillTasksTransition())
-          .addTransition(JobState.RUNNING, JobState.RUNNING,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_UPDATED_NODES,
               UPDATED_NODES_TRANSITION)
-          .addTransition(JobState.RUNNING, JobState.RUNNING,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_MAP_TASK_RESCHEDULED,
               new MapTaskRescheduledTransition())
-          .addTransition(JobState.RUNNING, JobState.RUNNING,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.RUNNING, JobState.RUNNING,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
-          .addTransition(JobState.RUNNING, JobState.RUNNING,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
               new TaskAttemptFetchFailureTransition())
           .addTransition(
-              JobState.RUNNING,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR,
+              JobStateInternal.RUNNING,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
           // Transitions from KILL_WAIT state.
           .addTransition
-              (JobState.KILL_WAIT,
-              EnumSet.of(JobState.KILL_WAIT, JobState.KILLED),
+              (JobStateInternal.KILL_WAIT,
+              EnumSet.of(JobStateInternal.KILL_WAIT, JobStateInternal.KILLED),
               JobEventType.JOB_TASK_COMPLETED,
               new KillWaitTaskCompletedTransition())
-          .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+          .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
               JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
               TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
-          .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+          .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+          .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
-              JobState.KILL_WAIT,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR,
+              JobStateInternal.KILL_WAIT,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
-          .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+          .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
               EnumSet.of(JobEventType.JOB_KILL,
                   JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_MAP_TASK_RESCHEDULED,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // Transitions from SUCCEEDED state
-          .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
+          .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
+          .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
-              JobState.SUCCEEDED,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR,
+              JobStateInternal.SUCCEEDED,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
-          .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
+          .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
               EnumSet.of(JobEventType.JOB_KILL, 
                   JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
@@ -341,17 +342,17 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
                   JobEventType.JOB_TASK_COMPLETED))
 
           // Transitions from FAILED state
-          .addTransition(JobState.FAILED, JobState.FAILED,
+          .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.FAILED, JobState.FAILED,
+          .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
-              JobState.FAILED,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR,
+              JobStateInternal.FAILED,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
-          .addTransition(JobState.FAILED, JobState.FAILED,
+          .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
               EnumSet.of(JobEventType.JOB_KILL, 
                   JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
@@ -359,17 +360,17 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
                   JobEventType.JOB_TASK_COMPLETED))
 
           // Transitions from KILLED state
-          .addTransition(JobState.KILLED, JobState.KILLED,
+          .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.KILLED, JobState.KILLED,
+          .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
-              JobState.KILLED,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR,
+              JobStateInternal.KILLED,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
-          .addTransition(JobState.KILLED, JobState.KILLED,
+          .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
               EnumSet.of(JobEventType.JOB_KILL, 
                   JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
@@ -378,8 +379,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
 
           // No transitions from INTERNAL_ERROR state. Ignore all.
           .addTransition(
-              JobState.ERROR,
-              JobState.ERROR,
+              JobStateInternal.ERROR,
+              JobStateInternal.ERROR,
               EnumSet.of(JobEventType.JOB_INIT,
                   JobEventType.JOB_KILL,
                   JobEventType.JOB_TASK_COMPLETED,
@@ -389,12 +390,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
                   JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
                   JobEventType.INTERNAL_ERROR))
-          .addTransition(JobState.ERROR, JobState.ERROR,
+          .addTransition(JobStateInternal.ERROR, JobStateInternal.ERROR,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           // create the topology tables
           .installTopology();
  
-  private final StateMachine<JobState, JobEventType, JobEvent> stateMachine;
+  private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;
 
   //changing fields while the job is running
   private int numMapTasks;
@@ -461,7 +462,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
     stateMachine = stateMachineFactory.make(this);
   }
 
-  protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
+  protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
     return stateMachine;
   }
 
@@ -540,9 +541,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
     readLock.lock();
 
     try {
-      JobState state = getState();
-      if (state == JobState.ERROR || state == JobState.FAILED
-          || state == JobState.KILLED || state == JobState.SUCCEEDED) {
+      JobStateInternal state = getInternalState();
+      if (state == JobStateInternal.ERROR || state == JobStateInternal.FAILED
+          || state == JobStateInternal.KILLED || state == JobStateInternal.SUCCEEDED) {
         this.mayBeConstructFinalFullCounters();
         return fullCounters;
       }
@@ -607,7 +608,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
         diagsb.append(s).append("\n");
       }
 
-      if (getState() == JobState.NEW) {
+      if (getInternalState() == JobStateInternal.NEW) {
         return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
             appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
             cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
@@ -693,7 +694,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
   public JobState getState() {
     readLock.lock();
     try {
-     return getStateMachine().getCurrentState();
+      return getExternalState(getStateMachine().getCurrentState());
     } finally {
       readLock.unlock();
     }
@@ -712,10 +713,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
    */
   public void handle(JobEvent event) {
     LOG.debug("Processing " + event.getJobId() + " of type " + event.getType());
-    LOG.info("XXX: Processing " + event.getJobId() + " of type " + event.getType() + " while in state: " + getState());
+    LOG.info("XXX: Processing " + event.getJobId() + " of type " + event.getType() + " while in state: " + getInternalState());
     try {
       writeLock.lock();
-      JobState oldState = getState();
+      JobStateInternal oldState = getInternalState();
       try {
          getStateMachine().doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
@@ -726,9 +727,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
             JobEventType.INTERNAL_ERROR));
       }
       //notify the eventhandler of state change
-      if (oldState != getState()) {
+      if (oldState != getInternalState()) {
         LOG.info(jobId + "Job Transitioned from " + oldState + " to "
-                 + getState());
+                 + getInternalState());
       }
     }
     
@@ -737,6 +738,24 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
     }
   }
 
+  @Private
+  public JobStateInternal getInternalState() {
+    readLock.lock();
+    try {
+     return getStateMachine().getCurrentState();
+    } finally {
+      readLock.unlock();
+    }    
+  }
+  
+  private static JobState getExternalState(JobStateInternal smState) {
+    if (smState == JobStateInternal.KILL_WAIT) {
+      return JobState.KILLED;
+    } else {
+      return JobState.valueOf(smState.name());
+    }    
+  }
+
   //helpful in testing
   protected void addTask(Task task) {
     synchronized (tasksSyncHandle) {
@@ -777,7 +796,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
     return FileSystem.get(conf);
   }
   
-  static JobState checkJobCompleteSuccess(JobImpl job) {
+  static JobStateInternal checkJobCompleteSuccess(JobImpl job) {
     // check for Job success
     if (job.completedTaskCount == job.tasks.size()) {
       try {
@@ -786,18 +805,18 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
       } catch (IOException e) {
         LOG.error("Could not do commit for Job", e);
         job.logJobHistoryFinishedEvent();
-        return job.finished(JobState.FAILED);
+        return job.finished(JobStateInternal.FAILED);
       }
       job.logJobHistoryFinishedEvent();
       // TODO: Maybe set cleanup progress. Otherwise job progress will
       // always stay at 0.95 when reported from an AM.
-      return job.finished(JobState.SUCCEEDED);
+      return job.finished(JobStateInternal.SUCCEEDED);
     }
     return null;
   }
 
-  JobState finished(JobState finalState) {
-    if (getState() == JobState.RUNNING) {
+  JobStateInternal finished(JobStateInternal finalState) {
+    if (getInternalState() == JobStateInternal.RUNNING) {
       metrics.endRunningJob(this);
     }
     if (finishTime == 0) setFinishTime();
@@ -1010,7 +1029,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
   */
 
   public static class InitTransition 
-      implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
+      implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
 
     /**
      * Note that this transition method is called directly (and synchronously)
@@ -1020,7 +1039,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
      * way; MR version is).
      */
     @Override
-    public JobState transition(JobImpl job, JobEvent event) {
+    public JobStateInternal transition(JobImpl job, JobEvent event) {
       job.metrics.submittedJob(job);
       job.metrics.preparingJob(job);
       try {
@@ -1086,7 +1105,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
         createReduceTasks(job);
 
         job.metrics.endPreparingJob(job);
-        return JobState.INITED;
+        return JobStateInternal.INITED;
         //TODO XXX Should JobInitedEvent be generated here (instead of in StartTransition)
 
       } catch (IOException e) {
@@ -1095,7 +1114,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
             + StringUtils.stringifyException(e));
         job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
         job.metrics.endPreparingJob(job);
-        return job.finished(JobState.FAILED);
+        return job.finished(JobStateInternal.FAILED);
       }
     }
 
@@ -1303,9 +1322,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
       JobUnsuccessfulCompletionEvent failedEvent =
           new JobUnsuccessfulCompletionEvent(job.oldJobId,
               job.finishTime, 0, 0,
-              JobState.KILLED.toString());
+              JobStateInternal.KILLED.toString());
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
-      job.finished(JobState.KILLED);
+      job.finished(JobStateInternal.KILLED);
     }
   }
 
@@ -1315,7 +1334,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
     public void transition(JobImpl job, JobEvent event) {
       job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
       job.addDiagnostic("Job received Kill in INITED state.");
-      job.finished(JobState.KILLED);
+      job.finished(JobStateInternal.KILLED);
     }
   }
 
@@ -1415,10 +1434,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
   }
 
   private static class TaskCompletedTransition implements
-      MultipleArcTransition<JobImpl, JobEvent, JobState> {
+      MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
 
     @Override
-    public JobState transition(JobImpl job, JobEvent event) {
+    public JobStateInternal transition(JobImpl job, JobEvent event) {
       job.completedTaskCount++;
       LOG.info("Num completed Tasks: " + job.completedTaskCount);
       JobTaskEvent taskEvent = (JobTaskEvent) event;
@@ -1434,7 +1453,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
       return checkJobForCompletion(job);
     }
 
-    protected JobState checkJobForCompletion(JobImpl job) {
+    protected JobStateInternal checkJobForCompletion(JobImpl job) {
       //check for Job failure
       if (job.failedMapTaskCount*100 > 
         job.allowedMapFailuresPercent*job.numMapTasks ||
@@ -1448,16 +1467,16 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
         LOG.info(diagnosticMsg);
         job.addDiagnostic(diagnosticMsg);
         job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
-        return job.finished(JobState.FAILED);
+        return job.finished(JobStateInternal.FAILED);
       }
       
-      JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
+      JobStateInternal jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
       if (jobCompleteSuccess != null) {
         return jobCompleteSuccess;
       }
       
       //return the current state, Job not finished yet
-      return job.getState();
+      return job.getInternalState();
     }
 
     private void taskSucceeded(JobImpl job, Task task) {
@@ -1491,17 +1510,17 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
 
   // Transition class for handling jobs with no tasks
   static class JobNoTasksCompletedTransition implements
-  MultipleArcTransition<JobImpl, JobEvent, JobState> {
+  MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
 
     @Override
-    public JobState transition(JobImpl job, JobEvent event) {
-      JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
+    public JobStateInternal transition(JobImpl job, JobEvent event) {
+      JobStateInternal jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
       if (jobCompleteSuccess != null) {
         return jobCompleteSuccess;
       }
       
       // Return the current state, Job not finished yet
-      return job.getState();
+      return job.getInternalState();
     }
   }
 
@@ -1518,14 +1537,14 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
   private static class KillWaitTaskCompletedTransition extends  
       TaskCompletedTransition {
     @Override
-    protected JobState checkJobForCompletion(JobImpl job) {
+    protected JobStateInternal checkJobForCompletion(JobImpl job) {
       if (job.completedTaskCount == job.tasks.size()) {
         job.setFinishTime();
         job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
-        return job.finished(JobState.KILLED);
+        return job.finished(JobStateInternal.KILLED);
       }
       //return the current state, Job not finished yet
-      return job.getState();
+      return job.getInternalState();
     }
   }
 
@@ -1579,9 +1598,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app2.job.Job,
       JobUnsuccessfulCompletionEvent failedEvent =
           new JobUnsuccessfulCompletionEvent(job.oldJobId,
               job.finishTime, 0, 0,
-              JobState.ERROR.toString());
+              JobStateInternal.ERROR.toString());
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
-      job.finished(JobState.ERROR);
+      job.finished(JobStateInternal.ERROR);
     }
   }
 

+ 116 - 74
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.app2.AppContext;
 import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttemptStateInternal;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
@@ -86,6 +87,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -100,6 +102,8 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.hadoop.yarn.util.Records;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public abstract class TaskAttemptImpl implements TaskAttempt,
     EventHandler<TaskAttemptEvent> {
   
@@ -151,82 +155,82 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
   
   
   private static StateMachineFactory
-      <TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+      <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
       stateMachineFactory
       = new StateMachineFactory
-      <TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
-      (TaskAttemptState.NEW);
+      <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
+      (TaskAttemptStateInternal.NEW);
   
   private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION;
   private static boolean stateMachineInited = false;
   private final StateMachine
-      <TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> stateMachine;
+      <TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent> stateMachine;
   
   // TODO XXX: Ensure MAPREDUCE-4457 is factored in. Also MAPREDUCE-4068.
   // TODO XXX: Rename all CONTAINER_COMPLETED transitions to TERMINATED.
   private void initStateMachine() {
     stateMachineFactory = 
     stateMachineFactory
-        .addTransition(TaskAttemptState.NEW, TaskAttemptState.START_WAIT, TaskAttemptEventType.TA_SCHEDULE, createScheduleTransition())
-        .addTransition(TaskAttemptState.NEW, TaskAttemptState.NEW, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptState.NEW, TaskAttemptState.FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestTransition())
-        .addTransition(TaskAttemptState.NEW, TaskAttemptState.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestTransition())
+        .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.START_WAIT, TaskAttemptEventType.TA_SCHEDULE, createScheduleTransition())
+        .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.NEW, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+        .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestTransition())
+        .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestTransition())
         
-        .addTransition(TaskAttemptState.START_WAIT, TaskAttemptState.RUNNING, TaskAttemptEventType.TA_STARTED_REMOTELY, createStartedTransition())
-        .addTransition(TaskAttemptState.START_WAIT, TaskAttemptState.START_WAIT, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptState.START_WAIT, TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestBeforeRunningTransition())
-        .addTransition(TaskAttemptState.START_WAIT, TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestBeforeRunningTransition())
-        .addTransition(TaskAttemptState.START_WAIT, TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, createNodeFailedBeforeRunningTransition())
-        .addTransition(TaskAttemptState.START_WAIT, TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, createContainerTerminatingBeforeRunningTransition())
-        .addTransition(TaskAttemptState.START_WAIT, TaskAttemptState.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createContainerCompletedBeforeRunningTransition())
+        .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STARTED_REMOTELY, createStartedTransition())
+        .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.START_WAIT, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+        .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestBeforeRunningTransition())
+        .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestBeforeRunningTransition())
+        .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, createNodeFailedBeforeRunningTransition())
+        .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, createContainerTerminatingBeforeRunningTransition())
+        .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createContainerCompletedBeforeRunningTransition())
         
-        .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING, TaskAttemptEventType.TA_STATUS_UPDATE, createStatusUpdateTransition())
-        .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING, createCommitPendingTransition())
-        .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.SUCCEEDED, TaskAttemptEventType.TA_DONE, createSucceededTransition())
-        .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, createFailRequestWhileRunningTransition())
-        .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, createFailRequestWhileRunningTransition())
-        .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestWhileRunningTransition())
-        .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestWhileRunningTransition())
-        .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, createNodeFailedWhileRunningTransition())
-        .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING, TaskAttemptEventType.TA_CONTAINER_TERMINATING, createContainerTerminatingWhileRunningTransition())
-        .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createContainerCompletedWhileRunningTransition())
+        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STATUS_UPDATE, createStatusUpdateTransition())
+        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING, createCommitPendingTransition())
+        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, createSucceededTransition())
+        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, createFailRequestWhileRunningTransition())
+        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, createFailRequestWhileRunningTransition())
+        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestWhileRunningTransition())
+        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestWhileRunningTransition())
+        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, createNodeFailedWhileRunningTransition())
+        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_CONTAINER_TERMINATING, createContainerTerminatingWhileRunningTransition())
+        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createContainerCompletedWhileRunningTransition())
         
         // XXX Maybe move getMessage / getDiagnosticInfo into the base TaskAttemptEvent ?
         
-        .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.COMMIT_PENDING, TaskAttemptEventType.TA_STATUS_UPDATE, createStatusUpdateTransition())
-        .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.COMMIT_PENDING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING) // TODO ensure this is an ignorable event.
-        .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.SUCCEEDED, TaskAttemptEventType.TA_DONE, createSucceededTransition())
-        .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, createFailRequestWhileRunningTransition())
-        .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, createFailRequestWhileRunningTransition())
-        .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestWhileRunningTransition())
-        .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestWhileRunningTransition())
-        .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, createNodeFailedWhileRunningTransition())
-        .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.RUNNING, TaskAttemptEventType.TA_CONTAINER_TERMINATING, createContainerTerminatingWhileRunningTransition())
-        .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createContainerCompletedWhileRunningTransition())
-
-        .addTransition(TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptState.KILLED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createTerminatedTransition())
-        .addTransition(TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptState.KILL_IN_PROGRESS, TaskAttemptState.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING))
+        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_STATUS_UPDATE, createStatusUpdateTransition())
+        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING) // TODO ensure this is an ignorable event.
+        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, createSucceededTransition())
+        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, createFailRequestWhileRunningTransition())
+        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, createFailRequestWhileRunningTransition())
+        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestWhileRunningTransition())
+        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestWhileRunningTransition())
+        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, createNodeFailedWhileRunningTransition())
+        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_CONTAINER_TERMINATING, createContainerTerminatingWhileRunningTransition())
+        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createContainerCompletedWhileRunningTransition())
+
+        .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createTerminatedTransition())
+        .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+        .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING))
         
-        .addTransition(TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptState.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createTerminatedTransition())
-        .addTransition(TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptState.FAIL_IN_PROGRESS, TaskAttemptState.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING))
+        .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createTerminatedTransition())
+        .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+        .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING))
         
-        .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED))
+        .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+        .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED))
 
-        .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED))
+        .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+        .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED))
         
         // TODO XXX: FailRequest / KillRequest at SUCCEEDED need to consider Map / Reduce task.
-        .addTransition(TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptState.SUCCEEDED, TaskAttemptState.FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestAfterSuccessTransition())
-        .addTransition(TaskAttemptState.SUCCEEDED, TaskAttemptState.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestAfterSuccessTransition())
-        .addTransition(TaskAttemptState.SUCCEEDED, TaskAttemptState.KILLED, TaskAttemptEventType.TA_NODE_FAILED, createNodeFailedAfterSuccessTransition())
-        .addTransition(TaskAttemptState.SUCCEEDED, TaskAttemptState.FAILED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, createTooManyFetchFailuresTransition())
-        .addTransition(TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED, EnumSet.of(TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_CONTAINER_TERMINATING))
+        .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+        .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestAfterSuccessTransition())
+        .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestAfterSuccessTransition())
+        .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_NODE_FAILED, createNodeFailedAfterSuccessTransition())
+        .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, createTooManyFetchFailuresTransition())
+        .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_CONTAINER_TERMINATING))
         
         
         .installTopology();
@@ -353,7 +357,7 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
   public TaskAttemptState getState() {
     readLock.lock();
     try {
-      return stateMachine.getCurrentState();
+      return getExternalState(stateMachine.getCurrentState());
     } finally {
       readLock.unlock();
     }
@@ -364,9 +368,12 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
     readLock.lock();
     try {
       // TODO: Use stateMachine level method?
-      return (getState() == TaskAttemptState.SUCCEEDED || 
-          getState() == TaskAttemptState.FAILED ||
-          getState() == TaskAttemptState.KILLED);
+      return (EnumSet.of(TaskAttemptStateInternal.SUCCEEDED,
+          TaskAttemptStateInternal.FAILED,
+          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptStateInternal.KILLED,
+          TaskAttemptStateInternal.KILL_IN_PROGRESS)
+          .contains(getInternalState()));
     } finally {
       readLock.unlock();
     }
@@ -485,10 +492,10 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
           + event.getType());
     }
     LOG.info("XXX: Processing " + event.getTaskAttemptID() + " of type "
-        + event.getType() + " while in state: " + getState());
+        + event.getType() + " while in state: " + getInternalState());
     writeLock.lock();
     try {
-      final TaskAttemptState oldState = getState();
+      final TaskAttemptStateInternal oldState = getInternalState();
       try {
         stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
@@ -500,16 +507,51 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
         eventHandler.handle(new JobEvent(this.attemptId.getTaskId().getJobId(),
             JobEventType.INTERNAL_ERROR));
       }
-      if (oldState != getState()) {
+      if (oldState != getInternalState()) {
           LOG.info(attemptId + " TaskAttempt Transitioned from " 
            + oldState + " to "
-           + getState());
+           + getInternalState());
       }
     } finally {
       writeLock.unlock();
     }
   }
   
+  @VisibleForTesting
+  public TaskAttemptStateInternal getInternalState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }    
+  }
+
+  private static TaskAttemptState getExternalState(
+      TaskAttemptStateInternal smState) {
+    switch (smState) {
+    case NEW:
+    case START_WAIT:
+      return TaskAttemptState.STARTING;
+    case RUNNING:
+      return TaskAttemptState.RUNNING;
+    case COMMIT_PENDING:
+      return TaskAttemptState.COMMIT_PENDING;
+    case FAILED:
+    case FAIL_IN_PROGRESS:
+      return TaskAttemptState.FAILED;
+    case KILLED:
+    case KILL_IN_PROGRESS:
+      return TaskAttemptState.KILLED;
+    case SUCCEEDED:
+      return TaskAttemptState.SUCCEEDED;
+    default:
+      throw new YarnException("Attempt to convert invalid "
+          + "stateMachineTaskAttemptState to externalTaskAttemptState: "
+          + smState);
+    }
+  }
+
   @SuppressWarnings("unchecked")
   private void sendEvent(Event<?> event) {
     this.eventHandler.handle(event);
@@ -560,7 +602,7 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
 
   private static JobCounterUpdateEvent createJobCounterUpdateEventTATerminated(
       TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted,
-      TaskAttemptState taState) {
+      TaskAttemptStateInternal taState) {
     TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
     JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID()
         .getTaskId().getJobId());
@@ -568,9 +610,9 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
     long slotMillisIncrement = computeSlotMillis(taskAttempt);
 
     if (taskType == TaskType.MAP) {
-      if (taState == TaskAttemptState.FAILED) {
+      if (taState == TaskAttemptStateInternal.FAILED) {
         jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1);
-      } else if (taState == TaskAttemptState.KILLED) {
+      } else if (taState == TaskAttemptStateInternal.KILLED) {
         jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1);
       }
       if (!taskAlreadyCompleted) {
@@ -578,9 +620,9 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
         jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
       }
     } else {
-      if (taState == TaskAttemptState.FAILED) {
+      if (taState == TaskAttemptStateInternal.FAILED) {
         jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1);
-      } else if (taState == TaskAttemptState.KILLED) {
+      } else if (taState == TaskAttemptStateInternal.KILLED) {
         jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1);
       }
       if (!taskAlreadyCompleted) {
@@ -615,7 +657,7 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
   private static
       TaskAttemptUnsuccessfulCompletionEvent
   createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt,
-      TaskAttemptState attemptState) {
+      TaskAttemptStateInternal attemptState) {
     TaskAttemptUnsuccessfulCompletionEvent tauce =
     new TaskAttemptUnsuccessfulCompletionEvent(
         TypeConverter.fromYarn(taskAttempt.attemptId),
@@ -700,7 +742,7 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
   }
   
   @SuppressWarnings({ "unchecked" })
-  private void logAttemptFinishedEvent(TaskAttemptState state) {
+  private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
     //Log finished events only if an attempt started.
     if (getLaunchTime() == 0) return; 
     if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
@@ -848,12 +890,12 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
       ta.setFinishTime();
 
       ta.sendEvent(createJobCounterUpdateEventTATerminated(ta, false,
-          TaskAttemptState.FAILED));
+          TaskAttemptStateInternal.FAILED));
       if (ta.getLaunchTime() != 0) {
         // TODO XXX: For cases like this, recovery goes for a toss, since the the attempt will not exist in the history file.
         ta.sendEvent(new JobHistoryEvent(ta.jobId,
             createTaskAttemptUnsuccessfulCompletionEvent(ta,
-                TaskAttemptState.FAILED)));
+                TaskAttemptStateInternal.FAILED)));
       } else {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Not generating HistoryFinish event since start event not " +
@@ -881,11 +923,11 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
       ta.setFinishTime();
 
       ta.sendEvent(createJobCounterUpdateEventTATerminated(ta, false,
-          TaskAttemptState.KILLED));
+          TaskAttemptStateInternal.KILLED));
       if (ta.getLaunchTime() != 0) {
         ta.sendEvent(new JobHistoryEvent(ta.jobId,
             createTaskAttemptUnsuccessfulCompletionEvent(ta,
-                TaskAttemptState.KILLED)));
+                TaskAttemptStateInternal.KILLED)));
       } else {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Not generating HistoryFinish event since start event not " +
@@ -1108,7 +1150,7 @@ public abstract class TaskAttemptImpl implements TaskAttempt,
       //Inform the speculator. Generate history event. Update counters.
       ta.setFinishTime();
       ta.sendEvent(new SpeculatorEvent(ta.reportedStatus, ta.finishTime));
-      ta.logAttemptFinishedEvent(TaskAttemptState.SUCCEEDED);
+      ta.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
       ta.sendEvent(createJobCounterUpdateEventSlotMillis(ta));
 
       // Inform the Scheduler.

+ 92 - 67
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app2.job.Task;
 import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskStateInternal;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
@@ -85,6 +86,8 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Implementation of Task interface.
  */
@@ -128,62 +131,62 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
      KILL_TRANSITION = new KillTransition();
 
   private static final StateMachineFactory
-               <TaskImpl, TaskState, TaskEventType, TaskEvent> 
+               <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent> 
             stateMachineFactory 
-           = new StateMachineFactory<TaskImpl, TaskState, TaskEventType, TaskEvent>
-               (TaskState.NEW)
+           = new StateMachineFactory<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
+               (TaskStateInternal.NEW)
 
     // define the state machine of Task
 
     // Transitions from NEW state
-    .addTransition(TaskState.NEW, TaskState.SCHEDULED, 
+    .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED, 
         TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
-    .addTransition(TaskState.NEW, TaskState.KILLED, 
+    .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED, 
         TaskEventType.T_KILL, new KillNewTransition())
 
     // Transitions from SCHEDULED state
       //when the first attempt is launched, the task state is set to RUNNING
-     .addTransition(TaskState.SCHEDULED, TaskState.RUNNING, 
+     .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.RUNNING, 
          TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
-     .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT, 
+     .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT, 
          TaskEventType.T_KILL, KILL_TRANSITION)
-     .addTransition(TaskState.SCHEDULED, TaskState.SCHEDULED, 
+     .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED, 
          TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
-     .addTransition(TaskState.SCHEDULED, 
-        EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED), 
+     .addTransition(TaskStateInternal.SCHEDULED, 
+        EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED), 
         TaskEventType.T_ATTEMPT_FAILED, 
         new AttemptFailedTransition())
  
     // Transitions from RUNNING state
-    .addTransition(TaskState.RUNNING, TaskState.RUNNING, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, 
         TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later
-    .addTransition(TaskState.RUNNING, TaskState.RUNNING, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, 
         TaskEventType.T_ATTEMPT_COMMIT_PENDING,
         new AttemptCommitPendingTransition())
-    .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
         TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
-    .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED, 
         TaskEventType.T_ATTEMPT_SUCCEEDED,
         new AttemptSucceededTransition())
-    .addTransition(TaskState.RUNNING, TaskState.RUNNING, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, 
         TaskEventType.T_ATTEMPT_KILLED,
         ATTEMPT_KILLED_TRANSITION)
-    .addTransition(TaskState.RUNNING, 
-        EnumSet.of(TaskState.RUNNING, TaskState.FAILED), 
+    .addTransition(TaskStateInternal.RUNNING, 
+        EnumSet.of(TaskStateInternal.RUNNING, TaskStateInternal.FAILED), 
         TaskEventType.T_ATTEMPT_FAILED,
         new AttemptFailedTransition())
-    .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.KILL_WAIT, 
         TaskEventType.T_KILL, KILL_TRANSITION)
 
     // Transitions from KILL_WAIT state
-    .addTransition(TaskState.KILL_WAIT,
-        EnumSet.of(TaskState.KILL_WAIT, TaskState.KILLED),
+    .addTransition(TaskStateInternal.KILL_WAIT,
+        EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
         TaskEventType.T_ATTEMPT_KILLED,
         new KillWaitAttemptKilledTransition())
     // Ignore-able transitions.
     .addTransition(
-        TaskState.KILL_WAIT,
-        TaskState.KILL_WAIT,
+        TaskStateInternal.KILL_WAIT,
+        TaskStateInternal.KILL_WAIT,
         EnumSet.of(TaskEventType.T_KILL,
             TaskEventType.T_ATTEMPT_LAUNCHED,
             TaskEventType.T_ATTEMPT_COMMIT_PENDING,
@@ -192,32 +195,32 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
             TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // Transitions from SUCCEEDED state
-    .addTransition(TaskState.SUCCEEDED, //only possible for map tasks
-        EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED, TaskState.FAILED),
+    .addTransition(TaskStateInternal.SUCCEEDED, //only possible for map tasks
+        EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED, TaskStateInternal.FAILED),
         TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
-    .addTransition(TaskState.SUCCEEDED, //only possible for map tasks
-        EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED),
+    .addTransition(TaskStateInternal.SUCCEEDED, //only possible for map tasks
+        EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED),
         TaskEventType.T_ATTEMPT_KILLED, new MapRetroactiveKilledTransition())
     // Ignore-able transitions.
     .addTransition(
-        TaskState.SUCCEEDED, TaskState.SUCCEEDED,
+        TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
         EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
             TaskEventType.T_ATTEMPT_LAUNCHED))
 
     // Transitions from FAILED state        
-    .addTransition(TaskState.FAILED, TaskState.FAILED,
+    .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
         EnumSet.of(TaskEventType.T_KILL,
                    TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // Transitions from KILLED state
-    .addTransition(TaskState.KILLED, TaskState.KILLED,
+    .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
         EnumSet.of(TaskEventType.T_KILL,
                    TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // create the topology tables
     .installTopology();
 
-  private final StateMachine<TaskState, TaskEventType, TaskEvent>
+  private final StateMachine<TaskStateInternal, TaskEventType, TaskEvent>
     stateMachine;
 
   // By default, the next TaskAttempt number is zero. Changes during recovery  
@@ -248,7 +251,12 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   @Override
   public TaskState getState() {
-    return stateMachine.getCurrentState();
+    readLock.lock();
+    try {
+      return getExternalState(getInternalState());
+    } finally {
+      readLock.unlock();
+    }
   }
 
   public TaskImpl(JobId jobId, TaskType taskType, int partition,
@@ -358,9 +366,10 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     readLock.lock();
     try {
      // TODO: Use stateMachine level method?
-      return (getState() == TaskState.SUCCEEDED ||
-          getState() == TaskState.FAILED ||
-          getState() == TaskState.KILLED);
+      return (getInternalState() == TaskStateInternal.SUCCEEDED ||
+              getInternalState() == TaskStateInternal.FAILED ||
+              getInternalState() == TaskStateInternal.KILLED);
+      // TODO XXX Maybe KILL_WAIT as well.
     } finally {
       readLock.unlock();
     }
@@ -435,6 +444,24 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
+  @VisibleForTesting
+  public TaskStateInternal getInternalState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }    
+  }
+
+  private static TaskState getExternalState(TaskStateInternal smState) {
+    if (smState == TaskStateInternal.KILL_WAIT) {
+      return TaskState.KILLED;
+    } else {
+      return TaskState.valueOf(smState.name());
+    }    
+  }
+
   //this is always called in read/write lock
   private long getLaunchTime() {
     long taskLaunchTime = 0;
@@ -486,8 +513,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     return finishTime;
   }
   
-  private TaskState finished(TaskState finalState) {
-    if (getState() == TaskState.RUNNING) {
+  private TaskStateInternal finished(TaskStateInternal finalState) {
+    if (getInternalState() == TaskStateInternal.RUNNING) {
       metrics.endRunningTask(this);
     }
     return finalState;
@@ -502,9 +529,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       switch (at.getState()) {
 
       // ignore all failed task attempts
-      case FAIL_IN_PROGRESS:
       case FAILED:
-      case KILL_IN_PROGRESS:
       case KILLED:
         continue;
       default:
@@ -601,10 +626,10 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
           + event.getType());
     }
     LOG.debug("XXX: Processing " + event.getTaskID() + " of type "
-        + event.getType() + " while in state: " + getState());
+        + event.getType() + " while in state: " + getInternalState());
     try {
       writeLock.lock();
-      TaskState oldState = getState();
+      TaskStateInternal oldState = getInternalState();
       try {
         stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
@@ -612,9 +637,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
             + this.taskId, e);
         internalError(event.getType());
       }
-      if (oldState != getState()) {
+      if (oldState != getInternalState()) {
         LOG.info(taskId + " Task Transitioned from " + oldState + " to "
-            + getState());
+            + getInternalState());
       }
 
     } finally {
@@ -658,7 +683,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
-  private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskState taskState) {
+  private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
     TaskFinishedEvent tfe =
       new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
         TypeConverter.fromYarn(task.successfulAttempt),
@@ -669,7 +694,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     return tfe;
   }
   
-  private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskState taskState, TaskAttemptId taId) {
+  private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskStateInternal taskState, TaskAttemptId taId) {
     StringBuilder errorSb = new StringBuilder();
     if (diag != null) {
       for (String d : diag) {
@@ -775,7 +800,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       // issue kill to all other attempts
       if (task.historyTaskStartGenerated) {
         TaskFinishedEvent tfe = createTaskFinishedEvent(task,
-            TaskState.SUCCEEDED);
+            TaskStateInternal.SUCCEEDED);
         task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
             tfe));
       }
@@ -792,7 +817,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 //                  TaskAttemptEventType.TA_KILL));
         }
       }
-      task.finished(TaskState.SUCCEEDED);
+      task.finished(TaskStateInternal.SUCCEEDED);
     }
   }
 
@@ -813,12 +838,12 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 
 
   private static class KillWaitAttemptKilledTransition implements
-      MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
+      MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
-    protected TaskState finalState = TaskState.KILLED;
+    protected TaskStateInternal finalState = TaskStateInternal.KILLED;
 
     @Override
-    public TaskState transition(TaskImpl task, TaskEvent event) {
+    public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       task.handleTaskAttemptCompletion(
           ((TaskTAttemptEvent) event).getTaskAttemptID(), 
           TaskAttemptCompletionEventStatus.KILLED);
@@ -836,18 +861,18 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
         }
 
         task.eventHandler.handle(
-            new JobTaskEvent(task.taskId, finalState));
+            new JobTaskEvent(task.taskId, getExternalState(finalState)));
         return finalState;
       }
-      return task.getState();
+      return task.getInternalState();
     }
   }
 
   private static class AttemptFailedTransition implements
-    MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
+    MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
     @Override
-    public TaskState transition(TaskImpl task, TaskEvent event) {
+    public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       task.failedAttempts++;
       TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
       if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
@@ -879,7 +904,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
         
         if (task.historyTaskStartGenerated) {
         TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(),
-            TaskState.FAILED, taId);
+            TaskStateInternal.FAILED, taId);
         task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
             taskFailedEvent));
         } else {
@@ -888,13 +913,13 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
         }
         task.eventHandler.handle(
             new JobTaskEvent(task.taskId, TaskState.FAILED));
-        return task.finished(TaskState.FAILED);
+        return task.finished(TaskStateInternal.FAILED);
       }
       return getDefaultState(task);
     }
 
-    protected TaskState getDefaultState(Task task) {
-      return task.getState();
+    protected TaskStateInternal getDefaultState(TaskImpl task) {
+      return task.getInternalState();
     }
   }
 
@@ -902,14 +927,14 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       extends AttemptFailedTransition {
 
     @Override
-    public TaskState transition(TaskImpl task, TaskEvent event) {
+    public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       if (event instanceof TaskTAttemptEvent) {
         TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
-        if (task.getState() == TaskState.SUCCEEDED &&
+        if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
             !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
           // don't allow a different task attempt to override a previous
           // succeeded state
-          return TaskState.SUCCEEDED;
+          return TaskStateInternal.SUCCEEDED;
         }
       }
       
@@ -934,16 +959,16 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
 
     @Override
-    protected TaskState getDefaultState(Task task) {
-      return TaskState.SCHEDULED;
+    protected TaskStateInternal getDefaultState(TaskImpl task) {
+      return TaskStateInternal.SCHEDULED;
     }
   }
 
   private static class MapRetroactiveKilledTransition implements
-    MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
+    MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
     @Override
-    public TaskState transition(TaskImpl task, TaskEvent event) {
+    public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       // verify that this occurs only for map task
       // TODO: consider moving it to MapTaskImpl
       if (!TaskType.MAP.equals(task.getType())) {
@@ -970,10 +995,10 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
         // to the RM. But the RM would ignore that just like it would ignore 
         // currently pending container requests affinitized to bad nodes.
         task.addAndScheduleAttempt();
-        return TaskState.SCHEDULED;
+        return TaskStateInternal.SCHEDULED;
       } else {
         // nothing to do
-        return TaskState.SUCCEEDED;
+        return TaskStateInternal.SUCCEEDED;
       }
     }
   }
@@ -985,7 +1010,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       
       if (task.historyTaskStartGenerated) {
       TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
-            TaskState.KILLED, null); // TODO Verify failedAttemptId is null
+            TaskStateInternal.KILLED, null); // TODO Verify failedAttemptId is null
       task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
           taskFailedEvent));
       }else {

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/DefaultSpeculator.java

@@ -365,7 +365,7 @@ public class DefaultSpeculator extends AbstractService implements
 
     for (TaskAttempt taskAttempt : attempts.values()) {
       if (taskAttempt.getState() == TaskAttemptState.RUNNING
-          || taskAttempt.getState() == TaskAttemptState.ASSIGNED) {
+          || taskAttempt.getState() == TaskAttemptState.STARTING) {
         if (++numberRunningAttempts > 1) {
           return ALREADY_SPECULATING;
         }

+ 25 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java

@@ -57,8 +57,10 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app2.job.Task;
 import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttemptStateInternal;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobFinishEvent;
@@ -69,6 +71,7 @@ import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptRemoteStartEvent
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.TaskAttemptImpl;
 import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
@@ -253,6 +256,23 @@ public class MRApp extends MRAppMaster {
     return job;
   }
 
+  public void waitForInternalState(TaskAttemptImpl attempt,
+      TaskAttemptStateInternal finalState) throws Exception {
+    int timeoutSecs = 0;
+    TaskAttemptReport report = attempt.getReport();
+    TaskAttemptStateInternal iState = attempt.getInternalState();
+    while (!finalState.equals(iState) && timeoutSecs++ < 20) {
+      System.out.println("TaskAttempt Internal State is : " + iState
+          + " Waiting for Internal state : " + finalState + "   progress : "
+          + report.getProgress());
+      Thread.sleep(500);
+      report = attempt.getReport();
+      iState = attempt.getInternalState();
+    }   
+    System.out.println("TaskAttempt Internal State is : " + iState);
+    Assert.assertEquals("TaskAttempt Internal state is not correct (timedout)",
+        finalState, iState);
+  }
 
   public void waitForState(TaskAttempt attempt, 
       TaskAttemptState finalState) throws Exception {
@@ -727,18 +747,18 @@ public class MRApp extends MRAppMaster {
     //override the init transition
     private final TestInitTransition initTransition = new TestInitTransition(
         maps, reduces);
-    StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> localFactory
-        = stateMachineFactory.addTransition(JobState.NEW,
-            EnumSet.of(JobState.INITED, JobState.FAILED),
+    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory
+        = stateMachineFactory.addTransition(JobStateInternal.NEW,
+            EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
             JobEventType.JOB_INIT,
             // This is abusive.
             initTransition);
 
-    private final StateMachine<JobState, JobEventType, JobEvent>
+    private final StateMachine<JobStateInternal, JobEventType, JobEvent>
         localStateMachine;
 
     @Override
-    protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
+    protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
       return localStateMachine;
     }
 

+ 4 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java

@@ -37,8 +37,10 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app2.job.Job;
 import org.apache.hadoop.mapreduce.v2.app2.job.Task;
 import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttemptStateInternal;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.TaskAttemptImpl;
 import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncherImpl;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
@@ -191,7 +193,8 @@ public class TestFail {
     Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts
         .size());
     TaskAttempt attempt = attempts.values().iterator().next();
-    app.waitForState(attempt, TaskAttemptState.START_WAIT);
+    app.waitForInternalState((TaskAttemptImpl) attempt,
+        TaskAttemptStateInternal.START_WAIT);
     // TODO XXX: This may not be a valid test.
     app.getDispatcher().getEventHandler().handle(
         new TaskAttemptEvent(attempt.getID(),

+ 10 - 9
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java

@@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app2.MRApp;
 import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app2.job.Task;
 import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
@@ -81,11 +82,11 @@ public class TestJobImpl {
     tasks.put(mockTask.getID(), mockTask);
     mockJob.tasks = tasks;
 
-    when(mockJob.getState()).thenReturn(JobState.ERROR);
+    when(mockJob.getInternalState()).thenReturn(JobStateInternal.ERROR);
     JobEvent mockJobEvent = mock(JobEvent.class);
-    JobState state = trans.transition(mockJob, mockJobEvent);
+    JobStateInternal state = trans.transition(mockJob, mockJobEvent);
     Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition",
-        JobState.ERROR, state);
+        JobStateInternal.ERROR, state);
   }
 
   @Test
@@ -102,9 +103,9 @@ public class TestJobImpl {
     when(mockJob.getJobContext()).thenReturn(mockJobContext);
     doNothing().when(mockJob).setFinishTime();
     doNothing().when(mockJob).logJobHistoryFinishedEvent();
-    when(mockJob.finished(JobState.KILLED)).thenReturn(JobState.KILLED);
-    when(mockJob.finished(JobState.FAILED)).thenReturn(JobState.FAILED);
-    when(mockJob.finished(JobState.SUCCEEDED)).thenReturn(JobState.SUCCEEDED);
+    when(mockJob.finished(JobStateInternal.KILLED)).thenReturn(JobStateInternal.KILLED);
+    when(mockJob.finished(JobStateInternal.FAILED)).thenReturn(JobStateInternal.FAILED);
+    when(mockJob.finished(JobStateInternal.SUCCEEDED)).thenReturn(JobStateInternal.SUCCEEDED);
 
     try {
       doThrow(new IOException()).when(mockCommitter).commitJob(any(JobContext.class));
@@ -116,7 +117,7 @@ public class TestJobImpl {
       "for successful job",
       JobImpl.checkJobCompleteSuccess(mockJob));
     Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
-        JobState.FAILED, JobImpl.checkJobCompleteSuccess(mockJob));
+        JobStateInternal.FAILED, JobImpl.checkJobCompleteSuccess(mockJob));
   }
 
   @Test
@@ -133,7 +134,7 @@ public class TestJobImpl {
     when(mockJob.getJobContext()).thenReturn(mockJobContext);
     doNothing().when(mockJob).setFinishTime();
     doNothing().when(mockJob).logJobHistoryFinishedEvent();
-    when(mockJob.finished(any(JobState.class))).thenReturn(JobState.SUCCEEDED);
+    when(mockJob.finished(any(JobStateInternal.class))).thenReturn(JobStateInternal.SUCCEEDED);
 
     try {
       doNothing().when(mockCommitter).commitJob(any(JobContext.class));
@@ -145,7 +146,7 @@ public class TestJobImpl {
       "for successful job",
       JobImpl.checkJobCompleteSuccess(mockJob));
     Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
-        JobState.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob));
+        JobStateInternal.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob));
   }
 
   @Test

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskImpl.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app2.AppContext;
 import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskStateInternal;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskTAttemptEvent;
@@ -333,7 +334,7 @@ public class TestTaskImpl {
    * {@link TaskState#KILL_WAIT}
    */
   private void assertTaskKillWaitState() {
-    assertEquals(TaskState.KILL_WAIT, mockTask.getState());
+    assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState());
   }
   
   /**

+ 4 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncher.java

@@ -37,7 +37,6 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app2.AppContext;
@@ -45,6 +44,8 @@ import org.apache.hadoop.mapreduce.v2.app2.MRApp;
 import org.apache.hadoop.mapreduce.v2.app2.job.Job;
 import org.apache.hadoop.mapreduce.v2.app2.job.Task;
 import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttemptStateInternal;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.TaskAttemptImpl;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
 import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
@@ -263,7 +264,8 @@ public class TestContainerLauncher {
           attempts.size());
 
     TaskAttempt attempt = attempts.values().iterator().next();
-    app.waitForState(attempt, TaskAttemptState.START_WAIT);
+    app.waitForInternalState((TaskAttemptImpl) attempt,
+        TaskAttemptStateInternal.START_WAIT);
 
     app.waitForState(job, JobState.FAILED);