Browse Source

commit 251fe311a016404e3cb426e1b45aaeea9c71a448
Author: Devaraj Das <ddas@yahoo-inc.com>
Date: Sun Sep 19 10:09:10 2010 -0700

Passing the TT address to the TaskController's initializeJob to avoid a deadlock


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077704 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 years ago
parent
commit
e1b0dbe091

+ 3 - 1
src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -163,7 +164,8 @@ public class DefaultTaskController extends TaskController {
   @Override
   public void initializeJob(String user, String jobid, 
                             Path credentials, Path jobConf, 
-                            TaskUmbilicalProtocol taskTracker
+                            TaskUmbilicalProtocol taskTracker,
+                            InetSocketAddress ttAddr
                             ) throws IOException {
     final LocalDirAllocator lDirAlloc = allocator;
     FileSystem localFs = FileSystem.getLocal(getConf());

+ 2 - 3
src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java

@@ -142,7 +142,8 @@ class LinuxTaskController extends TaskController {
 
   @Override
   public void initializeJob(String user, String jobid, Path credentials,
-                            Path jobConf, TaskUmbilicalProtocol taskTracker
+                            Path jobConf, TaskUmbilicalProtocol taskTracker,
+                            InetSocketAddress ttAddr
                             ) throws IOException {
     List<String> command = new ArrayList<String>(
       Arrays.asList(taskControllerExe, 
@@ -162,8 +163,6 @@ class LinuxTaskController extends TaskController {
     command.add(user);
     command.add(jobid);
     // add the task tracker's reporting address
-    InetSocketAddress ttAddr = 
-      ((TaskTracker) taskTracker).getTaskTrackerReportAddress();
     command.add(ttAddr.getHostName());
     command.add(Integer.toString(ttAddr.getPort()));
     String[] commandArray = command.toArray(new String[0]);

+ 4 - 1
src/mapred/org/apache/hadoop/mapred/TaskController.java

@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -88,11 +89,13 @@ public abstract class TaskController implements Configurable {
    * @param credentials a filename containing the job secrets
    * @param jobConf the path to the localized configuration file
    * @param taskTracker the connection the task tracker
+   * @param ttAddr the tasktracker's RPC address
    * @throws IOException
    */
   public abstract void initializeJob(String user, String jobid, 
                                      Path credentials, Path jobConf,
-                                     TaskUmbilicalProtocol taskTracker) 
+                                     TaskUmbilicalProtocol taskTracker,
+                                     InetSocketAddress ttAddr) 
   throws IOException;
   
   /**

+ 7 - 3
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -941,10 +941,11 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
     Task t = tip.getTask();
     JobID jobId = t.getJobID();
     RunningJob rjob = addTaskToJob(jobId, tip);
+    InetSocketAddress ttAddr = getTaskTrackerReportAddress();
 
     synchronized (rjob) {
       if (!rjob.localized) {
-        Path localJobConfPath = initializeJob(t, rjob);
+        Path localJobConfPath = initializeJob(t, rjob, ttAddr);
         JobConf localJobConf = new JobConf(localJobConfPath);
         //to be doubly sure, overwrite the user in the config with the one the TT 
         //thinks it is
@@ -976,11 +977,13 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
    *
    * @param t task whose job has to be localized on this TT
    * @param rjob the {@link RunningJob}
+   * @param ttAddr the tasktracker's RPC address
    * @return the path to the job configuration to be used for all the tasks
    *         of this job as a starting point.
    * @throws IOException
    */
-  Path initializeJob(final Task t, final RunningJob rjob)
+  Path initializeJob(final Task t, final RunningJob rjob, 
+      final InetSocketAddress ttAddr)
   throws IOException, InterruptedException {
     final JobID jobId = t.getJobID();
 
@@ -1038,7 +1041,8 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
           // distributed cache manager makes as well)
           JobLocalizer.writeLocalJobFile(localJobFile, localJobConf);
           taskController.initializeJob(t.getUser(), jobId.toString(), 
-              new Path(localJobTokenFile), localJobFile, TaskTracker.this);
+              new Path(localJobTokenFile), localJobFile, TaskTracker.this,
+              ttAddr);
         } catch (IOException e) {
           LOG.warn("Exception while localization " + 
               StringUtils.stringifyException(e));