فهرست منبع

MAPREDUCE-3921. MR AM should act on node health status changes. Contributed by Bikas Saha.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1349065 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth 13 سال پیش
والد
کامیت
eff9fa1aad
16فایلهای تغییر یافته به همراه686 افزوده شده و 65 حذف شده
  1. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 6 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java
  3. 5 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
  4. 40 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobUpdatedNodesEvent.java
  5. 37 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java
  6. 93 13
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
  7. 65 13
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  8. 56 11
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
  9. 74 18
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
  10. 6 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
  11. 178 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
  12. 100 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
  13. 6 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
  14. 5 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
  15. 6 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
  16. 6 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -132,6 +132,9 @@ Branch-2 ( Unreleased changes )
     MAPREDUCE-3871. Allow symlinking in LocalJobRunner DistributedCache.
     (tomwhite)
 
+    MAPREDUCE-3921. MR AM should act on node health status changes. 
+    (Bikas Saha via sseth)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 
 
 /**
@@ -54,6 +55,11 @@ public interface TaskAttempt {
    */
   String getAssignedContainerMgrAddress();
   
+  /**
+   * @return node's id if a container is assigned, otherwise null.
+   */
+  NodeId getNodeId();
+  
   /**
    * @return node's http address if a container is assigned, otherwise null.
    */

+ 5 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java

@@ -44,5 +44,9 @@ public enum JobEventType {
   JOB_COUNTER_UPDATE,
   
   //Producer:TaskAttemptListener
-  JOB_TASK_ATTEMPT_FETCH_FAILURE
+  JOB_TASK_ATTEMPT_FETCH_FAILURE,
+  
+  //Producer:RMContainerAllocator
+  JOB_UPDATED_NODES
+  
 }

+ 40 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobUpdatedNodesEvent.java

@@ -0,0 +1,40 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+
+
+
+public class JobUpdatedNodesEvent extends JobEvent {
+
+  private final List<NodeReport> updatedNodes;
+  public JobUpdatedNodesEvent(JobId jobId, List<NodeReport> updatedNodes) {
+    super(jobId, JobEventType.JOB_UPDATED_NODES);
+    this.updatedNodes = updatedNodes;
+  }
+
+  public List<NodeReport> getUpdatedNodes() {
+    return updatedNodes;
+  }
+
+}

+ 37 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java

