Quellcode durchsuchen

commit c1a7b67a52c2613600e6bc23a4d369142c4a673e
Author: Vinod Kumar <vinodkv@yahoo-inc.com>
Date: Sat Feb 27 15:44:10 2010 +0530

MAPREDUCE-1455 from https://issues.apache.org/jira/secure/attachment/12437322/1455.20S.2.patch


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077247 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley vor 14 Jahren
Ursprung
Commit
bec3e939a3
28 geänderte Dateien mit 1586 neuen und 266 gelöschten Zeilen
  1. 182 19
      src/mapred/org/apache/hadoop/mapred/JSPUtil.java
  2. 120 93
      src/mapred/org/apache/hadoop/mapred/JobACLsManager.java
  3. 1 1
      src/mapred/org/apache/hadoop/mapred/JobClient.java
  4. 4 0
      src/mapred/org/apache/hadoop/mapred/JobConf.java
  5. 1 1
      src/mapred/org/apache/hadoop/mapred/JobInProgress.java
  6. 16 12
      src/mapred/org/apache/hadoop/mapred/JobTracker.java
  7. 49 0
      src/mapred/org/apache/hadoop/mapred/JobTrackerJobACLsManager.java
  8. 12 3
      src/mapred/org/apache/hadoop/mapred/TaskGraphServlet.java
  9. 0 1
      src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
  10. 82 16
      src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
  11. 31 1
      src/mapred/org/apache/hadoop/mapred/TaskRunner.java
  12. 38 5
      src/mapred/org/apache/hadoop/mapred/TaskTracker.java
  13. 48 0
      src/mapred/org/apache/hadoop/mapred/TaskTrackerJobACLsManager.java
  14. 5 3
      src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
  15. 1 1
      src/test/org/apache/hadoop/mapred/TestJobACLs.java
  16. 1 1
      src/test/org/apache/hadoop/mapred/TestNodeRefresh.java
  17. 26 2
      src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
  18. 628 0
      src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java
  19. 52 0
      src/webapps/job/job_authorization_error.jsp
  20. 8 1
      src/webapps/job/jobblacklistedtrackers.jsp
  21. 16 3
      src/webapps/job/jobconf.jsp
  22. 98 21
      src/webapps/job/jobdetails.jsp
  23. 23 15
      src/webapps/job/jobfailures.jsp
  24. 1 1
      src/webapps/job/jobqueue_details.jsp
  25. 15 10
      src/webapps/job/jobtasks.jsp
  26. 3 3
      src/webapps/job/jobtracker.jsp
  27. 99 41
      src/webapps/job/taskdetails.jsp
  28. 26 12
      src/webapps/job/taskstats.jsp

+ 182 - 19
src/mapred/org/apache/hadoop/mapred/JSPUtil.java

