瀏覽代碼

commit 380a3295972dd4f97b3e9c9b4102c676d538b09d
Author: Vinod Kumar Vavilapalli <vinodkv@yahoo-inc.com>
Date: Tue May 18 17:01:12 2010 +0530

MAPREDUCE-1664. Bug fix to enable queue admins to view jobs. From https://issues.apache.org/jira/secure/attachment/12444782/1664.qAdminsJobView.20S.v1.6.patch.

+++ b/YAHOO-CHANGES.txt
+
+ MAPREDUCE-1664. Bugfix to enable queue administrators of a queue to
+ view job details of jobs submitted to that queue even though they
+ are not part of acl-view-job.
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077471 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 年之前
父節點
當前提交
4b76a6e588
共有 33 個文件被更改,包括 622 次插入389 次删除
  1. 6 4
      conf/mapred-queue-acls.xml.template
  2. 58 6
      src/c++/task-controller/task-controller.c
  3. 2 0
      src/c++/task-controller/task-controller.h
  4. 16 0
      src/c++/task-controller/tests/test-task-controller.c
  5. 34 9
      src/core/org/apache/hadoop/security/authorize/AccessControlList.java
  6. 3 3
      src/docs/src/documentation/content/xdocs/cluster_setup.xml
  7. 5 3
      src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
  8. 12 7
      src/mapred/mapred-default.xml
  9. 50 97
      src/mapred/org/apache/hadoop/mapred/ACLsManager.java
  10. 5 4
      src/mapred/org/apache/hadoop/mapred/CompletedJobStatusStore.java
  11. 5 5
      src/mapred/org/apache/hadoop/mapred/JSPUtil.java
  12. 0 2
      src/mapred/org/apache/hadoop/mapred/JobACLsManager.java
  13. 9 0
      src/mapred/org/apache/hadoop/mapred/JobClient.java
  14. 24 13
      src/mapred/org/apache/hadoop/mapred/JobHistory.java
  15. 1 2
      src/mapred/org/apache/hadoop/mapred/JobInProgress.java
  16. 15 3
      src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
  17. 34 28
      src/mapred/org/apache/hadoop/mapred/JobTracker.java
  18. 10 2
      src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
  19. 44 0
      src/mapred/org/apache/hadoop/mapred/Operation.java
  20. 39 30
      src/mapred/org/apache/hadoop/mapred/QueueManager.java
  21. 28 16
      src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
  22. 0 31
      src/mapred/org/apache/hadoop/mapred/TaskRunner.java
  23. 63 3
      src/mapred/org/apache/hadoop/mapred/TaskTracker.java
  24. 27 17
      src/test/org/apache/hadoop/mapred/TestJobACLs.java
  25. 5 2
      src/test/org/apache/hadoop/mapred/TestJobHistory.java
  26. 2 2
      src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
  27. 7 0
      src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
  28. 3 3
      src/test/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java
  29. 25 47
      src/test/org/apache/hadoop/mapred/TestQueueManager.java
  30. 2 3
      src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
  31. 37 18
      src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
  32. 3 1
      src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java
  33. 48 28
      src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java

+ 6 - 4
conf/mapred-queue-acls.xml.template

@@ -19,7 +19,7 @@
     configuration property mapred.acls.enabled to true.
 
     Irrespective of this ACL configuration, the user who started the cluster and
-    cluster administrators configured on JobTracker via
+    cluster administrators configured via
     mapreduce.cluster.administrators can submit jobs.
   </description>
 </property>
@@ -28,7 +28,7 @@
   <name>mapred.queue.default.acl-administer-jobs</name>
   <value> </value>
   <description> Comma separated list of user and group names that are allowed
-    to delete jobs or modify job's priority for all the jobs
+    to view job details, kill jobs or modify job's priority for all the jobs
     in the 'default' queue. The user list and the group list
     are separated by a blank. For e.g. user1,user2 group1,group2. 
     If set to the special value '*', it means all users are allowed to do 
@@ -39,8 +39,10 @@
     configuration property mapred.acls.enabled to true.
 
     Irrespective of this ACL configuration, the user who started the cluster and
-    cluster administrators configured on JobTracker via
-    mapreduce.cluster.administrators can do this operation.
+    cluster administrators configured via
+    mapreduce.cluster.administrators can do the above operations on all the jobs
+    in all the queues. The job owner can do all the above operations on his/her
+    job irrespective of this ACL configuration.
   </description>
 </property>
 

+ 58 - 6
src/c++/task-controller/task-controller.c

@@ -225,6 +225,14 @@ char *get_job_log_dir(const char *log_dir, const char *job_id) {
   return concatenate(JOB_LOG_DIR_PATTERN, "job_log_dir", 2, log_dir, job_id);
 }
 
+/**
+ * Get the job ACLs file for the given job log dir.
+ */
+char *get_job_acls_file(const char *log_dir) {
+  return concatenate(JOB_LOG_DIR_TO_JOB_ACLS_FILE_PATTERN, "job_acls_file",
+                     1, log_dir);
+}
+
 /**
  * Function to check if the passed tt_root is present in mapred.local.dir
  * the task-controller is configured with.
@@ -516,12 +524,20 @@ int prepare_attempt_directories(const char *job_id, const char *attempt_id,
 }
 
 /**
- * Function to prepare the job log dir for the child. It gives the user
- * ownership of the job's log-dir to the user and group ownership to the
- * user running tasktracker.
- *     *  sudo chown user:mapred log-dir/userlogs/$jobid
- *     *  sudo chmod -R 2770 log-dir/userlogs/$jobid // user is same as tt_user
- *     *  sudo chmod -R 2570 log-dir/userlogs/$jobid // user is not tt_user
+ * Function to prepare the job log dir(and job acls file in it) for the child.
+ * It gives the user ownership of the job's log-dir to the user and
+ * group ownership to the user running tasktracker(i.e. tt_user).
+ *
+ *   *  sudo chown user:mapred log-dir/userlogs/$jobid
+ *   *    if user is not $tt_user,
+ *   *      sudo chmod 2570 log-dir/userlogs/$jobid
+ *   *    else
+ *   *      sudo chmod 2770 log-dir/userlogs/$jobid
+ *   *  sudo chown user:mapred log-dir/userlogs/$jobid/job-acls.xml
+ *   *    if user is not $tt_user,
+ *   *      sudo chmod 2570 log-dir/userlogs/$jobid/job-acls.xml
+ *   *    else
+ *   *      sudo chmod 2770 log-dir/userlogs/$jobid/job-acls.xml 
  */
 int prepare_job_logs(const char *log_dir, const char *job_id,
     mode_t permissions) {
@@ -558,6 +574,42 @@ int prepare_job_logs(const char *log_dir, const char *job_id,
     free(job_log_dir);
     return -1;
   }
+
+  //set ownership and permissions for job_log_dir/job-acls.xml, if exists.
+  char *job_acls_file = get_job_acls_file(job_log_dir);
+  if (job_acls_file == NULL) {
+    fprintf(LOGFILE, "Couldn't get job acls file %s.\n", job_acls_file);
+    free(job_log_dir);
+    return -1; 
+  }
+
+  struct stat filestat1;
+  if (stat(job_acls_file, &filestat1) != 0) {
+    if (errno == ENOENT) {
+#ifdef DEBUG
+      fprintf(LOGFILE, "job_acls_file %s doesn't exist. Not doing anything.\n",
+          job_acls_file);
+#endif
+      free(job_acls_file);
+      free(job_log_dir);
+      return 0;
+    } else {
+      // stat failed because of something else!
+      fprintf(LOGFILE, "Failed to stat the job_acls_file %s\n", job_acls_file);
+      free(job_acls_file);
+      free(job_log_dir);
+      return -1;
+    }
+  }
+
+  if (secure_single_path(job_acls_file, user_detail->pw_uid, tasktracker_gid,
+      permissions, 1) != 0) {
+    fprintf(LOGFILE, "Failed to secure the job acls file %s\n", job_acls_file);
+    free(job_acls_file);
+    free(job_log_dir);
+    return -1;
+  }
+  free(job_acls_file);
   free(job_log_dir);
   return 0;
 }

+ 2 - 0
src/c++/task-controller/task-controller.h

