Procházet zdrojové kódy

MAPREDUCE-6513. MR job got hanged forever when one NM unstable for some time. (Varun Saxena via wangda)

Wangda Tan před 9 roky
rodič
revize
8b2880c0b6
9 změnil soubory, kde provedl 311 přidání a 38 odebrání
  1. 14 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java
  2. 40 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptKilledEvent.java
  3. 3 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
  4. 37 11
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  5. 22 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
  6. 3 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
  7. 48 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
  8. 82 5
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
  9. 62 13
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

+ 14 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java

@@ -24,14 +24,27 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 public class TaskAttemptKillEvent extends TaskAttemptEvent {
 
   private final String message;
+  // Next map attempt will be rescheduled(i.e. updated in ask with higher
+  // priority equivalent to that of a fast fail map)
+  private final boolean rescheduleAttempt;
 
   public TaskAttemptKillEvent(TaskAttemptId attemptID,
-      String message) {
+      String message, boolean rescheduleAttempt) {
     super(attemptID, TaskAttemptEventType.TA_KILL);
     this.message = message;
+    this.rescheduleAttempt = rescheduleAttempt;
+  }
+
+  public TaskAttemptKillEvent(TaskAttemptId attemptID,
+      String message) {
+    this(attemptID, message, false);
   }
 
   public String getMessage() {
     return message;
   }
+
+  public boolean getRescheduleAttempt() {
+    return rescheduleAttempt;
+  }
 }

+ 40 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptKilledEvent.java

@@ -0,0 +1,40 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+/**
+ * Task Attempt killed event.
+ */
+public class TaskTAttemptKilledEvent extends TaskTAttemptEvent {
+
+  // Next map attempt will be rescheduled(i.e. updated in ask with
+  // higher priority equivalent to that of a fast fail map)
+  private final boolean rescheduleAttempt;
+
+  public TaskTAttemptKilledEvent(TaskAttemptId id, boolean rescheduleAttempt) {
+    super(id, TaskEventType.T_ATTEMPT_KILLED);
+    this.rescheduleAttempt = rescheduleAttempt;
+  }
+
+  public boolean getRescheduleAttempt() {
+    return rescheduleAttempt;
+  }
+}

+ 3 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -1349,7 +1349,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           if (TaskType.MAP == id.getTaskId().getTaskType()) {
             // reschedule only map tasks because their outputs maybe unusable
             LOG.info(mesg + ". AttemptId:" + id);
-            eventHandler.handle(new TaskAttemptKillEvent(id, mesg));
+            // Kill the attempt and indicate that next map attempt should be
+            // rescheduled (i.e. considered as a fast fail map).
+            eventHandler.handle(new TaskAttemptKillEvent(id, mesg, true));
           }
         }
       }

+ 37 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -98,6 +98,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
@@ -184,6 +185,7 @@ public abstract class TaskAttemptImpl implements
   private int httpPort;
   private Locality locality;
   private Avataar avataar;
+  private boolean rescheduleNextAttempt = false;
 
   private static final CleanupContainerTransition
       CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition();
@@ -1377,6 +1379,16 @@ public abstract class TaskAttemptImpl implements
     return container != null;
   }
 
+  //always called in write lock
+  private boolean getRescheduleNextAttempt() {
+    return rescheduleNextAttempt;
+  }
+
+  //always called in write lock
+  private void setRescheduleNextAttempt(boolean reschedule) {
+    rescheduleNextAttempt = reschedule;
+  }
+
   //always called in write lock
   private void setFinishTime() {
     //set the finish time only if launch time is set
@@ -1745,9 +1757,8 @@ public abstract class TaskAttemptImpl implements
               TaskEventType.T_ATTEMPT_FAILED));
           break;
         case KILLED:
-          taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
-              taskAttempt.attemptId,
-              TaskEventType.T_ATTEMPT_KILLED));
+          taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
+              taskAttempt.attemptId, false));
           break;
         default:
           LOG.error("Task final state is not FAILED or KILLED: " + finalState);
