Browse Source

commit dd4956feca64b86eb358b512cfdbac7bfc0a7942
Author: Devaraj Das <ddas@yahoo-inc.com>
Date: Sun Feb 7 00:22:29 2010 -0800

MAPREDUCE:1457 from https://issues.apache.org/jira/secure/attachment/12435115/MAPREDUCE-1457-BPY20.patch.1

+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1457. Fixes JobTracker to get the FileSystem object within getStagingAreaDir
+ within a privileged block. Fixes Child.java to use the appropriate UGIs while getting
+ the TaskUmbilicalProtocol proxy and while executing the task.
+ Contributed by Jakob Homan. (ddas)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077153 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 years ago
parent
commit
6b8c9117b5

+ 54 - 13
src/mapred/org/apache/hadoop/mapred/Child.java

@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -58,10 +59,10 @@ class Child {
   public static void main(String[] args) throws Throwable {
     LOG.debug("Child starting");
 
-    JobConf defaultConf = new JobConf();
+    final JobConf defaultConf = new JobConf();
     String host = args[0];
     int port = Integer.parseInt(args[1]);
-    InetSocketAddress address = new InetSocketAddress(host, port);
+    final InetSocketAddress address = new InetSocketAddress(host, port);
     final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
     final int SLEEP_LONGER_COUNT = 5;
     int jvmIdInt = Integer.parseInt(args[3]);
@@ -81,11 +82,21 @@ class Child {
     UserGroupInformation current = UserGroupInformation.getCurrentUser();
     current.addToken(jt);
 
-    TaskUmbilicalProtocol umbilical =
-      (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
-          TaskUmbilicalProtocol.versionID,
-          address,
-          defaultConf);
+    UserGroupInformation taskOwner 
+     = UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
+    taskOwner.addToken(jt);
+    
+    final TaskUmbilicalProtocol umbilical = 
+      taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
+        @Override
+        public TaskUmbilicalProtocol run() throws Exception {
+          return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
+              TaskUmbilicalProtocol.versionID,
+              address,
+              defaultConf);
+        }
+    });
+    
     int numTasksToExecute = -1; //-1 signifies "no limit"
     int numTasksExecuted = 0;
     Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -127,6 +138,9 @@ class Child {
     JvmContext context = new JvmContext(jvmId, pid);
     int idleLoopCount = 0;
     Task task = null;
+    
+    UserGroupInformation childUGI = null;
+    
     try {
       while (true) {
         taskid = null;
@@ -156,7 +170,7 @@ class Child {
         //create the index file so that the log files 
         //are viewable immediately
         TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
-        JobConf job = new JobConf(task.getJobFile());
+        final JobConf job = new JobConf(task.getJobFile());
 
         // set the jobTokenFile into task
         task.setJobTokenSecret(JobTokenSecretManager.
@@ -181,11 +195,27 @@ class Child {
         JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
         // use job-specified working directory
         FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
-        try {
-          task.run(job, umbilical);             // run the task
-        } finally {
-          TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+        LOG.debug("Creating remote user to execute task: " + job.get("user.name"));
+        childUGI = UserGroupInformation.createRemoteUser(job.get("user.name"));
+        // Add tokens to new user so that it may execute its task correctly.
+        for(Token<?> token : UserGroupInformation.getCurrentUser().getTokens()) {
+          childUGI.addToken(token);
         }
+        
+        // Create a final reference to the task for the doAs block
+        final Task taskFinal = task;
+        childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+          @Override
+          public Object run() throws Exception {
+            try {
+              taskFinal.run(job, umbilical);             // run the task
+            } finally {
+              TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+            }
+
+            return null;
+          }
+        });
         if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
           break;
         }
@@ -198,7 +228,18 @@ class Child {
       try {
         if (task != null) {
           // do cleanup for the task
-          task.taskCleanup(umbilical);
+          if(childUGI == null) {
+            task.taskCleanup(umbilical);
+          } else {
+            final Task taskFinal = task;
+            childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() throws Exception {
+                taskFinal.taskCleanup(umbilical);
+                return null;
+              }
+            });
+          }
         }
       } catch (Exception e) {
         LOG.info("Error cleaning up", e);

+ 25 - 7
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -271,6 +271,8 @@ class JobInProgress {
   private Map<TaskTracker, FallowSlotInfo> trackersReservedForReduces = 
     new HashMap<TaskTracker, FallowSlotInfo>();
   private Path jobSubmitDir = null;
+
+  final private UserGroupInformation userUGI;
   
   /**
    * Create an almost empty JobInProgress, which can be used only for tests
@@ -284,6 +286,11 @@ class JobInProgress {
     this.anyCacheLevel = this.maxLevel+1;
     this.jobtracker = tracker;
     this.restartCount = 0;
+    try {
+      this.userUGI = UserGroupInformation.getCurrentUser();
+    } catch (IOException ie){
+      throw new RuntimeException(ie);
+    }
   }
   
   /**
@@ -320,14 +327,14 @@ class JobInProgress {
     // use the user supplied token to add user credentials to the conf
     jobSubmitDir = jobInfo.getJobSubmitDir();
     user = jobInfo.getUser().toString();
-    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+    userUGI = UserGroupInformation.createRemoteUser(user);
     if (ts != null) {
       for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
-        ugi.addToken(token);
+        userUGI.addToken(token);
       }
     }
 
-    fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+    fs = userUGI.doAs(new PrivilegedExceptionAction<FileSystem>() {
       public FileSystem run() throws IOException {
         return jobSubmitDir.getFileSystem(default_conf);
       }});
@@ -525,10 +532,21 @@ class JobInProgress {
     }
 
     LOG.info("Initializing " + jobId);
-
-    // log job info
-    JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile, 
-                                    this.startTime, hasRestarted());
+    final long startTimeFinal = this.startTime;
+    // log job info as the user running the job
+    try {
+    userUGI.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile, 
+            startTimeFinal, hasRestarted());
+        return null;
+      }
+    });
+    } catch(InterruptedException ie) {
+      throw new IOException(ie);
+    }
+    
     // log the job priority
     setPriority(this.priority);
     

+ 30 - 14
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -2031,8 +2031,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         tmpInfoPort == 0, conf);
     infoServer.setAttribute("job.tracker", this);
     // initialize history parameters.
-    boolean historyInitialized = JobHistory.init(this, conf, this.localMachine,
-                                                 this.startTime);
+    final JobTracker jtFinal = this;
+    boolean historyInitialized = 
+      mrOwner.doAs(new PrivilegedExceptionAction<Boolean>() {
+        @Override
+        public Boolean run() throws Exception {
+          return JobHistory.init(jtFinal, conf,jtFinal.localMachine, 
+              jtFinal.startTime);
+        }
+      });
     
     infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
     infoServer.start();
@@ -2147,16 +2154,16 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
     // Initialize history DONE folder
     if (historyInitialized) {
-      JobHistory.initDone(conf, fs);
-      final String historyLogDir = 
-        JobHistory.getCompletedJobHistoryLocation().toString();
-      infoServer.setAttribute("historyLogDir", historyLogDir);
       FileSystem historyFS = mrOwner.doAs(new PrivilegedExceptionAction<FileSystem>() {
         public FileSystem run() throws IOException {
+          JobHistory.initDone(conf, fs);
+          final String historyLogDir = 
+            JobHistory.getCompletedJobHistoryLocation().toString();
+          infoServer.setAttribute("historyLogDir", historyLogDir);
+          
           return new Path(historyLogDir).getFileSystem(conf);
         }
       });
-      infoServer.setAttribute("historyLogDir", historyLogDir);
       infoServer.setAttribute("fileSys", historyFS);
     }
 
@@ -3570,13 +3577,22 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
    */
   public String getStagingAreaDir() throws IOException {
-    Path stagingRootDir =
-      new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
-          "/tmp/hadoop/mapred/staging"));
-    FileSystem fs = stagingRootDir.getFileSystem(conf);
-    String user = UserGroupInformation.getCurrentUser().getShortUserName();
-    return fs.makeQualified(new Path(stagingRootDir,
-                                user+"/.staging")).toString();
+    try{
+      final String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      return mrOwner.doAs(new PrivilegedExceptionAction<String>() {
+        @Override
+        public String run() throws Exception {
+          Path stagingRootDir =
+            new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
+                "/tmp/hadoop/mapred/staging"));
+          FileSystem fs = stagingRootDir.getFileSystem(conf);
+          return fs.makeQualified(new Path(stagingRootDir,
+                                    user+"/.staging")).toString();
+        }
+      });
+    } catch(InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
 
   /**