@@ -86,6 +86,8 @@ enum errorcodes {
 
 #define JOB_LOG_DIR_PATTERN "%s/userlogs/%s"
 
+#define JOB_LOG_DIR_TO_JOB_ACLS_FILE_PATTERN "%s/job-acls.xml"
+
 #define ATTEMPT_LOG_DIR_PATTERN JOB_LOG_DIR_PATTERN"/%s"
 
 #define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"

+ 16 - 0
src/c++/task-controller/tests/test-task-controller.c

@@ -183,6 +183,19 @@ void test_get_job_log_dir() {
   assert(ret == 0);
 }
 
+void test_get_job_acls_file() {
+  char *job_acls_file = (char *) get_job_acls_file(
+    "/tmp/testing/userlogs/job_200906101234_0001");
+  printf("job acls file obtained is %s\n", job_acls_file);
+  int ret = 0;
+  if (strcmp(job_acls_file,
+    "/tmp/testing/userlogs/job_200906101234_0001/job-acls.xml") != 0) {
+    ret = -1;
+  }
+  free(job_acls_file);
+  assert(ret == 0);
+}
+
 void test_get_task_log_dir() {
   char *logdir = (char *) get_task_log_dir("/tmp/testing",
     "job_200906101234_0001", "attempt_200906112028_0001_m_000000_0");
@@ -219,6 +232,9 @@ int main(int argc, char **argv) {
   printf("\nTesting get_job_log_dir()\n");
   test_get_job_log_dir();
 
+  printf("\nTesting get_job_acls_file()\n");
+  test_get_job_acls_file();
+
   printf("\nTesting get_task_log_dir()\n");
   test_get_task_log_dir();
 

+ 34 - 9
src/core/org/apache/hadoop/security/authorize/AccessControlList.java

@@ -25,13 +25,23 @@ import java.util.TreeSet;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * Class representing a configured access control list.
  */
 public class AccessControlList implements Writable {
-  
+
+  static {                                      // register a ctor
+    WritableFactories.setFactory
+      (AccessControlList.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new AccessControlList(); }
+       });
+  }
+
   // Indicates an ACL string that represents access to all users
   public static final String WILDCARD_ACL_VALUE = "*";
   private static final int INITIAL_CAPACITY = 256;
@@ -42,7 +52,13 @@ public class AccessControlList implements Writable {
   private Set<String> groups;
   // Whether all users are granted access.
   private boolean allAllowed;
-  
+
+  /**
+   * This constructor exists primarily for AccessControlList to be Writable.
+   */
+  public AccessControlList() {
+  }
+
   /**
    * Construct a new ACL from a String representation of the same.
    * 
@@ -167,6 +183,21 @@ public class AccessControlList implements Writable {
 
   // Serializes the AccessControlList object
   public void write(DataOutput out) throws IOException {
+    Text.writeString(out, getACLString());
+  }
+
+  // Deserialize
+  public void readFields(DataInput in) throws IOException {
+    String aclString = Text.readString(in);
+    buildACL(aclString);
+  }
+
+  /** Returns the String representation of this ACL. Unlike toString() method's
+   *  return value, this String can be directly given to the constructor of
+   *  AccessControlList to build AccessControlList object.
+   *  This is the method used by the serialization method write().
+   */
+  public String getACLString() {
     StringBuilder sb = new StringBuilder(INITIAL_CAPACITY);
     if (allAllowed) {
       sb.append('*');
@@ -176,13 +207,7 @@ public class AccessControlList implements Writable {
       sb.append(" ");
       sb.append(getGroupsString());
     }
-    Text.writeString(out, sb.toString());
-  }
-
-  // Deserialize
-  public void readFields(DataInput in) throws IOException {
-    String aclString = Text.readString(in);
-    buildACL(aclString);
+    return sb.toString();
   }
 
   // Returns comma-separated concatenated single String of the set 'users'

+ 3 - 3
src/docs/src/documentation/content/xdocs/cluster_setup.xml

@@ -333,15 +333,15 @@
         </tr>
         <tr>
           <td>mapred.queue.<em>queue-name</em>.acl-administer-jobs</td>
-          <td>List of users and groups that can change the priority
-              or kill jobs that have been submitted to the
+          <td>List of users and groups that can view job details, change the
+              priority or kill jobs that have been submitted to the
               specified <em>queue-name</em>.</td>
           <td>
             The list of users and groups are both comma separated
             list of names. The two lists are separated by a blank.
             Example: <em>user1,user2 group1,group2</em>.
             If you wish to define only a list of groups, provide
-            a blank at the beginning of the value. Note that an
+            a blank at the beginning of the value. Note that the
             owner of a job can always change the priority or kill
             his/her own job, irrespective of the ACLs.
           </td>

+ 5 - 3
src/docs/src/documentation/content/xdocs/mapred_tutorial.xml

@@ -1523,10 +1523,12 @@
           <code>mapreduce.job.acl-modify-job</code> respectively. By default, 
           nobody is given access in these properties.</p> 
           
-          <p>However, irrespective of the ACLs configured, a job's owner,
+          <p>However, irrespective of the job ACLs configured, a job's owner,
           the superuser and cluster administrators
-          (<code>mapreduce.cluster.administrators</code>) always have access to
-          view and modify a job.</p>
+          (<code>mapreduce.cluster.administrators</code>) and queue
+          administrators of the queue to which the job was submitted to
+          (<code>mapred.queue.queue-name.acl-administer-jobs</code>) always
+          have access to view and modify a job.</p>
           
           <p> A job view ACL authorizes users against the configured 
           <code>mapreduce.job.acl-view-job</code> before returning possibly 

+ 12 - 7
src/mapred/mapred-default.xml

@@ -943,10 +943,11 @@
     job-level ACL.
 
     Irrespective of this ACL configuration, job-owner, the user who started the
-    cluster, cluster administrators configured on JobTracker via 
-    mapreduce.cluster.administrators
-    and administrators of the queue to which this job is submitted to 
-    can do all the modification operations.
+    cluster, cluster administrators configured via
+    mapreduce.cluster.administrators and queue administrators of the queue to
+    which this job is submitted to configured via
+    mapred.queue.queue-name.acl-administer-jobs  in mapred-queue-acls.xml can
+    do all the modification operations on a job.
 
     By default, nobody else besides job-owner, the user who started the cluster,
     cluster administrators and queue administrators can perform modification
@@ -976,11 +977,15 @@
     user, for e.g., JobStatus, JobProfile, list of jobs in the queue, etc.
     
     Irrespective of this ACL configuration, job-owner, the user who started the
-    cluster, cluster administrators configured on JobTracker via
-    mapreduce.cluster.administrators can do all the view operations.
+    cluster, cluster administrators configured via
+    mapreduce.cluster.administrators and queue administrators of the queue to
+    which this job is submitted to configured via
+    mapred.queue.queue-name.acl-administer-jobs in mapred-queue-acls.xml can do
+    all the view operations on a job.
     
     By default, nobody else besides job-owner, the user who started the
-    cluster and cluster administrators can perform view operations on a job.
+    cluster, cluster administrators and queue administrators can perform
+    view operations on a job.
   </description>
 </property>
 

+ 50 - 97
src/mapred/org/apache/hadoop/mapred/ACLsManager.java

@@ -22,8 +22,6 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.AuditLogger.Constants;
-import org.apache.hadoop.mapred.QueueManager.QueueOperation;
-import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
@@ -89,38 +87,30 @@ class ACLsManager {
   }
 
   /**
-   * Check the ACLs for a user doing the passed queue-operation and the passed
-   * job operation.
+   * Check the ACLs for a user doing the passed operation.
    * <ul>
    * <li>If ACLs are disabled, allow all users.</li>
    * <li>If the operation is not a job operation(for eg. submit-job-to-queue),
    *  then allow only (a) clusterOwner(who started the cluster), (b) cluster 
-   *  administrators (c) members of queue admins acl for the queue.</li>
+   *  administrators (c) members of queue-submit-job-acl for the queue.</li>
    * <li>If the operation is a job operation, then allow only (a) jobOwner,
    * (b) clusterOwner(who started the cluster), (c) cluster administrators,
    * (d) members of queue admins acl for the queue and (e) members of job
    * acl for the jobOperation</li>
    * </ul>
    * 
-   * @param job
-   * @param callerUGI
-   * @param oper
-   * @param jobOperation
+   * @param job   the job on which operation is requested
+   * @param callerUGI  the user who is requesting the operation
+   * @param operation  the operation requested
    * @throws AccessControlException
-   * @throws IOException
    */
-  void checkAccess(JobInProgress job,
-      UserGroupInformation callerUGI, QueueOperation qOperation,
-      JobACL jobOperation, String operationName) throws AccessControlException {
+  void checkAccess(JobInProgress job, UserGroupInformation callerUGI,
+      Operation operation) throws AccessControlException {
 
     String queue = job.getProfile().getQueueName();
-    String jobId = job.getJobID().toString();
     JobStatus jobStatus = job.getStatus();
-    String jobOwner = jobStatus.getUsername();
-    AccessControlList jobAcl = jobStatus.getJobACLs().get(jobOperation);
 
-    checkAccess(jobId, callerUGI, queue, qOperation,
-        jobOperation, jobOwner, jobAcl, operationName);
+    checkAccess(jobStatus, callerUGI, queue, operation);
   }
 
   /**
@@ -133,132 +123,95 @@ class ACLsManager {
    * </ul>
    */
   void checkAccess(JobStatus jobStatus, UserGroupInformation callerUGI,
-      JobACL jobOperation, String operationName) throws AccessControlException {
+      String queue, Operation operation)
+      throws AccessControlException {
 
     String jobId = jobStatus.getJobID().toString();
     String jobOwner = jobStatus.getUsername();
-    AccessControlList jobAcl = jobStatus.getJobACLs().get(jobOperation);
-
-    // If acls are enabled, check if jobOwner, cluster admin or part of job ACL
-    checkAccess(jobId, callerUGI, jobOperation, jobOwner, jobAcl,
-        operationName);
-  }
+    AccessControlList jobAcl =
+      jobStatus.getJobACLs().get(operation.jobACLNeeded);
 
-  /**
-   * Check the ACLs for a user doing the passed job operation.
-   * <ul>
-   * <li>If ACLs are disabled, allow all users.</li>
-   * <li>Otherwise, allow only (a) jobOwner,
-   * (b) clusterOwner(who started the cluster), (c) cluster administrators,
-   * (d) members of job acl for the jobOperation</li>
-   * </ul>
-   */
-  void checkAccess(String jobId, UserGroupInformation callerUGI,
-      JobACL jobOperation, String jobOwner, AccessControlList jobAcl,
-      String operationName)
-      throws AccessControlException {
-    // TODO: Queue admins are to be allowed to do the job view operation.
-    checkAccess(jobId, callerUGI, null, null, jobOperation, jobOwner, jobAcl,
-        operationName);
+    // If acls are enabled, check if callerUGI is jobOwner, queue admin,
+    // cluster admin or part of job ACL
+    checkAccess(jobId, callerUGI, queue, operation, jobOwner, jobAcl);
   }
 
   /**
-   * Check the ACLs for a user doing the passed queue-operation and the passed
-   * job operation.
+   * Check the ACLs for a user doing the passed operation.
    * <ul>
    * <li>If ACLs are disabled, allow all users.</li>
    * <li>If the operation is not a job operation(for eg. submit-job-to-queue),
    *  then allow only (a) clusterOwner(who started the cluster), (b)cluster 
-   *  administrators and (c) members of queue admins acl for the queue.</li>
+   *  administrators and (c) members of queue-submit-job-acl for the queue.</li>
    * <li>If the operation is a job operation, then allow only (a) jobOwner,
    * (b) clusterOwner(who started the cluster), (c) cluster administrators,
    * (d) members of queue admins acl for the queue and (e) members of job
    * acl for the jobOperation</li>
    * </ul>
    * 
-   * callerUGI user who is trying to perform the qOperation/jobOperation.
-   * jobAcl could be job-view-acl or job-modify-acl depending on jobOperation.
+   * callerUGI is the user who is trying to perform the operation.
+   * jobAcl could be job-view-acl or job-modify-acl depending on job operation.
    */
   void checkAccess(String jobId, UserGroupInformation callerUGI,
-      String queue, QueueOperation qOperation,
-      JobACL jobOperation, String jobOwner, AccessControlList jobAcl,
-      String operationName)
-      throws AccessControlException {
+      String queue, Operation operation, String jobOwner,
+      AccessControlList jobAcl) throws AccessControlException {
     if (!aclsEnabled) {
       return;
     }
 
     String user = callerUGI.getShortUserName();
+    String targetResource = jobId + " in queue " + queue;
 
     // Allow mapreduce cluster admins to do any queue operation and
     // any job operation
     if (isMRAdmin(callerUGI)) {
-      if (qOperation == QueueOperation.SUBMIT_JOB) {
-        AuditLogger.logSuccess(user, operationName, queue);
-      } else {
-        AuditLogger.logSuccess(user, operationName, jobId);
-      }
+      AuditLogger.logSuccess(user, operation.name(), targetResource);
       return;
     }
 
-    if (qOperation == QueueOperation.SUBMIT_JOB) {
-      // This is strictly queue operation(not a job operation) like
-      // submit-job-to-queue.
-      if (!queueManager.hasAccess(queue, qOperation, callerUGI)) {
-        AuditLogger.logFailure(user, operationName, null, queue,
-            Constants.UNAUTHORIZED_USER + ", job : " + jobId);
+    if (operation == Operation.SUBMIT_JOB) {
+      // This is strictly queue operation(not a job operation)
+      if (!queueManager.hasAccess(queue, operation.qACLNeeded, callerUGI)) {
+        AuditLogger.logFailure(user, operation.name(),
+            queueManager.getQueueACL(queue, operation.qACLNeeded).toString(),
+            targetResource, Constants.UNAUTHORIZED_USER);
 
         throw new AccessControlException("User "
             + callerUGI.getShortUserName() + " cannot perform "
-            + "operation " + operationName + " on queue " + queue
+            + "operation " + operation.name() + " on queue " + queue
             + ".\n Please run \"hadoop queue -showacls\" "
             + "command to find the queues you have access to .");
       } else {
-        AuditLogger.logSuccess(user, operationName, queue);
+        AuditLogger.logSuccess(user, operation.name(), targetResource);
         return;
       }
     }
 
-    if (jobOperation == JobACL.VIEW_JOB) {
-      // check if jobOwner or part of acl-view-job
-      if (jobACLsManager.checkAccess(callerUGI, jobOperation,
+    // Check if callerUGI is queueAdmin, jobOwner or part of job-acl.
+    // queueManager and queue are null only when called from
+    // TaskTracker(i.e. from TaskLogServlet) for the operation VIEW_TASK_LOGS.
+    // Caller of this method takes care of checking if callerUGI is a
+    // queue administrator for that operation.
+    if (operation == Operation.VIEW_TASK_LOGS) {
+      if (jobACLsManager.checkAccess(callerUGI, operation.jobACLNeeded,
           jobOwner, jobAcl)) {
-        AuditLogger.logSuccess(user, operationName, jobId.toString());
+        AuditLogger.logSuccess(user, operation.name(), targetResource);
         return;
       }
-      else {
-        AuditLogger.logFailure(user, operationName, null,
-            jobId.toString(), Constants.UNAUTHORIZED_USER);
-        throw new AccessControlException("User "
-            + callerUGI.getShortUserName() + " cannot perform operation "
-            + operationName + " on " + jobId);
-      }
+    } else if (queueManager.hasAccess(queue, operation.qACLNeeded, callerUGI) ||
+               jobACLsManager.checkAccess(callerUGI, operation.jobACLNeeded,
+               jobOwner, jobAcl)) {
+      AuditLogger.logSuccess(user, operation.name(), targetResource);
+      return;
     }
 
-    if (jobOperation == JobACL.MODIFY_JOB) {
-      // check if queueAdmin, jobOwner or part of acl-modify-job
-      if (queueManager.hasAccess(queue, qOperation, callerUGI)) {
-        AuditLogger.logSuccess(user, operationName, queue);
-        return;
-      } else if (jobACLsManager.checkAccess(callerUGI, jobOperation,
-                 jobOwner, jobAcl)) {
-        AuditLogger.logSuccess(user, operationName, jobId);
-        return;
-      }
-      AuditLogger.logFailure(user, operationName, null,
-          jobId.toString(), Constants.UNAUTHORIZED_USER + ", queue : "
-          + queue);
-
-      throw new AccessControlException("User "
-          + callerUGI.getShortUserName() + " cannot perform operation "
-          + operationName + " on " + jobId + " that is in the queue "
-          + queue);
-    }
+    AuditLogger.logFailure(user, operation.name(), jobAcl.toString(),
+        targetResource, Constants.UNAUTHORIZED_USER);
 
-    throw new AccessControlException("Unsupported queue operation "
-        + qOperation + " on queue " + queue + ", job operation "
-        + jobOperation + " on job " + jobId + " and the actual-operation "
-        + operationName);
+    throw new AccessControlException("User "
+        + callerUGI.getShortUserName() + " cannot perform operation "
+        + operation.name() + " on " + jobId + " that is in the queue "
+        + queue);
   }
 
 }

+ 5 - 4
src/mapred/org/apache/hadoop/mapred/CompletedJobStatusStore.java

@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.AccessControlException;
 
@@ -300,11 +299,13 @@ class CompletedJobStatusStore implements Runnable {
         FSDataInputStream dataIn = getJobInfoFile(jobId);
         if (dataIn != null) {
           JobStatus jobStatus = readJobStatus(dataIn);
+          JobProfile profile = readJobProfile(dataIn);
+          String queue = profile.getQueueName();
           // authorize the user for job view access
           aclsManager.checkAccess(jobStatus,
-              UserGroupInformation.getCurrentUser(), JobACL.VIEW_JOB,
-              JobACL.VIEW_JOB.name());
-          readJobProfile(dataIn);
+              UserGroupInformation.getCurrentUser(), queue,
+              Operation.VIEW_JOB_COUNTERS);
+
           counters = readCounters(dataIn);
           dataIn.close();
         }

+ 5 - 5
src/mapred/org/apache/hadoop/mapred/JSPUtil.java

@@ -110,8 +110,8 @@ class JSPUtil {
           public Void run() throws IOException, ServletException {
 
             // checks job view permission
-            jt.getACLsManager().checkAccess(job, ugi, null, JobACL.VIEW_JOB,
-                JobACL.VIEW_JOB.name());
+            jt.getACLsManager().checkAccess(job, ugi,
+                Operation.VIEW_JOB_DETAILS);
             return null;
           }
         });
@@ -489,9 +489,9 @@ class JSPUtil {
     }
 
     // Authorize the user for view access of this job
-    jobTracker.getACLsManager().checkAccess(jobid, currentUser, JobACL.VIEW_JOB,
-        jobInfo.get(Keys.USER), jobInfo.getJobACLs().get(JobACL.VIEW_JOB),
-        JobACL.VIEW_JOB.name());
+    jobTracker.getACLsManager().checkAccess(jobid, currentUser,
+        jobInfo.getJobQueue(), Operation.VIEW_JOB_DETAILS,
+        jobInfo.get(Keys.USER), jobInfo.getJobACLs().get(JobACL.VIEW_JOB));
 
     return jobInfo;
   }

+ 0 - 2
src/mapred/org/apache/hadoop/mapred/JobACLsManager.java

@@ -21,8 +21,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.mapreduce.JobACL;
-import org.apache.hadoop.mapred.AuditLogger;
-import org.apache.hadoop.mapred.AuditLogger.Constants;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;

+ 9 - 0
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -61,6 +61,7 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.hadoop.mapred.QueueManager.QueueACL;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -71,6 +72,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -796,6 +798,13 @@ public class JobClient extends Configured implements MRConstants, Tool  {
           int maps = writeSplits(context, submitJobDir);
           jobCopy.setNumMapTasks(maps);
 
+          // write "queue admins of the queue to which job is being submitted"
+          // to job file.
+          String queue = jobCopy.getQueueName();
+          AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
+          jobCopy.set(QueueManager.toFullPropertyName(queue,
+              QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());
+
           // Write job file to JobTracker's fs        
           FSDataOutputStream out = 
             FileSystem.create(fs, submitJobFile,

+ 24 - 13
src/mapred/org/apache/hadoop/mapred/JobHistory.java

@@ -300,7 +300,7 @@ public class JobHistory {
     ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, 
     SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT, 
     TRACKER_NAME, STATE_STRING, VERSION, MAP_COUNTERS, REDUCE_COUNTERS,
-    VIEW_JOB, MODIFY_JOB
+    VIEW_JOB, MODIFY_JOB, JOB_QUEUE
   }
 
   /**
@@ -697,6 +697,7 @@ public class JobHistory {
     private Map<String, Task> allTasks = new TreeMap<String, Task>();
     private Map<JobACL, AccessControlList> jobACLs =
         new HashMap<JobACL, AccessControlList>();
+    private String queueName = null;// queue to which this job was submitted to
     
     /** Create new JobInfo */
     public JobInfo(String jobId){ 
@@ -719,19 +720,26 @@ public class JobHistory {
 
     @Override
     public synchronized void handle(Map<Keys, String> values) {
-      // construct the ACLs
-      String viewJobACL = values.get(Keys.VIEW_JOB);
-      String modifyJobACL = values.get(Keys.MODIFY_JOB);
-      if (viewJobACL != null) {
-        jobACLs.put(JobACL.VIEW_JOB, new AccessControlList(viewJobACL));
-      }
-      if (modifyJobACL != null) {
-        jobACLs.put(JobACL.MODIFY_JOB, new AccessControlList(modifyJobACL));
-
+      if (values.containsKey(Keys.SUBMIT_TIME)) {// job submission
+        // construct the job ACLs
+        String viewJobACL = values.get(Keys.VIEW_JOB);
+        String modifyJobACL = values.get(Keys.MODIFY_JOB);
+        if (viewJobACL != null) {
+          jobACLs.put(JobACL.VIEW_JOB, new AccessControlList(viewJobACL));
+        }
+        if (modifyJobACL != null) {
+          jobACLs.put(JobACL.MODIFY_JOB, new AccessControlList(modifyJobACL));
+        }
+        // get the job queue name
+        queueName = values.get(Keys.JOB_QUEUE);
       }
       super.handle(values);
     }
 
+    String getJobQueue() {
+      return queueName;
+    }
+
     /**
      * Get the path of the locally stored job file
      * @param jobId id of the job
@@ -1262,11 +1270,14 @@ public class JobHistory {
           }
           //add to writer as well 
           JobHistory.log(writers, RecordTypes.Job, 
-                         new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF, 
-                                      Keys.VIEW_JOB, Keys.MODIFY_JOB }, 
+                         new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER,
+                                    Keys.SUBMIT_TIME, Keys.JOBCONF,
+                                    Keys.VIEW_JOB, Keys.MODIFY_JOB,
+                                    Keys.JOB_QUEUE}, 
                          new String[]{jobId.toString(), jobName, user, 
                                       String.valueOf(submitTime) , jobConfPath,
-                                      viewJobACL, modifyJobACL}
+                                      viewJobACL, modifyJobACL,
+                                      jobConf.getQueueName()}
                         ); 
              
         }catch(IOException e){

+ 1 - 2
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -360,8 +360,7 @@ public class JobInProgress {
       String desc = "The username " + conf.getUser() + " obtained from the " +
       		"conf doesn't match the username " + user + " the user " +
       				"authenticated as";
-      AuditLogger.logFailure(user, 
-          QueueManager.QueueOperation.SUBMIT_JOB.name(), conf.getUser(), 
+      AuditLogger.logFailure(user, Operation.SUBMIT_JOB.name(), conf.getUser(), 
           jobId.toString(), desc);
       throw new IOException(desc);
     }

+ 15 - 3
src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIden
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSelector;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenInfo;
 
@@ -76,8 +77,10 @@ interface JobSubmissionProtocol extends VersionedProtocol {
    * Version 23: Provide TokenStorage object while submitting a job
    * Version 24: Added delegation tokens (add, renew, cancel)
    * Version 25: Added JobACLs to JobStatus as part of MAPREDUCE-1307
+   * Version 26: Added the method getQueueAdmins(queueName) as part of
+   *             MAPREDUCE-1664.
    */
-  public static final long versionID = 25L;
+  public static final long versionID = 26L;
 
   /**
    * Allocate a name for the job.
@@ -100,8 +103,17 @@ interface JobSubmissionProtocol extends VersionedProtocol {
    * @return summary of the state of the cluster
    */
   public ClusterStatus getClusterStatus(boolean detailed) throws IOException;
-  
-    
+
+  /**
+   * Get the administrators of the given job-queue.
+   * This method is for hadoop internal use only.
+   * @param queueName
+   * @return Queue administrators ACL for the queue to which job is
+   *         submitted to
+   * @throws IOException
+   */
+  public AccessControlList getQueueAdmins(String queueName) throws IOException;
+
   /**
    * Kill the indicated job
    */

+ 34 - 28
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -78,6 +78,7 @@ import org.apache.hadoop.mapred.JobHistory.Listener;
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
+import org.apache.hadoop.mapred.QueueManager.QueueACL;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
@@ -91,6 +92,7 @@ import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
@@ -102,7 +104,6 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 
 import org.apache.hadoop.mapreduce.ClusterMetrics;
-import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -1692,9 +1693,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
           // check the access
           try {
-            aclsManager.checkAccess(job, ugi,
-                QueueManager.QueueOperation.SUBMIT_JOB, null,
-                QueueManager.QueueOperation.SUBMIT_JOB.name());
+            aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB);
           } catch (Throwable t) {
             LOG.warn("Access denied for user " + ugi.getShortUserName() 
                      + " in groups : [" 
@@ -3675,9 +3674,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
       // check for access
       try {
-        aclsManager.checkAccess(job, ugi,
-            QueueManager.QueueOperation.SUBMIT_JOB, null,
-            QueueManager.QueueOperation.SUBMIT_JOB.name());
+        aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB);
       } catch (IOException ioe) {
         LOG.warn("Access denied for user " + job.getJobConf().getUser()
             + ". Ignoring job " + jobId, ioe);
@@ -3762,7 +3759,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
              + job.getJobConf().getUser() + "' to queue '" 
              + job.getJobConf().getQueueName() + "'");
     AuditLogger.logSuccess(job.getUser(), 
-        QueueManager.QueueOperation.SUBMIT_JOB.name(), jobId.toString());
+        Operation.SUBMIT_JOB.name(), jobId.toString());
     return job.getStatus();
   }
 
@@ -3836,8 +3833,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         
     // check both queue-level and job-level access
     aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
-        QueueManager.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB,
-        "KILL_JOB");
+        Operation.KILL_JOB);
 
     killJob(job);
   }
@@ -4043,8 +4039,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       if (job != null) {
 
         // check the job-access
-        aclsManager.checkAccess(job, callerUGI, null, JobACL.VIEW_JOB,
-            JobACL.VIEW_JOB.name());
+        aclsManager.checkAccess(job, callerUGI, Operation.VIEW_JOB_COUNTERS);
 
         return isJobInited(job) ? job.getCounters() : EMPTY_COUNTERS;
       } 
@@ -4060,8 +4055,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
       // Check authorization
-      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(), null,
-          JobACL.VIEW_JOB, JobACL.VIEW_JOB.name());
+      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+          Operation.VIEW_JOB_DETAILS);
     }
     if (job == null || !isJobInited(job)) {
       return EMPTY_TASK_REPORTS;
@@ -4088,8 +4083,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
       // Check authorization
-      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(), null,
-          JobACL.VIEW_JOB, JobACL.VIEW_JOB.name());
+      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+          Operation.VIEW_JOB_DETAILS);
     }
     if (job == null || !isJobInited(job)) {
       return EMPTY_TASK_REPORTS;
@@ -4114,8 +4109,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
       // Check authorization
-      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(), null,
-          JobACL.VIEW_JOB, JobACL.VIEW_JOB.name());
+      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+          Operation.VIEW_JOB_DETAILS);
     }
     if (job == null || !isJobInited(job)) {
       return EMPTY_TASK_REPORTS;
@@ -4143,8 +4138,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
       // Check authorization
-      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(), null,
-          JobACL.VIEW_JOB, JobACL.VIEW_JOB.name());
+      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+          Operation.VIEW_JOB_DETAILS);
     }
     if (job == null || !isJobInited(job)) {
       return EMPTY_TASK_REPORTS;
@@ -4210,8 +4205,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     JobInProgress job = jobs.get(jobId);
     if (job != null) {
       // Check authorization
-      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(), null,
-          JobACL.VIEW_JOB, JobACL.VIEW_JOB.name());
+      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+          Operation.VIEW_JOB_DETAILS);
     }
     if (job != null && isJobInited(job)) {
       TaskInProgress tip = job.getTaskInProgress(tipId);
@@ -4266,14 +4261,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   /**
    * @see JobSubmissionProtocol#killTask(TaskAttemptID, boolean)
    */
-  public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail) throws IOException{
+  public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail)
+      throws IOException {
     TaskInProgress tip = taskidToTIPMap.get(taskid);
     if(tip != null) {
       // check both queue-level and job-level access
       aclsManager.checkAccess(tip.getJob(),
           UserGroupInformation.getCurrentUser(),
-          QueueManager.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB,
-          shouldFail ? "FAIL_TASK" : "KILL_TASK");
+          shouldFail ? Operation.FAIL_TASK : Operation.KILL_TASK);
 
       return tip.killTask(taskid, shouldFail);
     }