@@ -2014,8 +2025,13 @@ public abstract class TaskAttemptImpl implements
           taskAttempt, TaskAttemptStateInternal.KILLED);
       taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId
           .getTaskId().getJobId(), tauce));
-      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
-          taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED));
+      boolean rescheduleNextTaskAttempt = false;
+      if (event instanceof TaskAttemptKillEvent) {
+        rescheduleNextTaskAttempt =
+            ((TaskAttemptKillEvent)event).getRescheduleAttempt();
+      }
+      taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
+          taskAttempt.attemptId, rescheduleNextTaskAttempt));
       return TaskAttemptStateInternal.KILLED;
     }
   }
@@ -2044,6 +2060,12 @@ public abstract class TaskAttemptImpl implements
             taskAttempt.getID().toString());
         return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP;
       } else {
+        // Store reschedule flag so that after clean up is completed, new
+        // attempt is scheduled/rescheduled based on it.
+        if (event instanceof TaskAttemptKillEvent) {
+          taskAttempt.setRescheduleNextAttempt(
+              ((TaskAttemptKillEvent)event).getRescheduleAttempt());
+        }
         return TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP;
       }
     }
@@ -2075,9 +2097,8 @@ public abstract class TaskAttemptImpl implements
             ((TaskAttemptKillEvent) event).getMessage());
       }
 
-      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
-          taskAttempt.attemptId,
-          TaskEventType.T_ATTEMPT_KILLED));
+      taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
+          taskAttempt.attemptId, taskAttempt.getRescheduleNextAttempt()));
     }
   }
 
@@ -2095,9 +2116,8 @@ public abstract class TaskAttemptImpl implements
           taskAttempt.getAssignedContainerID(), taskAttempt.getAssignedContainerMgrAddress(),
           taskAttempt.container.getContainerToken(),
           ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
-      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
-          taskAttempt.attemptId,
-          TaskEventType.T_ATTEMPT_KILLED));
+      taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
+          taskAttempt.attemptId, false));
 
     }
   }
@@ -2137,6 +2157,12 @@ public abstract class TaskAttemptImpl implements
       // for it.
       finalizeProgress(taskAttempt);
       sendContainerCleanup(taskAttempt, event);
+      // Store reschedule flag so that after clean up is completed, new
+      // attempt is scheduled/rescheduled based on it.
+      if (event instanceof TaskAttemptKillEvent) {
+        taskAttempt.setRescheduleNextAttempt(
+            ((TaskAttemptKillEvent)event).getRescheduleAttempt());
+      }
     }
   }
 

+ 22 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java

@@ -76,6 +76,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@@ -594,10 +595,15 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   // This is always called in the Write Lock
   private void addAndScheduleAttempt(Avataar avataar) {
+    addAndScheduleAttempt(avataar, false);
+  }
+
+  // This is always called in the Write Lock
+  private void addAndScheduleAttempt(Avataar avataar, boolean reschedule) {
     TaskAttempt attempt = addAttempt(avataar);
     inProgressAttempts.add(attempt.getID());
     //schedule the nextAttemptNumber
-    if (failedAttempts.size() > 0) {
+    if (failedAttempts.size() > 0 || reschedule) {
       eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
           TaskAttemptEventType.TA_RESCHEDULE));
     } else {
@@ -968,7 +974,12 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       task.finishedAttempts.add(taskAttemptId);
       task.inProgressAttempts.remove(taskAttemptId);
       if (task.successfulAttempt == null) {
-        task.addAndScheduleAttempt(Avataar.VIRGIN);
+        boolean rescheduleNewAttempt = false;
+        if (event instanceof TaskTAttemptKilledEvent) {
+          rescheduleNewAttempt =
+              ((TaskTAttemptKilledEvent)event).getRescheduleAttempt();
+        }
+        task.addAndScheduleAttempt(Avataar.VIRGIN, rescheduleNewAttempt);
       }
       if ((task.commitAttempt != null) && (task.commitAttempt == taskAttemptId)) {
     	task.commitAttempt = null;
@@ -1187,7 +1198,15 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       // from the map splitInfo. So the bad node might be sent as a location
       // to the RM. But the RM would ignore that just like it would ignore
       // currently pending container requests affinitized to bad nodes.
-      task.addAndScheduleAttempt(Avataar.VIRGIN);
+      boolean rescheduleNextTaskAttempt = false;
+      if (event instanceof TaskTAttemptKilledEvent) {
+        // Decide whether to reschedule next task attempt. If true, this
+        // typically indicates that a successful map attempt was killed on an
+        // unusable node being reported.
+        rescheduleNextTaskAttempt =
+            ((TaskTAttemptKilledEvent)event).getRescheduleAttempt();
+      }
+      task.addAndScheduleAttempt(Avataar.VIRGIN, rescheduleNextTaskAttempt);
       return TaskStateInternal.SCHEDULED;
     }
   }

