瀏覽代碼

svn merge -c 1589524 FIXES: MAPREDUCE-5841. uber job doesn't terminate on getting mapred job kill. Contributed by Sangjin Lee

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1589526 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe 11 年之前
父節點
當前提交
c02ab95ee6

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -85,6 +85,9 @@ Release 2.4.1 - UNRELEASED
     MAPREDUCE-5832. Fixed TestJobClient to not fail on JDK7 or on Windows. (Jian
     He and Vinod Kumar Vavilapalli via vinodkv)
 
+    MAPREDUCE-5841. uber job doesn't terminate on getting mapred job kill
+    (Sangjin Lee via jlowe)
+
 Release 2.4.0 - 2014-04-07 
 
   INCOMPATIBLE CHANGES

+ 123 - 76
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java

@@ -24,6 +24,10 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
@@ -51,11 +55,13 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * Runs the container task locally in a thread.
  * Since all (sub)tasks share the same local directory, they must be executed
@@ -71,7 +77,8 @@ public class LocalContainerLauncher extends AbstractService implements
   private final HashSet<File> localizedFiles;
   private final AppContext context;
   private final TaskUmbilicalProtocol umbilical;
-  private Thread eventHandlingThread;
+  private ExecutorService taskRunner;
+  private Thread eventHandler;
   private BlockingQueue<ContainerLauncherEvent> eventQueue =
       new LinkedBlockingQueue<ContainerLauncherEvent>();
 
@@ -115,14 +122,24 @@ public class LocalContainerLauncher extends AbstractService implements
   }
 
   public void serviceStart() throws Exception {
-    eventHandlingThread = new Thread(new SubtaskRunner(), "uber-SubtaskRunner");
-    eventHandlingThread.start();
+    // create a single thread for serial execution of tasks
+    // make it a daemon thread so that the process can exit even if the task is
+    // not interruptible
+    taskRunner =
+        Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().
+            setDaemon(true).setNameFormat("uber-SubtaskRunner").build());
+    // create and start an event handling thread
+    eventHandler = new Thread(new EventHandler(), "uber-EventHandler");
+    eventHandler.start();
     super.serviceStart();
   }
 
   public void serviceStop() throws Exception {
-    if (eventHandlingThread != null) {
-      eventHandlingThread.interrupt();
+    if (eventHandler != null) {
+      eventHandler.interrupt();
+    }
+    if (taskRunner != null) {
+      taskRunner.shutdownNow();
     }
     super.serviceStop();
   }
@@ -158,12 +175,15 @@ public class LocalContainerLauncher extends AbstractService implements
    *   - runs Task (runSubMap() or runSubReduce())
    *     - TA can safely send TA_UPDATE since in RUNNING state
    */
-  private class SubtaskRunner implements Runnable {
+  private class EventHandler implements Runnable {
 
-    private boolean doneWithMaps = false;
-    private int finishedSubMaps = 0;
+    private volatile boolean doneWithMaps = false;
+    private volatile int finishedSubMaps = 0;
 
-    SubtaskRunner() {
+    private final Map<TaskAttemptId,Future<?>> futures =
+        new ConcurrentHashMap<TaskAttemptId,Future<?>>();
+
+    EventHandler() {
     }
 
     @SuppressWarnings("unchecked")
@@ -172,7 +192,7 @@ public class LocalContainerLauncher extends AbstractService implements
       ContainerLauncherEvent event = null;
 
       // Collect locations of map outputs to give to reduces
-      Map<TaskAttemptID, MapOutputFile> localMapFiles =
+      final Map<TaskAttemptID, MapOutputFile> localMapFiles =
           new HashMap<TaskAttemptID, MapOutputFile>();
       
       // _must_ either run subtasks sequentially or accept expense of new JVMs
@@ -183,81 +203,41 @@ public class LocalContainerLauncher extends AbstractService implements
           event = eventQueue.take();
         } catch (InterruptedException e) {  // mostly via T_KILL? JOB_KILL?
           LOG.error("Returning, interrupted : " + e);
-          return;
+          break;
         }
 
         LOG.info("Processing the event " + event.toString());
 
         if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
 
-          ContainerRemoteLaunchEvent launchEv =
+          final ContainerRemoteLaunchEvent launchEv =
               (ContainerRemoteLaunchEvent)event;
-          TaskAttemptId attemptID = launchEv.getTaskAttemptID(); 
-
-          Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId());
-          int numMapTasks = job.getTotalMaps();
-          int numReduceTasks = job.getTotalReduces();
-
-          // YARN (tracking) Task:
-          org.apache.hadoop.mapreduce.v2.app.job.Task ytask =
-              job.getTask(attemptID.getTaskId());
-          // classic mapred Task:
-          org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask();
-
-          // after "launching," send launched event to task attempt to move
-          // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must
-          // do getRemoteTask() call first)
           
-          //There is no port number because we are not really talking to a task
-          // tracker.  The shuffle is just done through local files.  So the
-          // port number is set to -1 in this case.
-          context.getEventHandler().handle(
-              new TaskAttemptContainerLaunchedEvent(attemptID, -1));
-
-          if (numMapTasks == 0) {
-            doneWithMaps = true;
-          }
-
-          try {
-            if (remoteTask.isMapOrReduce()) {
-              JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
-              jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1);
-              if (remoteTask.isMapTask()) {
-                jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1);
-              } else {
-                jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1);
-              }
-              context.getEventHandler().handle(jce);
+          // execute the task on a separate thread
+          Future<?> future = taskRunner.submit(new Runnable() {
+            public void run() {
+              runTask(launchEv, localMapFiles);
             }
-            runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
-                       (numReduceTasks > 0), localMapFiles);
-            
-          } catch (RuntimeException re) {
-            JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
-            jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1);
-            context.getEventHandler().handle(jce);
-            // this is our signal that the subtask failed in some way, so
-            // simulate a failed JVM/container and send a container-completed
-            // event to task attempt (i.e., move state machine from RUNNING
-            // to FAIL_CONTAINER_CLEANUP [and ultimately to FAILED])
-            context.getEventHandler().handle(new TaskAttemptEvent(attemptID,
-                TaskAttemptEventType.TA_CONTAINER_COMPLETED));
-          } catch (IOException ioe) {
-            // if umbilical itself barfs (in error-handler of runSubMap()),
-            // we're pretty much hosed, so do what YarnChild main() does
-            // (i.e., exit clumsily--but can never happen, so no worries!)
-            LOG.fatal("oopsie...  this can never happen: "
-                + StringUtils.stringifyException(ioe));
-            System.exit(-1);
-          }
+          });
+          // remember the current attempt
+          futures.put(event.getTaskAttemptID(), future);
 
         } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
 