@@ -4312,7 +4307,19 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));  
     return fs.makeQualified(sysDir).toString();
   }
-  
+
+  /**
+   * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getQueueAdmins(String)
+   */
+  public AccessControlList getQueueAdmins(String queueName) throws IOException {
+    AccessControlList acl =
+        queueManager.getQueueACL(queueName, QueueACL.ADMINISTER_JOBS);
+    if (acl == null) {
+      acl = new AccessControlList(" ");
+    }
+    return acl;
+  }
+
   ///////////////////////////////////////////////////////////////
   // JobTracker methods
   ///////////////////////////////////////////////////////////////
@@ -4345,8 +4352,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
       // check both queue-level and job-level access
       aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
-          QueueManager.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB,
-          "SET_JOB_PRIORITY");
+          Operation.SET_JOB_PRIORITY);
 
       synchronized (taskScheduler) {
         JobStatus oldStatus = (JobStatus)job.getStatus().clone();

+ 10 - 2
src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
@@ -503,7 +504,14 @@ class LocalJobRunner implements JobSubmissionProtocol {
     Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));  
     return fs.makeQualified(sysDir).toString();
   }
-  
+
+  /**
+   * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getQueueAdmins()
+   */
+  public AccessControlList getQueueAdmins(String queueName) throws IOException {
+    return new AccessControlList(" ");// no queue admins for local job runner
+  }  
+
   /**
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
    */