+ 3 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -925,9 +925,11 @@ public class RMContainerAllocator extends RMContainerRequestor
             LOG.info("Killing taskAttempt:" + tid
                 + " because it is running on unusable node:"
                 + taskAttemptNodeId);
+            // If map, reschedule next task attempt.
+            boolean rescheduleNextAttempt = (i == 0) ? true : false;
             eventHandler.handle(new TaskAttemptKillEvent(tid,
                 "TaskAttempt killed because it ran on unusable node"
-                    + taskAttemptNodeId));
+                    + taskAttemptNodeId, rescheduleNextAttempt));
           }
         }
       }

+ 48 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java

@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Supplier;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -56,13 +57,19 @@ import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
  * Tests the state machine of MR App.
@@ -201,13 +208,18 @@ public class TestMRApp {
   @Test
   public void testUpdatedNodes() throws Exception {
     int runCount = 0;
+    Dispatcher disp = Mockito.spy(new AsyncDispatcher());
     MRApp app = new MRAppWithHistory(2, 2, false, this.getClass().getName(),
-        true, ++runCount);
+        true, ++runCount, disp);
     Configuration conf = new Configuration();
     // after half of the map completion, reduce will start
     conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f);
     // uberization forces full slowstart (1.0), so disable that
     conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+
+    ContainerAllocEventHandler handler = new ContainerAllocEventHandler();
+    disp.register(ContainerAllocator.EventType.class, handler);
+
     final Job job1 = app.submit(conf);
     app.waitForState(job1, JobState.RUNNING);
     Assert.assertEquals("Num tasks not correct", 4, job1.getTasks().size());
