Browse Source

commit edd67b2aea06f831a016fb9b9c0075eb1207c94c
Author: Vinod Kumar <vinodkv@yahoo-inc.com>
Date: Sat Feb 27 15:33:40 2010 +0530

MAPREDUCE-1307 from https://issues.apache.org/jira/secure/attachment/12437331/MAPREDUCE-1307-20100227-ydist.txt


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077246 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 years ago
parent
commit
1c98829b45

+ 66 - 0
src/mapred/mapred-default.xml

@@ -916,6 +916,72 @@
   </description>
 </property>
 
+<property>
+  <name>mapreduce.cluster.job-authorization-enabled</name>
+  <value>false</value>
+  <description> Boolean flag that specifies if job-level authorization checks
+  should be enabled on the jobs submitted to the cluster.  Job-level
+  authorization is enabled if this flag is set to true or disabled otherwise.
+  It is disabled by default. If enabled, access control checks are made by
+  JobTracker and TaskTracker when requests are made by users for viewing the
+  job-details (See mapreduce.job.acl-view-job) or for modifying the job
+  (See mapreduce.job.acl-modify-job) using Map/Reduce APIs, RPCs or via the
+  console and web user interfaces.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.job.acl-modify-job</name>
+  <value></value>
+  <description> Job specific access-control list for 'modifying' the job. It
+    is only used if authorization is enabled in Map/Reduce by setting the
+    configuration property mapreduce.cluster.job-authorization-enabled to true.
+    This specifies the list of users and/or groups who can do modification
+    operations on the job. For specifying a list of users and groups the
+    format to use is "user1,user2 group1,group". If set to '*', it allows all
+    users/groups to modify this job. If set to '', it allows none. This
+    configuration is used to guard all the modifications with respect to this
+    job and takes care of all the following operations:
+      o killing this job
+      o killing a task of this job, failing a task of this job
+      o setting the priority of this job
+    Each of these operations are also protected by the per-queue level ACL
+    "acl-administer-jobs" configured via mapred-queues.xml. So a caller should
+    have the authorization to satisfy both the queue-level ACL and the
+    job-level ACL.
+
+    Irrespective of this ACL configuration, job-owner, superuser and members
+    of supergroup configured on JobTracker via mapred.permissions.supergroup,
+    can do all the modification operations.
+
+    By default, nobody else besides job-owner, superuser/supergroup can
+    perform modification operations on a job that they don't own.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.job.acl-view-job</name>
+  <value></value>
+  <description> Job specific access-control list for 'viewing' the job. It is
+    only used if authorization is enabled in Map/Reduce by setting the
+    configuration property mapreduce.cluster.job-authorization-enabled to true.
+    This specifies the list of users and/or groups who can view private details
+    about the job. For specifying a list of users and groups the
+    format to use is "user1,user2 group1,group". If set to '*', it allows all
+    users/groups to modify this job. If set to '', it allows none. This
+    configuration is used to guard some of the job-views and at present only
+    protects APIs that can return possibly sensitive information of the
+    job-owner like
+      o job-level counters
+      o task-level counters
+      o tasks' diagnostic information
+      o task-logs displayed on the TaskTracker web-UI and
+      o job.xml showed by the JobTracker's web-UI
+    Every other piece information of jobs is still accessible by any other
+    users, for e.g., JobStatus, JobProfile, list of jobs in the queue, etc.
+  </description>
+</property>
+
 <property>
   <name>mapred.tasktracker.indexcache.mb</name>
   <value>10</value>

+ 24 - 4
src/mapred/org/apache/hadoop/mapred/CompletedJobStatusStore.java

@@ -28,6 +28,10 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.AccessControlException;
+
 /**
  * Persists and retrieves the Job info of a job into/from DFS.
  * <p/>
@@ -45,13 +49,16 @@ class CompletedJobStatusStore implements Runnable {
   private FileSystem fs;
   private static final String JOB_INFO_STORE_DIR = "/jobtracker/jobsInfo";
 
+  private JobACLsManager jobACLsManager = null;
+
   public static final Log LOG =
           LogFactory.getLog(CompletedJobStatusStore.class);
 
   private static long HOUR = 1000 * 60 * 60;
   private static long SLEEP_TIME = 1 * HOUR;
 
-  CompletedJobStatusStore(Configuration conf) throws IOException {
+  CompletedJobStatusStore(JobACLsManager aclsManager, Configuration conf)
+      throws IOException {
     active =
       conf.getBoolean("mapred.job.tracker.persist.jobstatus.active", false);
 
@@ -67,13 +74,21 @@ class CompletedJobStatusStore implements Runnable {
       // set the fs
       this.fs = path.getFileSystem(conf);
       if (!fs.exists(path)) {
-        fs.mkdirs(path);
+        if (!fs.mkdirs(path)) {
+          active = false;
+          LOG.warn("Couldn't create " + jobInfoDir
+              + ". CompletedJobStore will be inactive.");
+          return;
+        }
       }
 
       if (retainTime == 0) {
         // as retain time is zero, all stored jobstatuses are deleted.
         deleteJobStatusDirs();
       }
+
+      this.jobACLsManager = aclsManager;
+
       LOG.info("Completed job store activated/configured with retain-time : " 
                + retainTime + " , job-info-dir : " + jobInfoDir);
     } else {
@@ -275,18 +290,23 @@ class CompletedJobStatusStore implements Runnable {
    *
    * @param jobId the jobId for which Counters is queried
    * @return Counters object, null if not able to retrieve
+   * @throws AccessControlException 
    */