@@ -0,0 +1,37 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+
+public class TaskAttemptKillEvent extends TaskAttemptEvent {
+
+  private final String message;
+
+  public TaskAttemptKillEvent(TaskAttemptId attemptID,
+      String message) {
+    super(attemptID, TaskAttemptEventType.TA_KILL);
+    this.message = message;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+}

+ 93 - 13
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -77,6 +77,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -85,8 +86,10 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
@@ -100,6 +103,9 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -148,6 +154,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   private final Object tasksSyncHandle = new Object();
   private final Set<TaskId> mapTasks = new LinkedHashSet<TaskId>();
   private final Set<TaskId> reduceTasks = new LinkedHashSet<TaskId>();
+  /**
+   * maps nodes to tasks that have run on those nodes
+   */
+  private final HashMap<NodeId, List<TaskAttemptId>> 
+    nodesToSucceededTaskAttempts = new HashMap<NodeId, List<TaskAttemptId>>();
+
   private final EventHandler eventHandler;
   private final MRAppMetrics metrics;
   private final String userName;
@@ -194,6 +206,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           new TaskAttemptCompletedEventTransition();
   private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION =
       new CounterUpdateTransition();
+  private static final UpdatedNodesTransition UPDATED_NODES_TRANSITION =
+      new UpdatedNodesTransition();
 
   protected static final
     StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> 
@@ -218,7 +232,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           .addTransition(JobState.NEW, JobState.ERROR,
               JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
-
+          // Ignore-able events
+          .addTransition(JobState.NEW, JobState.NEW,
+              JobEventType.JOB_UPDATED_NODES)
+              
           // Transitions from INITED state
           .addTransition(JobState.INITED, JobState.INITED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
@@ -234,7 +251,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           .addTransition(JobState.INITED, JobState.ERROR,
               JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
-
+          // Ignore-able events
+          .addTransition(JobState.INITED, JobState.INITED,
+              JobEventType.JOB_UPDATED_NODES)
+              
           // Transitions from RUNNING state
           .addTransition(JobState.RUNNING, JobState.RUNNING,
               JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
@@ -251,6 +271,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
               new JobNoTasksCompletedTransition())
           .addTransition(JobState.RUNNING, JobState.KILL_WAIT,
               JobEventType.JOB_KILL, new KillTasksTransition())
+          .addTransition(JobState.RUNNING, JobState.RUNNING,
+              JobEventType.JOB_UPDATED_NODES,
+              UPDATED_NODES_TRANSITION)
           .addTransition(JobState.RUNNING, JobState.RUNNING,
               JobEventType.JOB_MAP_TASK_RESCHEDULED,
               new MapTaskRescheduledTransition())
@@ -288,8 +311,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           // Ignore-able events
           .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
               EnumSet.of(JobEventType.JOB_KILL,
-                         JobEventType.JOB_MAP_TASK_RESCHEDULED,
-                         JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
+                  JobEventType.JOB_UPDATED_NODES,
+                  JobEventType.JOB_MAP_TASK_RESCHEDULED,
+                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // Transitions from SUCCEEDED state
           .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
@@ -303,7 +327,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
           .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
-              EnumSet.of(JobEventType.JOB_KILL,
+              EnumSet.of(JobEventType.JOB_KILL, 
+                  JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // Transitions from FAILED state
@@ -318,7 +343,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
           .addTransition(JobState.FAILED, JobState.FAILED,
-              EnumSet.of(JobEventType.JOB_KILL,
+              EnumSet.of(JobEventType.JOB_KILL, 
+                  JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // Transitions from KILLED state
@@ -333,7 +359,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
           .addTransition(JobState.KILLED, JobState.KILLED,
-              EnumSet.of(JobEventType.JOB_KILL,
+              EnumSet.of(JobEventType.JOB_KILL, 
+                  JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // No transitions from INTERNAL_ERROR state. Ignore all.
@@ -346,6 +373,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
                   JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
                   JobEventType.JOB_MAP_TASK_RESCHEDULED,
                   JobEventType.JOB_DIAGNOSTIC_UPDATE,
+                  JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
                   JobEventType.INTERNAL_ERROR))
           .addTransition(JobState.ERROR, JobState.ERROR,
@@ -895,7 +923,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
       LOG.info(msg.toString());
     }
   }
-
+  
   /**
    * ChainMapper and ChainReducer must execute in parallel, so they're not
    * compatible with uberization/LocalContainerLauncher (100% sequential).
@@ -924,6 +952,24 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     }
     return isChainJob;
   }
+  
+  private void actOnUnusableNode(NodeId nodeId, NodeState nodeState) {
+    // rerun previously successful map tasks
+    List<TaskAttemptId> taskAttemptIdList = nodesToSucceededTaskAttempts.get(nodeId);
+    if(taskAttemptIdList != null) {
+      String mesg = "TaskAttempt killed because it ran on unusable node "
+          + nodeId;
+      for(TaskAttemptId id : taskAttemptIdList) {
+        if(TaskType.MAP == id.getTaskId().getTaskType()) {
+          // reschedule only map tasks because their outputs maybe unusable
+          LOG.info(mesg + ". AttemptId:" + id);
+          eventHandler.handle(new TaskAttemptKillEvent(id, mesg));
+        }
+      }
+    }
+    // currently running task attempts on unusable nodes are handled in
+    // RMContainerAllocator
+  }
 
   /*
   private int getBlockSize() {
@@ -1269,18 +1315,37 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
       tce.setEventId(job.taskAttemptCompletionEvents.size());
       job.taskAttemptCompletionEvents.add(tce);
       
+      TaskAttemptId attemptId = tce.getAttemptId();
+      TaskId taskId = attemptId.getTaskId();
       //make the previous completion event as obsolete if it exists
       Object successEventNo = 
-        job.successAttemptCompletionEventNoMap.remove(tce.getAttemptId().getTaskId());
+        job.successAttemptCompletionEventNoMap.remove(taskId);
       if (successEventNo != null) {
         TaskAttemptCompletionEvent successEvent = 
           job.taskAttemptCompletionEvents.get((Integer) successEventNo);
         successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE);
       }
-
+      
+      // if this attempt is not successful then why is the previous successful 
+      // attempt being removed above - MAPREDUCE-4330
       if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.getStatus())) {
-        job.successAttemptCompletionEventNoMap.put(tce.getAttemptId().getTaskId(), 
-            tce.getEventId());
+        job.successAttemptCompletionEventNoMap.put(taskId, tce.getEventId());
+        
+        // here we could have simply called Task.getSuccessfulAttempt() but
+        // the event that triggers this code is sent before
+        // Task.successfulAttempt is set and so there is no guarantee that it
+        // will be available now
+        Task task = job.tasks.get(taskId);
+        TaskAttempt attempt = task.getAttempt(attemptId);
+        NodeId nodeId = attempt.getNodeId();
+        assert (nodeId != null); // node must exist for a successful event
+        List<TaskAttemptId> taskAttemptIdList = job.nodesToSucceededTaskAttempts
+            .get(nodeId);
+        if (taskAttemptIdList == null) {
+          taskAttemptIdList = new ArrayList<TaskAttemptId>();
+          job.nodesToSucceededTaskAttempts.put(nodeId, taskAttemptIdList);
+        }
+        taskAttemptIdList.add(attempt.getID());
       }
     }
   }
@@ -1460,7 +1525,22 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
       }
     }
   }
-
+  
+  private static class UpdatedNodesTransition implements
+      SingleArcTransition<JobImpl, JobEvent> {
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      JobUpdatedNodesEvent updateEvent = (JobUpdatedNodesEvent) event;
+      for(NodeReport nr: updateEvent.getUpdatedNodes()) {
+        NodeState nodeState = nr.getNodeState();
+        if(nodeState.isUnusable()) {
+          // act on the updates
+          job.actOnUnusableNode(nr.getNodeId(), nodeState);
+        }
+      }
+    }
+  }
+  
   private static class InternalErrorTransition implements
       SingleArcTransition<JobImpl, JobEvent> {
     @Override

+ 65 - 13
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -84,6 +84,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
@@ -403,6 +404,10 @@ public abstract class TaskAttemptImpl implements
          TaskAttemptState.FAILED,
          TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
          new TooManyFetchFailureTransition())
+     .addTransition(
+         TaskAttemptState.SUCCEEDED, TaskAttemptState.KILLED,
+         TaskAttemptEventType.TA_KILL,
+         new KilledAfterSuccessTransition())
      .addTransition(
          TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
@@ -410,8 +415,7 @@ public abstract class TaskAttemptImpl implements
      // Ignore-able events for SUCCEEDED state
      .addTransition(TaskAttemptState.SUCCEEDED,
          TaskAttemptState.SUCCEEDED,
-         EnumSet.of(TaskAttemptEventType.TA_KILL,
-             TaskAttemptEventType.TA_FAILMSG,
+         EnumSet.of(TaskAttemptEventType.TA_FAILMSG,
              TaskAttemptEventType.TA_CONTAINER_CLEANED,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED))
 
@@ -818,6 +822,16 @@ public abstract class TaskAttemptImpl implements
     }
   }
 
+  @Override 
+  public NodeId getNodeId() {
+    readLock.lock();
+    try {
+      return containerNodeId;
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
   /**If container Assigned then return the node's address, otherwise null.
    */
   @Override
@@ -999,7 +1013,7 @@ public abstract class TaskAttemptImpl implements
   }
 
   private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
-      TaskAttemptImpl taskAttempt) {
+      TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) {
     TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
     JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
     
@@ -1007,16 +1021,22 @@ public abstract class TaskAttemptImpl implements
     
     if (taskType == TaskType.MAP) {
       jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1);
-      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
+      if(!taskAlreadyCompleted) {
+        // dont double count the elapsed time
+        jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
+      }
     } else {
       jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1);
-      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
+      if(!taskAlreadyCompleted) {
+        // dont double count the elapsed time
+        jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
+      }
     }
     return jce;
   }
   
   private static JobCounterUpdateEvent createJobCounterUpdateEventTAKilled(
-      TaskAttemptImpl taskAttempt) {
+      TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) {
     TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
     JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
     
@@ -1024,10 +1044,16 @@ public abstract class TaskAttemptImpl implements
     
     if (taskType == TaskType.MAP) {
       jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1);
-      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
+      if(!taskAlreadyCompleted) {
+        // dont double count the elapsed time
+        jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
+      }
     } else {
       jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1);
-      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
+      if(!taskAlreadyCompleted) {
+        // dont double count the elapsed time
+        jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
+      }
     }
     return jce;
   }  
@@ -1259,10 +1285,10 @@ public abstract class TaskAttemptImpl implements
                 finalState);
         if(finalState == TaskAttemptState.FAILED) {
           taskAttempt.eventHandler
-            .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
+            .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
         } else if(finalState == TaskAttemptState.KILLED) {
           taskAttempt.eventHandler
-          .handle(createJobCounterUpdateEventTAKilled(taskAttempt));
+          .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
         }
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
@@ -1394,7 +1420,7 @@ public abstract class TaskAttemptImpl implements
       
       if (taskAttempt.getLaunchTime() != 0) {
         taskAttempt.eventHandler
-            .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
+            .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
         TaskAttemptUnsuccessfulCompletionEvent tauce =
             createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
                 TaskAttemptState.FAILED);
@@ -1463,7 +1489,7 @@ public abstract class TaskAttemptImpl implements
       
       if (taskAttempt.getLaunchTime() != 0) {
         taskAttempt.eventHandler
-            .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
+            .handle(createJobCounterUpdateEventTAFailed(taskAttempt, true));
         TaskAttemptUnsuccessfulCompletionEvent tauce =
             createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
                 TaskAttemptState.FAILED);
@@ -1477,6 +1503,32 @@ public abstract class TaskAttemptImpl implements
           taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
     }
   }
+  
+  private static class KilledAfterSuccessTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt, 
+        TaskAttemptEvent event) {
+      TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event;
+      //add to diagnostic
+      taskAttempt.addDiagnosticInfo(msgEvent.getMessage());
+
+      // not setting a finish time since it was set on success
+      assert (taskAttempt.getFinishTime() != 0);
+
+      assert (taskAttempt.getLaunchTime() != 0);
+      taskAttempt.eventHandler
+          .handle(createJobCounterUpdateEventTAKilled(taskAttempt, true));
+      TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
+          taskAttempt, TaskAttemptState.KILLED);
+      taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId
+          .getTaskId().getJobId(), tauce));
+      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+          taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED));
+    }
+  }
 
   private static class KilledTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@@ -1489,7 +1541,7 @@ public abstract class TaskAttemptImpl implements
       taskAttempt.setFinishTime();
       if (taskAttempt.getLaunchTime() != 0) {
         taskAttempt.eventHandler
-            .handle(createJobCounterUpdateEventTAKilled(taskAttempt));
+            .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
         TaskAttemptUnsuccessfulCompletionEvent tauce =
             createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
                 TaskAttemptState.KILLED);