@@ -285,6 +297,12 @@ public class TestMRApp {
     events = job1.getTaskAttemptCompletionEvents(0, 100);
     Assert.assertEquals("Expecting 2 more completion events for killed", 4,
         events.length);
+    // 2 map task attempts which were killed above should be requested from
+    // container allocator with the previous map task marked as failed. If
+    // this happens allocator will request the container for this mapper from
+    // RM at a higher priority of 5(i.e. with a priority equivalent to that of
+    // a fail fast map).
+    handler.waitForFailedMapContainerReqEvents(2);
 
     // all maps must be back to running
     app.waitForState(mapTask1, TaskState.RUNNING);
@@ -324,7 +342,7 @@ public class TestMRApp {
     // rerun
     // in rerun the 1st map will be recovered from previous run
     app = new MRAppWithHistory(2, 2, false, this.getClass().getName(), false,
-        ++runCount);
+        ++runCount, (Dispatcher)new AsyncDispatcher());
     conf = new Configuration();
     conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
     conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
@@ -420,6 +438,25 @@ public class TestMRApp {
     app.waitForState(job2, JobState.SUCCEEDED);
   }
 
+  private final class ContainerAllocEventHandler
+      implements EventHandler<ContainerAllocatorEvent> {
+    private AtomicInteger failedMapContainerReqEventCnt = new AtomicInteger(0);
+    @Override
+    public void handle(ContainerAllocatorEvent event) {
+      if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ &&
+          ((ContainerRequestEvent)event).getEarlierAttemptFailed()) {
+        failedMapContainerReqEventCnt.incrementAndGet();
+      }
+    }
+    public void waitForFailedMapContainerReqEvents(int count)
+        throws InterruptedException {
+      while(failedMapContainerReqEventCnt.get() != count) {
+        Thread.sleep(50);
+      }
+      failedMapContainerReqEventCnt.set(0);
+    }
+  }
+
   private static void waitFor(Supplier<Boolean> predicate, int
       checkIntervalMillis, int checkTotalMillis) throws InterruptedException {
     try {
@@ -590,9 +627,17 @@ public class TestMRApp {
   }
 
   private final class MRAppWithHistory extends MRApp {
+    private Dispatcher dispatcher;
     public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
-        String testName, boolean cleanOnStart, int startCount) {
+        String testName, boolean cleanOnStart, int startCount,
+        Dispatcher disp) {
       super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
+      this.dispatcher = disp;
+    }
+
+    @Override
+    protected Dispatcher createDispatcher() {
+      return dispatcher;
     }
 
     @Override

+ 82 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java

@@ -78,9 +78,13 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.Credentials;
@@ -982,7 +986,46 @@ public class TestTaskAttempt{
         + " Task attempt finish time is not the same ",
         finishTime, Long.valueOf(taImpl.getFinishTime()));
   }
-  
+
+  private void containerKillBeforeAssignment(boolean scheduleAttempt)
+      throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+
+    TaskAttemptImpl taImpl =
+        new MapTaskAttemptImpl(taskId, 1, eventHandler, mock(Path.class), 1,
+            mock(TaskSplitMetaInfo.class), new JobConf(),
+            mock(TaskAttemptListener.class), mock(Token.class),
+            new Credentials(), SystemClock.getInstance(),
+            mock(AppContext.class));
+    if (scheduleAttempt) {
+      taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+          TaskAttemptEventType.TA_SCHEDULE));
+    }
+    taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true));
+    assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
+        TaskAttemptState.KILLED);
+    assertEquals("Task attempt's internal state is not KILLED",
+        taImpl.getInternalState(), TaskAttemptStateInternal.KILLED);
+    assertFalse("InternalError occurred", eventHandler.internalError);
+    TaskEvent event = eventHandler.lastTaskEvent;
+    assertEquals(TaskEventType.T_ATTEMPT_KILLED, event.getType());
+    // In NEW state, new map attempt should not be rescheduled.
+    assertFalse(((TaskTAttemptKilledEvent)event).getRescheduleAttempt());
+  }
+
+  @Test
+  public void testContainerKillOnNew() throws Exception {
+    containerKillBeforeAssignment(false);
+  }
+
+  @Test
+  public void testContainerKillOnUnassigned() throws Exception {
+    containerKillBeforeAssignment(true);
+  }
+
   @Test
   public void testContainerKillAfterAssigned() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
@@ -1032,7 +1075,7 @@ public class TestTaskAttempt{
         taImpl.getInternalState(), TaskAttemptStateInternal.ASSIGNED);
     taImpl.handle(new TaskAttemptEvent(attemptId,
         TaskAttemptEventType.TA_KILL));
-    assertEquals("Task should be in KILLED state",
+    assertEquals("Task should be in KILL_CONTAINER_CLEANUP state",
         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
         taImpl.getInternalState());
   }
@@ -1089,7 +1132,7 @@ public class TestTaskAttempt{
         TaskAttemptEventType.TA_KILL));
     assertFalse("InternalError occurred trying to handle TA_KILL",
         eventHandler.internalError);
-    assertEquals("Task should be in KILLED state",
+    assertEquals("Task should be in KILL_CONTAINER_CLEANUP state",
         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
         taImpl.getInternalState());
   }
@@ -1150,12 +1193,11 @@ public class TestTaskAttempt{
         TaskAttemptEventType.TA_KILL));
     assertFalse("InternalError occurred trying to handle TA_KILL",
         eventHandler.internalError);