-          // no container to kill, so just send "cleaned" event to task attempt
-          // to move us from SUCCESS_CONTAINER_CLEANUP to SUCCEEDED state
-          // (or {FAIL|KILL}_CONTAINER_CLEANUP to {FAIL|KILL}_TASK_CLEANUP)
+          // cancel (and interrupt) the current running task associated with the
+          // event
+          TaskAttemptId taId = event.getTaskAttemptID();
+          Future<?> future = futures.remove(taId);
+          if (future != null) {
+            LOG.info("canceling the task attempt " + taId);
+            future.cancel(true);
+          }
+
+          // send "cleaned" event to task attempt to move us from
+          // SUCCESS_CONTAINER_CLEANUP to SUCCEEDED state (or 
+          // {FAIL|KILL}_CONTAINER_CLEANUP to {FAIL|KILL}_TASK_CLEANUP)
           context.getEventHandler().handle(
-              new TaskAttemptEvent(event.getTaskAttemptID(),
+              new TaskAttemptEvent(taId,
                   TaskAttemptEventType.TA_CONTAINER_CLEANED));
 
         } else {
@@ -267,7 +247,75 @@ public class LocalContainerLauncher extends AbstractService implements
       }
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("unchecked")
+    private void runTask(ContainerRemoteLaunchEvent launchEv,
+        Map<TaskAttemptID, MapOutputFile> localMapFiles) {
+      TaskAttemptId attemptID = launchEv.getTaskAttemptID(); 
+
+      Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId());
+      int numMapTasks = job.getTotalMaps();
+      int numReduceTasks = job.getTotalReduces();
+
+      // YARN (tracking) Task:
+      org.apache.hadoop.mapreduce.v2.app.job.Task ytask =
+          job.getTask(attemptID.getTaskId());
+      // classic mapred Task:
+      org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask();
+
+      // after "launching," send launched event to task attempt to move
+      // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must
+      // do getRemoteTask() call first)
+      
+      //There is no port number because we are not really talking to a task
+      // tracker.  The shuffle is just done through local files.  So the
+      // port number is set to -1 in this case.
+      context.getEventHandler().handle(
+          new TaskAttemptContainerLaunchedEvent(attemptID, -1));
+
+      if (numMapTasks == 0) {
+        doneWithMaps = true;
+      }
+
+      try {
+        if (remoteTask.isMapOrReduce()) {
+          JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
+          jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1);
+          if (remoteTask.isMapTask()) {
+            jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1);
+          } else {
+            jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1);
+          }
+          context.getEventHandler().handle(jce);
+        }
+        runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
+                   (numReduceTasks > 0), localMapFiles);
+        
+      } catch (RuntimeException re) {
+        JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
+        jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1);
+        context.getEventHandler().handle(jce);
+        // this is our signal that the subtask failed in some way, so
+        // simulate a failed JVM/container and send a container-completed
+        // event to task attempt (i.e., move state machine from RUNNING
+        // to FAIL_CONTAINER_CLEANUP [and ultimately to FAILED])
+        context.getEventHandler().handle(new TaskAttemptEvent(attemptID,
+            TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+      } catch (IOException ioe) {
+        // if umbilical itself barfs (in error-handler of runSubMap()),
+        // we're pretty much hosed, so do what YarnChild main() does
+        // (i.e., exit clumsily--but can never happen, so no worries!)
+        LOG.fatal("oopsie...  this can never happen: "
+            + StringUtils.stringifyException(ioe));
+        ExitUtil.terminate(-1);
+      } finally {
+        // remove my future
+        if (futures.remove(attemptID) != null) {
+          LOG.info("removed attempt " + attemptID +
+              " from the futures to keep track of");
+        }
+      }
+    }
+
     private void runSubtask(org.apache.hadoop.mapred.Task task,
                             final TaskType taskType,
                             TaskAttemptId attemptID,
@@ -397,7 +445,6 @@ public class LocalContainerLauncher extends AbstractService implements
      * filenames instead of "file.out". (All of this is entirely internal,
      * so there are no particular compatibility issues.)
      */
-    @SuppressWarnings("deprecation")
     private MapOutputFile renameMapOutputForReduce(JobConf conf,
         TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
       FileSystem localFs = FileSystem.getLocal(conf);
@@ -456,7 +503,7 @@ public class LocalContainerLauncher extends AbstractService implements
       }
     }
 
