ソースを参照

MAPREDUCE-2705. Permits parallel multiple task launches. Contributed by Thomas Graves.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1153717 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 14 年 前
コミット
44b7e03114

+ 3 - 0
mapreduce/CHANGES.txt

@@ -209,6 +209,9 @@ Trunk (unreleased changes)
     MAPREDUCE-2602. Allow setting of end-of-record delimiter for
     TextInputFormat for the old API. (Ahmed Radwan via todd)
 
+    MAPREDUCE-2705. Permits parallel multiple task launches. 
+    (Thomas Graves via ddas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and

+ 28 - 23
mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -1253,7 +1253,7 @@ public class TaskTracker
     }
   }
 
-  private void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
+  protected void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
       UserGroupInformation ugi) throws IOException {
     synchronized (tip) {
       tip.setJobConf(jobConf);
@@ -2351,30 +2351,35 @@ public class TaskTracker
    * All exceptions are handled locally, so that we don't mess up the
    * task tracker.
    */
-  void startNewTask(TaskInProgress tip) {
-    try {
-      RunningJob rjob = localizeJob(tip);
-      // Localization is done. Neither rjob.jobConf nor rjob.ugi can be null
-      launchTaskForJob(tip, new JobConf(rjob.jobConf), rjob.ugi); 
-    } catch (Throwable e) {
-      String msg = ("Error initializing " + tip.getTask().getTaskID() + 
-                    ":\n" + StringUtils.stringifyException(e));
-      LOG.warn(msg);
-      tip.reportDiagnosticInfo(msg);
-      try {
-        tip.kill(true);
-        tip.cleanup(true);
-      } catch (IOException ie2) {
-        LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n" +
-                 StringUtils.stringifyException(ie2));          
+  void startNewTask(final TaskInProgress tip) {
+    Thread launchThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          RunningJob rjob = localizeJob(tip);
+          // Localization is done. Neither rjob.jobConf nor rjob.ugi can be null
+          launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob.ugi); 
+        } catch (Throwable e) {
+          String msg = ("Error initializing " + tip.getTask().getTaskID() + 
+                        ":\n" + StringUtils.stringifyException(e));
+          LOG.warn(msg);
+          tip.reportDiagnosticInfo(msg);
+          try {
+            tip.kill(true);
+            tip.cleanup(true);
+          } catch (IOException ie2) {
+            LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n" +
+                     StringUtils.stringifyException(ie2));          
+          }
+          if (e instanceof Error) {
+            LOG.error("TaskLauncher error " +
+                StringUtils.stringifyException(e));
+          }
+        }
       }
+    });
+    launchThread.start();
         
-      // Careful! 
-      // This might not be an 'Exception' - don't handle 'Error' here!
-      if (e instanceof Error) {
-        throw ((Error) e);
-      }
-    }
   }
   
   void addToMemoryManager(TaskAttemptID attemptId, boolean isMap,