1
0
Pārlūkot izejas kodu

HADOOP-3698. Add access control to control who is allowed to submit or
modify jobs in the JobTracker. (Hemanth Yamijala via omalley)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@692248 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 17 gadi atpakaļ
vecāks
revīzija
82e1175d61

+ 3 - 0
CHANGES.txt

@@ -123,6 +123,9 @@ Trunk (unreleased changes)
     HADOOP-3866. Added sort and multi-job updates in the JobTracker web ui.
     HADOOP-3866. Added sort and multi-job updates in the JobTracker web ui.
     (Craig Weisenfluh via omalley)
     (Craig Weisenfluh via omalley)
 
 
+    HADOOP-3698. Add access control to control who is allowed to submit or 
+    modify jobs in the JobTracker. (Hemanth Yamijala via omalley)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.
     HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.

+ 61 - 0
conf/hadoop-default.xml

@@ -1380,4 +1380,65 @@ creations/deletions), or "all".</description>
   </description>  
   </description>  
 </property>
 </property>
 
 
+<property>
+  <name>mapred.queue.names</name>
+  <value>default</value>
+  <description> Comma separated list of queues configured for this jobtracker.
+    Jobs are added to queues and schedulers can configure different 
+    scheduling properties for the various queues. To configure a property 
+    for a queue, the name of the queue must match the name specified in this 
+    value. Queue properties that are common to all schedulers are configured 
+    here with the naming convention, mapred.queue.$QUEUE-NAME.$PROPERTY-NAME,
+    for e.g. mapred.queue.default.submit-job-acl.
+    The number of queues configured in this parameter could depend on the
+    type of scheduler being used, as specified in 
+    mapred.jobtracker.taskScheduler. For example, the JobQueueTaskScheduler
+    supports only a single queue, which is the default configured here.
+    Before adding more queues, ensure that the scheduler you've configured
+    supports multiple queues.
+  </description>
+</property>
+
+<property>
+  <name>mapred.acls.enabled</name>
+  <value>false</value>
+  <description> Specifies whether ACLs are enabled, and should be checked
+    for various operations.
+  </description>
+</property>
+
+<property>
+  <name>mapred.queue.default.acl-submit-job</name>
+  <value>*</value>
+  <description> Comma separated list of user and group names that are allowed
+    to submit jobs to the 'default' queue. The user list and the group list
+    are separated by a blank. For e.g. alice,bob group1,group2. 
+    If set to the special value '*', it means all users are allowed to 
+    submit jobs. 
+  </description>
+</property>
+
+<property>
+  <name>mapred.queue.default.acl-administer-jobs</name>
+  <value>*</value>
+  <description> Comma separated list of user and group names that are allowed
+    to delete jobs or modify job's priority for jobs not owned by the current
+    user in the 'default' queue. The user list and the group list
+    are separated by a blank. For e.g. alice,bob group1,group2. 
+    If set to the special value '*', it means all users are allowed to do 
+    this operation.
+  </description>
+</property>
+
+<property>
+  <name>queue.name</name>
+  <value>default</value>
+  <description> Queue to which a job is submitted. This must match one of the
+    queues defined in mapred.queue.names for the system. Also, the ACL setup
+    for the queue must allow the current user to submit a job to the queue.
+    Before specifying a queue, ensure that the system is configured with 
+    the queue, and access is allowed for submitting jobs to the queue.
+  </description>
+</property>
+
 </configuration>
 </configuration>

+ 5 - 0
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

@@ -130,6 +130,11 @@ public class TestFairScheduler extends TestCase {
           JobTracker.State.RUNNING);
           JobTracker.State.RUNNING);
     }
     }
 
 
