소스 검색

MAPREDUCE-6513. Job got hanged forever when one NM unstable for some
time. Contributed by Varun Saxena & Wangda Tan

Jian He 9 년 전
부모
커밋
1918da8fc9
10개의 변경된 파일388개의 추가작업 그리고 64개의 파일을 삭제
  1. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 14 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java
  3. 40 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptKilledEvent.java
  4. 3 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
  5. 46 21
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  6. 22 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
  7. 3 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
  8. 48 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
  9. 143 20
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
  10. 66 14
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -81,6 +81,9 @@ Release 2.7.3 - UNRELEASED
     MAPREDUCE-6689. MapReduce job can infinitely increase number of reducer
     resource requests (Wangda Tan via jlowe)
 
+    MAPREDUCE-6513. Job got hanged forever when one NM unstable for some
+    time. Contributed by Varun Saxena & Wangda Tan
+
 Release 2.7.2 - 2016-01-25
 
   INCOMPATIBLE CHANGES

+ 14 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java

@@ -24,14 +24,27 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 public class TaskAttemptKillEvent extends TaskAttemptEvent {
 
   private final String message;
+  // Next map attempt will be rescheduled(i.e. updated in ask with higher
+  // priority equivalent to that of a fast fail map)
+  private final boolean rescheduleAttempt;
 
   public TaskAttemptKillEvent(TaskAttemptId attemptID,
-      String message) {
+      String message, boolean rescheduleAttempt) {
     super(attemptID, TaskAttemptEventType.TA_KILL);
     this.message = message;
+    this.rescheduleAttempt = rescheduleAttempt;
+  }
+
+  public TaskAttemptKillEvent(TaskAttemptId attemptID,
+      String message) {
+    this(attemptID, message, false);
   }
 
   public String getMessage() {
     return message;
   }
+
+  public boolean getRescheduleAttempt() {
+    return rescheduleAttempt;
+  }
 }

+ 40 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptKilledEvent.java

@@ -0,0 +1,40 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+/**
+ * Task Attempt killed event.
+ */
+public class TaskTAttemptKilledEvent extends TaskTAttemptEvent {
+
+  // Next map attempt will be rescheduled(i.e. updated in ask with
+  // higher priority equivalent to that of a fast fail map)
+  private final boolean rescheduleAttempt;
+
+  public TaskTAttemptKilledEvent(TaskAttemptId id, boolean rescheduleAttempt) {
+    super(id, TaskEventType.T_ATTEMPT_KILLED);
+    this.rescheduleAttempt = rescheduleAttempt;
+  }
+
+  public boolean getRescheduleAttempt() {
+    return rescheduleAttempt;
+  }
+}

+ 3 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -1342,7 +1342,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           if (TaskType.MAP == id.getTaskId().getTaskType()) {
             // reschedule only map tasks because their outputs maybe unusable
             LOG.info(mesg + ". AttemptId:" + id);
-            eventHandler.handle(new TaskAttemptKillEvent(id, mesg));
+            // Kill the attempt and indicate that next map attempt should be
+            // rescheduled (i.e. considered as a fast fail map).
+            eventHandler.handle(new TaskAttemptKillEvent(id, mesg, true));
           }
         }
       }

+ 46 - 21
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -97,6 +97,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
@@ -184,6 +185,7 @@ public abstract class TaskAttemptImpl implements
   private int httpPort;
   private Locality locality;
   private Avataar avataar;
+  private boolean rescheduleNextAttempt = false;
 
   private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
     new CleanupContainerTransition();
@@ -1259,6 +1261,16 @@ public abstract class TaskAttemptImpl implements
     }
   }
 
+  //always called in write lock
+  private boolean getRescheduleNextAttempt() {
+    return rescheduleNextAttempt;
+  }
+
+  //always called in write lock
+  private void setRescheduleNextAttempt(boolean reschedule) {
+    rescheduleNextAttempt = reschedule;
+  }
+
   //always called in write lock
   private void setFinishTime() {
     //set the finish time only if launch time is set
@@ -1611,9 +1623,8 @@ public abstract class TaskAttemptImpl implements
               TaskEventType.T_ATTEMPT_FAILED));
           break;
         case KILLED:
-          taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
-              taskAttempt.attemptId,
-              TaskEventType.T_ATTEMPT_KILLED));
+          taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
+              taskAttempt.attemptId, false));
           break;
         default:
           LOG.error("Task final state is not FAILED or KILLED: " + finalState);
@@ -1851,7 +1862,6 @@ public abstract class TaskAttemptImpl implements
 
       // not setting a finish time since it was set on success
       assert (taskAttempt.getFinishTime() != 0);
-
       assert (taskAttempt.getLaunchTime() != 0);
       taskAttempt.eventHandler
           .handle(createJobCounterUpdateEventTAKilled(taskAttempt, true));
@@ -1859,8 +1869,13 @@ public abstract class TaskAttemptImpl implements
           taskAttempt, TaskAttemptStateInternal.KILLED);
       taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId
           .getTaskId().getJobId(), tauce));
-      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
-          taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED));
+      boolean rescheduleNextTaskAttempt = false;
+      if (event instanceof TaskAttemptKillEvent) {
+        rescheduleNextTaskAttempt =
+            ((TaskAttemptKillEvent)event).getRescheduleAttempt();
+      }
+      taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
+          taskAttempt.attemptId, rescheduleNextTaskAttempt));
       return TaskAttemptStateInternal.KILLED;
     }
   }
@@ -1891,9 +1906,8 @@ public abstract class TaskAttemptImpl implements
             ((TaskAttemptKillEvent) event).getMessage());
       }
 
-      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
-          taskAttempt.attemptId,
-          TaskEventType.T_ATTEMPT_KILLED));
+      taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
+          taskAttempt.attemptId, taskAttempt.getRescheduleNextAttempt()));
     }
   }
 
@@ -1907,22 +1921,33 @@ public abstract class TaskAttemptImpl implements
       // for it
       taskAttempt.taskAttemptListener.unregister(
           taskAttempt.attemptId, taskAttempt.jvmID);
-
+      sendContainerCleanup(taskAttempt, event);
+      // Store reschedule flag so that after clean up is completed, new
+      // attempt is scheduled/rescheduled based on it.
       if (event instanceof TaskAttemptKillEvent) {
-        taskAttempt.addDiagnosticInfo(
-            ((TaskAttemptKillEvent) event).getMessage());
+        taskAttempt.setRescheduleNextAttempt(
+            ((TaskAttemptKillEvent)event).getRescheduleAttempt());
       }
+    }
+  }
 