@@ -557,6 +565,6 @@ class LocalJobRunner implements JobSubmissionProtocol {
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token
                                       ) throws IOException,InterruptedException{
     return 0;
-  }  
+  }
 
 }

+ 44 - 0
src/mapred/org/apache/hadoop/mapred/Operation.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.mapred.QueueManager.QueueACL;
+import org.apache.hadoop.mapreduce.JobACL;
+
+/**
+ * Generic operation that maps to the dependent set of ACLs that drive the
+ * authorization of the operation.
+ */
+public enum Operation {
+  VIEW_JOB_COUNTERS(QueueACL.ADMINISTER_JOBS, JobACL.VIEW_JOB),
+  VIEW_JOB_DETAILS(QueueACL.ADMINISTER_JOBS, JobACL.VIEW_JOB),
+  VIEW_TASK_LOGS(QueueACL.ADMINISTER_JOBS, JobACL.VIEW_JOB),
+  KILL_JOB(QueueACL.ADMINISTER_JOBS, JobACL.MODIFY_JOB),
+  FAIL_TASK(QueueACL.ADMINISTER_JOBS, JobACL.MODIFY_JOB),
+  KILL_TASK(QueueACL.ADMINISTER_JOBS, JobACL.MODIFY_JOB),
+  SET_JOB_PRIORITY(QueueACL.ADMINISTER_JOBS, JobACL.MODIFY_JOB),
+  SUBMIT_JOB(QueueACL.SUBMIT_JOB, null);
+  
+  public QueueACL qACLNeeded;
+  public JobACL jobACLNeeded;
+  
+  Operation(QueueACL qACL, JobACL jobACL) {
+    this.qACLNeeded = qACL;
+    this.jobACLNeeded = jobACL;
+  }
+}

+ 39 - 30
src/mapred/org/apache/hadoop/mapred/QueueManager.java