+    @Override
+    public QueueManager getQueueManager() {
+      return null;
+    }
+    
     @Override
     @Override
     public int getNumberOfUniqueHosts() {
     public int getNumberOfUniqueHosts() {
       return 0;
       return 0;

+ 45 - 0
src/core/org/apache/hadoop/security/AccessControlIOException.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import java.io.IOException;
+
+/**
+ * An exception indicating access control violations.  
+ */
+public class AccessControlIOException extends IOException {
+
+  private static final long serialVersionUID = -1874018786480045420L;
+  
+  /**
+   * Default constructor is needed for unwrapping from 
+   * {@link org.apache.hadoop.ipc.RemoteException}.
+   */
+  public AccessControlIOException() {
+    super("Permission denied.");
+  }
+
+  /**
+   * Constructs an {@link AccessControlIOException}
+   * with the specified detail message.
+   * @param s the detail message.
+   */
+  public AccessControlIOException(String s) {
+    super(s);
+  }
+}

+ 10 - 3
src/examples/org/apache/hadoop/examples/SleepJob.java

@@ -169,6 +169,15 @@ public class SleepJob extends Configured implements Tool,
   public int run(int numMapper, int numReducer, long mapSleepTime,
   public int run(int numMapper, int numReducer, long mapSleepTime,
       int mapSleepCount, long reduceSleepTime,
       int mapSleepCount, long reduceSleepTime,
       int reduceSleepCount) throws IOException {
       int reduceSleepCount) throws IOException {
+    JobConf job = setupJobConf(numMapper, numReducer, mapSleepTime, 
+                  mapSleepCount, reduceSleepTime, reduceSleepCount);
+    JobClient.runJob(job);
+    return 0;
+  }
+
+  public JobConf setupJobConf(int numMapper, int numReducer, 
+                                long mapSleepTime, int mapSleepCount, 
+                                long reduceSleepTime, int reduceSleepCount) {
     JobConf job = new JobConf(getConf(), SleepJob.class);
     JobConf job = new JobConf(getConf(), SleepJob.class);
     job.setNumMapTasks(numMapper);
     job.setNumMapTasks(numMapper);
     job.setNumReduceTasks(numReducer);
     job.setNumReduceTasks(numReducer);
@@ -186,9 +195,7 @@ public class SleepJob extends Configured implements Tool,
     job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
     job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
     job.setInt("sleep.job.map.sleep.count", mapSleepCount);
     job.setInt("sleep.job.map.sleep.count", mapSleepCount);
     job.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
     job.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
-
-    JobClient.runJob(job);
-    return 0;
+    return job;
   }
   }
 
 
   public int run(String[] args) throws Exception {
   public int run(String[] args) throws Exception {

+ 14 - 8
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -61,6 +61,7 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.Counters.Group;
 import org.apache.hadoop.mapred.Counters.Group;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.Tool;
@@ -396,7 +397,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
   private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
   private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
       Configuration conf) throws IOException {
       Configuration conf) throws IOException {
     return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
     return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
-        JobSubmissionProtocol.versionID, addr, conf,
+        JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,
         NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
         NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
   }
   }
 
 
@@ -552,13 +553,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
      * set this user's id in job configuration, so later job files can be
      * set this user's id in job configuration, so later job files can be
      * accessed using this user's id
      * accessed using this user's id
      */
      */
-    UnixUserGroupInformation ugi = null;
-    try {
-      ugi = UnixUserGroupInformation.login(job, true);
-    } catch (LoginException e) {
-      throw (IOException)(new IOException(
-          "Failed to get the current user's information.").initCause(e));
-    }
+    UnixUserGroupInformation ugi = getUGI(job);
       
       
     //
     //
     // Figure out what fs the JobTracker is using.  Copy the
     // Figure out what fs the JobTracker is using.  Copy the
@@ -677,6 +672,17 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     }
     }
 
 
   }
   }
+
+  private UnixUserGroupInformation getUGI(Configuration job) throws IOException {
+    UnixUserGroupInformation ugi = null;
+    try {
+      ugi = UnixUserGroupInformation.login(job, true);
+    } catch (LoginException e) {
+      throw (IOException)(new IOException(
+          "Failed to get the current user's information.").initCause(e));
+    }
+    return ugi;
+  }
   
   
   /**
   /**
    * Submit a job to the MR system.
    * Submit a job to the MR system.

+ 26 - 1
src/mapred/org/apache/hadoop/mapred/JobConf.java

@@ -110,6 +110,12 @@ public class JobConf extends Configuration {
    */
    */
   public static final long DISABLED_VIRTUAL_MEMORY_LIMIT = -1L;
   public static final long DISABLED_VIRTUAL_MEMORY_LIMIT = -1L;
   
   
+  /**
+   * Name of the queue to which jobs will be submitted, if no queue
+   * name is mentioned.
+   */
+  public static final String DEFAULT_QUEUE_NAME = "default";
+  
   /**
   /**
    * Construct a map/reduce job configuration.
    * Construct a map/reduce job configuration.
    */
    */