+ 56 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java

@@ -191,13 +191,14 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     .addTransition(TaskState.SUCCEEDED, //only possible for map tasks
         EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED),
         TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
+    .addTransition(TaskState.SUCCEEDED, //only possible for map tasks
+        EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED),
+        TaskEventType.T_ATTEMPT_KILLED, new MapRetroactiveKilledTransition())
     // Ignore-able transitions.
     .addTransition(
         TaskState.SUCCEEDED, TaskState.SUCCEEDED,
-        EnumSet.of(TaskEventType.T_KILL,
-            TaskEventType.T_ADD_SPEC_ATTEMPT,
-            TaskEventType.T_ATTEMPT_LAUNCHED,
-            TaskEventType.T_ATTEMPT_KILLED))
+        EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
+            TaskEventType.T_ATTEMPT_LAUNCHED))
 
     // Transitions from FAILED state        
     .addTransition(TaskState.FAILED, TaskState.FAILED,
@@ -629,7 +630,6 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
   // always called inside a transition, in turn inside the Write Lock
   private void handleTaskAttemptCompletion(TaskAttemptId attemptId,
       TaskAttemptCompletionEventStatus status) {
-    finishedAttempts++;
     TaskAttempt attempt = attempts.get(attemptId);
     //raise the completion event only if the container is assigned
     // to nextAttemptNumber
@@ -681,6 +681,11 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
         taId == null ? null : TypeConverter.fromYarn(taId));
     return taskFailedEvent;
   }