@@ -27,8 +27,6 @@ import java.util.TreeSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.AuditLogger;
-import org.apache.hadoop.mapred.AuditLogger.Constants;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.util.StringUtils;
@@ -69,11 +67,15 @@ class QueueManager {
   static final String QUEUE_ACLS_FILE_NAME = "mapred-queue-acls.xml";
   
   /**
-   * Enum representing an operation that can be performed on a queue.
+   * Enum representing an AccessControlList that drives set of operations that
+   * can be performed on a queue.
    */
-  static enum QueueOperation {
+  static enum QueueACL {
     SUBMIT_JOB ("acl-submit-job"),
     ADMINISTER_JOBS ("acl-administer-jobs");
+    // Currently this ACL acl-administer-jobs is checked for the operations
+    // FAIL_TASK, KILL_TASK, KILL_JOB, SET_JOB_PRIORITY and VIEW_JOB.
+
     // TODO: Add ACL for LIST_JOBS when we have ability to authenticate 
     //       users in UI
     // TODO: Add ACL for CHANGE_ACL when we have an admin tool for 
@@ -81,7 +83,7 @@ class QueueManager {
     
     private final String aclName;
     
-    QueueOperation(String aclName) {
+    QueueACL(String aclName) {
       this.aclName = aclName;
     }
 
@@ -118,47 +120,38 @@ class QueueManager {
   }
   
   /**
-   * Return true if the given {@link QueueOperation} can be 
-   * performed by the specified user on the given queue.
+   * Return true if the given user is part of the ACL for the given
+   * {@link QueueACL} name for the given queue.
    * 
    * An operation is allowed if all users are provided access for this
    * operation, or if either the user or any of the groups specified is
    * provided access.
    * 
    * @param queueName Queue on which the operation needs to be performed. 
-   * @param oper The operation to perform
+   * @param qACL The queue ACL name to be checked
    * @param ugi The user and groups who wish to perform the operation.
    * 
    * @return true if the operation is allowed, false otherwise.
    */
   public synchronized boolean hasAccess(String queueName,
-                                QueueOperation oper, 
-                                UserGroupInformation ugi) {
+      QueueACL qACL, UserGroupInformation ugi) {
     if (!aclsEnabled) {
       return true;
     }
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("checking access for : " + toFullPropertyName(queueName, 
-                                            oper.getAclName()));      
+                                            qACL.getAclName()));      
     }
     
     AccessControlList acl = aclsMap.get(toFullPropertyName(
-        queueName, oper.getAclName()));
+        queueName, qACL.getAclName()));
     if (acl == null) {
       return false;
     }
     
-    // Check the ACL list
-    boolean allowed = acl.isAllAllowed();
-    if (!allowed) {
-      // Check the allowed users list
-      if (acl.isUserAllowed(ugi)) {
-        allowed = true;
-      }
-    }
-    
-    return allowed;    
+    // Check if user is part of the ACL
+    return acl.isUserAllowed(ugi);
   }
   
   /**
@@ -215,8 +208,8 @@ class QueueManager {
   
   private void checkDeprecation(Configuration conf) {
     for(String queue: queueNames) {
-      for (QueueOperation oper : QueueOperation.values()) {
-        String key = toFullPropertyName(queue, oper.getAclName());
+      for (QueueACL qACL : QueueACL.values()) {
+        String key = toFullPropertyName(queue, qACL.getAclName());
         String aclString = conf.get(key);
         if(aclString != null) {
           LOG.warn("Configuring queue ACLs in mapred-site.xml or " +
@@ -234,8 +227,8 @@ class QueueManager {
     HashMap<String, AccessControlList> aclsMap = 
       new HashMap<String, AccessControlList>();
     for (String queue : queueNames) {
-      for (QueueOperation oper : QueueOperation.values()) {
-        String key = toFullPropertyName(queue, oper.getAclName());
+      for (QueueACL qACL : QueueACL.values()) {
+        String key = toFullPropertyName(queue, qACL.getAclName());
         String aclString = conf.get(key, " ");// default is empty list of users
         aclsMap.put(key, new AccessControlList(aclString));
       }
@@ -297,16 +290,16 @@ class QueueManager {
     //List of all QueueAclsInfo objects , this list is returned
     ArrayList<QueueAclsInfo> queueAclsInfolist =
             new ArrayList<QueueAclsInfo>();
-    QueueOperation[] operations = QueueOperation.values();
+    QueueACL[] acls = QueueACL.values();
     for (String queueName : queueNames) {
       QueueAclsInfo queueAclsInfo = null;
       ArrayList<String> operationsAllowed = null;
-      for (QueueOperation operation : operations) {
-        if (hasAccess(queueName, operation, ugi)) {
+      for (QueueACL qACL : acls) {
+        if (hasAccess(queueName, qACL, ugi)) {
           if (operationsAllowed == null) {
             operationsAllowed = new ArrayList<String>();
           }
-          operationsAllowed.add(operation.getAclName());
+          operationsAllowed.add(qACL.getAclName());
         }
       }
       if (operationsAllowed != null) {
@@ -321,6 +314,22 @@ class QueueManager {
             queueAclsInfolist.size()]);
   }
 
+  /**
+   * Returns the specific queue ACL for the given queue.
+   * Returns null if the given queue does not exist or the acl is not
+   * configured for that queue.
+   * If acls are disabled(mapred.acls.enabled set to false), returns ACL with
+   * all users.
+   */
+  synchronized AccessControlList getQueueACL(String queueName,
+      QueueACL qACL) {
+    if (aclsEnabled) {
+      return aclsMap.get(toFullPropertyName(
+          queueName, qACL.getAclName()));
+    }
+    return new AccessControlList("*");
+  }
+
   /**
    * prints the configuration of QueueManager in Json format.
    * The method should be modified accordingly whenever

+ 28 - 16
src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java

@@ -28,8 +28,8 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.QueueManager.QueueACL;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -124,35 +124,47 @@ public class TaskLogServlet extends HttpServlet {
       return;
     }
 
-    // buiild job view acl by reading from conf
+    // build job view ACL by reading from conf
     AccessControlList jobViewACL = tracker.getJobACLsManager().
         constructJobACLs(conf).get(JobACL.VIEW_JOB);
 
+    // read job queue name from conf
+    String queue = conf.getQueueName();
+
+    // build queue admins ACL by reading from conf
+    AccessControlList queueAdminsACL = new AccessControlList(
+        conf.get(QueueManager.toFullPropertyName(queue,
+            QueueACL.ADMINISTER_JOBS.getAclName()), " "));
+
     String jobOwner = conf.get("user.name");
     UserGroupInformation callerUGI =
         UserGroupInformation.createRemoteUser(user);
 
-    tracker.getACLsManager().checkAccess(jobId, callerUGI, JobACL.VIEW_JOB,
-        jobOwner, jobViewACL, JobACL.VIEW_JOB.name());
+    // check if user is queue admin or cluster admin or jobOwner or member of
+    // job-view-acl
+    if (!queueAdminsACL.isUserAllowed(callerUGI)) {
+      tracker.getACLsManager().checkAccess(jobId, callerUGI, queue,
+          Operation.VIEW_TASK_LOGS, jobOwner, jobViewACL);
+    }
   }
 
   /**
-   * Builds a Configuration object by reading the xml file.
+   * Builds a JobConf object by reading the job-acls.xml file.
    * This doesn't load the default resources.
    *
-   * Returns null if job-acls.xml is not there in userlogs/$jobid/attempt-dir on
+   * Returns null if job-acls.xml is not there in userlogs/$jobid on
    * local file system. This can happen when we restart the cluster with job
    * level authorization enabled(but was disabled on earlier cluster) and
    * viewing task logs of old jobs(i.e. jobs finished on earlier unsecure
    * cluster).
    */