@@ -1351,7 +1357,26 @@ public class JobConf extends Configuration {
   public void setMaxVirtualMemoryForTask(long vmem) {
   public void setMaxVirtualMemoryForTask(long vmem) {
     setLong("mapred.task.maxmemory", vmem);
     setLong("mapred.task.maxmemory", vmem);
   }
   }
-    
+  
+  /**
+   * Return the name of the queue to which this job is submitted.
+   * Defaults to 'default'.
+   * 
+   * @return name of the queue
+   */
+  public String getQueueName() {
+    return get("queue.name", DEFAULT_QUEUE_NAME);
+  }
+  
+  /**
+   * Set the name of the queue to which this job should be submitted.
+   * 
+   * @param queueName Name of the queue
+   */
+  public void setQueueName(String queueName) {
+    set("queue.name", queueName);
+  }
+  
   /** 
   /** 
    * Find a jar that contains a class of the same name, if any.
    * Find a jar that contains a class of the same name, if any.
    * It will return a jar file, even if that is not the first thing
    * It will return a jar file, even if that is not the first thing

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -193,7 +193,8 @@ class JobInProgress {
     conf = new JobConf(localJobFile);
     conf = new JobConf(localJobFile);
     this.priority = conf.getJobPriority();
     this.priority = conf.getJobPriority();
     this.profile = new JobProfile(conf.getUser(), jobid, 
     this.profile = new JobProfile(conf.getUser(), jobid, 
-                                  jobFile.toString(), url, conf.getJobName());
+                                  jobFile.toString(), url, conf.getJobName(),
+                                  conf.getQueueName());
     String jarFile = conf.getJar();
     String jarFile = conf.getJar();
     if (jarFile != null) {
     if (jarFile != null) {
       fs.copyToLocalFile(new Path(jarFile), localJarFile);
       fs.copyToLocalFile(new Path(jarFile), localJarFile);

+ 31 - 3
src/mapred/org/apache/hadoop/mapred/JobProfile.java

@@ -47,7 +47,8 @@ public class JobProfile implements Writable {
   String jobFile;
   String jobFile;
   String url;
   String url;
   String name;
   String name;
-
+  String queueName;
+  
   /**
   /**
    * Construct an empty {@link JobProfile}.
    * Construct an empty {@link JobProfile}.
    */
    */
@@ -66,13 +67,30 @@ public class JobProfile implements Writable {
    */
    */
   public JobProfile(String user, JobID jobid, String jobFile, String url,
   public JobProfile(String user, JobID jobid, String jobFile, String url,
                     String name) {
                     String name) {
+    this(user, jobid, jobFile, url, name, JobConf.DEFAULT_QUEUE_NAME);
+  }
+
+  /**
+   * Construct a {@link JobProfile} the userid, jobid, 
+   * job config-file, job-details url and job name. 
+   * 
+   * @param user userid of the person who submitted the job.
+   * @param jobid id of the job.
+   * @param jobFile job configuration file. 
+   * @param url link to the web-ui for details of the job.
+   * @param name user-specified job name.
+   * @param queueName name of the queue to which the job is submitted
+   */
+  public JobProfile(String user, JobID jobid, String jobFile, String url,
+                      String name, String queueName) {
     this.user = user;
     this.user = user;
     this.jobid = jobid;
     this.jobid = jobid;
     this.jobFile = jobFile;
     this.jobFile = jobFile;
     this.url = url;
     this.url = url;
     this.name = name;
     this.name = name;
+    this.queueName = queueName;
   }
   }
-
+  
   /**
   /**
    * @deprecated use JobProfile(String, JobID, String, String, String) instead
    * @deprecated use JobProfile(String, JobID, String, String, String) instead
    */
    */
@@ -128,7 +146,15 @@ public class JobProfile implements Writable {
   public String getJobName() {
   public String getJobName() {
     return name;
     return name;
   }
   }
-    
+  
+  /**
+   * Get the name of the queue to which the job is submitted.
+   * @return name of the queue.
+   */
+  public String getQueueName() {
+    return queueName;
+  }
+  
   ///////////////////////////////////////
   ///////////////////////////////////////
   // Writable
   // Writable
   ///////////////////////////////////////
   ///////////////////////////////////////
@@ -138,6 +164,7 @@ public class JobProfile implements Writable {
     Text.writeString(out, url);
     Text.writeString(out, url);
     Text.writeString(out, user);
     Text.writeString(out, user);
     Text.writeString(out, name);
     Text.writeString(out, name);
+    Text.writeString(out, queueName);
   }
   }
   public void readFields(DataInput in) throws IOException {
   public void readFields(DataInput in) throws IOException {
     this.jobid = JobID.read(in);
     this.jobid = JobID.read(in);
@@ -145,6 +172,7 @@ public class JobProfile implements Writable {
     this.url = Text.readString(in);
     this.url = Text.readString(in);
     this.user = Text.readString(in);
     this.user = Text.readString(in);
     this.name = Text.readString(in);
     this.name = Text.readString(in);
+    this.queueName = Text.readString(in);
   }
   }
 }
 }
 
 

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java

@@ -40,8 +40,9 @@ interface JobSubmissionProtocol extends VersionedProtocol {
    * Version 8: change {job|task}id's to use corresponding objects rather that strings.
    * Version 8: change {job|task}id's to use corresponding objects rather that strings.
    * Version 9: change the counter representation for HADOOP-1915
    * Version 9: change the counter representation for HADOOP-1915
    * Version 10: added getSystemDir for HADOOP-3135
    * Version 10: added getSystemDir for HADOOP-3135
+   * Version 11: changed JobProfile to include the queue name for HADOOP-3698
    */
    */
-  public static final long versionID = 10L;
+  public static final long versionID = 11L;
 
 
   /**
   /**
    * Allocate a name for the job.
    * Allocate a name for the job.

+ 39 - 1
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -60,6 +60,9 @@ import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.security.AccessControlIOException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -511,6 +514,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
 
   private Thread taskCommitThread;
   private Thread taskCommitThread;
   
   
+  private QueueManager queueManager;
+
   /**
   /**
    * Start the JobTracker process, listen on the indicated port
    * Start the JobTracker process, listen on the indicated port
    */
    */
@@ -533,6 +538,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
     this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
                                            conf.get("mapred.hosts.exclude", ""));
                                            conf.get("mapred.hosts.exclude", ""));
     
     