-  public Counters readCounters(JobID jobId) {
+  public Counters readCounters(JobID jobId) throws AccessControlException {
     Counters counters = null;
     if (active) {
       try {
         FSDataInputStream dataIn = getJobInfoFile(jobId);
         if (dataIn != null) {
-          readJobStatus(dataIn);
+          JobStatus jobStatus = readJobStatus(dataIn);
+          jobACLsManager.checkAccess(jobStatus,
+              UserGroupInformation.getCurrentUser(), JobACL.VIEW_JOB);
           readJobProfile(dataIn);
           counters = readCounters(dataIn);
           dataIn.close();
         }
+      } catch (AccessControlException ace) {
+        throw ace;
       } catch (IOException ex) {
         LOG.warn("Could not read [" + jobId + "] job counters : " + ex, ex);
       }

+ 120 - 0
src/mapred/org/apache/hadoop/mapred/JobACLsManager.java

@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+public class JobACLsManager {
+
+  private JobTracker jobTracker = null;
+
+  public JobACLsManager(JobTracker tracker) {
+    jobTracker = tracker;
+  }
+
+  /**
+   * Construct the jobACLs from the configuration so that they can be kept in
+   * the memory. If authorization is disabled on the JT, nothing is constructed
+   * and an empty map is returned.
+   * 
+   * @return JobACl to AccessControlList map.
+   */
+  Map<JobACL, AccessControlList> constructJobACLs(JobConf conf) {
+    
+    Map<JobACL, AccessControlList> acls =
+      new HashMap<JobACL, AccessControlList>();
+
+    // Don't construct anything if authorization is disabled.
+    if (!jobTracker.isJobLevelAuthorizationEnabled()) {
+      return acls;
+    }
+
+    for (JobACL aclName : JobACL.values()) {
+      String aclConfigName = aclName.getAclName();
+      String aclConfigured = conf.get(aclConfigName);
+      if (aclConfigured == null) {
+        // If ACLs are not configured at all, we grant no access to anyone. So
+        // jobOwner and superuser/supergroup _only_ can do 'stuff'
+        aclConfigured = "";
+      }
+      acls.put(aclName, new AccessControlList(aclConfigured));
+    }
+    return acls;
+  }
+
+  /**
+   * If authorization is enabled on the JobTracker, checks whether the user (in
+   * the callerUGI) is authorized to perform the operation specify by
+   * 'jobOperation' on the job.
+   * <ul>
+   * <li>The owner of the job can do any operation on the job</li>
+   * <li>The superuser/supergroup of the JobTracker is always permitted to do
+   * operations on any job.</li>
+   * <li>For all other users/groups job-acls are checked</li>
+   * </ul>
+   * 
+   * @param jobStatus
+   * @param callerUGI
+   * @param jobOperation
+   */
+  void checkAccess(JobStatus jobStatus, UserGroupInformation callerUGI,
+      JobACL jobOperation) throws AccessControlException {
+
+    if (!jobTracker.isJobLevelAuthorizationEnabled()) {
+      return;
+    }
+
+    JobID jobId = jobStatus.getJobID();
+
+    // Check for superusers/supergroups
+    if (jobTracker.isSuperUserOrSuperGroup(callerUGI)) {
+      JobInProgress.LOG.info("superuser/supergroup "
+          + callerUGI.getShortUserName() + " trying to perform "
+          + jobOperation.toString() + " on " + jobId);
+      return;
+    }
+
+    // Job-owner is always part of all the ACLs
+    if (callerUGI.getShortUserName().equals(jobStatus.getUsername())) {
+      JobInProgress.LOG.info("Jobowner " + callerUGI.getShortUserName()
+          + " trying to perform " + jobOperation.toString() + " on "
+          + jobId);
+      return;
+    }
+
+    AccessControlList acl = jobStatus.getJobACLs().get(jobOperation);
+    if (acl.isUserAllowed(callerUGI)) {
+      JobInProgress.LOG.info("Normal user " + callerUGI.getShortUserName()
+          + " trying to perform " + jobOperation.toString() + " on "
+          + jobId);
+      return;
+    }
+
+    throw new AccessControlException(callerUGI
+        + " not authorized for performing the operation "
+        + jobOperation.toString() + " on " + jobId + ". "
+        + jobOperation.toString() + " configured for this job : "
+        + acl.toString());
+  }
+}

+ 8 - 1
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -1562,9 +1562,9 @@ public class JobClient extends Configured implements MRConstants, Tool  {
         if (job == null) {
           System.out.println("Could not find job " + jobid);
         } else {
+          Counters counters = job.getCounters();
           System.out.println();
           System.out.println(job);
-          Counters counters = job.getCounters();
           if (counters != null) {
             System.out.println(counters);
           } else {
@@ -1644,6 +1644,13 @@ public class JobClient extends Configured implements MRConstants, Tool  {
           exitCode = -1;
         }
       }
+    } catch (RemoteException re){
+      IOException unwrappedException = re.unwrapRemoteException();
+      if (unwrappedException instanceof AccessControlException) {
+        System.out.println(unwrappedException.getMessage());
+      } else {
+        throw re;
+      }
     } finally {
       close();
     }

+ 26 - 0
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.JobHistory.Values;
+import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -60,6 +61,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -300,6 +302,7 @@ public class JobInProgress {
     this.runningReduces = new LinkedHashSet<TaskInProgress>();
     this.resourceEstimator = new ResourceEstimator(this);
     this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
+    this.status.setUsername(conf.getUser());
     this.profile = new JobProfile(conf.getUser(), jobid, "", "",
                                   conf.getJobName(), conf.getQueueName());
     this.memoryPerMap = conf.getMemoryForMapTask();
@@ -340,6 +343,7 @@ public class JobInProgress {
         + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobId;
     this.jobtracker = jobtracker;
     this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
+    this.status.setUsername(jobInfo.getUser().toString());
     this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
     this.startTime = jobtracker.getClock().getTime();
     status.setStartTime(startTime);
@@ -388,6 +392,9 @@ public class JobInProgress {
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
        (numMapTasks + numReduceTasks + 10);
 
+    // Construct the jobACLs
+    status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
+
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
         
@@ -709,6 +716,25 @@ public class JobInProgress {
     return allTaskSplitMetaInfo;
   }
 
+  /**
+   * If authorization is enabled on the JobTracker, checks whether the user (in
+   * the callerUGI) is authorized to perform the operation specify by
+   * 'jobOperation' on the job.
+   * <ul>
+   * <li>The owner of the job can do any operation on the job</li>
+   * <li>The superuser/supergroup of the JobTracker is always permitted to do
+   * operations on any job.</li>
+   * <li>For all other users/groups job-acls are checked</li>
+   * </ul>
+   * 
+   * @param callerUGI
+   * @param jobOperation
+   */
+  void checkAccess(UserGroupInformation callerUGI, JobACL jobOperation)
+      throws AccessControlException {
+    jobtracker.getJobACLsManager().checkAccess(status, callerUGI, jobOperation);
+  }
+
   /////////////////////////////////////////////////////
   // Accessors for the JobInProgress
   /////////////////////////////////////////////////////

+ 35 - 2
src/mapred/org/apache/hadoop/mapred/JobStatus.java

@@ -27,6 +27,13 @@ import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 import org.apache.hadoop.io.WritableUtils;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
 /**************************************************
  * Describes the current status of a job.  This is
  * not intended to be a comprehensive piece of data.
@@ -48,6 +55,9 @@ public class JobStatus implements Writable, Cloneable {
   public static final int PREP = 4;
   public static final int KILLED = 5;
 
+  private Map<JobACL, AccessControlList> jobACLs =
+    new HashMap<JobACL, AccessControlList>();
+
   private static final String UNKNOWN = "UNKNOWN";
   private static final String[] runStates =
       {UNKNOWN, "RUNNING", "SUCCEEDED", "FAILED", "PREP", "KILLED"};
@@ -145,7 +155,7 @@ public class JobStatus implements Writable, Cloneable {
      }
      priority = jp;
    }
-   
+
   /**
    * @deprecated use getJobID instead
    */
@@ -161,7 +171,11 @@ public class JobStatus implements Writable, Cloneable {
    * @return Percentage of progress in maps 
    */
   public synchronized float mapProgress() { return mapProgress; }
-    
+
+  protected synchronized void setJobACLs(Map<JobACL, AccessControlList> acls) {
+    this.jobACLs = acls;
+  }
+
   /**
    * Sets the map progress of this job
    * @param p The value of map progress to set to
@@ -269,6 +283,10 @@ public class JobStatus implements Writable, Cloneable {
     this.schedulingInfo = schedulingInfo;
   }
   
+  public synchronized Map<JobACL, AccessControlList> getJobACLs() {
+    return jobACLs;
+  }
+
   /**
    * Return the priority of the job
    * @return job priority
@@ -308,6 +326,13 @@ public class JobStatus implements Writable, Cloneable {
     Text.writeString(out, user);
     WritableUtils.writeEnum(out, priority);
     Text.writeString(out, schedulingInfo);
+
+    // Serialize the job's ACLs
+    out.writeInt(jobACLs.size());
+    for (Entry<JobACL, AccessControlList> entry : jobACLs.entrySet()) {
+      WritableUtils.writeEnum(out, entry.getKey());
+      Text.writeString(out, entry.getValue().toString());
+    }
   }
 
   public synchronized void readFields(DataInput in) throws IOException {
@@ -321,6 +346,14 @@ public class JobStatus implements Writable, Cloneable {
     this.user = Text.readString(in);
     this.priority = WritableUtils.readEnum(in, JobPriority.class);
     this.schedulingInfo = Text.readString(in);
+
+    // De-serialize the job's ACLs
+    int numACLs = in.readInt();
+    for (int i = 0; i < numACLs; i++) {
+      JobACL aclType = WritableUtils.readEnum(in, JobACL.class);
+      String acl = Text.readString(in);
+      this.jobACLs.put(aclType, new AccessControlList(acl));
+    }
   }
 
   // A utility to convert new job runstates to the old ones.

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java

@@ -75,8 +75,9 @@ interface JobSubmissionProtocol extends VersionedProtocol {
    *             staging area using user credentials passed via the rpc. 
    * Version 23: Provide TokenStorage object while submitting a job
    * Version 24: Added delegation tokens (add, renew, cancel)
+   * Version 25: Added JobACLs to JobStatus as part of MAPREDUCE-1307
    */
-  public static final long versionID = 24L;
+  public static final long versionID = 25L;
 
   /**
    * Allocate a name for the job.

+ 144 - 42
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -67,6 +67,7 @@ import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.JobSubmissionProtocol;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.http.HttpServer;
@@ -102,6 +103,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 
 import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -1302,13 +1304,21 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     }
     
     private JobStatusChangeEvent updateJob(JobInProgress jip, 
-                                           JobHistory.JobInfo job) {
+        JobHistory.JobInfo job) {
       // Change the job priority
       String jobpriority = job.get(Keys.JOB_PRIORITY);
       JobPriority priority = JobPriority.valueOf(jobpriority);
       // It's important to update this via the jobtracker's api as it will 
       // take care of updating the event listeners too
-      setJobPriority(jip.getJobID(), priority);
+      
+      try {
+        setJobPriority(jip.getJobID(), priority);
+      } catch (IOException e) {
+        // This will not happen. JobTracker can set jobPriority of any job
+        // as mrOwner has the needed permissions.
+        LOG.warn("Unexpected. JobTracker could not do SetJobPriority on "
+                 + jip.getJobID() + ". " + e);
+      }
 
       // Save the previous job status
       JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
@@ -1675,7 +1685,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
           // check the access
           try {
-            checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB, ugi);
+            checkAccess(job, ugi, QueueManager.QueueOperation.SUBMIT_JOB,
+                        null);
           } catch (Throwable t) {
             LOG.warn("Access denied for user " + ugi.getShortUserName() 
                      + " in groups : [" 
@@ -1919,6 +1930,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
                                                 "expireLaunchingTasks");
 
   CompletedJobStatusStore completedJobStatusStore = null;
+  private JobACLsManager jobACLsManager;
   Thread completedJobsStoreThread = null;
   RecoveryManager recoveryManager;
 
@@ -2256,8 +2268,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
+    // Initialize the jobACLSManager
+    jobACLsManager = new JobACLsManager(this);
     //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(conf);
+    completedJobStatusStore = new CompletedJobStatusStore(jobACLsManager, conf);
   }
 
   private static SimpleDateFormat getDateFormat() {
@@ -3620,12 +3634,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   public JobStatus submitJob(JobID jobId, String jobSubmitDir, TokenStorage ts)
       throws IOException {
     JobInfo jobInfo = null;
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     synchronized (this) {
       if (jobs.containsKey(jobId)) {
         // job already running, don't start twice
         return jobs.get(jobId).getStatus();
       }
-      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
       jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()),
           new Path(jobSubmitDir));
     }
@@ -3648,7 +3662,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
       // check for access
       try {
-        checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
+        checkAccess(job, ugi, QueueManager.QueueOperation.SUBMIT_JOB, null);
       } catch (IOException ioe) {
         LOG.warn("Access denied for user " + job.getJobConf().getUser()
             + ". Ignoring job " + jobId, ioe);
@@ -3730,30 +3744,55 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     return job.getStatus();
   }
 
-  // Check whether the specified operation can be performed
-  // related to the job.
-  private void checkAccess(JobInProgress job, 
-                                QueueManager.QueueOperation oper) 
-                                  throws IOException {
-    // get the user group info
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    checkAccess(job, oper, ugi);
+  /**
+   * Is job-level authorization enabled on the JT?
+   * 
+   * @return
+   */
+  boolean isJobLevelAuthorizationEnabled() {
+    return conf.getBoolean(JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
   }
 
-  // use the passed ugi for checking the access
-  private void checkAccess(JobInProgress job, QueueManager.QueueOperation oper,
-                           UserGroupInformation ugi) throws IOException {
-    // get the queue
+  /**
+   * Check the ACLs for a user doing the passed queue-operation and the passed
+   * job operation.
+   * <ul>
+   * <li>Superuser/supergroup can do any operation on the job</li>
+   * <li>For any other user/group, the configured ACLs for the corresponding
+   * queue and the job are checked.</li>
+   * </ul>
+   * 
+   * @param job
+   * @param callerUGI
+   * @param oper
+   * @param jobOperation
+   * @throws AccessControlException
+   * @throws IOException
+   */
+  private void checkAccess(JobInProgress job,
+      UserGroupInformation callerUGI, QueueManager.QueueOperation oper,
+      JobACL jobOperation) throws AccessControlException {
+
+    // get the queue and verify the queue access
     String queue = job.getProfile().getQueueName();
-    if (!queueManager.hasAccess(queue, job, oper, ugi)) {
+    if (!queueManager.hasAccess(queue, job, oper, callerUGI)) {
       throw new AccessControlException("User " 
-                            + ugi.getShortUserName() 
+                            + callerUGI.getShortUserName() 
                             + " cannot perform "
                             + "operation " + oper + " on queue " + queue +
                             ".\n Please run \"hadoop queue -showacls\" " +
                             "command to find the queues you have access" +
                             " to .");
     }
+
+    // check nulls, for e.g., submitJob RPC doesn't have a jobOperation as the
+    // job itself isn't created by that time.
+    if (jobOperation == null) {
+      return;
+    }
+
+    // check the access to the job
+    job.checkAccess(callerUGI, jobOperation);
   }
 
   /**@deprecated use {@link #getClusterStatus(boolean)}*/
@@ -3799,6 +3838,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       getBlacklistedTrackerCount(), getExcludedNodes().size()) ;
   }
 
+  /**
+   * @see JobSubmissionProtocol#killJob
+   */
   public synchronized void killJob(JobID jobid) throws IOException {
     if (null == jobid) {
       LOG.info("Null jobid object sent to JobTracker.killJob()");
@@ -3812,7 +3854,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       return;
     }
         
-    checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
+    // check both queue-level and job-level access
+    checkAccess(job, UserGroupInformation.getCurrentUser(),
+        QueueManager.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+
     killJob(job);
   }
   
@@ -3941,9 +3986,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   }
   
   /**
-   * Set the priority of a job
-   * @param jobid id of the job
-   * @param priority new priority of the job
+   * @see ClientProtocol#setJobPriority(JobID, String)
    */
   public synchronized void setJobPriority(JobID jobid, 
                                           String priority)
@@ -3954,7 +3997,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
             + " is not a valid job");
         return;
     }
-    checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
+
     JobPriority newPriority = JobPriority.valueOf(priority);
     setJobPriority(jobid, newPriority);
   }
@@ -4015,10 +4058,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   }
   
   private static final Counters EMPTY_COUNTERS = new Counters();
-  public Counters getJobCounters(JobID jobid) {
+  public Counters getJobCounters(JobID jobid) throws IOException {
     synchronized (this) {
       JobInProgress job = jobs.get(jobid);
       if (job != null) {
+
+        // check the job-access
+        job.checkAccess(UserGroupInformation.getCurrentUser(),
+            JobACL.VIEW_JOB);
+
         return isJobInited(job) ? job.getCounters() : EMPTY_COUNTERS;
       } 
     }
@@ -4027,8 +4075,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   
   private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
   
-  public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
+  public synchronized TaskReport[] getMapTaskReports(JobID jobid)
+      throws IOException {
     JobInProgress job = jobs.get(jobid);
+    if (job != null) {
+      // Check authorization
+      job.checkAccess(UserGroupInformation.getCurrentUser(),
+          JobACL.VIEW_JOB);
+    }
     if (job == null || !isJobInited(job)) {
       return EMPTY_TASK_REPORTS;
     } else {
@@ -4049,8 +4103,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     }
   }
 
-  public synchronized TaskReport[] getReduceTaskReports(JobID jobid) {
+  public synchronized TaskReport[] getReduceTaskReports(JobID jobid)
+      throws IOException {
     JobInProgress job = jobs.get(jobid);
+    if (job != null) {
+      // Check authorization
+      job.checkAccess(UserGroupInformation.getCurrentUser(),
+          JobACL.VIEW_JOB);
+    }
     if (job == null || !isJobInited(job)) {
       return EMPTY_TASK_REPORTS;
     } else {
@@ -4069,8 +4129,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     }
   }
 
-  public synchronized TaskReport[] getCleanupTaskReports(JobID jobid) {
+  public synchronized TaskReport[] getCleanupTaskReports(JobID jobid)
+      throws IOException {
     JobInProgress job = jobs.get(jobid);
+    if (job != null) {
+      // Check authorization
+      job.checkAccess(UserGroupInformation.getCurrentUser(),
+          JobACL.VIEW_JOB);
+    }
     if (job == null || !isJobInited(job)) {
       return EMPTY_TASK_REPORTS;
     } else {
@@ -4092,8 +4158,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   
   }
   
-  public synchronized TaskReport[] getSetupTaskReports(JobID jobid) {
+  public synchronized TaskReport[] getSetupTaskReports(JobID jobid)
+      throws IOException {
     JobInProgress job = jobs.get(jobid);
+    if (job != null) {
+      // Check authorization
+      job.checkAccess(UserGroupInformation.getCurrentUser(),
+          JobACL.VIEW_JOB);
+    }
     if (job == null || !isJobInited(job)) {
       return EMPTY_TASK_REPORTS;
     } else {
@@ -4123,7 +4195,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       "mapred.cluster.max.map.memory.mb";
   static final String MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY =
       "mapred.cluster.max.reduce.memory.mb";
-  
+
+  public static final String JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG =
+      "mapreduce.cluster.job-authorization-enabled";
+
   /* 
    * Returns a list of TaskCompletionEvent for the given job, 
    * starting from fromEventId.
@@ -4156,6 +4231,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     JobID jobId = taskId.getJobID();
     TaskID tipId = taskId.getTaskID();
     JobInProgress job = jobs.get(jobId);
+    if (job != null) {
+      // Check authorization
+      job.checkAccess(UserGroupInformation.getCurrentUser(),
+          JobACL.VIEW_JOB);
+    }
     if (job != null && isJobInited(job)) {
       TaskInProgress tip = job.getTaskInProgress(tipId);
       if (tip != null) {
@@ -4206,11 +4286,16 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     return (job == null ? null : job.getTaskInProgress(tipid));
   }
     
-  /** Mark a Task to be killed */
+  /**
+   * @see JobSubmissionProtocol#killTask(TaskAttemptID, boolean)
+   */
   public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail) throws IOException{
     TaskInProgress tip = taskidToTIPMap.get(taskid);
     if(tip != null) {
-      checkAccess(tip.getJob(), QueueManager.QueueOperation.ADMINISTER_JOBS);
+      // check both queue-level and job-level access
+      checkAccess(tip.getJob(), UserGroupInformation.getCurrentUser(),
+          QueueManager.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+
       return tip.killTask(taskid, shouldFail);
     }
     else {
@@ -4231,7 +4316,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   public JobStatus[] jobsToComplete() {
     return getJobStatus(jobs.values(), true);
   } 
-  
+  /**
+   * @see JobSubmissionProtocol#getAllJobs()
+   */
   public JobStatus[] getAllJobs() {
     List<JobStatus> list = new ArrayList<JobStatus>();
     list.addAll(Arrays.asList(getJobStatus(jobs.values(),false)));
@@ -4266,12 +4353,21 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
   /**
    * Change the run-time priority of the given job.
+   * 
    * @param jobId job id
    * @param priority new {@link JobPriority} for the job
+   * @throws IOException
+   * @throws AccessControlException
    */
-  synchronized void setJobPriority(JobID jobId, JobPriority priority) {
+  synchronized void setJobPriority(JobID jobId, JobPriority priority)
+      throws AccessControlException, IOException {
     JobInProgress job = jobs.get(jobId);
     if (job != null) {
+
+      // check both queue-level and job-level access
+      checkAccess(job, UserGroupInformation.getCurrentUser(),
+          QueueManager.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+
       synchronized (taskScheduler) {
         JobStatus oldStatus = (JobStatus)job.getStatus().clone();
         job.setPriority(priority);
@@ -4462,16 +4558,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   }
   
   /**
-   * Is the current user a super user?
+   * Is the calling user a super user? Or part of the supergroup?
    * @return true, if it is a super user
-   * @throws IOException if there are problems getting the current user
    */
-  private synchronized boolean isSuperUser() throws IOException {
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    if (mrOwner.getShortUserName().equals(ugi.getShortUserName()) ) {
+  boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI) {
+    if (mrOwner.getShortUserName().equals(callerUGI.getShortUserName())) {
       return true;
     }
-    String[] groups = ugi.getGroupNames();
+    String[] groups = callerUGI.getGroupNames();
     for(int i=0; i < groups.length; ++i) {
       if (groups[i].equals(supergroup)) {
         return true;
@@ -4486,7 +4580,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    */
   public synchronized void refreshNodes() throws IOException {
     // check access
-    if (!isSuperUser()) {
+    if (!isSuperUserOrSuperGroup(UserGroupInformation.getCurrentUser())) {
       String user = UserGroupInformation.getCurrentUser().getShortUserName();
       throw new AccessControlException(user + 
                                        " is not authorized to refresh nodes.");
@@ -4496,6 +4590,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     refreshHosts();
   }
   
+  String getSuperGroup() {
+    return supergroup;
+  }
+  
   private synchronized void refreshHosts() throws IOException {
     // Reread the config to get mapred.hosts and mapred.hosts.exclude filenames.
     // Update the file names and refresh internal includes and excludes list
@@ -4853,4 +4951,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     }
     return authMethod;
   }
+
+  JobACLsManager getJobACLsManager() {
+    return jobACLsManager;
+  }
 }

+ 53 - 0
src/mapred/org/apache/hadoop/mapreduce/JobACL.java

@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+/**
+ * Job related ACLs
+ */
+public enum JobACL {
+
+  /**
+   * ACL for 'viewing' job. Dictates who can 'view' some or all of the job
+   * related details.
+   */
+  VIEW_JOB(JobContext.JOB_ACL_VIEW_JOB),
+
+  /**
+   * ACL for 'modifying' job. Dictates who can 'modify' the job for e.g., by
+   * killing the job, killing/failing a task of the job or setting priority of
+   * the job.
+   */
+  MODIFY_JOB(JobContext.JOB_ACL_MODIFY_JOB);
+
+  String aclName;
+
+  JobACL(String name) {
+    this.aclName = name;
+  }
+
+  /**
+   * Get the name of the ACL. Here it is same as the name of the configuration
+   * property for specifying the ACL for the job.
+   * 
+   * @return aclName
+   */
+  public String getAclName() {
+    return aclName;
+  }
+}

+ 4 - 0
src/mapred/org/apache/hadoop/mapreduce/JobContext.java

@@ -51,6 +51,10 @@ public class JobContext {
 
   public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
 
+  public static final String JOB_ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
+  public static final String JOB_ACL_MODIFY_JOB =
+    "mapreduce.job.acl-modify-job";
+
   public static final String CACHE_FILE_VISIBILITIES = 
     "mapreduce.job.cache.files.visibilities";
   public static final String CACHE_ARCHIVES_VISIBILITIES = 

+ 6 - 1
src/test/org/apache/hadoop/mapred/MiniMRCluster.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.StaticMapping;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -529,8 +530,12 @@ public class MiniMRCluster {
 
   /**
    * Change the job's priority
+   * 
+   * @throws IOException
+   * @throws AccessControlException
    */
-  public void setJobPriority(JobID jobId, JobPriority priority) {
+  public void setJobPriority(JobID jobId, JobPriority priority)
+      throws AccessControlException, IOException {
     jobTracker.getJobTracker().setJobPriority(jobId, priority);
   }
 

+ 415 - 0
src/test/org/apache/hadoop/mapred/TestJobACLs.java

@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobPriority;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.After;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Verify the job-ACLs
+ * 
+ */
+public class TestJobACLs {
+
+  static final Log LOG = LogFactory.getLog(TestJobACLs.class);
+
+  private MiniMRCluster mr = null;
+
+  private static final Path TEST_DIR =
+      new Path(System.getProperty("test.build.data", "/tmp"),
+          TestJobACLs.class.getCanonicalName() + Path.SEPARATOR
+              + "completed-job-store");
+
+  /**
+   * Start the cluster before running the actual test.
+   * 
+   * @throws IOException
+   */
+  @Before
+  public void setup() throws IOException {
+    // Start the cluster
+    startCluster(false);
+  }
+
+  private void startCluster(boolean reStart) throws IOException {
+    UserGroupInformation MR_UGI = UserGroupInformation.getLoginUser();
+    JobConf conf = new JobConf();
+
+    // Enable job-level authorization
+    conf.setBoolean(JobTracker.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, true);
+
+    // Enable CompletedJobStore
+    FileSystem fs = FileSystem.getLocal(conf);
+    if (!reStart) {
+      fs.delete(TEST_DIR, true);
+    }
+    conf.set("mapred.job.tracker.persist.jobstatus.dir",
+        fs.makeQualified(TEST_DIR).toString());
+    conf.setBoolean("mapred.job.tracker.persist.jobstatus.active", true);
+    conf.set("mapred.job.tracker.persist.jobstatus.hours", "1");
+
+    mr =
+        new MiniMRCluster(0, 0, 0, "file:///", 1, null, null, MR_UGI, conf);
+  }
+
+  /**
+   * Kill the cluster after the test is done.
+   */
+  @After
+  public void tearDown() {
+    if (mr != null) {
+      mr.shutdown();
+    }
+  }
+
+  /**
+   * Test view-job-acl, modify-job-acl and acl persistence to the
+   * completed-jobs-store.
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  @Test
+  public void testACLS() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    verifyACLViewJob();
+    verifyACLModifyJob();
+    verifyACLPersistence();
+  }
+
+  /**
+   * Verify JobContext.JOB_ACL_VIEW_JOB
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void verifyACLViewJob() throws IOException, InterruptedException {
+
+    // Set the job up.
+    final JobConf myConf = mr.createJobConf();
+    myConf.set(JobContext.JOB_ACL_VIEW_JOB, "user1,user3");
+
+    // Submit the job as user1
+    RunningJob job = submitJobAsUser(myConf, "user1");
+
+    final JobID jobId = job.getID();
+
+    // Try operations as an unauthorized user.
+    verifyViewJobAsUnauthorizedUser(myConf, jobId, "user2");
+
+    // Try operations as an authorized user.
+    verifyViewJobAsAuthorizedUser(myConf, jobId, "user3");
+
+    // Clean up the job
+    job.killJob();
+  }
+
+  private RunningJob submitJobAsUser(final JobConf clusterConf, String user)
+      throws IOException, InterruptedException {
+    UserGroupInformation ugi =
+        UserGroupInformation.createUserForTesting(user, new String[] {});
+    RunningJob job = (RunningJob) ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        JobClient jobClient = new JobClient(clusterConf);
+        SleepJob sleepJob = new SleepJob();
+        sleepJob.setConf(clusterConf);
+        JobConf jobConf = sleepJob.setupJobConf(0, 0, 1000, 1000, 1000, 1000);
+        RunningJob runningJob = jobClient.submitJob(jobConf);
+        return runningJob;
+      }
+    });
+    return job;
+  }
+
+  private void verifyViewJobAsAuthorizedUser(final JobConf myConf,
+      final JobID jobId, String authorizedUser) throws IOException,
+      InterruptedException {
+    UserGroupInformation authorizedUGI =
+        UserGroupInformation.createUserForTesting(authorizedUser,
+            new String[] {});
+    authorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
+      @SuppressWarnings("null")
+      @Override
+      public Object run() throws Exception {
+        RunningJob myJob = null;
+        JobClient client = null;
+        try {
+          client = new JobClient(myConf);
+          myJob = client.getJob(jobId);
+        } catch (Exception e) {
+          fail("Exception .." + e);
+        }
+
+        assertNotNull("Job " + jobId + " is not known to the JobTracker!",
+            myJob);
+
+        // Tests authorization with getCounters
+        try {
+          myJob.getCounters();
+        } catch (IOException ioe) {
+          fail("Unexpected.. exception.. " + ioe);
+        }
+
+        // Tests authorization  with getTaskReports
+        try {
+          client.getCleanupTaskReports(jobId);
+        } catch (IOException ioe) {
+          fail("Unexpected.. exception.. " + ioe);
+        }
+
+        return null;
+      }
+    });
+  }
+
+  private void verifyViewJobAsUnauthorizedUser(final JobConf myConf,
+      final JobID jobId, String unauthorizedUser) throws IOException,
+      InterruptedException {
+    UserGroupInformation unauthorizedUGI =
+        UserGroupInformation.createUserForTesting(unauthorizedUser,
+            new String[] {});
+    unauthorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
+      @SuppressWarnings("null")
+      @Override
+      public Object run() {
+        RunningJob myJob = null;
+        JobClient client = null;
+        try {
+          client = new JobClient(myConf);
+          myJob = client.getJob(jobId);
+        } catch (Exception e) {
+          fail("Exception .." + e);
+        }
+
+        assertNotNull("Job " + jobId + " is not known to the JobTracker!",
+            myJob);
+
+        // Tests authorization failure with getCounters
+        try {
+          myJob.getCounters();
+          fail("AccessControlException expected..");
+        } catch (IOException ioe) {
+          assertTrue(ioe.getMessage().contains("AccessControlException"));
+        }
+
+        // Tests authorization failure with getTaskReports
+        try {
+          client.getSetupTaskReports(jobId);
+          fail("AccessControlException expected..");
+        } catch (IOException ioe) {
+          assertTrue(ioe.getMessage().contains("AccessControlException"));
+        }
+
+        return null;
+      }
+    });
+  }
+
+  /**
+   * Verify JobContext.Job_ACL_MODIFY_JOB
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  private void verifyACLModifyJob() throws IOException,
+      InterruptedException, ClassNotFoundException {
+
+    // Set the job up.
+    final JobConf myConf = mr.createJobConf();
+    myConf.set(JobContext.JOB_ACL_MODIFY_JOB, "user1,user3");
+
+    // Submit the job as user1
+    RunningJob job = submitJobAsUser(myConf, "user1");
+
+    final JobID jobId = job.getID();
+
+    // Try operations as an unauthorized user.
+    verifyModifyJobAsUnauthorizedUser(myConf, jobId, "user2");
+
+    // Try operations as an authorized user.
+    verifyModifyJobAsAuthorizedUser(myConf, jobId, "user3");
+  }
+
+  private void verifyModifyJobAsAuthorizedUser(
+      final JobConf clusterConf, final JobID jobId,
+      String authorizedUser) throws IOException, InterruptedException {
+    UserGroupInformation authorizedUGI =
+        UserGroupInformation.createUserForTesting(authorizedUser,
+            new String[] {});
+    authorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
+      @SuppressWarnings("null")
+      @Override
+      public Object run() throws Exception {
+        RunningJob myJob = null;
+        try {
+          JobClient client = new JobClient(clusterConf);
+          myJob = client.getJob(jobId);
+        } catch (Exception e) {
+          fail("Exception .." + e);
+        }
+
+        assertNotNull("Job " + jobId + " is not known to the JobTracker!",
+            myJob);
+
+        // Test authorization success with setJobPriority
+        try {
+          myJob.setJobPriority(JobPriority.HIGH.toString());
+        } catch (IOException ioe) {
+          fail("Unexpected.. exception.. " + ioe);
+        }
+
+        // Test authorization success with killJob
+        try {
+          myJob.killJob();
+        } catch (IOException ioe) {
+          fail("Unexpected.. exception.. " + ioe);
+        }
+
+        return null;
+      }
+    });
+  }
+
+  private void verifyModifyJobAsUnauthorizedUser(
+      final JobConf clusterConf, final JobID jobId,
+      String unauthorizedUser) throws IOException, InterruptedException {
+    UserGroupInformation unauthorizedUGI =
+        UserGroupInformation.createUserForTesting(unauthorizedUser,
+            new String[] {});
+    unauthorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
+      @SuppressWarnings("null")
+      @Override
+      public Object run() {
+        RunningJob myJob = null;
+        try {
+          JobClient client = new JobClient(clusterConf);
+          myJob = client.getJob(jobId);
+        } catch (Exception e) {
+          fail("Exception .." + e);
+        }
+
+        assertNotNull("Job " + jobId + " is not known to the JobTracker!",
+            myJob);
+
+        // Tests authorization failure with killJob
+        try {
+          myJob.killJob();
+          fail("AccessControlException expected..");
+        } catch (IOException ioe) {
+          assertTrue(ioe.getMessage().contains("AccessControlException"));
+        }
+
+
+        // Tests authorization failure with setJobPriority
+        try {
+          myJob.setJobPriority(JobPriority.HIGH.toString());
+          fail("AccessControlException expected..");
+        } catch (IOException ioe) {
+          assertTrue(ioe.getMessage().contains("AccessControlException"));
+        }
+
+        return null;
+      }
+    });
+  }
+
+  private void verifyACLPersistence() throws IOException,
+      InterruptedException {
+
+    // Set the job up.
+    final JobConf myConf = mr.createJobConf();
+    myConf.set(JobContext.JOB_ACL_VIEW_JOB, "user1,user2");
+
+    // Submit the job as user1
+    RunningJob job = submitJobAsUser(myConf, "user1");
+
+    final JobID jobId = job.getID();
+
+    // Kill the job and wait till it is actually killed so that it is written to
+    // CompletedJobStore
+    job.killJob();
+    while (job.getJobState() != JobStatus.KILLED) {
+      LOG.info("Waiting for the job to be killed successfully..");
+      Thread.sleep(200);
+    }
+
+    // Now kill the cluster, so that the job is 'forgotten'
+    tearDown();
+
+    // Re-start the cluster
+    startCluster(true);
+
+    final JobConf myNewJobConf = mr.createJobConf();
+    // Now verify view-job works off CompletedJobStore
+    verifyViewJobAsAuthorizedUser(myNewJobConf, jobId, "user2");
+
+    // Only JobCounters is persisted on the JobStore. So test counters only.
+    UserGroupInformation unauthorizedUGI =
+        UserGroupInformation.createUserForTesting("user3", new String[] {});
+    unauthorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
+      @SuppressWarnings("null")
+      @Override
+      public Object run() {
+        RunningJob myJob = null;
+        try {
+          JobClient client = new JobClient(myNewJobConf);
+          myJob = client.getJob(jobId);
+        } catch (Exception e) {
+          fail("Exception .." + e);
+        }
+
+        assertNotNull("Job " + jobId + " is not known to the JobTracker!",
+            myJob);
+
+        // Tests authorization failure with getCounters
+        try {
+          myJob.getCounters();
+          fail("AccessControlException expected..");
+        } catch (IOException ioe) {
+          assertTrue(ioe.getMessage().contains("AccessControlException"));
+        }
+
+        return null;
+      }
+    });
+
+  }
+}

+ 51 - 6
src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
@@ -31,7 +32,12 @@ public class TestJobStatusPersistency extends ClusterMapReduceTestCase {
   static final Path TEST_DIR = 
     new Path(System.getProperty("test.build.data","/tmp"), 
              "job-status-persistence");
-  
+
+  @Override
+  protected void setUp() throws Exception {
+    // Don't start anything by default
+  }
+
   private JobID runJob() throws Exception {
     OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
     Writer wr = new OutputStreamWriter(os);
@@ -64,12 +70,13 @@ public class TestJobStatusPersistency extends ClusterMapReduceTestCase {
   }
 
   public void testNonPersistency() throws Exception {
+    startCluster(true, null);
     JobID jobId = runJob();
     JobClient jc = new JobClient(createJobConf());
     RunningJob rj = jc.getJob(jobId);
     assertNotNull(rj);
     stopCluster();
-    startCluster(false, null);
+    startCluster(true, null);
     jc = new JobClient(createJobConf());
     rj = jc.getJob(jobId);
     assertNull(rj);
@@ -79,7 +86,6 @@ public class TestJobStatusPersistency extends ClusterMapReduceTestCase {
     Properties config = new Properties();
     config.setProperty("mapred.job.tracker.persist.jobstatus.active", "true");
     config.setProperty("mapred.job.tracker.persist.jobstatus.hours", "1");
-    stopCluster();
     startCluster(false, config);
     JobID jobId = runJob();
     JobClient jc = new JobClient(createJobConf());
@@ -112,7 +118,7 @@ public class TestJobStatusPersistency extends ClusterMapReduceTestCase {
    * Test if the completed job status is persisted to localfs.
    */
   public void testLocalPersistency() throws Exception {
-    FileSystem fs = FileSystem.getLocal(createJobConf());
+    FileSystem fs = FileSystem.getLocal(new JobConf());
     
     fs.delete(TEST_DIR, true);
     
@@ -121,8 +127,7 @@ public class TestJobStatusPersistency extends ClusterMapReduceTestCase {
     config.setProperty("mapred.job.tracker.persist.jobstatus.hours", "1");
     config.setProperty("mapred.job.tracker.persist.jobstatus.dir", 
                        fs.makeQualified(TEST_DIR).toString());
-    stopCluster();
-    startCluster(false, config);
+    startCluster(true, config);
     JobID jobId = runJob();
     JobClient jc = new JobClient(createJobConf());
     RunningJob rj = jc.getJob(jobId);
@@ -133,4 +138,44 @@ public class TestJobStatusPersistency extends ClusterMapReduceTestCase {
     assertTrue("Missing job info from the local fs", fs.exists(jobInfo));
     fs.delete(TEST_DIR, true);
   }
+
+  /**
+   * Verify that completed-job store is inactive if the jobinfo path is not
+   * writable.
+   * 
+   * @throws Exception
+   */
+  public void testJobStoreDisablingWithInvalidPath() throws Exception {
+    MiniMRCluster mr = null;
+    Path parent = new Path(TEST_DIR, "parent");
+    try {
+      FileSystem fs = FileSystem.getLocal(new JobConf());
+
+      if (fs.exists(TEST_DIR) && !fs.delete(TEST_DIR, true)) {
+        fail("Cannot delete TEST_DIR!");
+      }
+
+      if (fs.mkdirs(new Path(TEST_DIR, parent))) {
+        if (!(new File(parent.toUri().getPath()).setWritable(false, false))) {
+          fail("Cannot chmod parent!");
+        }
+      } else {
+        fail("Cannot create parent dir!");
+      }
+      JobConf config = new JobConf();
+      config.set("mapred.job.tracker.persist.jobstatus.active", "true");
+      config.set("mapred.job.tracker.persist.jobstatus.hours", "1");
+      config.set("mapred.job.tracker.persist.jobstatus.dir", new Path(parent,
+          "child").toUri().getPath());
+      mr = new MiniMRCluster(0, "file:///", 4, null, null, config);
+      assertFalse(
+          "CompletedJobStore is unexpectly active!",
+          mr.getJobTrackerRunner().getJobTracker().completedJobStatusStore.isActive());
+    } finally {
+      new File(parent.toUri().getPath()).setWritable(true, false);
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
 }