-  static Configuration getConfFromJobACLsFile(TaskAttemptID attemptId,
-      boolean isCleanup) {
+  static JobConf getConfFromJobACLsFile(JobID jobId) {
     Path jobAclsFilePath = new Path(
-        TaskLog.getAttemptDir(attemptId, isCleanup).toString(), TaskRunner.jobACLsFile);
-    Configuration conf = null;
+        TaskLog.getJobDir(jobId).toString(),
+        TaskTracker.jobACLsFile);
+    JobConf conf = null;
     if (new File(jobAclsFilePath.toUri().getPath()).exists()) {
-      conf = new Configuration(false);
+      conf = new JobConf(false);
       conf.addResource(jobAclsFilePath);
     }
     return conf;
@@ -224,15 +236,15 @@ public class TaskLogServlet extends HttpServlet {
       ServletContext context = getServletContext();
       TaskTracker taskTracker = (TaskTracker) context.getAttribute(
           "task.tracker");
+      JobID jobId = attemptId.getJobID();
+
       // get jobACLConf from ACLs file
-      Configuration jobACLConf = getConfFromJobACLsFile(attemptId, isCleanup);
+      JobConf jobACLConf = getConfFromJobACLsFile(jobId);
       // Ignore authorization if job-acls.xml is not found
       if (jobACLConf != null) {
-        String jobId = attemptId.getJobID().toString();
-
         try {
-          checkAccessForTaskLogs(new JobConf(jobACLConf), user, jobId,
-              taskTracker);
+          checkAccessForTaskLogs(jobACLConf, user,
+              jobId.toString(), taskTracker);
         } catch (AccessControlException e) {
           String errMsg = "User " + user + " failed to view tasklogs of job " +
               jobId + "!\n\n" + e.getMessage();

+ 0 - 31
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -19,7 +19,6 @@ package org.apache.hadoop.mapred;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -34,7 +33,6 @@ import java.util.Vector;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
@@ -80,8 +78,6 @@ abstract class TaskRunner extends Thread {
    */
   protected MapOutputFile mapOutputFile;
 
-  static String jobACLsFile = "job-acl.xml";
-
   public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker, 
       JobConf conf) {
     this.tip = tip;
@@ -290,36 +286,9 @@ abstract class TaskRunner extends Thread {
           Localizer.PermissionsHandler.sevenZeroZero);
     }
 
-    if (tracker.areACLsEnabled()) {
-      // write job acls into a file to know the access for task logs
-      writeJobACLs(logDir);
-    }
     return logFiles;
   }
 
-  // Writes job-view-acls and user name into an xml file
-  private void writeJobACLs(File logDir) throws IOException {
-    File aclFile = new File(logDir, TaskRunner.jobACLsFile);
-    Configuration aclConf = new Configuration(false);
-
-    // set the job view acl in aclConf
-    String jobViewACL = conf.get(JobContext.JOB_ACL_VIEW_JOB, " ");
-
-    aclConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACL);
-
-    // set jobOwner as user.name in aclConf
-    String jobOwner = conf.getUser();
-    aclConf.set("user.name", jobOwner);
-    FileOutputStream out = new FileOutputStream(aclFile);
-    try {
-      aclConf.writeXml(out);
-    } finally {
-      out.close();
-    }
-    Localizer.PermissionsHandler.setPermissions(aclFile,
-        Localizer.PermissionsHandler.sevenZeroZero);
-  }
-
   /**
    * Write the child's configuration to the disk and set it in configuration so
    * that the child can pick it up from there.

+ 63 - 3
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.mapred;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.RandomAccessFile;
@@ -70,6 +71,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.QueueManager.QueueACL;
 import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
@@ -80,6 +82,7 @@ import org.apache.hadoop.mapred.TaskLog.LogName;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
@@ -153,6 +156,11 @@ public class TaskTracker
   public static final Log ClientTraceLog =
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
 
+  // Job ACLs file is created by TaskTracker under userlogs/$jobid directory for
+  // each job at job localization time. This will be used by TaskLogServlet for
+  // authorizing viewing of task logs of that job
+  static String jobACLsFile = "job-acls.xml";
+
   volatile boolean running = true;
 
   private LocalDirAllocator localDirAllocator;
@@ -944,7 +952,8 @@ public class TaskTracker
       if (!rjob.localized) {
         JobConf localJobConf = localizeJobFiles(t, rjob);
         // initialize job log directory
-        initializeJobLogDir(jobId);
+        initializeJobLogDir(jobId, localJobConf);
+
         // Now initialize the job via task-controller so as to set
         // ownership/permissions of jars, job-work-dir. Note that initializeJob
         // should be the last call after every other directory/file to be
@@ -1038,13 +1047,64 @@ public class TaskTracker
     return localJobConf;
   }
 
-  // create job userlog dir
-  void initializeJobLogDir(JobID jobId) {
+  // Create job userlog dir.
+  // Create job acls file in job log dir, if needed.
+  void initializeJobLogDir(JobID jobId, JobConf localJobConf)
+      throws IOException {
     // remove it from tasklog cleanup thread first,
     // it might be added there because of tasktracker reinit or restart
     JobStartedEvent jse = new JobStartedEvent(jobId);
     getUserLogManager().addLogEvent(jse);
     localizer.initializeJobLogDir(jobId);
+
+    if (areACLsEnabled()) {
+      // Create job-acls.xml file in job userlog dir and write the needed
+      // info for authorization of users for viewing task logs of this job.
+      writeJobACLs(localJobConf, TaskLog.getJobDir(jobId));
+    }
+  }
+
+  /**
+   *  Creates job-acls.xml under the given directory logDir and writes
+   *  job-view-acl, queue-admins-acl, jobOwner name and queue name into this
+   *  file.
+   *  queue name is the queue to which the job was submitted to.
+   *  queue-admins-acl is the queue admins ACL of the queue to which this
+   *  job was submitted to.
+   * @param conf   job configuration
+   * @param logDir job userlog dir
+   * @throws IOException
+   */
+  private static void writeJobACLs(JobConf conf, File logDir) throws IOException {
+    File aclFile = new File(logDir, jobACLsFile);
+    JobConf aclConf = new JobConf(false);
+
+    // set the job view acl in aclConf
+    String jobViewACL = conf.get(JobContext.JOB_ACL_VIEW_JOB, " ");
+    aclConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACL);
+
+    // set the job queue name in aclConf
+    String queue = conf.getQueueName();
+    aclConf.setQueueName(queue);
+
+    // set the queue admins acl in aclConf
+    String qACLName = QueueManager.toFullPropertyName(queue,
+        QueueACL.ADMINISTER_JOBS.getAclName());
+    String queueAdminsACL = conf.get(qACLName, " ");
+    aclConf.set(qACLName, queueAdminsACL);
+
+    // set jobOwner as user.name in aclConf
+    String jobOwner = conf.getUser();
+    aclConf.set("user.name", jobOwner);
+
+    FileOutputStream out = new FileOutputStream(aclFile);
+    try {
+      aclConf.writeXml(out);
+    } finally {
+      out.close();
+    }
+    Localizer.PermissionsHandler.setPermissions(aclFile,
+        Localizer.PermissionsHandler.sevenZeroZero);
   }
 
   /**

+ 27 - 17
src/test/org/apache/hadoop/mapred/TestJobACLs.java

@@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobPriority;
 import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.QueueManager.QueueOperation;
+import org.apache.hadoop.mapred.QueueManager.QueueACL;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Before;
 import org.junit.Test;
@@ -55,7 +55,10 @@ public class TestJobACLs {
           TestJobACLs.class.getCanonicalName() + Path.SEPARATOR
               + "completed-job-store");
 
-  private String jobSubmitter = "user1";
+  private String jobSubmitter = "jobSubmitter";
+  private String viewColleague = "viewColleague";
+  private String modifyColleague = "modifyColleague";
+  private String qAdmin = "qAdmin";
 
   /**
    * Start the cluster before running the actual test.
@@ -74,11 +77,11 @@ public class TestJobACLs {
 
     // Enable queue and job level authorization
     conf.setBoolean(JobConf.MR_ACLS_ENABLED, true);
-    // no queue admins for default queue
+    // qAdmin is a queue administrator for default queue
     conf.set(QueueManager.toFullPropertyName(
-        "default", QueueOperation.ADMINISTER_JOBS.getAclName()), " ");
+        "default", QueueACL.ADMINISTER_JOBS.getAclName()), qAdmin);
     conf.set(QueueManager.toFullPropertyName(
-        "default", QueueOperation.SUBMIT_JOB.getAclName()), jobSubmitter);
+        "default", QueueACL.SUBMIT_JOB.getAclName()), jobSubmitter);
 
     // Enable CompletedJobStore
     FileSystem fs = FileSystem.getLocal(conf);
@@ -119,7 +122,8 @@ public class TestJobACLs {
   public void testACLS() throws IOException, InterruptedException,
       ClassNotFoundException {
     verifyACLViewJob();
-    verifyACLModifyJob();
+    verifyACLModifyJob(modifyColleague);
+    verifyACLModifyJob(qAdmin);
     verifyACLPersistence();
   }
 
@@ -133,7 +137,7 @@ public class TestJobACLs {
 
     // Set the job up.
     final JobConf myConf = mr.createJobConf();
-    myConf.set(JobContext.JOB_ACL_VIEW_JOB, "user3");
+    myConf.set(JobContext.JOB_ACL_VIEW_JOB, viewColleague);
 
     // Submit the job as user1
     RunningJob job = submitJobAsUser(myConf, jobSubmitter);
@@ -141,10 +145,13 @@ public class TestJobACLs {
     final JobID jobId = job.getID();
 
     // Try operations as an unauthorized user.
-    verifyViewJobAsUnauthorizedUser(myConf, jobId, "user2");
+    verifyViewJobAsUnauthorizedUser(myConf, jobId, modifyColleague);
 
-    // Try operations as an authorized user.
-    verifyViewJobAsAuthorizedUser(myConf, jobId, "user3");
+    // Try operations as an authorized user, who is part of view-job-acl.
+    verifyViewJobAsAuthorizedUser(myConf, jobId, viewColleague);
+
+    // Try operations as an authorized user, who is a queue administrator.
+    verifyViewJobAsAuthorizedUser(myConf, jobId, qAdmin);
 
     // Clean up the job
     job.killJob();
@@ -267,12 +274,12 @@ public class TestJobACLs {
    * @throws InterruptedException
    * @throws ClassNotFoundException
    */
-  private void verifyACLModifyJob() throws IOException,
+  private void verifyACLModifyJob(String authorizedUser) throws IOException,
       InterruptedException, ClassNotFoundException {
 
     // Set the job up.
     final JobConf myConf = mr.createJobConf();
-    myConf.set(JobContext.JOB_ACL_MODIFY_JOB, "user3");
+    myConf.set(JobContext.JOB_ACL_MODIFY_JOB, modifyColleague);
 
     // Submit the job as user1
     RunningJob job = submitJobAsUser(myConf, jobSubmitter);
@@ -280,10 +287,10 @@ public class TestJobACLs {
     final JobID jobId = job.getID();
 
     // Try operations as an unauthorized user.
-    verifyModifyJobAsUnauthorizedUser(myConf, jobId, "user2");
+    verifyModifyJobAsUnauthorizedUser(myConf, jobId, viewColleague);
 
     // Try operations as an authorized user.
-    verifyModifyJobAsAuthorizedUser(myConf, jobId, "user3");
+    verifyModifyJobAsAuthorizedUser(myConf, jobId, authorizedUser);
   }
 
   private void verifyModifyJobAsAuthorizedUser(
@@ -374,7 +381,7 @@ public class TestJobACLs {
 
     // Set the job up.
     final JobConf myConf = mr.createJobConf();
-    myConf.set(JobContext.JOB_ACL_VIEW_JOB, "user2 group2");
+    myConf.set(JobContext.JOB_ACL_VIEW_JOB, viewColleague + " group2");
 
     // Submit the job as user1
     RunningJob job = submitJobAsUser(myConf, jobSubmitter);
@@ -397,11 +404,14 @@ public class TestJobACLs {
 
     final JobConf myNewJobConf = mr.createJobConf();
     // Now verify view-job works off CompletedJobStore
-    verifyViewJobAsAuthorizedUser(myNewJobConf, jobId, "user2");
+    verifyViewJobAsAuthorizedUser(myNewJobConf, jobId, viewColleague);
+    verifyViewJobAsAuthorizedUser(myNewJobConf, jobId, qAdmin);
 
     // Only JobCounters is persisted on the JobStore. So test counters only.
     UserGroupInformation unauthorizedUGI =
-        UserGroupInformation.createUserForTesting("user3", new String[] {});
+        UserGroupInformation.createUserForTesting(
+            modifyColleague, new String[] {});
+
     unauthorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
       @SuppressWarnings("null")
       @Override

+ 5 - 2
src/test/org/apache/hadoop/mapred/TestJobHistory.java

@@ -37,7 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobHistory.*;
-import org.apache.hadoop.mapred.QueueManager.QueueOperation;
+import org.apache.hadoop.mapred.QueueManager.QueueACL;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.commons.logging.Log;
@@ -813,6 +813,9 @@ public class TestJobHistory extends TestCase {
       assertTrue(acl.toString().equals(
           jobInfo.getJobACLs().get(JobACL.MODIFY_JOB).toString()));
     }
+    
+    // Validate the job queue name
+    assertTrue(jobInfo.getJobQueue().equals(conf.getQueueName()));
   }
 
   public void testDoneFolderOnHDFS() throws IOException {
@@ -920,7 +923,7 @@ public class TestJobHistory extends TestCase {
       conf.setBoolean(JobConf.MR_ACLS_ENABLED, true);
       // no queue admins for default queue
       conf.set(QueueManager.toFullPropertyName(
-          "default", QueueOperation.ADMINISTER_JOBS.getAclName()), " ");
+          "default", QueueACL.ADMINISTER_JOBS.getAclName()), " ");
       
       mr = new MiniMRCluster(2, "file:///", 3, null, null, conf);
 

+ 2 - 2
src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java

@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.UtilsForTests;
-import org.apache.hadoop.mapred.QueueManager.QueueOperation;
+import org.apache.hadoop.mapred.QueueManager.QueueACL;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import junit.framework.TestCase;
@@ -536,7 +536,7 @@ public class TestJobTrackerRestart extends TestCase {
       // get the user group info
       UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
       jtConf.set(QueueManager.toFullPropertyName("default",
-          QueueOperation.SUBMIT_JOB.getAclName()), ugi.getUserName());
+          QueueACL.SUBMIT_JOB.getAclName()), ugi.getUserName());
       
       mr = new MiniMRCluster(1, namenode, 1, null, null, jtConf);
       

+ 7 - 0
src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java

@@ -200,6 +200,13 @@ public class TestLocalizationWithLinuxTaskController extends
     File jobLogDir = TaskLog.getJobDir(jobId);
     checkFilePermissions(jobLogDir.toString(), expectedDirPerms, task.getUser(),
         ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+    // check job-acls.xml file permissions
+    checkFilePermissions(jobLogDir.toString() + Path.SEPARATOR
+        + TaskTracker.jobACLsFile, expectedFilePerms, task.getUser(),
+        ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+    
+    // validate the content of job ACLs file
+    validateJobACLsFileContent();
   }
 
   @Override

+ 3 - 3
src/test/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java

@@ -21,7 +21,7 @@ import java.io.IOException;
 import javax.security.auth.login.LoginException;
 import junit.framework.TestCase;
 
-import org.apache.hadoop.mapred.QueueManager.QueueOperation;
+import org.apache.hadoop.mapred.QueueManager.QueueACL;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -33,8 +33,8 @@ public class TestQueueAclsForCurrentUser extends TestCase {
   private QueueManager queueManager;
   private JobConf conf = null;
   UserGroupInformation currentUGI = null;
-  String submitAcl = QueueOperation.SUBMIT_JOB.getAclName();
-  String adminAcl  = QueueOperation.ADMINISTER_JOBS.getAclName();
+  String submitAcl = QueueACL.SUBMIT_JOB.getAclName();
+  String adminAcl  = QueueACL.ADMINISTER_JOBS.getAclName();
 
   private void setupConfForNoAccess() throws IOException,LoginException {
     currentUGI = UserGroupInformation.getLoginUser();

+ 25 - 47
src/test/org/apache/hadoop/mapred/TestQueueManager.java

@@ -38,15 +38,15 @@ import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.QueueManager.QueueOperation;
+import org.apache.hadoop.mapred.QueueManager.QueueACL;
 import org.apache.hadoop.security.UserGroupInformation;
 
 public class TestQueueManager extends TestCase {
 
   private static final Log LOG = LogFactory.getLog(TestQueueManager.class);
 
-  String submitAcl = QueueOperation.SUBMIT_JOB.getAclName();
-  String adminAcl  = QueueOperation.ADMINISTER_JOBS.getAclName();
+  String submitAcl = QueueACL.SUBMIT_JOB.getAclName();
+  String adminAcl  = QueueACL.ADMINISTER_JOBS.getAclName();
 
   private MiniDFSCluster miniDFSCluster;
   private MiniMRCluster miniMRCluster;
@@ -389,62 +389,49 @@ public class TestQueueManager extends TestCase {
 
       //Job Submission should fail because ugi to be used is set to blank.
       assertFalse("User Job Submission Succeeded before refresh.",
-          queueManager.hasAccess("default", QueueManager.QueueOperation.
-              SUBMIT_JOB, ugi));
+          queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi));
       assertFalse("User Job Submission Succeeded before refresh.",
-          queueManager.hasAccess("q1", QueueManager.QueueOperation.
-              SUBMIT_JOB, ugi));
+          queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
       assertFalse("User Job Submission Succeeded before refresh.",
-          queueManager.hasAccess("q2", QueueManager.QueueOperation.
-              SUBMIT_JOB, ugi));
+          queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi));
       
       //Test job submission as alternate user.
       UserGroupInformation alternateUgi = 
         UserGroupInformation.createUserForTesting("u1", new String[]{"user"});
       assertTrue("Alternate User Job Submission failed before refresh.",
-          queueManager.hasAccess("q2", QueueManager.QueueOperation.
-              SUBMIT_JOB, alternateUgi));
+          queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, alternateUgi));
       
       //Set acl for user1.
       queueConfProps.put(QueueManager.toFullPropertyName(
-          "default", submitAcl),
-    		  ugi.getShortUserName());
+          "default", submitAcl), ugi.getShortUserName());
       queueConfProps.put(QueueManager.toFullPropertyName(
-          "q1", submitAcl),
-    		  ugi.getShortUserName());
+          "q1", submitAcl), ugi.getShortUserName());
       queueConfProps.put(QueueManager.toFullPropertyName(
-          "q2", submitAcl),
-    		  ugi.getShortUserName());
+          "q2", submitAcl), ugi.getShortUserName());
       //write out queue-acls.xml.
       UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
       //refresh configuration
       queueManager.refreshAcls(conf);
       //Submission should succeed
       assertTrue("User Job Submission failed after refresh.",
-          queueManager.hasAccess("default", QueueManager.QueueOperation.
-              SUBMIT_JOB, ugi));
+          queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi));
       assertTrue("User Job Submission failed after refresh.",
-          queueManager.hasAccess("q1", QueueManager.QueueOperation.
-              SUBMIT_JOB, ugi));
+          queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
       assertTrue("User Job Submission failed after refresh.",
-          queueManager.hasAccess("q2", QueueManager.QueueOperation.
-              SUBMIT_JOB, ugi));
+          queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi));
       assertFalse("Alternate User Job Submission succeeded after refresh.",
-          queueManager.hasAccess("q2", QueueManager.QueueOperation.
-              SUBMIT_JOB, alternateUgi));
+          queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, alternateUgi));
       //delete the ACL file.
       queueConfigFile.delete();
       
       //rewrite the mapred-site.xml
       hadoopConfProps.put(JobConf.MR_ACLS_ENABLED, "true");
       hadoopConfProps.put(QueueManager.toFullPropertyName(
-          "q1", submitAcl),
-          ugi.getShortUserName());
+          "q1", submitAcl), ugi.getShortUserName());
       UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
       queueManager.refreshAcls(conf);
       assertTrue("User Job Submission allowed after refresh and no queue acls file.",
-          queueManager.hasAccess("q1", QueueManager.QueueOperation.
-              SUBMIT_JOB, ugi));
+          queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
     } finally{
       if(queueConfigFile.exists()) {
         queueConfigFile.delete();
@@ -472,28 +459,22 @@ public class TestQueueManager extends TestCase {
       Properties queueConfProps = new Properties();
       UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
       queueConfProps.put(QueueManager.toFullPropertyName(
-          "default", submitAcl),
-          ugi.getShortUserName());
+          "default", submitAcl), ugi.getShortUserName());
       queueConfProps.put(QueueManager.toFullPropertyName(
-          "q1", submitAcl),
-          ugi.getShortUserName());
+          "q1", submitAcl), ugi.getShortUserName());
       queueConfProps.put(QueueManager.toFullPropertyName(
-          "q2", submitAcl),
-          ugi.getShortUserName());
+          "q2", submitAcl), ugi.getShortUserName());
       UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
       
       Configuration conf = new JobConf();
       QueueManager queueManager = new QueueManager(conf);
       //Testing access to queue.
       assertTrue("User Job Submission failed.",
-          queueManager.hasAccess("default", QueueManager.QueueOperation.
-              SUBMIT_JOB, ugi));
+          queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi));
       assertTrue("User Job Submission failed.",
-          queueManager.hasAccess("q1", QueueManager.QueueOperation.
-              SUBMIT_JOB, ugi));
+          queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
       assertTrue("User Job Submission failed.",
-          queueManager.hasAccess("q2", QueueManager.QueueOperation.
-              SUBMIT_JOB, ugi));
+          queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi));
       
       //Write out a new incomplete invalid configuration file.
       PrintWriter writer = new PrintWriter(new FileOutputStream(queueConfigFile));