+    queueManager = new QueueManager(this.conf);
+    
     // Create the scheduler
     // Create the scheduler
     Class<? extends TaskScheduler> schedulerClass
     Class<? extends TaskScheduler> schedulerClass
       = conf.getClass("mapred.jobtracker.taskScheduler",
       = conf.getClass("mapred.jobtracker.taskScheduler",
@@ -1134,6 +1141,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     jobInProgressListeners.remove(listener);
     jobInProgressListeners.remove(listener);
   }
   }
   
   
+  /**
+   * Return the {@link QueueManager} associated with the JobTracker.
+   */
+  public QueueManager getQueueManager() {
+    return queueManager;
+  }
+  
   ////////////////////////////////////////////////////
   ////////////////////////////////////////////////////
   // InterTrackerProtocol
   // InterTrackerProtocol
   ////////////////////////////////////////////////////
   ////////////////////////////////////////////////////
@@ -1490,6 +1504,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     
     
     totalSubmissions++;
     totalSubmissions++;
     JobInProgress job = new JobInProgress(jobId, this, this.conf);
     JobInProgress job = new JobInProgress(jobId, this, this.conf);
+    checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
+
     synchronized (jobs) {
     synchronized (jobs) {
       synchronized (taskScheduler) {
       synchronized (taskScheduler) {
         jobs.put(job.getProfile().getJobID(), job);
         jobs.put(job.getProfile().getJobID(), job);
@@ -1502,6 +1518,26 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     return job.getStatus();
     return job.getStatus();
   }
   }
 
 
+  // Check whether the specified operation can be performed
+  // related to the job. If ownerAllowed is true, then an owner
+  // of the job can perform the operation irrespective of
+  // access control.
+  private void checkAccess(JobInProgress job, 
+                                QueueManager.QueueOperation oper) 
+                                  throws IOException {
+    // get the user group info
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+
+    // get the queue
+    String queue = job.getProfile().getQueueName();
+    if (!queueManager.hasAccess(queue, job, oper, ugi)) {
+      throw new AccessControlIOException("User " 
+                            + ugi.getUserName() 
+                            + " cannot perform "
+                            + "operation " + oper + " on queue " + queue);
+    }
+  }
+
   public synchronized ClusterStatus getClusterStatus() {
   public synchronized ClusterStatus getClusterStatus() {
     synchronized (taskTrackers) {
     synchronized (taskTrackers) {
       return new ClusterStatus(taskTrackers.size(),
       return new ClusterStatus(taskTrackers.size(),
@@ -1513,8 +1549,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     }
     }
   }
   }
     
     
-  public synchronized void killJob(JobID jobid) {
+  public synchronized void killJob(JobID jobid) throws IOException {
     JobInProgress job = jobs.get(jobid);
     JobInProgress job = jobs.get(jobid);
+    checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
     if (job.inited()) {
     if (job.inited()) {
       job.kill();
       job.kill();
     }
     }
@@ -1674,6 +1711,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail) throws IOException{
   public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail) throws IOException{
     TaskInProgress tip = taskidToTIPMap.get(taskid);
     TaskInProgress tip = taskidToTIPMap.get(taskid);
     if(tip != null) {
     if(tip != null) {
+      checkAccess(tip.getJob(), QueueManager.QueueOperation.ADMINISTER_JOBS);
       return tip.killTask(taskid, shouldFail);
       return tip.killTask(taskid, shouldFail);
     }
     }
     else {
     else {

+ 318 - 0
src/mapred/org/apache/hadoop/mapred/QueueManager.java

@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.HashMap;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Class that exposes information about queues maintained by the Hadoop
+ * Map/Reduce framework.
+ * 
+ * The Map/Reduce framework can be configured with one or more queues,
+ * depending on the scheduler it is configured with. While some 
+ * schedulers work only with one queue, some schedulers support multiple 
+ * queues.
+ *  
+ * Queues can be configured with various properties. Some of these
+ * properties are common to all schedulers, and those are handled by this
+ * class. Schedulers might also associate several custom properties with 
+ * queues. Where such a case exists, the queue name must be used to link 
+ * the common properties with the scheduler specific ones.  
+ */
+public class QueueManager {
+  
+  private static final Log LOG = LogFactory.getLog(QueueManager.class);
+  
+  // Prefix in configuration for queue related keys
+  private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX 
+                                                        = "mapred.queue.";
+  // Indicates an ACL string that represents access to all users
+  private static final String ALL_ALLOWED_ACL_VALUE = "*";
+  // Configured queues
+  private Set<String> queueNames;
+  // Map of a queue and ACL property name with an ACL
+  private HashMap<String, ACL> aclsMap;
+  // Map of a queue name to any generic object that represents 
+  // scheduler information 
+  private HashMap<String, Object> schedulerInfoObjects;
+  // Whether ACLs are enabled in the system or not.
+  private boolean aclsEnabled;
+  
+  /**
+   * Enum representing an operation that can be performed on a queue.
+   */
+  static enum QueueOperation {
+    SUBMIT_JOB ("acl-submit-job", false),
+    ADMINISTER_JOBS ("acl-administer-jobs", true);
+    // TODO: Add ACL for LIST_JOBS when we have ability to authenticate 
+    //       users in UI
+    // TODO: Add ACL for CHANGE_ACL when we have an admin tool for 
+    //       configuring queues.
+    
+    private final String aclName;
+    private final boolean jobOwnerAllowed;
+    
+    QueueOperation(String aclName, boolean jobOwnerAllowed) {
+      this.aclName = aclName;
+      this.jobOwnerAllowed = jobOwnerAllowed;
+    }
+
+    final String getAclName() {
+      return aclName;
+    }
+    
+    final boolean isJobOwnerAllowed() {
+      return jobOwnerAllowed;
+    }
+  }
+  
+  /**
+   * Class representing an access control that is configured.
+   */
+  private static class ACL {
+    
+    // Set of users who are granted access.
+    private Set<String> users;
+    // Set of groups which are granted access
+    private Set<String> groups;
+    // Whether all users are granted access.
+    private boolean allAllowed;
+    
+    /**
+     * Construct a new ACL from a String representation of the same.
+     * 
+     * The String is a a comma separated list of users and groups.
+     * The user list comes first and is separated by a space followed 
+     * by the group list. For e.g. "user1,user2 group1,group2"
+     * 
+     * @param aclString String representation of the ACL
+     */
+    ACL (String aclString) {
+      users = new TreeSet<String>();
+      groups = new TreeSet<String>();
+      if (aclString.equals(ALL_ALLOWED_ACL_VALUE)) {
+        allAllowed = true;
+      } else {
+        String[] userGroupStrings = aclString.split(" ", 2);
+        
+        if (userGroupStrings.length >= 1) {
+          String[] usersStr = userGroupStrings[0].split(",");
+          if (usersStr.length >= 1) {
+            addToSet(users, usersStr);
+          }
+        }
+        
+        if (userGroupStrings.length == 2) {
+          String[] groupsStr = userGroupStrings[1].split(",");
+          if (groupsStr.length >= 1) {
+            addToSet(groups, groupsStr);
+          }
+        }
+      }
+    }
+    
+    boolean allUsersAllowed() {
+      return allAllowed;
+    }
+    
+    boolean isUserAllowed(String user) {
+      return users.contains(user);
+    }
+    
+    boolean isAnyGroupAllowed(String[] otherGroups) {
+      for (String g : otherGroups) {
+        if (groups.contains(g)) {
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+  
+  /**
+   * Construct a new QueueManager using configuration specified in the passed
+   * in {@link org.apache.hadoop.conf.Configuration} object.
+   * 
+   * @param conf Configuration object where queue configuration is specified.
+   */
+  public QueueManager(Configuration conf) {
+    queueNames = new TreeSet<String>();
+    aclsMap = new HashMap<String, ACL>();
+    schedulerInfoObjects = new HashMap<String, Object>();
+    initialize(conf);
+  }
+  
+  /**
+   * Return the set of queues configured in the system.
+   * 
+   * The number of queues configured should be dependent on the Scheduler 
+   * configured. Note that some schedulers work with only one queue, whereas
+   * others can support multiple queues.
+   *  
+   * @return Set of queue names.
+   */
+  public synchronized Set<String> getQueues() {
+    return queueNames;
+  }
+  
+  /**
+   * Return true if the given {@link QueueManager.QueueOperation} can be 
+   * performed by the specified user on the given queue.
+   * 
+   * An operation is allowed if all users are provided access for this
+   * operation, or if either the user or any of the groups specified is
+   * provided access.
+   * 
+   * @param queueName Queue on which the operation needs to be performed. 
+   * @param oper The operation to perform
+   * @param ugi The user and groups who wish to perform the operation.
+   * 
+   * @return true if the operation is allowed, false otherwise.
+   */
+  public synchronized boolean hasAccess(String queueName, QueueOperation oper,
+                                UserGroupInformation ugi) {
+    return hasAccess(queueName, null, oper, ugi);
+  }
+  
+  /**
+   * Return true if the given {@link QueueManager.QueueOperation} can be 
+   * performed by the specified user on the specified job in the given queue.
+   * 
+   * An operation is allowed either if the owner of the job is the user 
+   * performing the task, all users are provided access for this
+   * operation, or if either the user or any of the groups specified is
+   * provided access.
+   * 
+   * If the {@link QueueManager.QueueOperation} is not job specific then the 
+   * job parameter is ignored.
+   * 
+   * @param queueName Queue on which the operation needs to be performed.
+   * @param job The {@link JobInProgress} on which the operation is being
+   *            performed. 
+   * @param oper The operation to perform
+   * @param ugi The user and groups who wish to perform the operation.
+   * 
+   * @return true if the operation is allowed, false otherwise.
+   */
+  public synchronized boolean hasAccess(String queueName, JobInProgress job, 
+                                QueueOperation oper, 
+                                UserGroupInformation ugi) {
+    if (!aclsEnabled) {
+      return true;
+    }
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("checking access for : " + toFullPropertyName(queueName, 
+                                            oper.getAclName()));      
+    }
+    
+    if (oper.isJobOwnerAllowed()) {
+      if (job.getJobConf().getUser().equals(ugi.getUserName())) {
+        return true;
+      }
+    }
+    
+    ACL acl = aclsMap.get(toFullPropertyName(queueName, oper.getAclName()));
+    if (acl == null) {
+      return false;
+    }
+    return ((acl.allUsersAllowed()) ||
+              (acl.isUserAllowed(ugi.getUserName())) ||
+              (acl.isAnyGroupAllowed(ugi.getGroupNames())));    
+  }
+  
+  /**
+   * Set a generic Object that represents scheduling information relevant
+   * to a queue.
+   * 
+   * A string representation of this Object will be used by the framework
+   * to display in user facing applications like the JobTracker web UI and
+   * the hadoop CLI.
+   * 
+   * @param queueName queue for which the scheduling information is to be set. 
+   * @param queueInfo scheduling information for this queue.
+   */
+  public synchronized void setSchedulerInfo(String queueName, 
+                                              Object queueInfo) {
+    schedulerInfoObjects.put(queueName, queueInfo);
+  }
+  
+  /**
+   * Return the scheduler information configured for this queue.
+   * 
+   * @param queueName queue for which the scheduling information is required.
+   * @return The scheduling information for this queue.
+   * 
+   * @see #setSchedulerInfo(String, Object)
+   */
+  public synchronized Object getSchedulerInfo(String queueName) {
+    return schedulerInfoObjects.get(queueName);
+  }
+  
+  /**
+   * Refresh information configured for queues in the system by reading
+   * it from the passed in {@link org.apache.hadoop.conf.Configuration}.
+   *
+   * Previously stored information about queues is removed and new
+   * information populated from the configuration.
+   * 
+   * @param conf New configuration for the queues. 
+   */
+  public synchronized void refresh(Configuration conf) {
+    queueNames.clear();
+    aclsMap.clear();
+    initialize(conf);
+  }
+  
+  private void initialize(Configuration conf) {
+    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
+    String[] queues = conf.getStrings("mapred.queue.names", 
+                                  new String[] {JobConf.DEFAULT_QUEUE_NAME});
+    addToSet(queueNames, queues);
+    
+    // for every queue, and every operation, get the ACL
+    // if any is specified and store in aclsMap.
+    for (String queue : queues) {
+      for (QueueOperation oper : QueueOperation.values()) {
+        String key = toFullPropertyName(queue, oper.getAclName());
+        String aclString = conf.get(key, "*");
+        aclsMap.put(key, new ACL(aclString));
+      }
+    }
+  }
+  
+  private static final String toFullPropertyName(String queue, 
+      String property) {
+    return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
+  }
+  
+  private static final void addToSet(Set<String> set, String[] elems) {
+    for (String elem : elems) {
+      set.add(elem);
+    }
+  }
+}

+ 7 - 0
src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java

@@ -56,4 +56,11 @@ interface TaskTrackerManager {
    */
    */
   public void removeJobInProgressListener(JobInProgressListener listener);
   public void removeJobInProgressListener(JobInProgressListener listener);
 
 
+  /**
+   * Return the {@link QueueManager} which manages the queues in this
+   * {@link TaskTrackerManager}.
+   *
+   * @return the {@link QueueManager}
+   */
+  public QueueManager getQueueManager();
 }
 }

+ 5 - 0
src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java

@@ -139,6 +139,11 @@ public class TestJobQueueTaskScheduler extends TestCase {
       listeners.remove(listener);
       listeners.remove(listener);
     }
     }
     
     
+    @Override
+    public QueueManager getQueueManager() {
+      return null;
+    }
+    
     // Test methods
     // Test methods
     
     
     public void submitJob(JobInProgress job) {
     public void submitJob(JobInProgress job) {

+ 349 - 0
src/test/org/apache/hadoop/mapred/TestQueueManager.java

@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.TreeSet;
+
+import javax.security.auth.login.LoginException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+
+public class TestQueueManager extends TestCase {
+
+  private static final Log LOG = LogFactory.getLog(TestQueueManager.class);
+  
+  private MiniDFSCluster miniDFSCluster;
+  private MiniMRCluster miniMRCluster;
+
+  public void testDefaultQueueConfiguration() {
+    JobConf conf = new JobConf();
+    QueueManager qMgr = new QueueManager(conf);
+    Set<String> expQueues = new TreeSet<String>();
+    expQueues.add("default");
+    verifyQueues(expQueues, qMgr.getQueues());
+    // pass true so it will fail if the key is not found.
+    assertFalse(conf.getBoolean("mapred.acls.enabled", true));
+  }
+  
+  public void testMultipleQueues() {
+    JobConf conf = new JobConf();
+    conf.set("mapred.queue.names", "q1,q2,Q3");
+    QueueManager qMgr = new QueueManager(conf);
+    Set<String> expQueues = new TreeSet<String>();
+    expQueues.add("q1");
+    expQueues.add("q2");
+    expQueues.add("Q3");
+    verifyQueues(expQueues, qMgr.getQueues());
+  }
+  
+  public void testSchedulerInfo() {
+    JobConf conf = new JobConf();
+    conf.set("mapred.queue.names", "qq1,qq2");
+    QueueManager qMgr = new QueueManager(conf);
+    qMgr.setSchedulerInfo("qq1", "queueInfoForqq1");
+    qMgr.setSchedulerInfo("qq2", "queueInfoForqq2");
+    assertEquals(qMgr.getSchedulerInfo("qq2"), "queueInfoForqq2");
+    assertEquals(qMgr.getSchedulerInfo("qq1"), "queueInfoForqq1");
+  }
+  
+  public void testAllEnabledACLForJobSubmission() throws IOException {
+    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
+    verifyJobSubmission(conf, true);
+  }
+  
+  public void testAllDisabledACLForJobSubmission() throws IOException {
+    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "");
+    verifyJobSubmission(conf, false);
+  }
+  
+  public void testUserDisabledACLForJobSubmission() throws IOException {
+    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", 
+                                "3698-non-existent-user");
+    verifyJobSubmission(conf, false);
+  }
+  
+  public void testDisabledACLForNonDefaultQueue() throws IOException {
+    // allow everyone in default queue
+    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
+    // setup a different queue
+    conf.set("mapred.queue.names", "default,q1");
+    // setup a different acl for this queue.
+    conf.set("mapred.queue.q1.acl-submit-job", "dummy-user");
+    // verify job submission to other queue fails.
+    verifyJobSubmission(conf, false, "q1");
+  }
+  
+  public void testEnabledACLForNonDefaultQueue() throws IOException,
+                                                          LoginException {
+    // login as self...
+    UserGroupInformation ugi = UnixUserGroupInformation.login();
+    String userName = ugi.getUserName();
+    // allow everyone in default queue
+    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
+    // setup a different queue
+    conf.set("mapred.queue.names", "default,q2");
+    // setup a different acl for this queue.
+    conf.set("mapred.queue.q2.acl-submit-job", userName);
+    // verify job submission to other queue fails.
+    verifyJobSubmission(conf, true, "q2");
+  }
+  
+  public void testUserEnabledACLForJobSubmission() 
+                                    throws IOException, LoginException {
+    // login as self...
+    UserGroupInformation ugi = UnixUserGroupInformation.login();
+    String userName = ugi.getUserName();
+    JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
+                                  "3698-junk-user," + userName 
+                                    + " 3698-junk-group1,3698-junk-group2");
+    verifyJobSubmission(conf, true);
+  }
+  
+  public void testGroupsEnabledACLForJobSubmission() 
+                                    throws IOException, LoginException {
+    // login as self, get one group, and add in allowed list.
+    UserGroupInformation ugi = UnixUserGroupInformation.login();
+    String[] groups = ugi.getGroupNames();
+    assertTrue(groups.length > 0);
+    JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
+                                "3698-junk-user1,3698-junk-user2 " 
+                                  + groups[groups.length-1] 
+                                           + ",3698-junk-group");
+    verifyJobSubmission(conf, true);
+  }
+  
+  public void testAllEnabledACLForJobKill() throws IOException {
+    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "*");
+    verifyJobKill(conf, true);
+  }
+
+  public void testAllDisabledACLForJobKill() throws IOException {
+    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "");
+    verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
+  }
+  
+  public void testOwnerAllowedForJobKill() throws IOException {
+    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", 
+                                              "junk-user");
+    verifyJobKill(conf, true);
+  }
+  
+  public void testUserDisabledACLForJobKill() throws IOException {
+    //setup a cluster allowing a user to submit
+    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", 
+                                              "dummy-user");
+    verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
+  }
+  
+  public void testUserEnabledACLForJobKill() throws IOException, 
+                                                    LoginException {
+    // login as self...
+    UserGroupInformation ugi = UnixUserGroupInformation.login();
+    String userName = ugi.getUserName();
+    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
+                                              "dummy-user,"+userName);
+    verifyJobKillAsOtherUser(conf, true, "dummy-user,dummy-user-group");
+  }
+  
+  private JobConf setupConf(String aclName, String aclValue) {
+    JobConf conf = new JobConf();
+    conf.setBoolean("mapred.acls.enabled", true);
+    conf.set(aclName, aclValue);
+    return conf;
+  }
+  
+  private void verifyQueues(Set<String> expectedQueues, 
+                                          Set<String> actualQueues) {
+    assertEquals(expectedQueues.size(), actualQueues.size());
+    for (String queue : expectedQueues) {
+      assertTrue(actualQueues.contains(queue));
+    }
+  }
+  
+  private void verifyJobSubmission(JobConf conf, boolean shouldSucceed) 
+                                              throws IOException {
+    verifyJobSubmission(conf, shouldSucceed, "default");
+  }
+
+  private void verifyJobSubmission(JobConf conf, boolean shouldSucceed, 
+                                    String queue) throws IOException {
+    setUpCluster(conf);
+    try {
+      RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, null, queue);
+      if (shouldSucceed) {
+        assertTrue(rjob.isSuccessful());
+      } else {
+        fail("Job submission should have failed.");
+      }
+    } catch (IOException ioe) {
+      if (shouldSucceed) {
+        throw ioe;
+      } else {
+        LOG.info("exception while submitting job: " + ioe.getMessage());
+        assertTrue(ioe.getMessage().
+            contains("cannot perform operation " +
+            "SUBMIT_JOB on queue " + queue));
+      }
+    } finally {
+      tearDownCluster();
+    }
+}
+
+  private void verifyJobKill(JobConf conf, boolean shouldSucceed) 
+                                      throws IOException {
+    setUpCluster(conf);
+    try {
+      RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false);
+      assertFalse(rjob.isComplete());
+      while(rjob.mapProgress() == 0.0f) {
+        try {
+          Thread.sleep(10);  
+        } catch (InterruptedException ie) {
+          break;
+        }
+      }
+      rjob.killJob();
+      if (shouldSucceed) {
+        assertTrue(rjob.isComplete());
+      } else {
+        fail("Job kill should have failed.");
+      }
+    } catch (IOException ioe) {
+      if (shouldSucceed) {
+        throw ioe;
+      } else {
+        LOG.info("exception while submitting job: " + ioe.getMessage());
+        assertTrue(ioe.getMessage().
+                        contains("cannot perform operation " +
+                                    "ADMINISTER_JOBS on queue default"));
+      }
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  private void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed,
+                                        String otherUserInfo) 
+                        throws IOException {
+    setUpCluster(conf);
+    try {
+      // submit a job as another user.
+      String userInfo = otherUserInfo;
+      RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
+      assertFalse(rjob.isComplete());
+
+      //try to kill as self
+      try {
+        rjob.killJob();
+        if (!shouldSucceed) {
+          fail("should fail kill operation");  
+        }
+      } catch (IOException ioe) {
+        if (shouldSucceed) {
+          throw ioe;
+        }
+        //verify it fails
+        LOG.info("exception while submitting job: " + ioe.getMessage());
+        assertTrue(ioe.getMessage().
+                        contains("cannot perform operation " +
+                                    "ADMINISTER_JOBS on queue default"));
+      }
+      //wait for job to complete on its own
+      while (!rjob.isComplete()) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+          break;
+        }
+      }
+    } finally {
+      tearDownCluster();
+    }
+  }
+  
+  private void setUpCluster(JobConf conf) throws IOException {
+    miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fileSys = miniDFSCluster.getFileSystem();
+    String namenode = fileSys.getUri().toString();
+    miniMRCluster = new MiniMRCluster(1, namenode, 3, 
+                      null, null, conf);
+  }
+  
+  private void tearDownCluster() throws IOException {
+    if (miniMRCluster != null) { miniMRCluster.shutdown(); }
+    if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
+  }
+  
+  private RunningJob submitSleepJob(int numMappers, int numReducers, 
+                            long mapSleepTime, long reduceSleepTime,
+                            boolean shouldComplete) 
+                              throws IOException {
+    return submitSleepJob(numMappers, numReducers, mapSleepTime,
+                          reduceSleepTime, shouldComplete, null);
+  }
+  
+  private RunningJob submitSleepJob(int numMappers, int numReducers, 
+                                      long mapSleepTime, long reduceSleepTime,
+                                      boolean shouldComplete, String userInfo) 
+                                            throws IOException {
+    return submitSleepJob(numMappers, numReducers, mapSleepTime, 
+                          reduceSleepTime, shouldComplete, userInfo, null);
+  }
+
+  private RunningJob submitSleepJob(int numMappers, int numReducers, 
+                                    long mapSleepTime, long reduceSleepTime,
+                                    boolean shouldComplete, String userInfo,
+                                    String queueName) 
+                                      throws IOException {
+    JobConf clientConf = new JobConf();
+    clientConf.set("mapred.job.tracker", "localhost:"
+        + miniMRCluster.getJobTrackerPort());
+    SleepJob job = new SleepJob();
+    job.setConf(clientConf);
+    clientConf = job.setupJobConf(numMappers, numReducers, 
+        mapSleepTime, (int)mapSleepTime/100,
+        reduceSleepTime, (int)reduceSleepTime/100);
+    if (queueName != null) {
+      clientConf.setQueueName(queueName);
+    }
+    RunningJob rJob = null;
+    if (shouldComplete) {
+      rJob = JobClient.runJob(clientConf);  
+    } else {
+      JobConf jc = new JobConf(clientConf);
+      if (userInfo != null) {
+        jc.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
+      }
+      rJob = new JobClient(clientConf).submitJob(jc);
+    }
+    return rJob;
+  }
+
+}