@@ -20,12 +20,15 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
+import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import javax.servlet.RequestDispatcher;
+import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
@@ -36,11 +39,14 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.http.HtmlQuoting;
 import org.apache.hadoop.mapred.JobHistory.JobInfo;
 import org.apache.hadoop.mapred.JobTracker.RetireJobInfo;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ServletUtil;
 import org.apache.hadoop.util.StringUtils;
 
 class JSPUtil {
-  private static final String PRIVATE_ACTIONS_KEY = "webinterface.private.actions";
+  static final String PRIVATE_ACTIONS_KEY = "webinterface.private.actions";
   
   public static final Configuration conf = new Configuration();
 
@@ -52,6 +58,94 @@ class JSPUtil {
     conf.getInt("mapred.job.tracker.jobhistory.lru.cache.size", 5);
 
   private static final Log LOG = LogFactory.getLog(JSPUtil.class);
+
+  /**
+   * Wraps the {@link JobInProgress} object and contains boolean for
+   * 'job view access' allowed or not.
+   * This class is only for usage by JSPs and Servlets.
+   */
+  static class JobWithViewAccessCheck {
+    private JobInProgress job = null;
+    
+    // true if user is authorized to view this job
+    private boolean isViewAllowed = true;
+
+    JobWithViewAccessCheck(JobInProgress job) {
+      this.job = job;
+    }
+
+    JobInProgress getJob() {
+      return job;
+    }
+
+    boolean isViewJobAllowed() {
+      return isViewAllowed;
+    }
+
+    void setViewAccess(boolean isViewAllowed) {
+      this.isViewAllowed = isViewAllowed;
+    }
+  }
+
+  /**
+   * Validates if current user can view the job.
+   * If user is not authorized to view the job, this method will modify the
+   * response and forwards to an error page and returns Job with
+   * viewJobAccess flag set to false.
+   * @return JobWithViewAccessCheck object(contains JobInProgress object and
+   *         viewJobAccess flag). Callers of this method will check the flag
+   *         and decide if view should be allowed or not. Job will be null if
+   *         the job with given jobid doesnot exist at the JobTracker.
+   */
+  public static JobWithViewAccessCheck checkAccessAndGetJob(JobTracker jt,
+      JobID jobid, HttpServletRequest request, HttpServletResponse response)
+      throws ServletException, IOException {
+    final JobInProgress job = jt.getJob(jobid);
+    JobWithViewAccessCheck myJob = new JobWithViewAccessCheck(job);
+
+    String user = request.getRemoteUser();
+    if (user != null && job != null && jt.isJobLevelAuthorizationEnabled()) {
+      final UserGroupInformation ugi =
+        UserGroupInformation.createRemoteUser(user);
+      try {
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+          public Void run() throws IOException, ServletException {
+
+            // checks job view permission
+            job.checkAccess(ugi, JobACL.VIEW_JOB);
+            return null;
+          }
+        });
+      } catch (AccessControlException e) {
+        String errMsg = "User " + ugi.getShortUserName() +
+            " failed to view " + jobid + "!<br><br>" + e.getMessage() +
+            "<hr><a href=\"jobtracker.jsp\">Go back to JobTracker</a><br>";
+        JSPUtil.setErrorAndForward(errMsg, request, response);
+        myJob.setViewAccess(false);
+      } catch (InterruptedException e) {
+        String errMsg = " Interrupted while trying to access " + jobid +
+        "<hr><a href=\"jobtracker.jsp\">Go back to JobTracker</a><br>";
+        JSPUtil.setErrorAndForward(errMsg, request, response);
+        myJob.setViewAccess(false);
+      }
+    }
+    return myJob;
+  }
+
+  /**
+   * Sets error code SC_UNAUTHORIZED in response and forwards to
+   * error page which contains error message and a back link.
+   */
+  public static void setErrorAndForward(String errMsg,
+      HttpServletRequest request, HttpServletResponse response)
+      throws ServletException, IOException {
+    request.setAttribute("error.msg", errMsg);
+    RequestDispatcher dispatcher = request.getRequestDispatcher(
+        "/job_authorization_error.jsp");
+    response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
+    dispatcher.forward(request, response);
+  }
+
   /**
    * Method used to process the request from the job page based on the 
    * request which it has received. For example like changing priority.
@@ -60,33 +154,99 @@ class JSPUtil {
    * @param response HTTP response object.
    * @param tracker {@link JobTracker} instance
    * @throws IOException
+   * @throws InterruptedException 
+   * @throws ServletException 
    */
   public static void processButtons(HttpServletRequest request,
-      HttpServletResponse response, JobTracker tracker) throws IOException {
+      HttpServletResponse response, final JobTracker tracker)
+      throws IOException, InterruptedException, ServletException {
 
-    if (conf.getBoolean(PRIVATE_ACTIONS_KEY, false)
+	String user = request.getRemoteUser();
+    if (privateActionsAllowed(tracker.conf)
         && request.getParameter("killJobs") != null) {
-      String[] jobs = request.getParameterValues("jobCheckBox");
-      if (jobs != null) {
-        for (String job : jobs) {
-          tracker.killJob(JobID.forName(job));
+        String[] jobs = request.getParameterValues("jobCheckBox");
+        if (jobs != null) {
+          boolean notAuthorized = false;
+          String errMsg = "User " + user
+              + " failed to kill the following job(s)!<br><br>";
+          for (String job : jobs) {
+            final JobID jobId = JobID.forName(job);
+            if (user != null) {
+              UserGroupInformation ugi =
+                UserGroupInformation.createRemoteUser(user);
+              try {
+                ugi.doAs(new PrivilegedExceptionAction<Void>() {
+                  public Void run() throws IOException{
+
+                    tracker.killJob(jobId);// checks job modify permission
+                    return null;
+                  }
+                });
+              } catch(AccessControlException e) {
+                errMsg = errMsg.concat("<br>" + e.getMessage());
+                notAuthorized = true;
+                // We don't return right away so that we can try killing other
+                // jobs that are requested to be killed.
+                continue;
+              }
+            }
+            else {// no authorization needed
+              tracker.killJob(jobId);
+            }
+          }
+          if (notAuthorized) {// user is not authorized to kill some/all of jobs
+            errMsg = errMsg.concat(
+              "<br><hr><a href=\"jobtracker.jsp\">Go back to JobTracker</a><br>");
+            setErrorAndForward(errMsg, request, response);
+            return;
+          }
         }
       }
-    }
 
-    if (conf.getBoolean(PRIVATE_ACTIONS_KEY, false) && 
+    if (privateActionsAllowed(tracker.conf) && 
           request.getParameter("changeJobPriority") != null) {
-      String[] jobs = request.getParameterValues("jobCheckBox");
+        String[] jobs = request.getParameterValues("jobCheckBox");
+        if (jobs != null) {
+          final JobPriority jobPri = JobPriority.valueOf(request
+              .getParameter("setJobPriority"));
+          boolean notAuthorized = false;
+          String errMsg = "User " + user
+              + " failed to set priority for the following job(s)!<br><br>";
 
-      if (jobs != null) {
-        JobPriority jobPri = JobPriority.valueOf(request
-            .getParameter("setJobPriority"));
+          for (String job : jobs) {
+            final JobID jobId = JobID.forName(job);
+            if (user != null) {
+              UserGroupInformation ugi = UserGroupInformation.
+                  createRemoteUser(user);
+              try {
+                ugi.doAs(new PrivilegedExceptionAction<Void>() {
+                  public Void run() throws IOException{
 
-        for (String job : jobs) {
-          tracker.setJobPriority(JobID.forName(job), jobPri);
+                    // checks job modify permission
+                    tracker.setJobPriority(jobId, jobPri);
+                    return null;
+                  }
+                });
+              } catch(AccessControlException e) {
+                errMsg = errMsg.concat("<br>" + e.getMessage());
+                notAuthorized = true;
+                // We don't return right away so that we can try operating on
+                // other jobs.
+                continue;
+              }
+            }
+            else {// no authorization needed
+              tracker.setJobPriority(jobId, jobPri);
+            }
+          }
+          if (notAuthorized) {// user is not authorized to kill some/all of jobs
+            errMsg = errMsg.concat(
+              "<br><hr><a href=\"jobtracker.jsp\">Go back to JobTracker</a><br>");
+            setErrorAndForward(errMsg, request, response);
+            return;
+          }
         }
       }
-    }
   }
 
   /**
@@ -100,11 +260,10 @@ class JSPUtil {
    * @throws IOException
    */
   public static String generateJobTable(String label, Collection<JobInProgress> jobs
-      , int refresh, int rowId) throws IOException {
+      , int refresh, int rowId, JobConf conf) throws IOException {
 
     boolean isModifiable = label.equals("Running") 
-                                && conf.getBoolean(
-                                      PRIVATE_ACTIONS_KEY, false);
+                                && privateActionsAllowed(conf);
     StringBuffer sb = new StringBuffer();
     
     sb.append("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n");
@@ -302,4 +461,8 @@ class JSPUtil {
       return jobInfo;
     }
   }
+
+  static boolean privateActionsAllowed(JobConf conf) {
+    return conf.getBoolean(PRIVATE_ACTIONS_KEY, false);
+  }
 }

+ 120 - 93
src/mapred/org/apache/hadoop/mapred/JobACLsManager.java

@@ -20,101 +20,128 @@ package org.apache.hadoop.mapred;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 
-public class JobACLsManager {
-
-  private JobTracker jobTracker = null;
-
-  public JobACLsManager(JobTracker tracker) {
-    jobTracker = tracker;
-  }
-
-  /**
-   * Construct the jobACLs from the configuration so that they can be kept in
-   * the memory. If authorization is disabled on the JT, nothing is constructed
-   * and an empty map is returned.
-   * 
-   * @return JobACl to AccessControlList map.
-   */
-  Map<JobACL, AccessControlList> constructJobACLs(JobConf conf) {
-    
-    Map<JobACL, AccessControlList> acls =
-      new HashMap<JobACL, AccessControlList>();
-
-    // Don't construct anything if authorization is disabled.
-    if (!jobTracker.isJobLevelAuthorizationEnabled()) {
-      return acls;
-    }
-
-    for (JobACL aclName : JobACL.values()) {
-      String aclConfigName = aclName.getAclName();
-      String aclConfigured = conf.get(aclConfigName);
-      if (aclConfigured == null) {
-        // If ACLs are not configured at all, we grant no access to anyone. So
-        // jobOwner and superuser/supergroup _only_ can do 'stuff'
-        aclConfigured = "";
-      }
-      acls.put(aclName, new AccessControlList(aclConfigured));
-    }
-    return acls;
-  }
-
-  /**
-   * If authorization is enabled on the JobTracker, checks whether the user (in
-   * the callerUGI) is authorized to perform the operation specify by
-   * 'jobOperation' on the job.
-   * <ul>
-   * <li>The owner of the job can do any operation on the job</li>
-   * <li>The superuser/supergroup of the JobTracker is always permitted to do
-   * operations on any job.</li>
-   * <li>For all other users/groups job-acls are checked</li>
-   * </ul>
-   * 
-   * @param jobStatus
-   * @param callerUGI
-   * @param jobOperation
-   */
-  void checkAccess(JobStatus jobStatus, UserGroupInformation callerUGI,
-      JobACL jobOperation) throws AccessControlException {
-
-    if (!jobTracker.isJobLevelAuthorizationEnabled()) {
-      return;
-    }
-
-    JobID jobId = jobStatus.getJobID();
-
-    // Check for superusers/supergroups
-    if (jobTracker.isSuperUserOrSuperGroup(callerUGI)) {
-      JobInProgress.LOG.info("superuser/supergroup "
-          + callerUGI.getShortUserName() + " trying to perform "
-          + jobOperation.toString() + " on " + jobId);
-      return;
-    }
-
-    // Job-owner is always part of all the ACLs
-    if (callerUGI.getShortUserName().equals(jobStatus.getUsername())) {
-      JobInProgress.LOG.info("Jobowner " + callerUGI.getShortUserName()
-          + " trying to perform " + jobOperation.toString() + " on "
-          + jobId);
-      return;
-    }
-
-    AccessControlList acl = jobStatus.getJobACLs().get(jobOperation);
-    if (acl.isUserAllowed(callerUGI)) {
-      JobInProgress.LOG.info("Normal user " + callerUGI.getShortUserName()
-          + " trying to perform " + jobOperation.toString() + " on "
-          + jobId);
-      return;
-    }
-
-    throw new AccessControlException(callerUGI
-        + " not authorized for performing the operation "
-        + jobOperation.toString() + " on " + jobId + ". "
-        + jobOperation.toString() + " configured for this job : "
-        + acl.toString());
-  }
-}
+public abstract class JobACLsManager {
+
+	  static final Log LOG = LogFactory.getLog(JobACLsManager.class);
+
+	  protected abstract boolean isJobLevelAuthorizationEnabled();
+
+	  protected abstract boolean isSuperUserOrSuperGroup(
+	      UserGroupInformation callerUGI);
+
+	  /**
+	   * Construct the jobACLs from the configuration so that they can be kept in
+	   * the memory. If authorization is disabled on the JT, nothing is constructed
+	   * and an empty map is returned.
+	   * 
+	   * @return JobACL to AccessControlList map.
+	   */
+	  Map<JobACL, AccessControlList> constructJobACLs(JobConf conf) {
+	    
+	    Map<JobACL, AccessControlList> acls =
+	      new HashMap<JobACL, AccessControlList>();
+
+	    // Don't construct anything if authorization is disabled.
+	    if (!isJobLevelAuthorizationEnabled()) {
+	      return acls;
+	    }
+
+	    for (JobACL aclName : JobACL.values()) {
+	      String aclConfigName = aclName.getAclName();
+	      String aclConfigured = conf.get(aclConfigName);
+	      if (aclConfigured == null) {
+	        // If ACLs are not configured at all, we grant no access to anyone. So
+	        // jobOwner and superuser/supergroup _only_ can do 'stuff'
+	        aclConfigured = "";
+	      }
+	      acls.put(aclName, new AccessControlList(aclConfigured));
+	    }
+	    return acls;
+	  }
+
+	  /**
+	   * If authorization is enabled, checks whether the user (in the callerUGI) is
+	   * authorized to perform the operation specified by 'jobOperation' on the job.
+	   * <ul>
+	   * <li>The owner of the job can do any operation on the job</li>
+	   * <li>The superuser/supergroup is always permitted to do operations on any
+	   * job.</li>
+	   * <li>For all other users/groups job-acls are checked</li>
+	   * </ul>
+	   * 
+	   * @param jobStatus
+	   * @param callerUGI
+	   * @param jobOperation
+	   */
+	  void checkAccess(JobStatus jobStatus, UserGroupInformation callerUGI,
+	      JobACL jobOperation) throws AccessControlException {
+
+	    JobID jobId = jobStatus.getJobID();
+	    String jobOwner = jobStatus.getUsername();
+	    AccessControlList acl = jobStatus.getJobACLs().get(jobOperation);
+	    checkAccess(jobId, callerUGI, jobOperation, jobOwner, acl);
+	  }
+
+	  /**
+	   * If authorization is enabled, checks whether the user (in the callerUGI) is
+	   * authorized to perform the operation specified by 'jobOperation' on the job.
+	   * <ul>
+	   * <li>The owner of the job can do any operation on the job</li>
+	   * <li>The superuser/supergroup is always permitted to do operations on any
+	   * job.</li>
+	   * <li>For all other users/groups job-acls are checked</li>
+	   * </ul>
+	   * @param jobId
+	   * @param callerUGI
+	   * @param jobOperation
+	   * @param jobOwner
+	   * @param jobACL
+	   * @throws AccessControlException
+	   */
+	  void checkAccess(JobID jobId, UserGroupInformation callerUGI,
+	      JobACL jobOperation, String jobOwner, AccessControlList jobACL)
+	      throws AccessControlException {
+
+	    if (!isJobLevelAuthorizationEnabled()) {
+	      return;
+	    }
+
+	    // Check for superusers/supergroups
+	    if (isSuperUserOrSuperGroup(callerUGI)) {
+	      LOG.info("superuser/supergroupMember "
+	          + callerUGI.getShortUserName() + " trying to perform "
+	          + jobOperation.toString() + " on " + jobId);
+	      return;
+	    }
+
+	    // Job-owner is always part of all the ACLs
+	    if (callerUGI.getShortUserName().equals(jobOwner)) {
+	      LOG.info("Jobowner " + callerUGI.getShortUserName()
+	          + " trying to perform " + jobOperation.toString() + " on "
+	          + jobId);
+	      return;
+	    }
+
+	    
+	    if (jobACL.isUserAllowed(callerUGI)) {
+	      LOG.info("Normal user " + callerUGI.getShortUserName()
+	          + " trying to perform " + jobOperation.toString() + " on "
+	          + jobId);
+	      return;
+	    }
+
+	    throw new AccessControlException(callerUGI
+	        + " is not authorized for performing the operation "
+	        + jobOperation.toString() + " on " + jobId + ". "
+	        + jobOperation.toString()
+	        + " Access control list configured for this job : "
+	        + jobACL.toString());
+	  }
+	}

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -1228,7 +1228,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
   }
 
   static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
-    return (baseUrl + "/tasklog?plaintext=true&taskid=" + taskId); 
+    return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
   }
   
   private static void displayTaskLogs(TaskAttemptID taskId, String baseUrl)

+ 4 - 0
src/mapred/org/apache/hadoop/mapred/JobConf.java

@@ -163,6 +163,10 @@ public class JobConf extends Configuration {
   static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
       "mapred.job.reduce.memory.mb";
 
+  public static final String JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG = 
+	    "mapreduce.cluster.job-authorization-enabled";
+  static final String MR_SUPERGROUP = "mapred.permissions.supergroup";
+
   /**
    * Configuration key to set the java command line options for the child
    * map and reduce tasks.

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -1016,7 +1016,7 @@ public class JobInProgress {
           host = ttStatus.getHost();
         }
         httpTaskLogLocation = "http://" + host + ":" + ttStatus.getHttpPort(); 
-           //+ "/tasklog?plaintext=true&taskid=" + status.getTaskID();
+           //+ "/tasklog?plaintext=true&attemptid=" + status.getTaskID();
       }
 
       TaskCompletionEvent taskEvent = null;

+ 16 - 12
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -1930,7 +1930,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
                                                 "expireLaunchingTasks");
 
   CompletedJobStatusStore completedJobStatusStore = null;
-  private JobACLsManager jobACLsManager;
+  private JobTrackerJobACLsManager jobACLsManager;
   Thread completedJobsStoreThread = null;
   RecoveryManager recoveryManager;
 
@@ -1969,7 +1969,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   static final String SUBDIR = "jobTracker";
   FileSystem fs = null;
   Path systemDir = null;
-  private JobConf conf;
+  JobConf conf;
   private final UserGroupInformation mrOwner;
   private final String supergroup;
 
@@ -2018,7 +2018,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       mrOwner = UserGroupInformation.getCurrentUser();
     }
   
-    supergroup = conf.get("mapred.permissions.supergroup", "supergroup");
+    supergroup = conf.get(JobConf.MR_SUPERGROUP,
+                          "supergroup");
     LOG.info("Starting jobtracker with owner as " + mrOwner.getShortUserName() 
              + " and supergroup as " + supergroup);
     long secretKeyInterval = 
@@ -2269,7 +2270,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
     // Initialize the jobACLSManager
-    jobACLsManager = new JobACLsManager(this);
+    jobACLsManager = new JobTrackerJobACLsManager(this);
     //initializes the job status store
     completedJobStatusStore = new CompletedJobStatusStore(jobACLsManager, conf);
   }
@@ -3750,7 +3751,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    * @return
    */
   boolean isJobLevelAuthorizationEnabled() {
-    return conf.getBoolean(JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
+    return conf.getBoolean(JobConf.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
   }
 
   /**
@@ -4196,9 +4197,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   static final String MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY =
       "mapred.cluster.max.reduce.memory.mb";
 
-  public static final String JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG =
-      "mapreduce.cluster.job-authorization-enabled";
-
   /* 
    * Returns a list of TaskCompletionEvent for the given job, 
    * starting from fromEventId.
@@ -4561,13 +4559,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    * Is the calling user a super user? Or part of the supergroup?
    * @return true, if it is a super user
    */
-  boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI) {
-    if (mrOwner.getShortUserName().equals(callerUGI.getShortUserName())) {
+  static boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI,
+      UserGroupInformation superUser, String superGroup) {
+    if (superUser.getShortUserName().equals(callerUGI.getShortUserName())) {
       return true;
     }
     String[] groups = callerUGI.getGroupNames();
     for(int i=0; i < groups.length; ++i) {
-      if (groups[i].equals(supergroup)) {
+      if (groups[i].equals(superGroup)) {
         return true;
       }
     }
@@ -4580,7 +4579,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    */
   public synchronized void refreshNodes() throws IOException {
     // check access
-    if (!isSuperUserOrSuperGroup(UserGroupInformation.getCurrentUser())) {
+    if (!isSuperUserOrSuperGroup(UserGroupInformation.getCurrentUser(), mrOwner,
+                                 supergroup)) {
       String user = UserGroupInformation.getCurrentUser().getShortUserName();
       throw new AccessControlException(user + 
                                        " is not authorized to refresh nodes.");
@@ -4590,6 +4590,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     refreshHosts();
   }
   
+  UserGroupInformation getMROwner() {
+    return mrOwner;
+  }
+
   String getSuperGroup() {
     return supergroup;
   }

+ 49 - 0
src/mapred/org/apache/hadoop/mapred/JobTrackerJobACLsManager.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Manages the job ACLs and the operations on them at JobTracker.
+ *
+ */
+public class JobTrackerJobACLsManager extends JobACLsManager {
+
+  static final Log LOG = LogFactory.getLog(JobTrackerJobACLsManager.class);
+
+  private JobTracker jobTracker = null;
+
+  public JobTrackerJobACLsManager(JobTracker tracker) {
+    jobTracker = tracker;
+  }
+
+  @Override
+  protected boolean isJobLevelAuthorizationEnabled() {
+    return jobTracker.isJobLevelAuthorizationEnabled();
+  }
+
+  @Override
+  protected boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI) {
+    return JobTracker.isSuperUserOrSuperGroup(callerUGI,
+        jobTracker.getMROwner(), jobTracker.getSuperGroup());
+  }
+
+}

+ 12 - 3
src/mapred/org/apache/hadoop/mapred/TaskGraphServlet.java

@@ -19,12 +19,14 @@ package org.apache.hadoop.mapred;
 
 import java.io.IOException;
 import java.io.PrintWriter;
-
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.hadoop.mapred.JSPUtil.JobWithViewAccessCheck;
+import org.apache.hadoop.security.UserGroupInformation;
+
 /** The servlet that outputs svg graphics for map / reduce task
  *  statuses
  */
@@ -52,13 +54,20 @@ public class TaskGraphServlet extends HttpServlet {
 
     response.setContentType("image/svg+xml");
 
-    JobTracker tracker = 
+    final JobTracker tracker = 
       (JobTracker) getServletContext().getAttribute("job.tracker");
     
     String jobIdStr = request.getParameter("jobid");
     if(jobIdStr == null)
       return;
-    JobID jobId = JobID.forName(jobIdStr);
+    final JobID jobId = JobID.forName(jobIdStr);
+
+    // verify if user has view access for this job
+    JobWithViewAccessCheck myJob = JSPUtil.checkAccessAndGetJob(
+        tracker, jobId, request, response);
+    if (!myJob.isViewJobAllowed()) {
+      return;// user is not authorized to view this job
+    }
 
     final boolean isMap = "map".equalsIgnoreCase(request.getParameter("type"));
     final TaskReport[] reports = isMap? tracker.getMapTaskReports(jobId) 

+ 0 - 1
src/mapred/org/apache/hadoop/mapred/TaskInProgress.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;

+ 82 - 16
src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java

@@ -22,11 +22,19 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.http.HtmlQuoting;
 import org.apache.hadoop.util.StringUtils;
 
@@ -51,7 +59,7 @@ public class TaskLogServlet extends HttpServlet {
   public static String getTaskLogUrl(String taskTrackerHostName,
       String httpPort, String taskAttemptID) {
     return ("http://" + taskTrackerHostName + ":" + httpPort
-        + "/tasklog?taskid=" + taskAttemptID);
+        + "/tasklog?attemptid=" + taskAttemptID);
   }
 
   private void printTaskLog(HttpServletResponse response,
@@ -102,6 +110,42 @@ public class TaskLogServlet extends HttpServlet {
     }
   }
 
+  /**
+   * Validates if the given user has job view permissions for this job.
+   * conf contains jobOwner and job-view-ACLs.
+   * We allow jobOwner, superUser(i.e. mrOwner) and members of superGroup and
+   * users and groups specified in configuration using
+   * mapreduce.job.acl-view-job to view job.
+   */
+  private void checkAccessForTaskLogs(JobConf conf, String user, JobID jobId,
+      TaskTracker tracker) throws AccessControlException {
+
+    if (!tracker.isJobLevelAuthorizationEnabled()) {
+      return;
+    }
+
+    // buiild job view acl by reading from conf
+    AccessControlList jobViewACL = tracker.getJobACLsManager().
+        constructJobACLs(conf).get(JobACL.VIEW_JOB);
+
+    String jobOwner = conf.get("user.name");
+    UserGroupInformation callerUGI = UserGroupInformation.createRemoteUser(user);
+
+    tracker.getJobACLsManager().checkAccess(jobId, callerUGI, JobACL.VIEW_JOB,
+        jobOwner, jobViewACL);
+  }
+
+  /**
+   * Builds a Configuration object by reading the xml file.
+   * This doesn't load the default resources.
+   */
+  static Configuration getConfFromJobACLsFile(String attemptIdStr) {
+    Configuration conf = new Configuration(false);
+    conf.addResource(new Path(TaskLog.getBaseDir(attemptIdStr).toString(),
+        TaskRunner.jobACLsFile));
+    return conf;
+  }
+
   /**
    * Get the logs via http.
    */
@@ -115,13 +159,35 @@ public class TaskLogServlet extends HttpServlet {
     TaskLog.LogName filter = null;
     boolean isCleanup = false;
 
-    String taskIdStr = request.getParameter("taskid");
-    if (taskIdStr == null) {
+    String attemptIdStr = request.getParameter("attemptid");
+    if (attemptIdStr == null) {
       response.sendError(HttpServletResponse.SC_BAD_REQUEST, 
-                         "Argument taskid is required");
+                         "Argument attemptid is required");
       return;
     }
-    TaskAttemptID taskId = TaskAttemptID.forName(taskIdStr);
+
+    TaskAttemptID attemptId = TaskAttemptID.forName(attemptIdStr);
+
+    // get user name who is accessing
+    String user = request.getRemoteUser();
+    if (user != null) {
+      // get jobACLConf from ACLs file
+      JobConf jobACLConf = new JobConf(getConfFromJobACLsFile(attemptIdStr));
+      ServletContext context = getServletContext();
+      TaskTracker taskTracker = (TaskTracker) context.getAttribute(
+          "task.tracker");
+      JobID jobId = attemptId.getJobID();
+
+      try {
+        checkAccessForTaskLogs(jobACLConf, user, jobId, taskTracker);
+      } catch (AccessControlException e) {
+        String errMsg = "User " + user + " failed to view tasklogs of job " +
+            jobId + "!\n\n" + e.getMessage();
+        response.sendError(HttpServletResponse.SC_UNAUTHORIZED, errMsg);
+        return;
+      }
+    }
+
     String logFilter = request.getParameter("filter");
     if (logFilter != null) {
       try {
@@ -157,27 +223,27 @@ public class TaskLogServlet extends HttpServlet {
     OutputStream out = response.getOutputStream();
     if( !plainText ) {
       out.write(("<html>\n" +
-                 "<title>Task Logs: '" + taskId + "'</title>\n" +
+                 "<title>Task Logs: '" + attemptId + "'</title>\n" +
                  "<body>\n" +
-                 "<h1>Task Logs: '" +  taskId +  "'</h1><br>\n").getBytes()); 
+                 "<h1>Task Logs: '" +  attemptId +  "'</h1><br>\n").getBytes()); 
 
       if (filter == null) {
-        printTaskLog(response, out, taskId, start, end, plainText, 
+        printTaskLog(response, out, attemptId, start, end, plainText, 
                      TaskLog.LogName.STDOUT, isCleanup);
-        printTaskLog(response, out, taskId, start, end, plainText, 
+        printTaskLog(response, out, attemptId, start, end, plainText, 
                      TaskLog.LogName.STDERR, isCleanup);
-        printTaskLog(response, out, taskId, start, end, plainText, 
+        printTaskLog(response, out, attemptId, start, end, plainText, 
                      TaskLog.LogName.SYSLOG, isCleanup);
-        if (haveTaskLog(taskId, TaskLog.LogName.DEBUGOUT)) {
-          printTaskLog(response, out, taskId, start, end, plainText, 
+        if (haveTaskLog(attemptId, TaskLog.LogName.DEBUGOUT)) {
+          printTaskLog(response, out, attemptId, start, end, plainText, 
                        TaskLog.LogName.DEBUGOUT, isCleanup);
         }
-        if (haveTaskLog(taskId, TaskLog.LogName.PROFILE)) {
-          printTaskLog(response, out, taskId, start, end, plainText, 
+        if (haveTaskLog(attemptId, TaskLog.LogName.PROFILE)) {
+          printTaskLog(response, out, attemptId, start, end, plainText, 
                        TaskLog.LogName.PROFILE, isCleanup);
         }
       } else {
-        printTaskLog(response, out, taskId, start, end, plainText, filter,
+        printTaskLog(response, out, attemptId, start, end, plainText, filter,
                      isCleanup);
       }
       
@@ -187,7 +253,7 @@ public class TaskLogServlet extends HttpServlet {
       response.sendError(HttpServletResponse.SC_BAD_REQUEST,
           "You must supply a value for `filter' (STDOUT, STDERR, or SYSLOG) if you set plainText = true");
     } else {
-      printTaskLog(response, out, taskId, start, end, plainText, filter, 
+      printTaskLog(response, out, attemptId, start, end, plainText, filter, 
                    isCleanup);
     } 
   }

+ 31 - 1
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.mapred;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -33,6 +34,7 @@ import java.util.Vector;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
@@ -78,6 +80,8 @@ abstract class TaskRunner extends Thread {
    */
   protected MapOutputFile mapOutputFile;
 
+  static String jobACLsFile = "job-acl.xml";
+
   public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker, 
       JobConf conf) {
     this.tip = tip;
@@ -264,8 +268,9 @@ abstract class TaskRunner extends Thread {
    * 
    * @param taskid
    * @return an array of files. The first file is stdout, the second is stderr.
+   * @throws IOException 
    */
-  static File[] prepareLogFiles(TaskAttemptID taskid) {
+  File[] prepareLogFiles(TaskAttemptID taskid) throws IOException {
     File[] logFiles = new File[2];
     logFiles[0] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
     logFiles[1] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
@@ -277,9 +282,34 @@ abstract class TaskRunner extends Thread {
       Localizer.PermissionsHandler.setPermissions(logDir,
           Localizer.PermissionsHandler.sevenZeroZero);
     }
+    // write job acls into a file to know the access for task logs
+    writeJobACLs(logDir);
     return logFiles;
   }
 
+  // Writes job-view-acls and user name into an xml file
+  private void writeJobACLs(File logDir) throws IOException {
+    File aclFile = new File(logDir, TaskRunner.jobACLsFile);
+    Configuration aclConf = new Configuration(false);
+
+    // set the job view acls in aclConf
+    String jobViewACLs = conf.get(JobContext.JOB_ACL_VIEW_JOB);
+    if (jobViewACLs != null) {
+      aclConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
+    }
+    // set jobOwner as mapreduce.job.user.name in aclConf
+    String jobOwner = conf.getUser();
+    aclConf.set("user.name", jobOwner);
+    FileOutputStream out = new FileOutputStream(aclFile);
+    try {
+      aclConf.writeXml(out);
+    } finally {
+      out.close();
+    }
+    Localizer.PermissionsHandler.setPermissions(aclFile,
+        Localizer.PermissionsHandler.sevenZeroZero);
+  }
+
   /**
    * Write the child's configuration to the disk and set it in configuration so
    * that the child can pick it up from there.

+ 38 - 5
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -247,6 +247,10 @@ public class TaskTracker
   private int maxMapSlots;
   private int maxReduceSlots;
   private int failures;
+
+  // MROwner's ugi
+  private UserGroupInformation mrOwner;
+  private String supergroup;
   
   // Performance-related config knob to send an out-of-band heartbeat
   // on task completion
@@ -275,6 +279,9 @@ public class TaskTracker
   static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
       "mapred.tasktracker.memory_calculator_plugin";
 
+  // Manages job acls of jobs in TaskTracker
+  private TaskTrackerJobACLsManager jobACLsManager;
+
   /**
    * the minimum interval between jobtracker polls
    */
@@ -578,18 +585,22 @@ public class TaskTracker
   synchronized void initialize() throws IOException, InterruptedException {
     this.fConf = new JobConf(originalConf);
     String keytabFilename = fConf.get(TT_KEYTAB_FILE);
-    UserGroupInformation ttUgi;
     UserGroupInformation.setConfiguration(fConf);
     if (keytabFilename != null) {
       String desiredUser = fConf.get(TT_USER_NAME,
                                     System.getProperty("user.name"));
       UserGroupInformation.loginUserFromKeytab(desiredUser,
                                                keytabFilename);
-      ttUgi = UserGroupInformation.getLoginUser();
+      mrOwner = UserGroupInformation.getLoginUser();
 
     } else {
-      ttUgi = UserGroupInformation.getCurrentUser();
+      mrOwner = UserGroupInformation.getCurrentUser();
     }
+    supergroup = fConf.get(JobConf.MR_SUPERGROUP,
+                           "supergroup");
+    LOG.info("Starting tasktracker with owner as " + mrOwner.getShortUserName()
+             + " and supergroup as " + supergroup);
+
     localFs = FileSystem.getLocal(fConf);
     if (fConf.get("slave.host.name") != null) {
       this.localHostname = fConf.get("slave.host.name");
@@ -685,7 +696,7 @@ public class TaskTracker
         this.fConf, taskController);
 
     this.jobClient = (InterTrackerProtocol) 
-    ttUgi.doAs(new PrivilegedExceptionAction<Object>() {
+    mrOwner.doAs(new PrivilegedExceptionAction<Object>() {
       public Object run() throws IOException {
         return RPC.waitForProxy(InterTrackerProtocol.class,
             InterTrackerProtocol.versionID,
@@ -727,6 +738,22 @@ public class TaskTracker
       fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
   }
 
+  UserGroupInformation getMROwner() {
+    return mrOwner;
+  }
+
+  String getSuperGroup() {
+    return supergroup;
+  }
+  
+  /**
+   * Is job level authorization enabled on the TT ?
+   */
+  boolean isJobLevelAuthorizationEnabled() {
+    return fConf.getBoolean(
+        JobConf.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
+  }
+
   public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(
     Configuration conf) {
     return conf.getClass("mapred.tasktracker.instrumentation",
@@ -1229,10 +1256,13 @@ public class TaskTracker
     server.setAttribute("localDirAllocator", localDirAllocator);
     server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
     server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
-    server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class);
+    server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
     server.start();
     this.httpPort = server.getPort();
     checkJettyPort(httpPort);
+    
+    // Initialize the jobACLSManager
+    jobACLsManager = new TaskTrackerJobACLsManager(this);
     initialize();
   }
 
@@ -3793,4 +3823,7 @@ public class TaskTracker
       return localJobTokenFileStr;
     }
 
+    TaskTrackerJobACLsManager getJobACLsManager() {
+      return jobACLsManager;
+    }
 }

+ 48 - 0
src/mapred/org/apache/hadoop/mapred/TaskTrackerJobACLsManager.java

@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Manages the job ACLs and the operations on them at TaskTracker.
+ *
+ */
+public class TaskTrackerJobACLsManager extends JobACLsManager {
+
+  static final Log LOG = LogFactory.getLog(TaskTrackerJobACLsManager.class);
+
+  private TaskTracker taskTracker = null;
+
+  public TaskTrackerJobACLsManager(TaskTracker tracker) {
+    taskTracker = tracker;
+  }
+
+  @Override
+  protected boolean isJobLevelAuthorizationEnabled() {
+    return taskTracker.isJobLevelAuthorizationEnabled();
+  }
+
+  @Override
+  protected boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI) {
+    return JobTracker.isSuperUserOrSuperGroup(callerUGI,
+        taskTracker.getMROwner(), taskTracker.getSuperGroup());
+  }
+}

+ 5 - 3
src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java

@@ -82,7 +82,8 @@ public abstract class ClusterMapReduceTestCase extends TestCase {
 
       ConfigurableMiniMRCluster.setConfiguration(props);
       //noinspection deprecation
-      mrCluster = new ConfigurableMiniMRCluster(2, getFileSystem().getName(), 1);
+      mrCluster = new ConfigurableMiniMRCluster(2, getFileSystem().getName(),
+                                                1, conf);
     }
   }
 
@@ -94,8 +95,9 @@ public abstract class ClusterMapReduceTestCase extends TestCase {
     }
 
     public ConfigurableMiniMRCluster(int numTaskTrackers, String namenode,
-                                     int numDir) throws Exception {
-      super(numTaskTrackers, namenode, numDir);
+                                     int numDir, JobConf conf)
+        throws Exception {
+      super(0,0, numTaskTrackers, namenode, numDir, null, null, null, conf);
     }
 
     public JobConf createJobConf() {

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestJobACLs.java

@@ -71,7 +71,7 @@ public class TestJobACLs {
     JobConf conf = new JobConf();
 
     // Enable job-level authorization
-    conf.setBoolean(JobTracker.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, true);
+    conf.setBoolean(JobConf.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, true);
 
     // Enable CompletedJobStore
     FileSystem fs = FileSystem.getLocal(conf);

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestNodeRefresh.java

@@ -210,7 +210,7 @@ public class TestNodeRefresh extends TestCase {
     // start a cluster with 1 host and specified superuser and supergroup
     Configuration conf = new Configuration();
     // set the supergroup
-    conf.set("mapred.permissions.supergroup", "abc");
+    conf.set("mapreduce.cluster.permissions.supergroup", "abc");
     startCluster(2, 1, 0, UserGroupInformation.createRemoteUser("user1"), conf);
     
     conf = mr.createJobConf(new JobConf(conf));

+ 26 - 2
src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java

@@ -28,6 +28,7 @@ import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -457,6 +458,11 @@ public class TestTaskTrackerLocalization extends TestCase {
     localizedJobConf = tracker.localizeJobFiles(task, 
         new TaskTracker.RunningJob(task.getJobID()));
 
+    // Set job view ACLs in conf sothat validation of contents of jobACLsFile
+    // can be done against this value. Have both users and groups
+    String jobViewACLs = "user1,user2, group1,group2";
+    localizedJobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
+
     // Now initialize the job via task-controller so as to set
     // ownership/permissions of jars, job-work-dir
     JobInitializationContext jobContext = new JobInitializationContext();
@@ -496,7 +502,7 @@ public class TestTaskTrackerLocalization extends TestCase {
     runner.setupChildTaskConfiguration(lDirAlloc);
     TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
         localizedJobConf);
-    attemptLogFiles = TaskRunner.prepareLogFiles(task.getTaskID());
+    attemptLogFiles = runner.prepareLogFiles(task.getTaskID());
 
     // Make sure the task-conf file is created
     Path localTaskFile =
@@ -566,6 +572,24 @@ public class TestTaskTrackerLocalization extends TestCase {
         + expectedStderr.toString() + " Observed : "
         + attemptLogFiles[1].toString(), expectedStderr.toString().equals(
         attemptLogFiles[1].toString()));
+
+    // Make sure that the job ACLs file exists in the task log dir
+    File jobACLsFile = new File(logDir, TaskRunner.jobACLsFile);
+    assertTrue("JobACLsFile is missing in the task log dir " + logDir,
+        jobACLsFile.exists());
+
+    // With default task controller, the job-acls file is owned by TT and
+    // permissions are 700
+    checkFilePermissions(jobACLsFile.getAbsolutePath(), "-rwx------",
+        taskTrackerUGI.getShortUserName(), taskTrackerUGI.getGroupNames()[0]);
+
+    // Validate the contents of jobACLsFile(both user name and job-view-acls)
+    Configuration jobACLsConf =
+        TaskLogServlet.getConfFromJobACLsFile(task.getTaskID().toString());
+    assertTrue(jobACLsConf.get("user.name").equals(
+        localizedJobConf.getUser()));
+    assertTrue(jobACLsConf.get(JobContext.JOB_ACL_VIEW_JOB).
+        equals(localizedJobConf.get(JobContext.JOB_ACL_VIEW_JOB)));
   }
 
   /**
@@ -711,7 +735,7 @@ public class TestTaskTrackerLocalization extends TestCase {
     runner.setupChildTaskConfiguration(lDirAlloc);
     TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
         localizedJobConf);
-    TaskRunner.prepareLogFiles(task.getTaskID());
+    runner.prepareLogFiles(task.getTaskID());
     Path localTaskFile =
         lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
             .getUser(), task.getJobID().toString(), task.getTaskID()

+ 628 - 0
src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java

@@ -0,0 +1,628 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URL;
+import java.net.HttpURLConnection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.http.TestHttpServer.DummyFilterInitializer;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class TestWebUIAuthorization extends ClusterMapReduceTestCase {
+
+  private static final Log LOG = LogFactory.getLog(
+      TestWebUIAuthorization.class);
+
+  // user1 submits the jobs
+  private static final String jobSubmitter = "user1";
+  // mrOwner starts the cluster
+  private static String mrOwner = null;
+  // member of supergroup
+  private static final String superGroupMember = "user2";
+  // "colleague1" is there in job-view-acls config
+  private static final String viewColleague = "colleague1";
+  // "colleague2" is there in job-modify-acls config
+  private static final String modifyColleague = "colleague2";
+  // "colleague3" is there in both job-view-acls and job-modify-acls
+  private static final String viewAndModifyColleague = "colleague3";
+  // "evilJohn" is not having view/modify access on the jobs
+  private static final String unauthorizedUser = "evilJohn";
+
+  protected void setUp() throws Exception {
+    // do not do anything
+  };
+
+  /** access a url, ignoring some IOException such as the page does not exist */
+  static int getHttpStatusCode(String urlstring, String userName,
+      String method) throws IOException {
+    LOG.info("Accessing " + urlstring + " as user " + userName);
+    URL url = new URL(urlstring + "&user.name=" + userName);
+    HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+    connection.setRequestMethod(method);
+    if (method.equals("POST")) {
+      String encodedData = "action=kill&user.name=" + userName;      
+      connection.setRequestProperty("Content-Type",
+                                    "application/x-www-form-urlencoded");
+      connection.setRequestProperty("Content-Length",
+                                    Integer.toString(encodedData.length()));
+      connection.setDoOutput(true);
+
+      OutputStream os = connection.getOutputStream();
+      os.write(encodedData.getBytes());
+    }
+    connection.connect();
+
+    return connection.getResponseCode();
+  }
+
+  public static class MyGroupsProvider extends ShellBasedUnixGroupsMapping {
+    static Map<String, List<String>> mapping = new HashMap<String, List<String>>();
+
+    @Override
+    public List<String> getGroups(String user) throws IOException {
+      return mapping.get(user);
+    }
+  }
+
+  /**
+   * Validates the given jsp/servlet against different user names who
+   * can(or cannot) view the job.
+   * (1) jobSubmitter can view the job
+   * (2) superGroupMember can view the job
+   * (3) user mentioned in job-view-acls should be able to view the job
+   * (4) user mentioned in job-modify-acls but not in job-view-acls
+   *     cannot view the job
+   * (5) other unauthorized users cannot view the job
+   */
+  private void validateViewJob(String url, String method) throws IOException {
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(url, jobSubmitter, method));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(url, superGroupMember, method));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(url, mrOwner, method));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(url, viewColleague, method));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(url, viewAndModifyColleague, method));
+    assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
+        getHttpStatusCode(url, modifyColleague, method));
+    assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
+        getHttpStatusCode(url, unauthorizedUser, method));
+  }
+
+  /**
+   * Validates the given jsp/servlet against different user names who
+   * can(or cannot) modify the job.
+   * (1) jobSubmitter and superGroupMember can modify the job. But we are not
+   *     validating this in this method. Let the caller explicitly validate
+   *     this, if needed.
+   * (2) user mentioned in job-view-acls but not in job-modify-acls cannot
+   *     modify the job
+   * (3) user mentioned in job-modify-acls (irrespective of job-view-acls)
+   *     can modify the job
+   * (4) other unauthorized users cannot modify the job
+   */
+  private void validateModifyJob(String url, String method) throws IOException {
+    assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
+        getHttpStatusCode(url, viewColleague, method));
+    assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
+        getHttpStatusCode(url, unauthorizedUser, method));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(url, modifyColleague, method));
+  }
+
+  // starts a sleep job with 1 map task that runs for a long time
+  private RunningJob startSleepJobAsUser(String user, final JobConf conf)
+      throws Exception {
+	final SleepJob sleepJob = new SleepJob();
+    sleepJob.setConf(conf);
+    UserGroupInformation jobSubmitterUGI = 
+        UserGroupInformation.createRemoteUser(user);
+    RunningJob job =
+        jobSubmitterUGI.doAs(new PrivilegedExceptionAction<RunningJob>() {
+        public RunningJob run() throws Exception {
+          JobClient jobClient = new JobClient(conf);
+          SleepJob sleepJob = new SleepJob();
+          sleepJob.setConf(conf);
+          JobConf jobConf =
+              sleepJob.setupJobConf(1, 0, 900000, 1, 1000, 1000);
+          RunningJob runningJob = jobClient.submitJob(jobConf);
+          return runningJob;
+        }
+      });
+    return job;
+  }
+
+  // Waits till the map task gets started and gets its tipId from map reports
+  // and returns the tipId
+  private TaskID getTIPId(MiniMRCluster cluster,
+      org.apache.hadoop.mapreduce.JobID jobid) throws Exception {
+    JobClient client = new JobClient(cluster.createJobConf());
+    JobID jobId = (JobID) jobid;
+    TaskReport[] mapReports = null;
+
+    TaskID tipId = null;
+    do { // make sure that the map task is running
+      Thread.sleep(200);
+      mapReports = client.getMapTaskReports(jobId);
+    } while (mapReports.length == 0);
+
+    for (TaskReport r : mapReports) {
+      tipId = r.getTaskID();
+      break;// because we have only one map
+    }
+    return tipId;
+  }
+
+  /**
+   * Make sure that the given user can do killJob using jobdetails.jsp url
+   * @param cluster
+   * @param conf
+   * @param jtURL
+   * @param jobTrackerJSP
+   * @param user
+   * @throws Exception
+   */
+  private void confirmJobDetailsJSPKillJobAsUser(MiniMRCluster cluster,
+      JobConf conf, String jtURL, String jobTrackerJSP, String user)
+      throws Exception {
+    RunningJob job = startSleepJobAsUser(jobSubmitter, conf);
+    org.apache.hadoop.mapreduce.JobID jobid = job.getID();
+    getTIPId(cluster, jobid);// wait till the map task is started
+    // jobDetailsJSP killJob url
+    String url = jtURL + "/jobdetails.jsp?" +
+        "action=kill&jobid="+ jobid.toString();
+    try {
+      assertEquals(HttpURLConnection.HTTP_OK,
+          getHttpStatusCode(url, user, "POST"));
+    } finally {
+      if (!job.isComplete()) {
+        LOG.info("Killing job " + jobid + " from finally block");
+        assertEquals(HttpURLConnection.HTTP_OK,
+            getHttpStatusCode(jobTrackerJSP + "&killJobs=true&jobCheckBox=" +
+            jobid.toString(), jobSubmitter, "GET"));
+      }
+    }
+  }
+
+  /**
+   * Starts a sleep job and tries to kill the job using jobdetails.jsp as
+   * (1) viewColleague (2) unauthorizedUser (3) modifyColleague
+   * (4) viewAndModifyColleague (5) mrOwner (6) superGroupMember and
+   * (7) jobSubmitter
+   *
+   * Validates the given jsp/servlet against different user names who
+   * can(or cannot) do both view and modify on the job.
+   * (1) jobSubmitter, mrOwner and superGroupMember can do both view and modify
+   *     on the job. But we are not validating this in this method. Let the
+   *     caller explicitly validate this, if needed.
+   * (2) user mentioned in job-view-acls but not in job-modify-acls cannot
+   *     do this
+   * (3) user mentioned in job-modify-acls but not in job-view-acls cannot
+   *     do this
+   * (4) other unauthorized users cannot do this
+   *
+   * @throws Exception
+   */
+  private void validateJobDetailsJSPKillJob(MiniMRCluster cluster,
+      JobConf clusterConf, String jtURL) throws Exception {
+
+    JobConf conf = new JobConf(cluster.createJobConf());
+    conf.set(JobContext.JOB_ACL_VIEW_JOB, viewColleague + " group3");
+
+    // Let us add group1 and group3 to modify-job-acl. So modifyColleague and
+    // viewAndModifyColleague will be able to modify the job
+    conf.set(JobContext.JOB_ACL_MODIFY_JOB, " group1,group3");
+
+    String jobTrackerJSP =  jtURL + "/jobtracker.jsp?a=b";
+    RunningJob job = startSleepJobAsUser(jobSubmitter, conf);
+    org.apache.hadoop.mapreduce.JobID jobid = job.getID();
+    getTIPId(cluster, jobid);// wait till the map task is started
+    // jobDetailsJSPKillJobAction url
+    String url = jtURL + "/jobdetails.jsp?" +
+        "action=kill&jobid="+ jobid.toString();
+    try {
+      assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
+          getHttpStatusCode(url, viewColleague, "POST"));
+      assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
+          getHttpStatusCode(url, unauthorizedUser, "POST"));
+      assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
+          getHttpStatusCode(url, modifyColleague, "POST"));
+      assertEquals(HttpURLConnection.HTTP_OK,
+          getHttpStatusCode(url, viewAndModifyColleague, "POST"));
+    } finally {
+      if (!job.isComplete()) {
+        LOG.info("Killing job " + jobid + " from finally block");
+        assertEquals(HttpURLConnection.HTTP_OK,
+            getHttpStatusCode(jobTrackerJSP + "&killJobs=true&jobCheckBox=" +
+            jobid.toString(), jobSubmitter, "GET"));
+      }
+    }
+
+    // check if jobSubmitter, mrOwner and superGroupMember can do killJob
+    // using jobdetails.jsp url
+    confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
+                                       jobSubmitter);
+    confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
+                                       mrOwner);
+    confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
+                                       superGroupMember);
+  }
+
+  /**
+   * Make sure that the given user can do killJob using jobtracker.jsp url
+   * @param cluster
+   * @param conf
+   * @param jtURL
+   * @param user
+   * @throws Exception
+   */
+  private void confirmJobTrackerJSPKillJobAsUser(MiniMRCluster cluster,
+      JobConf conf, String jtURL, String user)
+      throws Exception {
+    String jobTrackerJSP =  jtURL + "/jobtracker.jsp?a=b";
+    RunningJob job = startSleepJobAsUser(jobSubmitter, conf);
+    org.apache.hadoop.mapreduce.JobID jobid = job.getID();
+    getTIPId(cluster, jobid);// wait till the map task is started
+    // jobTrackerJSP killJob url
+    String url = jobTrackerJSP +
+        "&killJobs=true&jobCheckBox=" + jobid.toString();
+    try {
+      assertEquals(HttpURLConnection.HTTP_OK,
+          getHttpStatusCode(url, user, "POST"));
+    } finally {
+      if (!job.isComplete()) {
+        LOG.info("Killing job " + jobid + " from finally block");
+        assertEquals(HttpURLConnection.HTTP_OK,
+            getHttpStatusCode(jobTrackerJSP + "&killJobs=true&jobCheckBox=" +
+            jobid.toString(), jobSubmitter, "GET"));
+      }
+    }
+  }
+
+  /**
+   * Waits for a while for the job to become completed
+   * @param job
+   * @throws IOException
+   */
+  void waitForKillJobToFinish(RunningJob job) throws IOException {
+    for (int i = 0;!job.isComplete() && i < 20;i++) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e){
+        LOG.warn("Interrupted while waiting for killJob() to finish for "
+                 + job.getID());
+      }
+    }
+  }
+
+  /**
+   * Make sure that multiple jobs get killed using jobtracker.jsp url when
+   * user has modify access on only some of those jobs.
+   * @param cluster
+   * @param conf
+   * @param jtURL
+   * @param user
+   * @throws Exception
+   */
+  private void validateKillMultipleJobs(MiniMRCluster cluster,
+      JobConf conf, String jtURL) throws Exception {
+    String jobTrackerJSP =  jtURL + "/jobtracker.jsp?a=b";
+    // jobTrackerJSP killJob url
+    String url = jobTrackerJSP + "&killJobs=true";
+    // view-job-acl doesn't matter for killJob from jobtracker jsp page
+    conf.set(JobContext.JOB_ACL_VIEW_JOB, "");
+    
+    // Let us start jobs as 4 different users(none of these 4 users is
+    // mrOwner and none of these users is a member of superGroup). So only
+    // based on the config JobContext.JOB_ACL_MODIFY_JOB being set here,
+    // killJob on each of the jobs will be succeeded.
+
+    // start 1st job.
+    // Out of these 4 users, only jobSubmitter can do killJob on 1st job
+    conf.set(JobContext.JOB_ACL_MODIFY_JOB, "");
+    RunningJob job1 = startSleepJobAsUser(jobSubmitter, conf);
+    org.apache.hadoop.mapreduce.JobID jobid = job1.getID();
+    getTIPId(cluster, jobid);// wait till the map task is started
+    url = url.concat("&jobCheckBox=" + jobid.toString());
+    // start 2nd job.
+    // Out of these 4 users, only viewColleague can do killJob on 2nd job
+    RunningJob job2 = startSleepJobAsUser(viewColleague, conf);
+    jobid = job2.getID();
+    getTIPId(cluster, jobid);// wait till the map task is started
+    url = url.concat("&jobCheckBox=" + jobid.toString());
+    // start 3rd job.
+    // Out of these 4 users, only modifyColleague can do killJob on 3rd job
+    RunningJob job3 = startSleepJobAsUser(modifyColleague, conf);
+    jobid = job3.getID();
+    getTIPId(cluster, jobid);// wait till the map task is started
+    url = url.concat("&jobCheckBox=" + jobid.toString());
+    // start 4rd job.
+    // Out of these 4 users, viewColleague and viewAndModifyColleague
+    // can do killJob on 4th job
+    conf.set(JobContext.JOB_ACL_MODIFY_JOB, viewColleague);
+    RunningJob job4 = startSleepJobAsUser(viewAndModifyColleague, conf);
+    jobid = job4.getID();
+    getTIPId(cluster, jobid);// wait till the map task is started
+    url = url.concat("&jobCheckBox=" + jobid.toString());
+
+    try {
+      // Try killing all the 4 jobs as user viewColleague who can kill only
+      // 2nd and 4th jobs. Check if 1st and 3rd jobs are not killed and
+      // 2nd and 4th jobs get killed
+      assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
+          getHttpStatusCode(url, viewColleague, "POST"));
+      
+      waitForKillJobToFinish(job2);
+      assertTrue("killJob failed for a job for which user has "
+          + "job-modify permission", job2.isComplete());
+      waitForKillJobToFinish(job4);
+      assertTrue("killJob failed for a job for which user has "
+          + "job-modify permission", job4.isComplete());
+      assertFalse("killJob succeeded for a job for which user doesnot "
+          + " have job-modify permission", job1.isComplete());
+      assertFalse("killJob succeeded for a job for which user doesnot "
+          + " have job-modify permission", job3.isComplete());
+    } finally {
+      // Kill all 4 jobs as user mrOwner(even though some of them
+      // were already killed)
+      assertEquals(HttpURLConnection.HTTP_OK,
+          getHttpStatusCode(url, mrOwner, "GET"));
+    }
+  }
+
+  /**
+   * Run a job and validate if JSPs/Servlets are going through authentication
+   * and authorization.
+   * @throws Exception 
+   */
+  @Test
+  public void testWebUIAuthorization() throws Exception {
+    JobConf conf = new JobConf();
+    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+        MyGroupsProvider.class.getName());
+    Groups.getUserToGroupsMappingService(conf);
+    Properties props = new Properties();
+    props.setProperty("hadoop.http.filter.initializers",
+        DummyFilterInitializer.class.getName());
+    props.setProperty(
+       JobConf.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, String.valueOf(true));
+    props.setProperty("dfs.permissions", "false");
+    
+    props.setProperty(JSPUtil.PRIVATE_ACTIONS_KEY, "true");
+    props.setProperty("mapreduce.job.committer.setup.cleanup.needed", "false");
+    props.setProperty(JobConf.MR_SUPERGROUP, "superGroup");
+
+    MyGroupsProvider.mapping.put(jobSubmitter, Arrays.asList("group1"));
+    MyGroupsProvider.mapping.put(viewColleague, Arrays.asList("group2"));
+    MyGroupsProvider.mapping.put(modifyColleague, Arrays.asList("group1"));
+    MyGroupsProvider.mapping.put(unauthorizedUser, Arrays.asList("evilSociety"));
+    MyGroupsProvider.mapping.put(superGroupMember, Arrays.asList("superGroup"));
+    MyGroupsProvider.mapping.put(viewAndModifyColleague, Arrays.asList("group3"));
+
+    mrOwner = UserGroupInformation.getCurrentUser().getShortUserName();
+    MyGroupsProvider.mapping.put(mrOwner, Arrays.asList(
+        new String[] { "group4", "group5" }));
+
+    startCluster(true, props);
+    MiniMRCluster cluster = getMRCluster();
+    int infoPort = cluster.getJobTrackerRunner().getJobTrackerInfoPort();
+
+    JobConf clusterConf = cluster.createJobConf();
+    conf = new JobConf(clusterConf);
+    conf.set(JobContext.JOB_ACL_VIEW_JOB, viewColleague + " group3");
+
+    // Let us add group1 and group3 to modify-job-acl. So modifyColleague and
+    // viewAndModifyColleague will be able to modify the job
+    conf.set(JobContext.JOB_ACL_MODIFY_JOB, " group1,group3");
+
+    RunningJob job = startSleepJobAsUser(jobSubmitter, conf);
+
+    org.apache.hadoop.mapreduce.JobID jobid = job.getID();
+
+    String jtURL = "http://localhost:" + infoPort;
+
+    String jobTrackerJSP =  jtURL + "/jobtracker.jsp?a=b";
+    try {
+      // Currently, authorization is not done for jobtracker page. So allow
+      // everyone to view it.
+      validateJobTrackerJSPAccess(jtURL);
+      validateJobDetailsJSPAccess(jobid, jtURL);
+      validateTaskGraphServletAccess(jobid, jtURL);
+      validateJobTasksJSPAccess(jobid, jtURL);
+      validateJobConfJSPAccess(jobid, jtURL);
+      validateJobFailuresJSPAccess(jobid, jtURL);
+      valiateJobBlacklistedTrackerJSPAccess(jobid, jtURL);
+      validateJobTrackerJSPSetPriorityAction(jobid, jtURL);
+
+      // Wait for the tip to start so as to test task related JSP
+      TaskID tipId = getTIPId(cluster, jobid);
+      validateTaskStatsJSPAccess(jobid, jtURL, tipId);
+      validateTaskDetailsJSPAccess(jobid, jtURL, tipId);
+      validateJobTrackerJSPKillJobAction(jobid, jtURL);
+    } finally {
+      if (!job.isComplete()) { // kill the job(as jobSubmitter) if needed
+        LOG.info("Killing job " + jobid + " from finally block");
+        assertEquals(HttpURLConnection.HTTP_OK,
+            getHttpStatusCode(jobTrackerJSP + "&killJobs=true&jobCheckBox=" +
+            jobid.toString(), jobSubmitter, "GET"));
+      }
+    }
+
+    // validate killJob of jobdetails.jsp
+    validateJobDetailsJSPKillJob(cluster, clusterConf, jtURL);
+
+    // validate killJob of jobtracker.jsp as users viewAndModifyColleague,
+    // jobSubmitter, mrOwner and superGroupMember
+    confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL,
+        viewAndModifyColleague);
+    confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, jobSubmitter);
+    confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, mrOwner);
+    confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL,
+        superGroupMember);
+
+    // validate killing of multiple jobs using jobtracker jsp and check
+    // if all the jobs which can be killed by user are actually the ones that
+    // got killed
+    validateKillMultipleJobs(cluster, conf, jtURL);
+  }
+
+  // validate killJob of jobtracker.jsp
+  private void validateJobTrackerJSPKillJobAction(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
+      throws IOException {
+    String jobTrackerJSP =  jtURL + "/jobtracker.jsp?a=b";
+    String jobTrackerJSPKillJobAction = jobTrackerJSP +
+        "&killJobs=true&jobCheckBox="+ jobid.toString();
+    validateModifyJob(jobTrackerJSPKillJobAction, "GET");
+  }
+
+  // validate viewing of job of taskdetails.jsp
+  private void validateTaskDetailsJSPAccess(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL, TaskID tipId)
+      throws IOException {
+    String taskDetailsJSP =  jtURL + "/taskdetails.jsp?jobid=" +
+        jobid.toString() + "&tipid=" + tipId;
+    validateViewJob(taskDetailsJSP, "GET");
+  }
+
+  // validate taskstats.jsp
+  private void validateTaskStatsJSPAccess(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL, TaskID tipId)
+      throws IOException {
+    String taskStatsJSP =  jtURL + "/taskstats.jsp?jobid=" +
+        jobid.toString() + "&tipid=" + tipId;
+    validateViewJob(taskStatsJSP, "GET");
+  }
+
+  // validate setJobPriority
+  private void validateJobTrackerJSPSetPriorityAction(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
+      throws IOException {
+    String jobTrackerJSP =  jtURL + "/jobtracker.jsp?a=b";
+    String jobTrackerJSPSetJobPriorityAction = jobTrackerJSP +
+        "&changeJobPriority=true&setJobPriority="+"HIGH"+"&jobCheckBox=" +
+        jobid.toString();
+    validateModifyJob(jobTrackerJSPSetJobPriorityAction, "GET");
+    // jobSubmitter, mrOwner and superGroupMember are not validated for
+    // job-modify permission in validateModifyJob(). So let us do it
+    // explicitly here
+    assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
+        jobTrackerJSPSetJobPriorityAction, jobSubmitter, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
+        jobTrackerJSPSetJobPriorityAction, superGroupMember, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
+        jobTrackerJSPSetJobPriorityAction, mrOwner, "GET"));
+  }
+
+  // validate access of jobblacklistedtrackers.jsp
+  private void valiateJobBlacklistedTrackerJSPAccess(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
+      throws IOException {
+    String jobBlacklistedTrackersJSP =  jtURL +
+        "/jobblacklistedtrackers.jsp?jobid="+jobid.toString();
+    validateViewJob(jobBlacklistedTrackersJSP, "GET");
+  }
+
+  // validate access of jobfailures.jsp
+  private void validateJobFailuresJSPAccess(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
+      throws IOException {
+    String jobFailuresJSP =  jtURL + "/jobfailures.jsp?jobid="+jobid.toString();
+    validateViewJob(jobFailuresJSP, "GET");
+  }
+
+  // validate access of jobconf.jsp
+  private void validateJobConfJSPAccess(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
+      throws IOException {
+    String jobConfJSP =  jtURL + "/jobconf.jsp?jobid="+jobid.toString();
+    validateViewJob(jobConfJSP, "GET");
+  }
+
+  // validate access of jobtasks.jsp
+  private void validateJobTasksJSPAccess(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
+      throws IOException {
+    String jobTasksJSP =  jtURL + "/jobtasks.jsp?jobid="
+        + jobid.toString() + "&type=map&pagenum=1&state=running";
+    validateViewJob(jobTasksJSP, "GET");
+  }
+
+  // validate access of TaskGraphServlet
+  private void validateTaskGraphServletAccess(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
+      throws IOException {
+    String taskGraphServlet = jtURL + "/taskgraph?type=map&jobid="
+        + jobid.toString();
+    validateViewJob(taskGraphServlet, "GET");
+    taskGraphServlet = jtURL + "/taskgraph?type=reduce&jobid="
+        + jobid.toString();
+    validateViewJob(taskGraphServlet, "GET");
+  }
+
+  // validate access of jobdetails.jsp
+  private void validateJobDetailsJSPAccess(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
+      throws IOException {
+    String jobDetailsJSP =  jtURL + "/jobdetails.jsp?jobid="
+        + jobid.toString();
+    validateViewJob(jobDetailsJSP, "GET");
+  }
+
+  // validate access of jobtracker.jsp
+  private void validateJobTrackerJSPAccess(String jtURL)
+      throws IOException {
+    String jobTrackerJSP =  jtURL + "/jobtracker.jsp?a=b";
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(jobTrackerJSP, jobSubmitter, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(jobTrackerJSP, viewColleague, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(jobTrackerJSP, unauthorizedUser, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(jobTrackerJSP, modifyColleague, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(jobTrackerJSP, viewAndModifyColleague, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(jobTrackerJSP, mrOwner, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(jobTrackerJSP, superGroupMember, "GET"));
+  }
+}

+ 52 - 0
src/webapps/job/job_authorization_error.jsp

@@ -0,0 +1,52 @@
+<%
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file 
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+%>
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="javax.servlet.*"
+  import="javax.servlet.http.*"
+  import="java.io.*"
+  import="java.net.URL"
+  import="org.apache.hadoop.util.*"
+%>
+<%!	private static final long serialVersionUID = 1L;
+%>
+
+<html>
+<head>
+<title>Error: User cannot access this Job</title>
+</head>
+<body>
+<h2>Error: User cannot do this operation on this Job</h2><br>
+
+<%
+  String errorMsg = (String) request.getAttribute("error.msg");
+%>
+
+<font size="5"> 
+<%
+  out.println(errorMsg);
+%>
+</font>
+
+<hr>
+
+<%
+out.println(ServletUtil.htmlFooter());
+%>

+ 8 - 1
src/webapps/job/jobblacklistedtrackers.jsp

@@ -6,6 +6,7 @@
   import="java.util.*"
   import="org.apache.hadoop.http.HtmlQuoting"
   import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.mapred.JSPUtil.JobWithViewAccessCheck"
   import="org.apache.hadoop.util.*"
 %>
 
@@ -38,7 +39,13 @@
   	  return;
     }
     
-    JobInProgress job = (JobInProgress) tracker.getJob(JobID.forName(jobId));
+    JobWithViewAccessCheck myJob = JSPUtil.checkAccessAndGetJob(tracker,
+        JobID.forName(jobId), request, response);
+    if (!myJob.isViewJobAllowed()) {
+      return; // user is not authorized to view this job
+    }
+
+    JobInProgress job = myJob.getJob();
     if (job == null) {
       out.print("<b>Job " + jobId + " not found.</b><br>\n");
       return;

+ 16 - 3
src/webapps/job/jobconf.jsp

@@ -5,16 +5,17 @@
   import="java.io.*"
   import="java.net.URL"
   import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.mapred.JSPUtil.JobWithViewAccessCheck"
   import="org.apache.hadoop.util.*"
 %>
 
 
 <%
-  JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
-  String jobId = request.getParameter("jobid");
+  final JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+  final String jobId = request.getParameter("jobid");
   if (jobId == null) {
     out.println("<h2>Missing 'jobid' for fetching job configuration!</h2>");
- 	return;
+    return;
   }
 %>
   
@@ -26,6 +27,18 @@
 <h2>Job Configuration: JobId - <%= jobId %></h2><br>
 
 <%
+  JobWithViewAccessCheck myJob = JSPUtil.checkAccessAndGetJob(tracker,
+      JobID.forName(jobId), request, response);
+  if (!myJob.isViewJobAllowed()) {
+    return; // user is not authorized to view this job
+  }
+
+  JobInProgress job = myJob.getJob();
+  if (job == null) {
+    out.print("<b>Job " + jobId + " not found.</b><br>\n");
+    return;
+  }
+
   String jobFilePath = JobTracker.getLocalJobFilePath(JobID.forName(jobId));
   FileInputStream jobFile = null;
   try {

+ 98 - 21
src/webapps/job/jobdetails.jsp

@@ -8,18 +8,23 @@
   import="java.text.DecimalFormat"
   import="org.apache.hadoop.http.HtmlQuoting"
   import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.mapred.JSPUtil.JobWithViewAccessCheck"
   import="org.apache.hadoop.mapreduce.TaskType"
   import="org.apache.hadoop.util.*"
+  import="org.apache.hadoop.mapreduce.JobACL"
+  import="org.apache.hadoop.security.UserGroupInformation"
+  import="java.security.PrivilegedExceptionAction"
+  import="org.apache.hadoop.security.AccessControlException"
+  import="org.apache.hadoop.security.authorize.AccessControlList"
 %>
 
 <%
-  JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+  final JobTracker tracker = (JobTracker) application.getAttribute(
+      "job.tracker");
   String trackerName = 
            StringUtils.simpleHostname(tracker.getJobTrackerMachine());
 %>
 <%!
-  private static final String PRIVATE_ACTIONS_KEY 
-		= "webinterface.private.actions";
  
   private void printTaskSummary(JspWriter out,
                                 String jobId,
@@ -157,27 +162,83 @@
         catch (NumberFormatException ignored) {
         }
     }
-    JobID jobIdObj = JobID.forName(jobId);
-    JobInProgress job = (JobInProgress) tracker.getJob(jobIdObj);
-    
+    final JobID jobIdObj = JobID.forName(jobId);
+    JobWithViewAccessCheck myJob = JSPUtil.checkAccessAndGetJob(tracker, jobIdObj,
+                                                     request, response);
+    if (!myJob.isViewJobAllowed()) {
+      return; // user is not authorized to view this job
+    }
+
+    JobInProgress job = myJob.getJob();
+
+    final String newPriority = request.getParameter("prio");
+    String user = request.getRemoteUser();
+    UserGroupInformation ugi = null;
+    if (user != null) {
+      ugi = UserGroupInformation.createRemoteUser(user);
+    }
+
     String action = request.getParameter("action");
-    if(JSPUtil.conf.getBoolean(PRIVATE_ACTIONS_KEY, false) && 
+    if(JSPUtil.privateActionsAllowed(tracker.conf) && 
         "changeprio".equalsIgnoreCase(action) 
         && request.getMethod().equalsIgnoreCase("POST")) {
-      tracker.setJobPriority(jobIdObj, 
-                             JobPriority.valueOf(request.getParameter("prio")));
+      if (ugi != null) {
+        try {
+          ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            public Void run() throws IOException{
+
+              // checks job modify permission
+              tracker.setJobPriority(jobIdObj, 
+                  JobPriority.valueOf(newPriority));
+              return null;
+            }
+          });
+        } catch(AccessControlException e) {
+          String errMsg = "User " + user + " failed to modify priority of " +
+              jobIdObj + "!<br><br>" + e.getMessage() +
+              "<hr><a href=\"jobdetails.jsp?jobid=" + jobId +
+              "\">Go back to Job</a><br>";
+          JSPUtil.setErrorAndForward(errMsg, request, response);
+          return;
+        }
+      }
+      else {// no authorization needed
+        tracker.setJobPriority(jobIdObj,
+             JobPriority.valueOf(newPriority));;
+      }
     }
     
-    if(JSPUtil.conf.getBoolean(PRIVATE_ACTIONS_KEY, false)) {
-        action = request.getParameter("action");
-	    if(action!=null && action.equalsIgnoreCase("confirm")) {
-  	      printConfirm(out, jobId);
-    	    return;
-	    }
-  	    else if(action != null && action.equalsIgnoreCase("kill") && 
-  	        request.getMethod().equalsIgnoreCase("POST")) {
-	      tracker.killJob(jobIdObj);
-	    }
+    if(JSPUtil.privateActionsAllowed(tracker.conf)) {
+      action = request.getParameter("action");
+      if(action!=null && action.equalsIgnoreCase("confirm")) {
+        printConfirm(out, jobId);
+        return;
+      }
+      else if(action != null && action.equalsIgnoreCase("kill") &&
+          request.getMethod().equalsIgnoreCase("POST")) {
+        if (ugi != null) {
+          try {
+            ugi.doAs(new PrivilegedExceptionAction<Void>() {
+              public Void run() throws IOException{
+
+                // checks job modify permission
+                tracker.killJob(jobIdObj);// checks job modify permission
+                return null;
+              }
+            });
+          } catch(AccessControlException e) {
+            String errMsg = "User " + user + " failed to kill " + jobIdObj +
+                "!<br><br>" + e.getMessage() +
+                "<hr><a href=\"jobdetails.jsp?jobid=" + jobId +
+                "\">Go back to Job</a><br>";
+            JSPUtil.setErrorAndForward(errMsg, request, response);
+            return;
+          }
+        }
+        else {// no authorization needed
+          tracker.killJob(jobIdObj);
+        }
+      }
     }
 %>
 
@@ -219,6 +280,22 @@
         HtmlQuoting.quoteHtmlChars(profile.getJobName()) + "<br>\n");
     out.print("<b>Job File:</b> <a href=\"jobconf.jsp?jobid=" + jobId + "\">" +
         profile.getJobFile() + "</a><br>\n");
+
+    if (tracker.isJobLevelAuthorizationEnabled()) {
+      // Display job-view-acls and job-modify-acls configured for this job
+      Map<JobACL, AccessControlList> jobAcls = status.getJobACLs();
+      out.print("<b>Job-ACLs:</b><br>");
+      for (JobACL aclName : JobACL.values()) {
+        String aclConfigName = aclName.getAclName();
+        AccessControlList aclConfigured = jobAcls.get(aclName);
+        String aclStr = "";
+        if (aclConfigured != null) {
+          aclStr = aclConfigured.toString();
+        }
+        out.print("&nbsp;&nbsp;&nbsp;&nbsp;" + aclConfigName + ": "
+                  + aclStr + "<br>");
+      }
+    }
     out.print("<b>Job Setup:</b>");
     printJobLevelTaskSummary(out, jobId, "setup", 
                              job.getTasks(TaskType.JOB_SETUP));
@@ -365,7 +442,7 @@ if("off".equals(session.getAttribute("map.graph"))) { %>
 <%} }%>
 
 <hr>
-<% if(JSPUtil.conf.getBoolean(PRIVATE_ACTIONS_KEY, false)) { %>
+<% if(JSPUtil.privateActionsAllowed(tracker.conf)) { %>
   <table border="0"> <tr> <td>
   Change priority from <%=job.getPriority()%> to:
   <form action="jobdetails.jsp" method="post">
@@ -385,7 +462,7 @@ if("off".equals(session.getAttribute("map.graph"))) { %>
 
 <table border="0"> <tr>
     
-<% if(JSPUtil.conf.getBoolean(PRIVATE_ACTIONS_KEY, false) 
+<% if(JSPUtil.privateActionsAllowed(tracker.conf) 
     	&& runState == JobStatus.RUNNING) { %>
 	<br/><a href="jobdetails.jsp?action=confirm&jobid=<%=jobId%>"> Kill this job </a>
 <% } %>

+ 23 - 15
src/webapps/job/jobfailures.jsp

@@ -6,6 +6,7 @@
   import="java.util.*"
   import="org.apache.hadoop.http.HtmlQuoting"
   import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.mapred.JSPUtil.JobWithViewAccessCheck"
   import="org.apache.hadoop.mapreduce.TaskType"
   import="org.apache.hadoop.util.*"
 %>
@@ -18,7 +19,6 @@
 <%! 
   private void printFailedAttempts(JspWriter out,
                                    JobTracker tracker,
-                                   JobID jobId,
                                    TaskInProgress tip,
                                    TaskStatus.State failState) throws IOException {
     TaskStatus[] statuses = tip.getTaskStatuses();
@@ -30,9 +30,8 @@
         String taskTrackerName = statuses[i].getTaskTracker();
         TaskTrackerStatus taskTracker = tracker.getTaskTrackerStatus(taskTrackerName);
         out.print("<tr><td>" + statuses[i].getTaskID() +
-                  "</td><td><a href=\"taskdetails.jsp?jobid="+ jobId + 
-                  "&tipid=" + tipId + "\">" + tipId +
-                  "</a></td>");
+                  "</td><td><a href=\"taskdetails.jsp?tipid=" + tipId + "\">" +
+                  tipId + "</a></td>");
         if (taskTracker == null) {
           out.print("<td>" + taskTrackerName + "</td>");
         } else {
@@ -82,14 +81,10 @@
              
   private void printFailures(JspWriter out, 
                              JobTracker tracker,
-                             JobID jobId,
+                             JobInProgress job,
                              String kind, 
-                             String cause) throws IOException {
-    JobInProgress job = (JobInProgress) tracker.getJob(jobId);
-    if (job == null) {
-      out.print("<b>Job " + jobId + " not found.</b><br>\n");
-      return;
-    }
+                             String cause)
+      throws IOException, InterruptedException, ServletException {
     
     boolean includeMap = false;
     boolean includeReduce = false;
@@ -130,13 +125,13 @@
     if (includeMap) {
       TaskInProgress[] tips = job.getTasks(TaskType.MAP);
       for(int i=0; i < tips.length; ++i) {
-        printFailedAttempts(out, tracker, jobId, tips[i], state);
+        printFailedAttempts(out, tracker, tips[i], state);
       }
     }
     if (includeReduce) {
       TaskInProgress[] tips = job.getTasks(TaskType.REDUCE);
       for(int i=0; i < tips.length; ++i) {
-        printFailedAttempts(out, tracker, jobId, tips[i], state);
+        printFailedAttempts(out, tracker, tips[i], state);
       }
     }
     out.print("</table>\n");
@@ -150,6 +145,19 @@
       return;
     }
     JobID jobIdObj = JobID.forName(jobId);
+    
+    JobWithViewAccessCheck myJob = JSPUtil.checkAccessAndGetJob(
+        tracker, jobIdObj, request, response);
+    if (!myJob.isViewJobAllowed()) {
+      return; // user is not authorized to view this job
+    }
+
+    JobInProgress job = myJob.getJob();
+    if (job == null) {
+      out.print("<b>Job " + jobId + " not found.</b><br>\n");
+      return;
+    }
+
     String kind = request.getParameter("kind");
     String cause = request.getParameter("cause");
 %>
@@ -160,8 +168,8 @@
 <h1>Hadoop <a href="jobdetails.jsp?jobid=<%=jobId%>"><%=jobId%></a>
 failures on <a href="jobtracker.jsp"><%=trackerName%></a></h1>
 
-<% 
-    printFailures(out, tracker, jobIdObj, kind, cause); 
+<%
+    printFailures(out, tracker, job, kind, cause); 
 %>
 
 <hr>

+ 1 - 1
src/webapps/job/jobqueue_details.jsp

@@ -63,7 +63,7 @@ if(jobs == null || jobs.isEmpty()) {
 <br/>
 <hr/>
 <%=
-  JSPUtil.generateJobTable("Job List", jobs, 30, 0)
+  JSPUtil.generateJobTable("Job List", jobs, 30, 0, tracker.conf)
 %>
 <hr>
 <% } %>

+ 15 - 10
src/webapps/job/jobtasks.jsp

@@ -6,13 +6,14 @@
   import="java.util.*"
   import="org.apache.hadoop.http.HtmlQuoting"
   import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.mapred.JSPUtil.JobWithViewAccessCheck"
   import="org.apache.hadoop.util.*"
   import="java.lang.Integer"
   import="java.text.SimpleDateFormat"
 %>
 <%! static SimpleDateFormat dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss") ; %>
 <%
-  JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+  final JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
   String trackerName = 
            StringUtils.simpleHostname(tracker.getJobTrackerMachine());
   String jobid = request.getParameter("jobid");
@@ -20,6 +21,16 @@
     out.println("<h2>Missing 'jobid'!</h2>");
     return;
   }
+  final JobID jobidObj = JobID.forName(jobid);
+
+  JobWithViewAccessCheck myJob = JSPUtil.checkAccessAndGetJob(tracker, jobidObj,
+      request, response);
+  if (!myJob.isViewJobAllowed()) {
+    return; // user is not authorized to view this job
+  }
+
+  JobInProgress job = myJob.getJob();
+
   String type = request.getParameter("type");
   String pagenum = request.getParameter("pagenum");
   String state = request.getParameter("state");
@@ -27,10 +38,6 @@
   int pnum = Integer.parseInt(pagenum);
   int next_page = pnum+1;
   int numperpage = 2000;
-  JobID jobidObj = JobID.forName(jobid);
-  JobInProgress job = (JobInProgress) tracker.getJob(jobidObj);
-  JobProfile profile = (job != null) ? (job.getProfile()) : null;
-  JobStatus status = (job != null) ? (job.getStatus()) : null;
   TaskReport[] reports = null;
   int start_index = (pnum - 1) * numperpage;
   int end_index = start_index + numperpage;
@@ -92,9 +99,8 @@
     }
     for (int i = start_index ; i < end_index; i++) {
           TaskReport report = reports[i];
-          out.print("<tr><td><a href=\"taskdetails.jsp?jobid=" + jobid + 
-                    "&tipid=" + report.getTaskID() + "\">"  + 
-                    report.getTaskID() + "</a></td>");
+          out.print("<tr><td><a href=\"taskdetails.jsp?tipid=" +
+            report.getTaskID() + "\">"  + report.getTaskID() + "</a></td>");
          out.print("<td>" + StringUtils.formatPercent(report.getProgress(),2) +
         		   ServletUtil.percentageGraph(report.getProgress() * 100f, 80) + "</td>");
          out.print("<td>"  + HtmlQuoting.quoteHtmlChars(report.getState()) + "<br/></td>");
@@ -108,8 +114,7 @@
          }
          out.println("</pre><br/></td>");
          out.println("<td>" + 
-             "<a href=\"taskstats.jsp?jobid=" + jobid + 
-             "&tipid=" + report.getTaskID() +
+             "<a href=\"taskstats.jsp?tipid=" + report.getTaskID() +
              "\">" + report.getCounters().size() +
              "</a></td></tr>");
     }

+ 3 - 3
src/webapps/job/jobtracker.jsp

@@ -141,14 +141,14 @@ for(JobQueueInfo queue: queues) {
 <hr>
 
 <h2 id="running_jobs">Running Jobs</h2>
-<%=JSPUtil.generateJobTable("Running", runningJobs, 30, 0)%>
+<%=JSPUtil.generateJobTable("Running", runningJobs, 30, 0, tracker.conf)%>
 <hr>
 
 <%
 if (completedJobs.size() > 0) {
   out.print("<h2 id=\"completed_jobs\">Completed Jobs</h2>");
   out.print(JSPUtil.generateJobTable("Completed", completedJobs, 0, 
-    runningJobs.size()));
+    runningJobs.size(), tracker.conf));
   out.print("<hr>");
 }
 %>
@@ -157,7 +157,7 @@ if (completedJobs.size() > 0) {
 if (failedJobs.size() > 0) {
   out.print("<h2 id=\"failed_jobs\">Failed Jobs</h2>");
   out.print(JSPUtil.generateJobTable("Failed", failedJobs, 0, 
-    (runningJobs.size()+completedJobs.size())));
+    (runningJobs.size()+completedJobs.size()), tracker.conf));
   out.print("<hr>");
 }
 %>

+ 99 - 41
src/webapps/job/taskdetails.jsp

@@ -7,21 +7,22 @@
   import="java.util.*"
   import="org.apache.hadoop.http.HtmlQuoting"
   import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.mapred.JSPUtil.JobWithViewAccessCheck"
   import="org.apache.hadoop.util.*"
   import="java.text.SimpleDateFormat"  
-  import="org.apache.hadoop.util.*"
+  import="org.apache.hadoop.security.UserGroupInformation"
+  import="java.security.PrivilegedExceptionAction"
+  import="org.apache.hadoop.security.AccessControlException"
 %>
 <%!static SimpleDateFormat dateFormat = new SimpleDateFormat(
       "d-MMM-yyyy HH:mm:ss");
-
-  private static final String PRIVATE_ACTIONS_KEY = "webinterface.private.actions";%>
-<%!private void printConfirm(JspWriter out, String jobid, String tipid,
-      String taskid, String action) throws IOException {
-    String url = "taskdetails.jsp?jobid=" + jobid + "&tipid=" + tipid
-        + "&taskid=" + taskid;
+%>
+<%!private void printConfirm(JspWriter out,
+      String attemptid, String action) throws IOException {
+    String url = "taskdetails.jsp?attemptid=" + attemptid;
     out.print("<html><head><META http-equiv=\"refresh\" content=\"15;URL="
         + url + "\"></head>" + "<body><h3> Are you sure you want to kill/fail "
-        + taskid + " ?<h3><br><table border=\"0\"><tr><td width=\"100\">"
+        + attemptid + " ?<h3><br><table border=\"0\"><tr><td width=\"100\">"
         + "<form action=\"" + url + "\" method=\"post\">"
         + "<input type=\"hidden\" name=\"action\" value=\"" + action + "\" />"
         + "<input type=\"submit\" name=\"Kill/Fail\" value=\"Kill/Fail\" />"
@@ -31,53 +32,109 @@
         + "/></form></td></tr></table></body></html>");
   }%>
 <%
-    JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
-    String jobid = request.getParameter("jobid");
-    String tipid = request.getParameter("tipid");
-    String taskid = request.getParameter("taskid");
-    JobID jobidObj = JobID.forName(jobid);
-    TaskID tipidObj = TaskID.forName(tipid);
-    TaskAttemptID taskidObj = TaskAttemptID.forName(taskid);
-    
-    JobInProgress job = (JobInProgress) tracker.getJob(jobidObj);
-    
-    boolean privateActions = JSPUtil.conf.getBoolean(PRIVATE_ACTIONS_KEY,
-        false);
+    final JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+
+    String attemptid = request.getParameter("attemptid");
+    final TaskAttemptID attemptidObj = TaskAttemptID.forName(attemptid);
+
+    // Obtain tipid for attemptid, if attemptid is available.
+    TaskID tipidObj =
+        (attemptidObj == null) ? TaskID.forName(request.getParameter("tipid"))
+                               : attemptidObj.getTaskID();
+    if (tipidObj == null) {
+      out.print("<b>tipid sent is not valid.</b><br>\n");
+      return;
+    }
+    // Obtain jobid from tipid
+    final JobID jobidObj = tipidObj.getJobID();
+    String jobid = jobidObj.toString();
+
+    JobWithViewAccessCheck myJob = JSPUtil.checkAccessAndGetJob(tracker, jobidObj,
+        request, response);
+    if (!myJob.isViewJobAllowed()) {
+      return; // user is not authorized to view this job
+    }
+
+    JobInProgress job = myJob.getJob();
+    if (job == null) {
+      out.print("<b>Job " + jobid + " not found.</b><br>\n");
+      return;
+    }
+    boolean privateActions = JSPUtil.privateActionsAllowed(tracker.conf);
     if (privateActions) {
       String action = request.getParameter("action");
       if (action != null) {
+        String user = request.getRemoteUser();
+        UserGroupInformation ugi = null;
+        if (user != null) {
+          ugi = UserGroupInformation.createRemoteUser(user);
+        }
         if (action.equalsIgnoreCase("confirm")) {
           String subAction = request.getParameter("subaction");
           if (subAction == null)
             subAction = "fail-task";
-          printConfirm(out, jobid, tipid, taskid, subAction);
+          printConfirm(out, attemptid, subAction);
           return;
         }
         else if (action.equalsIgnoreCase("kill-task") 
             && request.getMethod().equalsIgnoreCase("POST")) {
-          tracker.killTask(taskidObj, false);
+          if (ugi != null) {
+            try {
+              ugi.doAs(new PrivilegedExceptionAction<Void>() {
+              public Void run() throws IOException{
+
+                tracker.killTask(attemptidObj, false);// checks job modify permission
+                return null;
+              }
+              });
+            } catch(AccessControlException e) {
+              String errMsg = "User " + user + " failed to kill task "
+                  + attemptidObj + "!<br><br>" + e.getMessage() +
+                  "<hr><a href=\"jobdetails.jsp?jobid=" + jobid +
+                  "\">Go back to Job</a><br>";
+              JSPUtil.setErrorAndForward(errMsg, request, response);
+              return;
+            }
+          } else {// no authorization needed
+            tracker.killTask(attemptidObj, false);
+          }
           //redirect again so that refreshing the page will not attempt to rekill the task
-          response.sendRedirect("/taskdetails.jsp?" + "&subaction=kill-task"
-              + "&jobid=" + jobid + "&tipid=" + tipid);
+          response.sendRedirect("/taskdetails.jsp?subaction=kill-task" +
+              "&tipid=" + tipidObj.toString());
         }
         else if (action.equalsIgnoreCase("fail-task")
             && request.getMethod().equalsIgnoreCase("POST")) {
-          tracker.killTask(taskidObj, true);
-          response.sendRedirect("/taskdetails.jsp?" + "&subaction=fail-task"
-              + "&jobid=" + jobid + "&tipid=" + tipid);
+          if (ugi != null) {
+            try {
+              ugi.doAs(new PrivilegedExceptionAction<Void>() {
+              public Void run() throws IOException{
+
+                tracker.killTask(attemptidObj, true);// checks job modify permission
+                return null;
+              }
+              });
+            } catch(AccessControlException e) {
+              String errMsg = "User " + user + " failed to fail task "
+                  + attemptidObj + "!<br><br>" + e.getMessage() +
+                  "<hr><a href=\"jobdetails.jsp?jobid=" + jobid +
+                  "\">Go back to Job</a><br>";
+              JSPUtil.setErrorAndForward(errMsg, request, response);
+              return;
+            }
+          } else {// no authorization needed
+            tracker.killTask(attemptidObj, true);
+          }
+
+          response.sendRedirect("/taskdetails.jsp?subaction=fail-task" +
+              "&tipid=" + tipidObj.toString());
         }
       }
     }
-    TaskInProgress tip = null;
-    if (job != null && tipidObj != null) {
-      tip = job.getTaskInProgress(tipidObj);
-    }
+    TaskInProgress tip = job.getTaskInProgress(tipidObj);
     TaskStatus[] ts = null;
+    boolean isCleanupOrSetup = false;
     if (tip != null) { 
       ts = tip.getTaskStatuses();
-    }
-    boolean isCleanupOrSetup = false;
-    if ( tip != null) {
       isCleanupOrSetup = tip.isJobCleanupTask();
       if (!isCleanupOrSetup) {
         isCleanupOrSetup = tip.isJobSetupTask();
@@ -228,18 +285,19 @@
             out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
           }
         }
-        out.print("</td><td>" + "<a href=\"/taskstats.jsp?jobid=" + jobid
-          + "&tipid=" + tipid + "&taskid=" + status.getTaskID() + "\">"
-          + ((status.getCounters() != null) ? status.getCounters().size() : 0) + "</a></td>");
+        out.print("</td><td>" + "<a href=\"/taskstats.jsp?attemptid=" +
+          status.getTaskID() + "\">"
+          + ((status.getCounters() != null) ? status.getCounters().size() : 0)
+          + "</a></td>");
         out.print("<td>");
         if (privateActions
           && status.getRunState() == TaskStatus.State.RUNNING) {
         out.print("<a href=\"/taskdetails.jsp?action=confirm"
-          + "&subaction=kill-task" + "&jobid=" + jobid + "&tipid="
-          + tipid + "&taskid=" + status.getTaskID() + "\" > Kill </a>");
+          + "&subaction=kill-task" + "&attemptid=" + status.getTaskID()
+          + "\" > Kill </a>");
         out.print("<br><a href=\"/taskdetails.jsp?action=confirm"
-          + "&subaction=fail-task" + "&jobid=" + jobid + "&tipid="
-          + tipid + "&taskid=" + status.getTaskID() + "\" > Fail </a>");
+          + "&subaction=fail-task" + "&attemptid=" + status.getTaskID()
+          + "\" > Fail </a>");
         }
         else
           out.print("<pre>&nbsp;</pre>");

+ 26 - 12
src/webapps/job/taskstats.jsp

@@ -8,6 +8,7 @@
   import="java.util.*"
   import="org.apache.hadoop.http.HtmlQuoting"
   import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.mapred.JSPUtil.JobWithViewAccessCheck"
   import="org.apache.hadoop.util.*"
   import="java.text.SimpleDateFormat"  
 %>
@@ -15,33 +16,46 @@
   JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
   String trackerName = 
            StringUtils.simpleHostname(tracker.getJobTrackerMachine());
-  String jobid = request.getParameter("jobid");
-  String tipid = request.getParameter("tipid");
-  String taskid = request.getParameter("taskid");
-  JobID jobidObj = JobID.forName(jobid);
-  TaskID tipidObj = TaskID.forName(tipid);
-  TaskAttemptID taskidObj = TaskAttemptID.forName(taskid);
+  String attemptid = request.getParameter("attemptid");
+  TaskAttemptID attemptidObj = TaskAttemptID.forName(attemptid);
+  // Obtain tipid for attemptId, if attemptId is available.
+  TaskID tipidObj =
+      (attemptidObj == null) ? TaskID.forName(request.getParameter("tipid"))
+                             : attemptidObj.getTaskID();
+  // Obtain jobid from tipid
+  final JobID jobidObj = tipidObj.getJobID();
+  String jobid = jobidObj.toString();
   
-  JobInProgress job = (JobInProgress) tracker.getJob(jobidObj);
+  JobWithViewAccessCheck myJob = JSPUtil.checkAccessAndGetJob(tracker, jobidObj,
+      request, response);
+  if (!myJob.isViewJobAllowed()) {
+    return; // user is not authorized to view this job
+  }
+
+  JobInProgress job = myJob.getJob();
+  if (job == null) {
+    out.print("<b>Job " + jobid + " not found.</b><br>\n");
+    return;
+  }
   
   Format decimal = new DecimalFormat();
   Counters counters;
-  if (taskid == null) {
+  if (attemptid == null) {
     counters = tracker.getTipCounters(tipidObj);
-    taskid = tipid; // for page title etc
+    attemptid = tipidObj.toString(); // for page title etc
   }
   else {
-    TaskStatus taskStatus = tracker.getTaskStatus(taskidObj);
+    TaskStatus taskStatus = tracker.getTaskStatus(attemptidObj);
     counters = taskStatus.getCounters();
   }
 %>
 
 <html>
   <head>
-    <title>Counters for <%=taskid%></title>
+    <title>Counters for <%=attemptid%></title>
   </head>
 <body>
-<h1>Counters for <%=taskid%></h1>
+<h1>Counters for <%=attemptid%></h1>
 
 <hr>