+  
+  private static void unSucceed(TaskImpl task) {
+    task.commitAttempt = null;
+    task.successfulAttempt = null;
+  }
 
   /**
   * @return a String representation of the splits.
@@ -755,6 +760,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       task.handleTaskAttemptCompletion(
           ((TaskTAttemptEvent) event).getTaskAttemptID(), 
           TaskAttemptCompletionEventStatus.SUCCEEDED);
+      task.finishedAttempts++;
       --task.numberUncompletedAttempts;
       task.successfulAttempt = ((TaskTAttemptEvent) event).getTaskAttemptID();
       task.eventHandler.handle(new JobTaskEvent(
@@ -790,6 +796,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       task.handleTaskAttemptCompletion(
           ((TaskTAttemptEvent) event).getTaskAttemptID(), 
           TaskAttemptCompletionEventStatus.KILLED);
+      task.finishedAttempts++;
       --task.numberUncompletedAttempts;
       if (task.successfulAttempt == null) {
         task.addAndScheduleAttempt();
@@ -808,6 +815,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       task.handleTaskAttemptCompletion(
           ((TaskTAttemptEvent) event).getTaskAttemptID(), 
           TaskAttemptCompletionEventStatus.KILLED);
+      task.finishedAttempts++;
       // check whether all attempts are finished
       if (task.finishedAttempts == task.attempts.size()) {
         if (task.historyTaskStartGenerated) {
@@ -845,6 +853,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
             attempt.getAssignedContainerMgrAddress()));
       }
       
+      task.finishedAttempts++;
       if (task.failedAttempts < task.maxAttempts) {
         task.handleTaskAttemptCompletion(
             ((TaskTAttemptEvent) event).getTaskAttemptID(), 
@@ -880,12 +889,6 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     protected TaskState getDefaultState(Task task) {
       return task.getState();
     }
-
-    protected void unSucceed(TaskImpl task) {
-      ++task.numberUncompletedAttempts;
-      task.commitAttempt = null;
-      task.successfulAttempt = null;
-    }
   }
 
   private static class MapRetroactiveFailureTransition
@@ -908,6 +911,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       //  fails, we have to let AttemptFailedTransition.transition
       //  believe that there's no redundancy.
       unSucceed(task);
+      // fake increase in Uncomplete attempts for super.transition
+      ++task.numberUncompletedAttempts;
       return super.transition(task, event);
     }
 
@@ -917,6 +922,45 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
+  private static class MapRetroactiveKilledTransition implements
+    MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
+
+    @Override
+    public TaskState transition(TaskImpl task, TaskEvent event) {
+      // verify that this occurs only for map task
+      // TODO: consider moving it to MapTaskImpl
+      if (!TaskType.MAP.equals(task.getType())) {
+        LOG.error("Unexpected event for REDUCE task " + event.getType());
+        task.internalError(event.getType());
+      }
+
+      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
+      TaskAttemptId attemptId = attemptEvent.getTaskAttemptID();
+      if(task.successfulAttempt == attemptId) {
+        // successful attempt is now killed. reschedule
+        // tell the job about the rescheduling
+        unSucceed(task);
+        task.handleTaskAttemptCompletion(
+            attemptId, 
+            TaskAttemptCompletionEventStatus.KILLED);
+        task.eventHandler.handle(new JobMapTaskRescheduledEvent(task.taskId));
+        // typically we are here because this map task was run on a bad node and 
+        // we want to reschedule it on a different node.
+        // Depending on whether there are previous failed attempts or not this 
+        // can SCHEDULE or RESCHEDULE the container allocate request. If this
+        // SCHEDULE's then the dataLocal hosts of this taskAttempt will be used
+        // from the map splitInfo. So the bad node might be sent as a location 
+        // to the RM. But the RM would ignore that just like it would ignore 
+        // currently pending container requests affinitized to bad nodes.
+        task.addAndScheduleAttempt();
+        return TaskState.SCHEDULED;
+      } else {
+        // nothing to do
+        return TaskState.SUCCEEDED;
+      }
+    }
+  }
+
   private static class KillNewTransition 
     implements SingleArcTransition<TaskImpl, TaskEvent> {
     @Override
@@ -966,6 +1010,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     public void transition(TaskImpl task, TaskEvent event) {
       task.metrics.launchedTask(task);
       task.metrics.runningTask(task);
+      
     }
   }
 }

+ 74 - 18
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -46,19 +47,27 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.RackResolver;
@@ -583,7 +592,9 @@ public class RMContainerAllocator extends RMContainerRequestor
 
     //Called on each allocation. Will know about newly blacklisted/added hosts.
     computeIgnoreBlacklisting();
-    
+
+    handleUpdatedNodes(response);
+
     for (ContainerStatus cont : finishedContainers) {
       LOG.info("Received completed container " + cont.getContainerId());
       TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());
@@ -600,10 +611,48 @@ public class RMContainerAllocator extends RMContainerRequestor
         String diagnostics = cont.getDiagnostics();
         eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
             diagnostics));
-      }
+      }      
     }
     return newContainers;
   }
+  
+  @SuppressWarnings("unchecked")
+  private void handleUpdatedNodes(AMResponse response) {
+    // send event to the job about on updated nodes
+    List<NodeReport> updatedNodes = response.getUpdatedNodes();
+    if (!updatedNodes.isEmpty()) {
+
+      // send event to the job to act upon completed tasks
+      eventHandler.handle(new JobUpdatedNodesEvent(getJob().getID(),
+          updatedNodes));
+
+      // act upon running tasks
+      HashSet<NodeId> unusableNodes = new HashSet<NodeId>();
+      for (NodeReport nr : updatedNodes) {
+        NodeState nodeState = nr.getNodeState();
+        if (nodeState.isUnusable()) {
+          unusableNodes.add(nr.getNodeId());
+        }
+      }
+      for (int i = 0; i < 2; ++i) {
+        HashMap<TaskAttemptId, Container> taskSet = i == 0 ? assignedRequests.maps
+            : assignedRequests.reduces;
+        // kill running containers
+        for (Map.Entry<TaskAttemptId, Container> entry : taskSet.entrySet()) {
+          TaskAttemptId tid = entry.getKey();
+          NodeId taskAttemptNodeId = entry.getValue().getNodeId();
+          if (unusableNodes.contains(taskAttemptNodeId)) {
+            LOG.info("Killing taskAttempt:" + tid
+                + " because it is running on unusable node:"
+                + taskAttemptNodeId);
+            eventHandler.handle(new TaskAttemptKillEvent(tid,
+                "TaskAttempt killed because it ran on unusable node"
+                    + taskAttemptNodeId));
+          }
+        }
+      }
+    }
+  }
 
   @Private
   public int getMemLimit() {
@@ -743,7 +792,6 @@ public class RMContainerAllocator extends RMContainerRequestor
         boolean blackListed = false;         
         ContainerRequest assigned = null;
         
-        ContainerId allocatedContainerId = allocated.getId();
         if (isAssignable) {
           // do not assign if allocated container is on a  
           // blacklisted host
@@ -790,7 +838,7 @@ public class RMContainerAllocator extends RMContainerRequestor
               eventHandler.handle(new TaskAttemptContainerAssignedEvent(
                   assigned.attemptID, allocated, applicationACLs));
 
-              assignedRequests.add(allocatedContainerId, assigned.attemptID);
+              assignedRequests.add(allocated, assigned.attemptID);
 
               if (LOG.isDebugEnabled()) {
                 LOG.info("Assigned container (" + allocated + ") "
@@ -811,7 +859,7 @@ public class RMContainerAllocator extends RMContainerRequestor
         // or if we could not assign it 
         if (blackListed || assigned == null) {
           containersReleased++;
-          release(allocatedContainerId);
+          release(allocated.getId());
         }
       }
     }
@@ -974,20 +1022,20 @@ public class RMContainerAllocator extends RMContainerRequestor
   private class AssignedRequests {
     private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
       new HashMap<ContainerId, TaskAttemptId>();
-    private final LinkedHashMap<TaskAttemptId, ContainerId> maps = 
-      new LinkedHashMap<TaskAttemptId, ContainerId>();
-    private final LinkedHashMap<TaskAttemptId, ContainerId> reduces = 
-      new LinkedHashMap<TaskAttemptId, ContainerId>();
+    private final LinkedHashMap<TaskAttemptId, Container> maps = 
+      new LinkedHashMap<TaskAttemptId, Container>();
+    private final LinkedHashMap<TaskAttemptId, Container> reduces = 
+      new LinkedHashMap<TaskAttemptId, Container>();
     private final Set<TaskAttemptId> preemptionWaitingReduces = 
       new HashSet<TaskAttemptId>();
     
-    void add(ContainerId containerId, TaskAttemptId tId) {
-      LOG.info("Assigned container " + containerId.toString() + " to " + tId);
-      containerToAttemptMap.put(containerId, tId);
+    void add(Container container, TaskAttemptId tId) {
+      LOG.info("Assigned container " + container.getId().toString() + " to " + tId);
+      containerToAttemptMap.put(container.getId(), tId);
       if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
-        maps.put(tId, containerId);
+        maps.put(tId, container);
       } else {
-        reduces.put(tId, containerId);
+        reduces.put(tId, container);
       }
     }
 
@@ -1017,9 +1065,9 @@ public class RMContainerAllocator extends RMContainerRequestor
     boolean remove(TaskAttemptId tId) {
       ContainerId containerId = null;
       if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
-        containerId = maps.remove(tId);
+        containerId = maps.remove(tId).getId();
       } else {
-        containerId = reduces.remove(tId);
+        containerId = reduces.remove(tId).getId();
         if (containerId != null) {
           boolean preempted = preemptionWaitingReduces.remove(tId);
           if (preempted) {
@@ -1038,12 +1086,20 @@ public class RMContainerAllocator extends RMContainerRequestor
     TaskAttemptId get(ContainerId cId) {
       return containerToAttemptMap.get(cId);
     }
+    
+    NodeId getNodeId(TaskAttemptId tId) {
+      if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
+        return maps.get(tId).getNodeId();
+      } else {
+        return reduces.get(tId).getNodeId();
+      }
+    }
 
     ContainerId get(TaskAttemptId tId) {
       if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
-        return maps.get(tId);
+        return maps.get(tId).getId();
       } else {
-        return reduces.get(tId);
+        return reduces.get(tId).getId();
       }
     }
   }

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java

@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -231,6 +232,11 @@ public class MockJobs extends MockApps {
     final List<String> diags = Lists.newArrayList();
     diags.add(DIAGS.next());
     return new TaskAttempt() {
+      @Override
+      public NodeId getNodeId() throws UnsupportedOperationException{
+        throw new UnsupportedOperationException();
+      }
+      
       @Override
       public TaskAttemptId getID() {
         return taid;

+ 178 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java

@@ -22,6 +22,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 
 import junit.framework.Assert;
@@ -29,17 +30,26 @@ import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.junit.Test;
 
 /**
@@ -160,6 +170,159 @@ public class TestMRApp {
     
     app.waitForState(job, JobState.SUCCEEDED);
   }
+  
+  /**
+   * The test verifies that the AM re-runs maps that have run on bad nodes. It
+   * also verifies that the AM records all success/killed events so that reduces
+   * are notified about map output status changes. It also verifies that the
+   * re-run information is preserved across AM restart
+   */
+  @Test
+  public void testUpdatedNodes() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
+        true, ++runCount);
+    Configuration conf = new Configuration();
+    // after half of the map completion, reduce will start
+    conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f);
+    // uberization forces full slowstart (1.0), so disable that
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("Num tasks not correct", 3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator()
+        .next();
+    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator()
+        .next();
+    NodeId node1 = task1Attempt.getNodeId();
+    NodeId node2 = task2Attempt.getNodeId();
+    Assert.assertEquals(node1, node2);
+
+    // send the done signal to the task
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task1Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task2Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+
+    // all maps must be succeeded
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    TaskAttemptCompletionEvent[] events = job.getTaskAttemptCompletionEvents(0,
+        100);
+    Assert.assertEquals("Expecting 2 completion events for success", 2,
+        events.length);
+
+    // send updated nodes info
+    ArrayList<NodeReport> updatedNodes = new ArrayList<NodeReport>();
+    NodeReport nr = RecordFactoryProvider.getRecordFactory(null)
+        .newRecordInstance(NodeReport.class);
+    nr.setNodeId(node1);
+    nr.setNodeState(NodeState.UNHEALTHY);
+    updatedNodes.add(nr);
+    app.getContext().getEventHandler()
+        .handle(new JobUpdatedNodesEvent(job.getID(), updatedNodes));
+
+    app.waitForState(task1Attempt, TaskAttemptState.KILLED);
+    app.waitForState(task2Attempt, TaskAttemptState.KILLED);
+
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Expecting 2 more completion events for killed", 4,
+        events.length);
+
+    // all maps must be back to running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    Iterator<TaskAttempt> itr = mapTask1.getAttempts().values().iterator();
+    itr.next();
+    task1Attempt = itr.next();
+
+    // send the done signal to the task
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task1Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+
+    // map1 must be succeeded. map2 must be running
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Expecting 1 more completion events for success", 5,
+        events.length);
+
+    // Crash the app again.
+    app.stop();
+
+    // rerun
+    // in rerun the 1st map will be recovered from previous run
+    app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+        ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    Task reduceTask = it.next();
+
+    // map 1 will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals(
+        "Expecting 2 completion events for killed & success of map1", 2,
+        events.length);
+
+    task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task2Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Expecting 1 more completion events for success", 3,
+        events.length);
+
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    TaskAttempt task3Attempt = reduceTask.getAttempts().values().iterator()
+        .next();
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task3Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+    app.waitForState(reduceTask, TaskState.SUCCEEDED);
+
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Expecting 1 more completion events for success", 4,
+        events.length);
+
+    // job succeeds
+    app.waitForState(job, JobState.SUCCEEDED);
+  }
 
   @Test
   public void testJobError() throws Exception {
@@ -194,10 +357,6 @@ public class TestMRApp {
       ((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob);
       return spiedJob;
     }
-
-    JobImpl getSpiedJob() {
-      return this.spiedJob;
-    }
   }
 
   @Test
@@ -232,6 +391,21 @@ public class TestMRApp {
       TypeConverter.fromYarn(state);
     }
   }