-      taskAttempt.reportedStatus.progress = 1.0f;
-      taskAttempt.updateProgressSplits();
-      //send the cleanup event to containerLauncher
-      taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
-          taskAttempt.attemptId, 
-          taskAttempt.container.getId(), StringInterner
-              .weakIntern(taskAttempt.container.getNodeId().toString()),
-          taskAttempt.container.getContainerToken(),
-          ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
+  @SuppressWarnings("unchecked")
+  private static void sendContainerCleanup(TaskAttemptImpl taskAttempt,
+      TaskAttemptEvent event) {
+    if (event instanceof TaskAttemptKillEvent) {
+      taskAttempt.addDiagnosticInfo(
+          ((TaskAttemptKillEvent) event).getMessage());
     }
+
+    taskAttempt.reportedStatus.progress = 1.0f;
+    taskAttempt.updateProgressSplits();
+    //send the cleanup event to containerLauncher
+    taskAttempt.eventHandler.handle(
+        new ContainerLauncherEvent(taskAttempt.attemptId,
+            taskAttempt.container.getId(), StringInterner
+            .weakIntern(taskAttempt.container.getNodeId().toString()),
+            taskAttempt.container.getContainerToken(),
+            ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
   }
 
   private void addDiagnosticInfo(String diag) {

+ 22 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java

@@ -76,6 +76,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@@ -594,10 +595,15 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   // This is always called in the Write Lock
   private void addAndScheduleAttempt(Avataar avataar) {
+    addAndScheduleAttempt(avataar, false);
+  }
+
+  // This is always called in the Write Lock
+  private void addAndScheduleAttempt(Avataar avataar, boolean reschedule) {
     TaskAttempt attempt = addAttempt(avataar);
     inProgressAttempts.add(attempt.getID());
     //schedule the nextAttemptNumber
-    if (failedAttempts.size() > 0) {
+    if (failedAttempts.size() > 0 || reschedule) {
       eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
           TaskAttemptEventType.TA_RESCHEDULE));
     } else {
@@ -968,7 +974,12 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       task.finishedAttempts.add(taskAttemptId);
       task.inProgressAttempts.remove(taskAttemptId);
       if (task.successfulAttempt == null) {
-        task.addAndScheduleAttempt(Avataar.VIRGIN);
+        boolean rescheduleNewAttempt = false;
+        if (event instanceof TaskTAttemptKilledEvent) {
+          rescheduleNewAttempt =
+              ((TaskTAttemptKilledEvent)event).getRescheduleAttempt();
+        }
+        task.addAndScheduleAttempt(Avataar.VIRGIN, rescheduleNewAttempt);
       }
       if ((task.commitAttempt != null) && (task.commitAttempt == taskAttemptId)) {
     	task.commitAttempt = null;
@@ -1175,7 +1186,15 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       // from the map splitInfo. So the bad node might be sent as a location
       // to the RM. But the RM would ignore that just like it would ignore
       // currently pending container requests affinitized to bad nodes.
-      task.addAndScheduleAttempt(Avataar.VIRGIN);
+      boolean rescheduleNextTaskAttempt = false;
+      if (event instanceof TaskTAttemptKilledEvent) {
+        // Decide whether to reschedule next task attempt. If true, this
+        // typically indicates that a successful map attempt was killed on an
+        // unusable node being reported.
+        rescheduleNextTaskAttempt =
+            ((TaskTAttemptKilledEvent)event).getRescheduleAttempt();
+      }
+      task.addAndScheduleAttempt(Avataar.VIRGIN, rescheduleNextTaskAttempt);
       return TaskStateInternal.SCHEDULED;
     }
   }

+ 3 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -887,9 +887,11 @@ public class RMContainerAllocator extends RMContainerRequestor
             LOG.info("Killing taskAttempt:" + tid
                 + " because it is running on unusable node:"
                 + taskAttemptNodeId);
+            // If map, reschedule next task attempt.
+            boolean rescheduleNextAttempt = (i == 0) ? true : false;
             eventHandler.handle(new TaskAttemptKillEvent(tid,
                 "TaskAttempt killed because it ran on unusable node"
-                    + taskAttemptNodeId));
+                    + taskAttemptNodeId, rescheduleNextAttempt));
           }
         }
       }

+ 48 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java

@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Supplier;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -56,13 +57,19 @@ import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
  * Tests the state machine of MR App.
@@ -201,13 +208,18 @@ public class TestMRApp {
   @Test
   public void testUpdatedNodes() throws Exception {
     int runCount = 0;
+    Dispatcher disp = Mockito.spy(new AsyncDispatcher());
     MRApp app = new MRAppWithHistory(2, 2, false, this.getClass().getName(),
-        true, ++runCount);
+        true, ++runCount, disp);
     Configuration conf = new Configuration();
     // after half of the map completion, reduce will start
     conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f);
     // uberization forces full slowstart (1.0), so disable that
     conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+
+    ContainerAllocEventHandler handler = new ContainerAllocEventHandler();
+    disp.register(ContainerAllocator.EventType.class, handler);
+
     final Job job1 = app.submit(conf);
     app.waitForState(job1, JobState.RUNNING);
     Assert.assertEquals("Num tasks not correct", 4, job1.getTasks().size());