-    assertEquals("Task should be in KILLED state",
+    assertEquals("Task should be in KILL_CONTAINER_CLEANUP state",
         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
         taImpl.getInternalState());
   }
 
-
   @Test
   public void testKillMapTaskWhileSuccessFinishing() throws Exception {
     MockEventHandler eventHandler = new MockEventHandler();
@@ -1195,6 +1237,37 @@ public class TestTaskAttempt{
     assertFalse("InternalError occurred", eventHandler.internalError);
   }
 
+  @Test
+  public void testKillMapTaskAfterSuccess() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt is not in SUCCEEDED state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+    assertEquals("Task attempt's internal state is not " +
+        "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
+        TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    // Send a map task attempt kill event indicating next map attempt has to be
+    // reschedule
+    taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true));
+    assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
+        TaskAttemptState.KILLED);
+    assertEquals("Task attempt's internal state is not KILLED",
+        taImpl.getInternalState(), TaskAttemptStateInternal.KILLED);
+    assertFalse("InternalError occurred", eventHandler.internalError);
+    TaskEvent event = eventHandler.lastTaskEvent;
+    assertEquals(TaskEventType.T_ATTEMPT_KILLED, event.getType());
+    // Send an attempt killed event to TaskImpl forwarding the same reschedule
+    // flag we received in task attempt kill event.
+    assertTrue(((TaskTAttemptKilledEvent)event).getRescheduleAttempt());
+  }
+
   @Test
   public void testKillMapTaskWhileFailFinishing() throws Exception {
     MockEventHandler eventHandler = new MockEventHandler();
@@ -1406,9 +1479,13 @@ public class TestTaskAttempt{
 
   public static class MockEventHandler implements EventHandler {
     public boolean internalError;
+    public TaskEvent lastTaskEvent;
 
     @Override
     public void handle(Event event) {
+      if (event instanceof TaskEvent) {
+        lastTaskEvent = (TaskEvent)event;
+      }
       if (event instanceof JobEvent) {
         JobEvent je = ((JobEvent) event);
         if (JobEventType.INTERNAL_ERROR == je.getType()) {

+ 62 - 13
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

@@ -55,6 +55,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
@@ -92,7 +93,8 @@ public class TestTaskImpl {
   private int taskCounter = 0;
   private final int partition = 1;
   
-  private InlineDispatcher dispatcher;   
+  private InlineDispatcher dispatcher;
+  private MockTaskAttemptEventHandler taskAttemptEventHandler;
   private List<MockTaskAttemptImpl> taskAttempts;
   
   private class MockTaskImpl extends TaskImpl {
@@ -257,7 +259,10 @@ public class TestTaskImpl {
     taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
     when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations); 
     
-    taskAttempts = new ArrayList<MockTaskAttemptImpl>();    
+    taskAttempts = new ArrayList<MockTaskAttemptImpl>();
+
+    taskAttemptEventHandler = new MockTaskAttemptEventHandler();
+    dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler);
   }
   
   private MockTaskImpl createMockTask(TaskType taskType) {
@@ -294,8 +299,12 @@ public class TestTaskImpl {
   }
   
   private void killScheduledTaskAttempt(TaskAttemptId attemptId) {
-    mockTask.handle(new TaskTAttemptEvent(attemptId, 
-        TaskEventType.T_ATTEMPT_KILLED));
+    killScheduledTaskAttempt(attemptId, false);
+  }
+
+  private void killScheduledTaskAttempt(TaskAttemptId attemptId,
+      boolean reschedule) {
+    mockTask.handle(new TaskTAttemptKilledEvent(attemptId, reschedule));
     assertTaskScheduledState();
   }
 
@@ -326,11 +335,15 @@ public class TestTaskImpl {
   }
   
   private void killRunningTaskAttempt(TaskAttemptId attemptId) {
-    mockTask.handle(new TaskTAttemptEvent(attemptId, 
-        TaskEventType.T_ATTEMPT_KILLED));
+    killRunningTaskAttempt(attemptId, false);
+  }
+
+  private void killRunningTaskAttempt(TaskAttemptId attemptId,
+      boolean reschedule) {
+    mockTask.handle(new TaskTAttemptKilledEvent(attemptId, reschedule));
     assertTaskRunningState();  
   }
-  
+
   private void failRunningTaskAttempt(TaskAttemptId attemptId) {
     mockTask.handle(new TaskTAttemptEvent(attemptId, 
         TaskEventType.T_ATTEMPT_FAILED));
@@ -423,10 +436,12 @@ public class TestTaskImpl {
    */
   public void testKillScheduledTaskAttempt() {
     LOG.info("--- START: testKillScheduledTaskAttempt ---");
-    mockTask = createMockTask(TaskType.MAP);        
+    mockTask = createMockTask(TaskType.MAP);
     TaskId taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
-    killScheduledTaskAttempt(getLastAttempt().getAttemptId());
+    killScheduledTaskAttempt(getLastAttempt().getAttemptId(), true);
+    assertEquals(TaskAttemptEventType.TA_RESCHEDULE,
+        taskAttemptEventHandler.lastTaskAttemptEvent.getType());
   }
   
   @Test 
@@ -449,11 +464,13 @@ public class TestTaskImpl {
    */
   public void testKillRunningTaskAttempt() {
     LOG.info("--- START: testKillRunningTaskAttempt ---");
-    mockTask = createMockTask(TaskType.MAP);        
+    mockTask = createMockTask(TaskType.MAP);
     TaskId taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
     launchTaskAttempt(getLastAttempt().getAttemptId());
-    killRunningTaskAttempt(getLastAttempt().getAttemptId());    
+    killRunningTaskAttempt(getLastAttempt().getAttemptId(), true);
+    assertEquals(TaskAttemptEventType.TA_RESCHEDULE,
+        taskAttemptEventHandler.lastTaskAttemptEvent.getType());
   }
 
   @Test
@@ -471,6 +488,28 @@ public class TestTaskImpl {
     assertTaskSucceededState();
   }
 
+  @Test
+  /**
+   * Kill map attempt for succeeded map task
+   * {@link TaskState#SUCCEEDED}->{@link TaskState#SCHEDULED}
+   */
+  public void testKillAttemptForSuccessfulTask() {
+    LOG.info("--- START: testKillAttemptForSuccessfulTask ---");
+    mockTask = createMockTask(TaskType.MAP);
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    commitTaskAttempt(getLastAttempt().getAttemptId());
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    assertTaskSucceededState();
+    mockTask.handle(
+        new TaskTAttemptKilledEvent(getLastAttempt().getAttemptId(), true));
+    assertEquals(TaskAttemptEventType.TA_RESCHEDULE,
+        taskAttemptEventHandler.lastTaskAttemptEvent.getType());
+    assertTaskScheduledState();
+  }
+
   @Test 
   public void testTaskProgress() {
     LOG.info("--- START: testTaskProgress ---");
@@ -728,8 +767,8 @@ public class TestTaskImpl {
     assertEquals(TaskState.FAILED, mockTask.getState());
     taskAttempt = taskAttempts.get(3);
     taskAttempt.setState(TaskAttemptState.KILLED);
-    mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
-        TaskEventType.T_ATTEMPT_KILLED));
+    mockTask.handle(new TaskTAttemptKilledEvent(taskAttempt.getAttemptId(),
+        false));
     assertEquals(TaskState.FAILED, mockTask.getState());
   }
 
@@ -840,4 +879,14 @@ public class TestTaskImpl {
     Counters taskCounters = mockTask.getCounters();
     assertEquals("wrong counters for task", specAttemptCounters, taskCounters);
   }
+
+  public static class MockTaskAttemptEventHandler implements EventHandler {
+    public TaskAttemptEvent lastTaskAttemptEvent;
+    @Override
+    public void handle(Event event) {
+      if (event instanceof TaskAttemptEvent) {
+        lastTaskAttemptEvent = (TaskAttemptEvent)event;
+      }
+    }
+  };
 }