+  
+  private final class MRAppWithHistory extends MRApp {
+    public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart, int startCount) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
+    }
+
+    @Override
+    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+        AppContext context) {
+      JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context, 
+          getStartCount());
+      return eventHandler;
+    }
+  }
 
   public static void main(String[] args) throws Exception {
     TestMRApp t = new TestMRApp();

+ 100 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

@@ -46,9 +46,11 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
@@ -594,6 +596,88 @@ public class TestRMContainerAllocator {
     Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
   }
+  
+  @Test
+  public void testUpdatedNodes() throws Exception {
+    Configuration conf = new Configuration();
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+    
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+
+    // add resources to scheduler
+    MockNM nm1 = rm.registerNode("h1:1234", 10240);
+    MockNM nm2 = rm.registerNode("h2:1234", 10240);
+    dispatcher.await();
+
+    // create the map container request
+    ContainerRequestEvent event = createReq(jobId, 1, 1024,
+        new String[] { "h1" });
+    allocator.sendRequest(event);
+    TaskAttemptId attemptId = event.getAttemptID();
+    
+    TaskAttempt mockTaskAttempt = mock(TaskAttempt.class);
+    when(mockTaskAttempt.getNodeId()).thenReturn(nm1.getNodeId());
+    Task mockTask = mock(Task.class);
+    when(mockTask.getAttempt(attemptId)).thenReturn(mockTaskAttempt);
+    when(mockJob.getTask(attemptId.getTaskId())).thenReturn(mockTask);
+
+    // this tells the scheduler about the requests
+    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+    dispatcher.await();
+
+    nm1.nodeHeartbeat(true);
+    dispatcher.await();
+    // get the assignment
+    assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(1, assigned.size());
+    Assert.assertEquals(nm1.getNodeId(), assigned.get(0).getContainer().getNodeId());
+    // no updated nodes reported
+    Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty());
+    Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty());
+    
+    // mark nodes bad
+    nm1.nodeHeartbeat(false);
+    nm2.nodeHeartbeat(false);
+    dispatcher.await();
+    
+    // schedule response returns updated nodes
+    assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(0, assigned.size());
+    // updated nodes are reported
+    Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size());
+    Assert.assertEquals(1, allocator.getTaskAttemptKillEvents().size());
+    Assert.assertEquals(2, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size());
+    Assert.assertEquals(attemptId, allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID());
+    allocator.getJobUpdatedNodeEvents().clear();
+    allocator.getTaskAttemptKillEvents().clear();
+    
+    assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(0, assigned.size());
+    // no updated nodes reported
+    Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty());
+    Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty());
+  }
 
   @Test
   public void testBlackListedNodes() throws Exception {
@@ -1100,7 +1184,10 @@ public class TestRMContainerAllocator {
   private static class MyContainerAllocator extends RMContainerAllocator {
     static final List<TaskAttemptContainerAssignedEvent> events
       = new ArrayList<TaskAttemptContainerAssignedEvent>();
-
+    static final List<TaskAttemptKillEvent> taskAttemptKillEvents 
+      = new ArrayList<TaskAttemptKillEvent>();
+    static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents 
+    = new ArrayList<JobUpdatedNodesEvent>();
     private MyResourceManager rm;
 
     private static AppContext createAppContext(
@@ -1119,6 +1206,10 @@ public class TestRMContainerAllocator {
           // Only capture interesting events.
           if (event instanceof TaskAttemptContainerAssignedEvent) {
             events.add((TaskAttemptContainerAssignedEvent) event);
+          } else if (event instanceof TaskAttemptKillEvent) {
+            taskAttemptKillEvents.add((TaskAttemptKillEvent)event);
+          } else if (event instanceof JobUpdatedNodesEvent) {
+            jobUpdatedNodeEvents.add((JobUpdatedNodesEvent)event);
           }
         }
       });
@@ -1202,6 +1293,14 @@ public class TestRMContainerAllocator {
       events.clear();
       return result;
     }
+    
+    List<TaskAttemptKillEvent> getTaskAttemptKillEvents() {
+      return taskAttemptKillEvents;
+    }
+    
+    List<JobUpdatedNodesEvent> getJobUpdatedNodeEvents() {
+      return jobUpdatedNodeEvents;
+    }
 
     @Override
     protected void startAllocatorThread() {

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java

@@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -528,6 +529,11 @@ public class TestRuntimeEstimators {
       dispatcher.getEventHandler().handle(event);
     }
 
+    @Override
+    public NodeId getNodeId() throws UnsupportedOperationException{
+      throw new UnsupportedOperationException();
+    }
+    
     @Override
     public TaskAttemptId getID() {
       return myAttemptID;

+ 5 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java

@@ -282,9 +282,12 @@ public class JobHistoryParser {
       if(attemptInfo.getAttemptId().equals(taskInfo.getSuccessfulAttemptId()))
       {
         // the failed attempt is the one that made this task successful
-        // so its no longer successful
+        // so its no longer successful. Reset fields set in
+        // handleTaskFinishedEvent()
+        taskInfo.counters = null;
+        taskInfo.finishTime = -1;
         taskInfo.status = null;
-        // not resetting the other fields set in handleTaskFinishedEvent()
+        taskInfo.successfulAttemptId = null;
       }
     }
   }

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.util.Records;
 
 public class CompletedTaskAttempt implements TaskAttempt {
@@ -57,6 +58,11 @@ public class CompletedTaskAttempt implements TaskAttempt {
     }
   }
 
+  @Override
+  public NodeId getNodeId() throws UnsupportedOperationException{
+    throw new UnsupportedOperationException();
+  }
+  
   @Override
   public ContainerId getAssignedContainerID() {
     return attemptInfo.getContainerId();

+ 6 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java

@@ -38,5 +38,9 @@ public enum NodeState {
   LOST, 
   
   /** Node has rebooted */
-  REBOOTED
-}
+  REBOOTED;
+  
+  public boolean isUnusable() {
+    return (this == UNHEALTHY || this == DECOMMISSIONED || this == LOST);
+  }
+}