@@ -285,6 +297,12 @@ public class TestMRApp {
     events = job1.getTaskAttemptCompletionEvents(0, 100);
     Assert.assertEquals("Expecting 2 more completion events for killed", 4,
         events.length);
+    // 2 map task attempts which were killed above should be requested from
+    // container allocator with the previous map task marked as failed. If
+    // this happens allocator will request the container for this mapper from
+    // RM at a higher priority of 5(i.e. with a priority equivalent to that of
+    // a fail fast map).
+    handler.waitForFailedMapContainerReqEvents(2);
 
     // all maps must be back to running
     app.waitForState(mapTask1, TaskState.RUNNING);
@@ -324,7 +342,7 @@ public class TestMRApp {
     // rerun
     // in rerun the 1st map will be recovered from previous run
     app = new MRAppWithHistory(2, 2, false, this.getClass().getName(), false,
-        ++runCount);
+        ++runCount, (Dispatcher)new AsyncDispatcher());
     conf = new Configuration();
     conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
     conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
@@ -420,6 +438,25 @@ public class TestMRApp {
     app.waitForState(job2, JobState.SUCCEEDED);
   }
 
+  private final class ContainerAllocEventHandler
+      implements EventHandler<ContainerAllocatorEvent> {
+    private AtomicInteger failedMapContainerReqEventCnt = new AtomicInteger(0);
+    @Override
+    public void handle(ContainerAllocatorEvent event) {
+      if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ &&
+          ((ContainerRequestEvent)event).getEarlierAttemptFailed()) {
+        failedMapContainerReqEventCnt.incrementAndGet();
+      }
+    }
+    public void waitForFailedMapContainerReqEvents(int count)
+        throws InterruptedException {
+      while(failedMapContainerReqEventCnt.get() != count) {
+        Thread.sleep(50);
+      }
+      failedMapContainerReqEventCnt.set(0);
+    }
+  }
+
   private static void waitFor(Supplier<Boolean> predicate, int
       checkIntervalMillis, int checkTotalMillis) throws InterruptedException {
     try {
@@ -590,9 +627,17 @@ public class TestMRApp {
   }
 
   private final class MRAppWithHistory extends MRApp {
+    private Dispatcher dispatcher;
     public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
-        String testName, boolean cleanOnStart, int startCount) {
+        String testName, boolean cleanOnStart, int startCount,
+        Dispatcher disp) {
       super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
+      this.dispatcher = disp;
+    }
+
+    @Override
+    protected Dispatcher createDispatcher() {
+      return dispatcher;
     }
 
     @Override

+ 143 - 20
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java

@@ -18,22 +18,6 @@
 
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.junit.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -71,6 +55,10 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.Credentials;
@@ -87,9 +75,25 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class TestTaskAttempt{
 	
@@ -944,7 +948,46 @@ public class TestTaskAttempt{
 		+ " Task attempt finish time is not the same ",
 		finishTime, Long.valueOf(taImpl.getFinishTime()));  
   }
-  
+
+  private void containerKillBeforeAssignment(boolean scheduleAttempt)
+      throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+
+    TaskAttemptImpl taImpl =
+        new MapTaskAttemptImpl(taskId, 1, eventHandler, mock(Path.class), 1,
+            mock(TaskSplitMetaInfo.class), new JobConf(),
+            mock(TaskAttemptListener.class), mock(Token.class),
+            new Credentials(), new SystemClock(),
+            mock(AppContext.class));
+    if (scheduleAttempt) {
+      taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+          TaskAttemptEventType.TA_SCHEDULE));
+    }
+    taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true));
+    assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
+        TaskAttemptState.KILLED);
+    assertEquals("Task attempt's internal state is not KILLED",
+        taImpl.getInternalState(), TaskAttemptStateInternal.KILLED);
+    assertFalse("InternalError occurred", eventHandler.internalError);
+    TaskEvent event = eventHandler.lastTaskEvent;
+    assertEquals(TaskEventType.T_ATTEMPT_KILLED, event.getType());
+    // In NEW state, new map attempt should not be rescheduled.
+    assertFalse(((TaskTAttemptKilledEvent)event).getRescheduleAttempt());
+  }
+
+  @Test
+  public void testContainerKillOnNew() throws Exception {
+    containerKillBeforeAssignment(false);
+  }
+
+  @Test
+  public void testContainerKillOnUnassigned() throws Exception {
+    containerKillBeforeAssignment(true);
+  }
+
   @Test
   public void testContainerKillAfterAssigned() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
@@ -994,7 +1037,7 @@ public class TestTaskAttempt{
         taImpl.getInternalState(), TaskAttemptStateInternal.ASSIGNED);
     taImpl.handle(new TaskAttemptEvent(attemptId,
         TaskAttemptEventType.TA_KILL));
-    assertEquals("Task should be in KILLED state",
+    assertEquals("Task should be in KILL_CONTAINER_CLEANUP state",
         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
         taImpl.getInternalState());
   }
@@ -1051,7 +1094,7 @@ public class TestTaskAttempt{
         TaskAttemptEventType.TA_KILL));
     assertFalse("InternalError occurred trying to handle TA_KILL",
         eventHandler.internalError);