@@ -509,14 +490,11 @@ public class TestQueueManager extends TestCase {
       } catch (Exception e) {
       }
       assertTrue("User Job Submission failed after invalid conf file refresh.",
-          queueManager.hasAccess("default", QueueManager.QueueOperation.
-              SUBMIT_JOB, ugi));
+          queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi));
       assertTrue("User Job Submission failed after invalid conf file refresh.",
-          queueManager.hasAccess("q1", QueueManager.QueueOperation.
-              SUBMIT_JOB, ugi));
+          queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
       assertTrue("User Job Submission failed after invalid conf file refresh.",
-          queueManager.hasAccess("q2", QueueManager.QueueOperation.
-              SUBMIT_JOB, ugi));
+          queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi));
     } finally {
       //Cleanup the configuration files in all cases
       if(hadoopConfigFile.exists()) {

+ 2 - 3
src/test/org/apache/hadoop/mapred/TestRecoveryManager.java

@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
-import org.apache.hadoop.mapred.QueueManager.QueueOperation;
+import org.apache.hadoop.mapred.QueueManager.QueueACL;
 import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.*;
@@ -237,8 +237,7 @@ public class TestRecoveryManager extends TestCase {
     mr.getJobTrackerConf().setBoolean(JobConf.MR_ACLS_ENABLED, true);
     UserGroupInformation ugi = UserGroupInformation.getLoginUser();
     mr.getJobTrackerConf().set(QueueManager.toFullPropertyName(
-        "default", QueueOperation.SUBMIT_JOB.getAclName()), 
-        ugi.getUserName());
+        "default", QueueACL.SUBMIT_JOB.getAclName()), ugi.getUserName());
 
     // start the jobtracker
     LOG.info("Starting jobtracker");

+ 37 - 18
src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.QueueManager.QueueACL;
 import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
 import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.RunningJob;
@@ -127,6 +128,14 @@ public class TestTaskTrackerLocalization extends TestCase {
     jobConf.setInt("mapred.userlog.retain.hours", 0);
     jobConf.setUser(getJobOwner().getShortUserName());
 
+    // set job queue name in job conf
+    String queue = "default";
+    jobConf.setQueueName(queue);
+    // Set queue admins acl in job conf similar to what JobClient does
+    jobConf.set(QueueManager.toFullPropertyName(queue,
+        QueueACL.ADMINISTER_JOBS.getAclName()),
+        "qAdmin1,qAdmin2 qAdminsGroup1,qAdminsGroup2");
+
     String jtIdentifier = "200907202331";
     jobId = new JobID(jtIdentifier, 1);
 
@@ -468,6 +477,34 @@ public class TestTaskTrackerLocalization extends TestCase {
         .exists());
     checkFilePermissions(jobLogDir.toString(), "drwx------", task.getUser(),
         taskTrackerUGI.getGroupNames()[0]);
+
+    // Make sure that the job ACLs file job-acls.xml exists in job userlog dir
+    File jobACLsFile = new File(jobLogDir, TaskTracker.jobACLsFile);
+    assertTrue("JobACLsFile is missing in the job userlog dir " + jobLogDir,
+        jobACLsFile.exists());
+
+    // With default task controller, the job-acls.xml file is owned by TT and
+    // permissions are 700
+    checkFilePermissions(jobACLsFile.getAbsolutePath(), "-rwx------",
+        taskTrackerUGI.getShortUserName(), taskTrackerUGI.getGroupNames()[0]);
+
+    validateJobACLsFileContent();
+  }
+
+  // Validate the contents of jobACLsFile ( i.e. user name, job-view-acl, queue
+  // name and queue-admins-acl ).
+  protected void validateJobACLsFileContent() {
+    JobConf jobACLsConf = TaskLogServlet.getConfFromJobACLsFile(jobId);
+    assertTrue(jobACLsConf.get("user.name").equals(
+        localizedJobConf.getUser()));
+    assertTrue(jobACLsConf.get(JobContext.JOB_ACL_VIEW_JOB).
+        equals(localizedJobConf.get(JobContext.JOB_ACL_VIEW_JOB)));
+    String queue = localizedJobConf.getQueueName();
+    assertTrue(queue.equalsIgnoreCase(jobACLsConf.getQueueName()));
+    String qACLName = QueueManager.toFullPropertyName(queue,
+        QueueACL.ADMINISTER_JOBS.getAclName());
+    assertTrue(jobACLsConf.get(qACLName).equals(
+        localizedJobConf.get(qACLName)));
   }
 
   /**
@@ -525,24 +562,6 @@ public class TestTaskTrackerLocalization extends TestCase {
         + expectedStderr.toString() + " Observed : "
         + attemptLogFiles[1].toString(), expectedStderr.toString().equals(
         attemptLogFiles[1].toString()));
-
-    // Make sure that the job ACLs file exists in the task log dir
-    File jobACLsFile = new File(logDir, TaskRunner.jobACLsFile);
-    assertTrue("JobACLsFile is missing in the task log dir " + logDir,
-        jobACLsFile.exists());
-
-    // With default task controller, the job-acls file is owned by TT and
-    // permissions are 700
-    checkFilePermissions(jobACLsFile.getAbsolutePath(), "-rwx------",
-        taskTrackerUGI.getShortUserName(), taskTrackerUGI.getGroupNames()[0]);
-
-    // Validate the contents of jobACLsFile(both user name and job-view-acls)
-    Configuration jobACLsConf = TaskLogServlet.getConfFromJobACLsFile(
-        task.getTaskID(), task.isTaskCleanupTask());
-    assertTrue(jobACLsConf.get("user.name").equals(
-        localizedJobConf.getUser()));
-    assertTrue(jobACLsConf.get(JobContext.JOB_ACL_VIEW_JOB).
-        equals(localizedJobConf.get(JobContext.JOB_ACL_VIEW_JOB)));
   }
 
   /**

+ 3 - 1
src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java

@@ -60,8 +60,9 @@ public class TestUserLogCleanup {
 
   private File localizeJob(JobID jobid) throws IOException {
     File jobUserlog = TaskLog.getJobDir(jobid);
+    JobConf conf = new JobConf();
     // localize job log directory
-    tt.initializeJobLogDir(jobid);
+    tt.initializeJobLogDir(jobid, conf);
     assertTrue(jobUserlog + " directory is not created.", jobUserlog.exists());
     return jobUserlog;
   }
@@ -75,6 +76,7 @@ public class TestUserLogCleanup {
   private void startTT(Configuration conf) throws IOException {
     myClock = new FakeClock(); // clock is reset.
     tt = new TaskTracker();
+    tt.setConf(new JobConf(conf));
     localizer = new Localizer(FileSystem.get(conf), conf
         .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY),
         new DefaultTaskController());

+ 48 - 28
src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java

@@ -33,7 +33,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.http.TestHttpServer.DummyFilterInitializer;
 import org.apache.hadoop.mapred.JobHistory.Keys;
 import org.apache.hadoop.mapred.JobHistory.TaskAttempt;
-import org.apache.hadoop.mapred.QueueManager.QueueOperation;
+import org.apache.hadoop.mapred.QueueManager.QueueACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.SleepJob;
@@ -121,11 +121,13 @@ public class TestWebUIAuthorization extends ClusterMapReduceTestCase {
    * (1) jobSubmitter can view the job
    * (2) mrAdmin can view any job
    * (3) mrOwner can view any job
-   * (4) user mentioned in job-view-acls should be able to view the
-   *     job irrespective of job-modify-acls.
-   * (5) user mentioned in job-modify-acls but not in job-view-acls
+   * (4) qAdmins of the queue to which job is submitted to can view any job in
+   *     that queue.
+   * (5) user mentioned in job-view-acl should be able to view the
+   *     job irrespective of job-modify-acl.
+   * (6) user mentioned in job-modify-acl but not in job-view-acl
    *     cannot view the job
-   * (6) other unauthorized users cannot view the job
+   * (7) other unauthorized users cannot view the job
    */
   private void validateViewJob(String url, String method)
       throws IOException {
@@ -140,6 +142,8 @@ public class TestWebUIAuthorization extends ClusterMapReduceTestCase {
         getHttpStatusCode(url, mrAdminGroupMember, method));
     assertEquals("Incorrect return code for MR-owner " + mrOwner,
         HttpURLConnection.HTTP_OK, getHttpStatusCode(url, mrOwner, method));
+    assertEquals("Incorrect return code for queue admin " + qAdmin,
+        HttpURLConnection.HTTP_OK, getHttpStatusCode(url, qAdmin, method));
     assertEquals("Incorrect return code for user in job-view-acl " +
         viewColleague, HttpURLConnection.HTTP_OK,
         getHttpStatusCode(url, viewColleague, method));
@@ -160,9 +164,9 @@ public class TestWebUIAuthorization extends ClusterMapReduceTestCase {
    * (1) jobSubmitter, mrOwner, qAdmin and mrAdmin can modify the job.
    *     But we are not validating this in this method. Let the caller
    *     explicitly validate this, if needed.
-   * (2) user mentioned in job-view-acls but not in job-modify-acls cannot
+   * (2) user mentioned in job-view-acl but not in job-modify-acl cannot
    *     modify the job
-   * (3) user mentioned in job-modify-acls (irrespective of job-view-acls)
+   * (3) user mentioned in job-modify-acl (irrespective of job-view-acl)
    *     can modify the job
    * (4) other unauthorized users cannot modify the job
    */
@@ -297,9 +301,9 @@ public class TestWebUIAuthorization extends ClusterMapReduceTestCase {
 
     props.setProperty(JobConf.MR_ACLS_ENABLED, String.valueOf(true));
     props.setProperty(QueueManager.toFullPropertyName(
-        "default", QueueOperation.ADMINISTER_JOBS.getAclName()), qAdmin);
+        "default", QueueACL.ADMINISTER_JOBS.getAclName()), qAdmin);
     props.setProperty(QueueManager.toFullPropertyName(
-        "default", QueueOperation.SUBMIT_JOB.getAclName()), jobSubmitter);
+        "default", QueueACL.SUBMIT_JOB.getAclName()), jobSubmitter);
 
     props.setProperty("dfs.permissions", "false");
 