-  } // end SubtaskRunner
+  } // end EventHandler
   
   private static class RenamedMapOutputFile extends MapOutputFile {
     private Path path;

+ 144 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java

@@ -0,0 +1,144 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestLocalContainerLauncher {
+  private static final Log LOG =
+      LogFactory.getLog(TestLocalContainerLauncher.class);
+
+  @SuppressWarnings("rawtypes")
+  @Test(timeout=10000)
+  public void testKillJob() throws Exception {
+    JobConf conf = new JobConf();
+    AppContext context = mock(AppContext.class);
+    // a simple event handler solely to detect the container cleaned event
+    final CountDownLatch isDone = new CountDownLatch(1);
+    EventHandler handler = new EventHandler() {
+      @Override
+      public void handle(Event event) {
+        LOG.info("handling event " + event.getClass() +
+            " with type " + event.getType());
+        if (event instanceof TaskAttemptEvent) {
+          if (event.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) {
+            isDone.countDown();
+          }
+        }
+      }
+    };
+    when(context.getEventHandler()).thenReturn(handler);
+
+    // create and start the launcher
+    LocalContainerLauncher launcher =
+        new LocalContainerLauncher(context, mock(TaskUmbilicalProtocol.class));
+    launcher.init(conf);
+    launcher.start();
+
+    // create mocked job, task, and task attempt
+    // a single-mapper job
+    JobId jobId = MRBuilderUtils.newJobId(System.currentTimeMillis(), 1, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId taId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+
+    Job job = mock(Job.class);
+    when(job.getTotalMaps()).thenReturn(1);
+    when(job.getTotalReduces()).thenReturn(0);
+    Map<JobId,Job> jobs = new HashMap<JobId,Job>();
+    jobs.put(jobId, job);
+    // app context returns the one and only job
+    when(context.getAllJobs()).thenReturn(jobs);
+
+    org.apache.hadoop.mapreduce.v2.app.job.Task ytask =
+        mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
+    when(ytask.getType()).thenReturn(TaskType.MAP);
+    when(job.getTask(taskId)).thenReturn(ytask);
+
+    // create a sleeping mapper that runs beyond the test timeout
+    MapTask mapTask = mock(MapTask.class);
+    when(mapTask.isMapOrReduce()).thenReturn(true);
+    when(mapTask.isMapTask()).thenReturn(true);
+    TaskAttemptID taskID = TypeConverter.fromYarn(taId);
+    when(mapTask.getTaskID()).thenReturn(taskID);
+    when(mapTask.getJobID()).thenReturn(taskID.getJobID());
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        // sleep for a long time
+        LOG.info("sleeping for 5 minutes...");
+        Thread.sleep(5*60*1000);
+        return null;
+      }
+    }).when(mapTask).run(isA(JobConf.class), isA(TaskUmbilicalProtocol.class));
+
+    // pump in a task attempt launch event
+    ContainerLauncherEvent launchEvent =
+        new ContainerRemoteLaunchEvent(taId, null, createMockContainer(), mapTask);
+    launcher.handle(launchEvent);
+
+    Thread.sleep(200);
+    // now pump in a container clean-up event
+    ContainerLauncherEvent cleanupEvent =
+        new ContainerLauncherEvent(taId, null, null, null,
+            ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP);
+    launcher.handle(cleanupEvent);
+
+    // wait for the event to fire: this should be received promptly
+    isDone.await();
+
+    launcher.close();
+  }
+
+  private static Container createMockContainer() {
+    Container container = mock(Container.class);
+    NodeId nodeId = NodeId.newInstance("foo.bar.org", 1234);
+    when(container.getNodeId()).thenReturn(nodeId);
+    return container;
+  }
+}