-    assertEquals("Task should be in KILLED state",
+    assertEquals("Task should be in KILL_CONTAINER_CLEANUP state",
         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
         taImpl.getInternalState());
   }
@@ -1112,16 +1155,96 @@ public class TestTaskAttempt{
         TaskAttemptEventType.TA_KILL));
     assertFalse("InternalError occurred trying to handle TA_KILL",
         eventHandler.internalError);
-    assertEquals("Task should be in KILLED state",
+    assertEquals("Task should be in KILL_CONTAINER_CLEANUP state",
         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
         taImpl.getInternalState());
   }
 
+  @Test
+  public void testKillMapTaskAfterSuccess() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt is not in SUCCEEDED state",
+        taImpl.getInternalState(),
+        TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    // Send a map task attempt kill event indicating next map attempt has to be
+    // reschedule
+    taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true));
+    assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
+        TaskAttemptState.KILLED);
+    assertEquals("Task attempt's internal state is not KILLED",
+        taImpl.getInternalState(), TaskAttemptStateInternal.KILLED);
+    assertFalse("InternalError occurred", eventHandler.internalError);
+    TaskEvent event = eventHandler.lastTaskEvent;
+    assertEquals(TaskEventType.T_ATTEMPT_KILLED, event.getType());
+    // Send an attempt killed event to TaskImpl forwarding the same reschedule
+    // flag we received in task attempt kill event.
+    assertTrue(((TaskTAttemptKilledEvent)event).getRescheduleAttempt());
+  }
+
+  private TaskAttemptImpl createTaskAttemptImpl(
+      MockEventHandler eventHandler) {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+
+    TaskAttemptImpl taImpl =
+        new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+            splits, jobConf, taListener,
+            mock(Token.class), new Credentials(),
+            new SystemClock(), appCtx);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+        container, mock(Map.class)));
+    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+    return taImpl;
+  }
+
   public static class MockEventHandler implements EventHandler {
     public boolean internalError;
+    public TaskEvent lastTaskEvent;
 
     @Override
     public void handle(Event event) {
+      if (event instanceof TaskEvent) {
+        lastTaskEvent = (TaskEvent)event;
+      }
       if (event instanceof JobEvent) {
         JobEvent je = ((JobEvent) event);
         if (JobEventType.INTERNAL_ERROR == je.getType()) {

+ 66 - 14
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

@@ -50,13 +50,17 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.util.Clock;
@@ -89,7 +93,8 @@ public class TestTaskImpl {
   private int taskCounter = 0;
   private final int partition = 1;
   
-  private InlineDispatcher dispatcher;   
+  private InlineDispatcher dispatcher;
+  private MockTaskAttemptEventHandler taskAttemptEventHandler;
   private List<MockTaskAttemptImpl> taskAttempts;
   
   private class MockTaskImpl extends TaskImpl {
@@ -234,7 +239,10 @@ public class TestTaskImpl {
     taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
     when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations); 
     
-    taskAttempts = new ArrayList<MockTaskAttemptImpl>();    
+    taskAttempts = new ArrayList<MockTaskAttemptImpl>();
+
+    taskAttemptEventHandler = new MockTaskAttemptEventHandler();
+    dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler);
   }
   
   private MockTaskImpl createMockTask(TaskType taskType) {
@@ -271,8 +279,12 @@ public class TestTaskImpl {
   }
   
   private void killScheduledTaskAttempt(TaskAttemptId attemptId) {
-    mockTask.handle(new TaskTAttemptEvent(attemptId, 
-        TaskEventType.T_ATTEMPT_KILLED));
+    killScheduledTaskAttempt(attemptId, false);
+  }
+
+  private void killScheduledTaskAttempt(TaskAttemptId attemptId,
+      boolean reschedule) {
+    mockTask.handle(new TaskTAttemptKilledEvent(attemptId, reschedule));
     assertTaskScheduledState();
   }
 
@@ -301,11 +313,15 @@ public class TestTaskImpl {
   }
   
   private void killRunningTaskAttempt(TaskAttemptId attemptId) {
-    mockTask.handle(new TaskTAttemptEvent(attemptId, 
-        TaskEventType.T_ATTEMPT_KILLED));
+    killRunningTaskAttempt(attemptId, false);
+  }
+
+  private void killRunningTaskAttempt(TaskAttemptId attemptId,
+      boolean reschedule) {
+    mockTask.handle(new TaskTAttemptKilledEvent(attemptId, reschedule));
     assertTaskRunningState();  
   }
-  
+
   private void failRunningTaskAttempt(TaskAttemptId attemptId) {
     mockTask.handle(new TaskTAttemptEvent(attemptId, 
         TaskEventType.T_ATTEMPT_FAILED));
@@ -334,7 +350,7 @@ public class TestTaskImpl {
   }
     
   /**
-   * {@link TaskState#KILL_WAIT}
+   * {@link TaskStateInternal#KILL_WAIT}
    */
   private void assertTaskKillWaitState() {
     assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState());
@@ -398,10 +414,12 @@ public class TestTaskImpl {
    */
   public void testKillScheduledTaskAttempt() {
     LOG.info("--- START: testKillScheduledTaskAttempt ---");
-    mockTask = createMockTask(TaskType.MAP);        
+    mockTask = createMockTask(TaskType.MAP);
     TaskId taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
-    killScheduledTaskAttempt(getLastAttempt().getAttemptId());
+    killScheduledTaskAttempt(getLastAttempt().getAttemptId(), true);
+    assertEquals(TaskAttemptEventType.TA_RESCHEDULE,
+        taskAttemptEventHandler.lastTaskAttemptEvent.getType());
   }
   
   @Test 
@@ -424,11 +442,13 @@ public class TestTaskImpl {
    */
   public void testKillRunningTaskAttempt() {
     LOG.info("--- START: testKillRunningTaskAttempt ---");
-    mockTask = createMockTask(TaskType.MAP);        
+    mockTask = createMockTask(TaskType.MAP);
     TaskId taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
     launchTaskAttempt(getLastAttempt().getAttemptId());
-    killRunningTaskAttempt(getLastAttempt().getAttemptId());    
+    killRunningTaskAttempt(getLastAttempt().getAttemptId(), true);
+    assertEquals(TaskAttemptEventType.TA_RESCHEDULE,
+        taskAttemptEventHandler.lastTaskAttemptEvent.getType());
   }
 
   @Test
@@ -446,6 +466,28 @@ public class TestTaskImpl {
     assertTaskSucceededState();
   }
 
+  @Test
+  /**
+   * Kill map attempt for succeeded map task
+   * {@link TaskState#SUCCEEDED}->{@link TaskState#SCHEDULED}
+   */
+  public void testKillAttemptForSuccessfulTask() {
+    LOG.info("--- START: testKillAttemptForSuccessfulTask ---");
+    mockTask = createMockTask(TaskType.MAP);
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    commitTaskAttempt(getLastAttempt().getAttemptId());
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    assertTaskSucceededState();
+    mockTask.handle(
+        new TaskTAttemptKilledEvent(getLastAttempt().getAttemptId(), true));
+    assertEquals(TaskAttemptEventType.TA_RESCHEDULE,
+        taskAttemptEventHandler.lastTaskAttemptEvent.getType());
+    assertTaskScheduledState();
+  }
+
   @Test 
   public void testTaskProgress() {
     LOG.info("--- START: testTaskProgress ---");
@@ -703,8 +745,8 @@ public class TestTaskImpl {
     assertEquals(TaskState.FAILED, mockTask.getState());
     taskAttempt = taskAttempts.get(3);
     taskAttempt.setState(TaskAttemptState.KILLED);
-    mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
-        TaskEventType.T_ATTEMPT_KILLED));
+    mockTask.handle(new TaskTAttemptKilledEvent(taskAttempt.getAttemptId(),
+        false));
     assertEquals(TaskState.FAILED, mockTask.getState());
   }
 
@@ -750,4 +792,14 @@ public class TestTaskImpl {
     Counters taskCounters = mockTask.getCounters();
     assertEquals("wrong counters for task", specAttemptCounters, taskCounters);
   }
+
+  public static class MockTaskAttemptEventHandler implements EventHandler {
+    public TaskAttemptEvent lastTaskAttemptEvent;
+    @Override
+    public void handle(Event event) {
+      if (event instanceof TaskAttemptEvent) {
+        lastTaskAttemptEvent = (TaskAttemptEvent)event;
+      }
+    }
+  };
 }