@@ -383,7 +387,9 @@ public class TestWebUIAuthorization extends ClusterMapReduceTestCase {
     JobHistory.parseHistoryFromFS(historyFilePath.toString().substring(5),
         l, historyFilePath.getFileSystem(conf));
 
-    Map<String, org.apache.hadoop.mapred.JobHistory.Task> tipsMap = jobInfo.getAllTasks();
+    Map<String, org.apache.hadoop.mapred.JobHistory.Task> tipsMap =
+    	jobInfo.getAllTasks();
+
     for (String tip : tipsMap.keySet()) {
       // validate access of taskdetailshistory.jsp
       validateViewJob(jtURL + "/taskdetailshistory.jsp?logFile="
@@ -401,20 +407,37 @@ public class TestWebUIAuthorization extends ClusterMapReduceTestCase {
         String taskLogURL = TaskLogServlet.getTaskLogUrl("localhost",
             attemptsMap.get(attempt).get(Keys.HTTP_PORT), attempt.toString());
         validateViewJob(taskLogURL, "GET");
+      }
+    }
 
-        // delete job-acls.xml file from the task log dir of attempt and verify
-        // if unauthorized users can view task logs of attempt.
-        File attemptLogDir = TaskLog.getAttemptDir(TaskAttemptID
-            .forName(attempt), false);
-        Path jobACLsFilePath = new Path(attemptLogDir.toString(),
-            TaskRunner.jobACLsFile);
-        new File(jobACLsFilePath.toUri().getPath()).delete();
+    // For each tip, let us test the effect of deletion of job-acls.xml file and
+    // deletion of task log dir for each of the attempts of the tip.
+    
+    // delete job-acls.xml file from the job userlog dir and verify
+    // if unauthorized users can view task logs of each attempt.
+    Path jobACLsFilePath = new Path(TaskLog.getJobDir(jobid).toString(),
+        TaskTracker.jobACLsFile);
+    new File(jobACLsFilePath.toUri().getPath()).delete();
+
+    for (String tip : tipsMap.keySet()) {
+
+      Map<String, TaskAttempt> attemptsMap =
+        tipsMap.get(tip).getTaskAttempts();
+      for (String attempt : attemptsMap.keySet()) {
+
+        String taskLogURL = TaskLogServlet.getTaskLogUrl("localhost",
+            attemptsMap.get(attempt).get(Keys.HTTP_PORT), attempt.toString());
+
+        // unauthorized users can view task logs of each attempt because
+        // job-acls.xml file is deleted.
         assertEquals("Incorrect return code for " + unauthorizedUser,
             HttpURLConnection.HTTP_OK, getHttpStatusCode(taskLogURL,
                 unauthorizedUser, "GET"));
 
         // delete the whole task log dir of attempt and verify that we get
         // correct response code (i.e. HTTP_GONE) when task logs are accessed.
+        File attemptLogDir = TaskLog.getAttemptDir(TaskAttemptID
+            .forName(attempt), false);
         FileUtil.fullyDelete(attemptLogDir);
         assertEquals("Incorrect return code for " + jobSubmitter,
             HttpURLConnection.HTTP_GONE, getHttpStatusCode(taskLogURL,
@@ -477,10 +500,6 @@ public class TestWebUIAuthorization extends ClusterMapReduceTestCase {
           getHttpStatusCode(url, unauthorizedUser, "POST"));
       assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
           getHttpStatusCode(url, modifyColleague, "POST"));
-      // As qAdmin doesn't have view access to job, he cannot kill the job
-      // from jobdetails web page. But qAdmin can kill job from jobtracker page.
-      assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
-          getHttpStatusCode(url, qAdmin, "POST"));
 
       assertEquals(HttpURLConnection.HTTP_OK,
           getHttpStatusCode(url, viewAndModifyColleague, "POST"));
@@ -499,13 +518,15 @@ public class TestWebUIAuthorization extends ClusterMapReduceTestCase {
     // check if jobSubmitter, mrOwner and mrAdmin can do
     // killJob using jobdetails.jsp url
     confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
-                                       jobSubmitter);
+                                      jobSubmitter);
+    confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
+                                      mrOwner);
     confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
-                                       mrOwner);
+                                      mrAdminUser);
     confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
-                                       mrAdminGroupMember);
+                                      mrAdminGroupMember);
     confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
-        mrAdminUser);
+                                      qAdmin);
   }
 
   /**
@@ -647,9 +668,9 @@ public class TestWebUIAuthorization extends ClusterMapReduceTestCase {
 
     props.setProperty(JobConf.MR_ACLS_ENABLED, String.valueOf(true));
     props.setProperty(QueueManager.toFullPropertyName(
-        "default", QueueOperation.ADMINISTER_JOBS.getAclName()), qAdmin);
+        "default", QueueACL.ADMINISTER_JOBS.getAclName()), qAdmin);
     props.setProperty(QueueManager.toFullPropertyName(
-        "default", QueueOperation.SUBMIT_JOB.getAclName()),
+        "default", QueueACL.SUBMIT_JOB.getAclName()),
         jobSubmitter + "," + jobSubmitter1 + "," + jobSubmitter2 + "," +
         jobSubmitter3);
 
@@ -732,7 +753,6 @@ public class TestWebUIAuthorization extends ClusterMapReduceTestCase {
 
     startCluster(true, props);
     validateCommonServlets(getMRCluster());
-    stopCluster();
   }
 
   private void validateCommonServlets(MiniMRCluster cluster) throws IOException {