Przeglądaj źródła

commit 2eca4af6dd8eb9e709bc7c0f7d68c310734d54b0
Author: Vinod Kumar <vinodkv@yahoo-inc.com>
Date: Wed May 5 11:00:56 2010 +0530

MAPREDUCE:1707 from https://issues.apache.org/jira/secure/attachment/12443680/MAPREDUCE-1707-20100504-ydist.txt

+++ b/YAHOO-CHANGES.txt
+
+ MAPREDUCE-1707. TaskRunner can get NPE in getting ugi from TaskTracker. (vinodkv)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077440 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 lat temu
rodzic
commit
fd705c4df6

+ 4 - 6
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -164,10 +164,7 @@ abstract class TaskRunner extends Thread {
       
       // We don't create any symlinks yet, so presence/absence of workDir
       // actually on the file system doesn't matter.
-      UserGroupInformation ugi =
-        //UserGroupInformation.createRemoteUser(conf.getUser());
-        tracker.getRunningJob(t.getJobID()).getUGI();
-      ugi.doAs(new PrivilegedExceptionAction<Void>() {
+      tip.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
         public Void run() throws IOException {
           taskDistributedCacheManager =
             tracker.getTrackerDistributedCacheManager()
@@ -242,8 +239,9 @@ abstract class TaskRunner extends Thread {
       }
     } finally {
       try{
-        taskDistributedCacheManager.release();
-
+        if (taskDistributedCacheManager != null) {
+          taskDistributedCacheManager.release();
+        }
       }catch(IOException ie){
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }

+ 18 - 4
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -1124,10 +1124,11 @@ public class TaskTracker
     }
   }
 
-  private void launchTaskForJob(TaskInProgress tip, JobConf jobConf)
-      throws IOException{
+  private void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
+      UserGroupInformation ugi) throws IOException {
     synchronized (tip) {
       tip.setJobConf(jobConf);
+      tip.setUGI(ugi);
       tip.launchTask();
     }
   }
@@ -2103,7 +2104,8 @@ public class TaskTracker
   void startNewTask(TaskInProgress tip) {
     try {
       RunningJob rjob = localizeJob(tip);
-      launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
+      // Localization is done. Neither rjob.jobConf nor rjob.ugi can be null
+      launchTaskForJob(tip, new JobConf(rjob.jobConf), rjob.ugi); 
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
@@ -2232,7 +2234,19 @@ public class TaskTracker
     private String debugCommand;
     private volatile boolean slotTaken = false;
     private TaskLauncher launcher;
-        
+
+    // The ugi of the user who is running the job. This contains all the tokens
+    // too which will be populated during job-localization
+    private UserGroupInformation ugi;
+
+    UserGroupInformation getUGI() {
+      return ugi;
+    }
+
+    void setUGI(UserGroupInformation userUGI) {
+      ugi = userUGI;
+    }
+
     /**
      */
     public TaskInProgress(Task task, JobConf conf) {