Browse Source

HADOOP-5913. Provide ability to an administrator to stop and start job queues. Contributed by Rahul Kumar Singh and Hemanth Yamijala.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@785643 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala 16 years ago
parent
commit
d2d141d625

+ 2 - 0
.gitignore

@@ -29,6 +29,8 @@ conf/hdfs-site.xml
 conf/hadoop-policy.xml
 conf/capacity-scheduler.xml
 conf/mapred-queue-acls.xml
+conf/mapred-queue-acls.xml.template
+conf/mapred-queues.xml
 docs/api/
 logs/
 src/contrib/ec2/bin/hadoop-ec2-env.sh

+ 3 - 0
CHANGES.txt

@@ -68,6 +68,9 @@ Trunk (unreleased changes)
     HADOOP-5698. Change org.apache.hadoop.examples.MultiFileWordCount to 
     use new mapreduce api. (Amareshwari Sriramadasu via sharad)
 
+    HADOOP-5913. Provide ability to an administrator to stop and start
+    job queues. (Rahul Kumar Singh and Hemanth Yamijala via yhemanth)
+
   NEW FEATURES
 
     HADOOP-4268. Change fsck to use ClientProtocol methods so that the

+ 0 - 31
conf/mapred-queue-acls.xml.template

@@ -1,31 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!-- This is a template file for queue acls configuration properties -->
-
-<configuration>
-
-<property>
-  <name>mapred.queue.default.acl-submit-job</name>
-  <value>*</value>
-  <description> Comma separated list of user and group names that are allowed
-    to submit jobs to the 'default' queue. The user list and the group list
-    are separated by a blank. For e.g. alice,bob group1,group2. 
-    If set to the special value '*', it means all users are allowed to 
-    submit jobs. 
-  </description>
-</property>
-
-<property>
-  <name>mapred.queue.default.acl-administer-jobs</name>
-  <value>*</value>
-  <description> Comma separated list of user and group names that are allowed
-    to delete jobs or modify job's priority for jobs not owned by the current
-    user in the 'default' queue. The user list and the group list
-    are separated by a blank. For e.g. alice,bob group1,group2. 
-    If set to the special value '*', it means all users are allowed to do 
-    this operation.
-  </description>
-</property>
-
-</configuration>

+ 68 - 0
conf/mapred-queues.xml.template

@@ -0,0 +1,68 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- This is a template file for queue acls configuration properties -->
+
+<configuration>
+
+<property>
+  <name>mapred.queue.names</name>
+  <value>default</value>
+  <description> Comma separated list of queues configured for this jobtracker.
+    Jobs are added to queues and schedulers can configure different 
+    scheduling properties for the various queues. To configure a property 
+    for a queue, the name of the queue must match the name specified in this 
+    value. Queue properties that are common to all schedulers are configured 
+    here with the naming convention, mapred.queue.$QUEUE-NAME.$PROPERTY-NAME,
+    for e.g. mapred.queue.default.submit-job-acl.
+    The number of queues configured in this parameter could depend on the
+    type of scheduler being used, as specified in 
+    mapred.jobtracker.taskScheduler. For example, the JobQueueTaskScheduler
+    supports only a single queue, which is the default configured here.
+    Before adding more queues, ensure that the scheduler you've configured
+    supports multiple queues.
+  </description>
+</property>
+
+<property>
+  <name>mapred.acls.enabled</name>
+  <value>false</value>
+  <description> Specifies whether ACLs are enabled, and should be checked
+    for various operations.
+  </description>
+</property>
+
+<property>
+  <name>mapred.queue.default.acl-submit-job</name>
+  <value>*</value>
+  <description> Comma separated list of user and group names that are allowed
+    to submit jobs to the 'default' queue. The user list and the group list
+    are separated by a blank. For e.g. alice,bob group1,group2. 
+    If set to the special value '*', it means all users are allowed to 
+    submit jobs. 
+  </description>
+</property>
+
+<property>
+  <name>mapred.queue.default.acl-administer-jobs</name>
+  <value>*</value>
+  <description> Comma separated list of user and group names that are allowed
+    to delete jobs or modify job's priority for jobs not owned by the current
+    user in the 'default' queue. The user list and the group list
+    are separated by a blank. For e.g. alice,bob group1,group2. 
+    If set to the special value '*', it means all users are allowed to do 
+    this operation.
+  </description>
+</property>
+
+<property>
+  <name>mapred.queue.default.state</name>
+  <value>running</value>
+  <description>
+   This values defines the state , default queue is in.
+   the values can be either "stopped" or "running"
+   This value can be changed at runtime.
+  </description>
+</property>
+
+</configuration>

+ 24 - 4
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil.AccessControlList;
 
 public class TestCapacityScheduler extends TestCase {
 
@@ -399,15 +400,34 @@ public class TestCapacityScheduler extends TestCase {
   }
   
   static class FakeQueueManager extends QueueManager {
-    private Set<String> queues = null;
+    private Set<String> queueNames = null;
+    private static final AccessControlList allEnabledAcl = new AccessControlList("*");
+    
     FakeQueueManager() {
       super(new Configuration());
     }
-    void setQueues(Set<String> queues) {
-      this.queues = queues;
+    
+    void setQueues(Set<String> queueNames) {
+      this.queueNames = queueNames;
+      
+      // sync up queues with the parent class.
+      Queue[] queues = new Queue[queueNames.size()];
+      int i = 0;
+      for (String queueName : queueNames) {
+        HashMap<String, AccessControlList> aclsMap 
+          = new HashMap<String, AccessControlList>();
+        for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
+          String key = QueueManager.toFullPropertyName(queueName,
+                                                        oper.getAclName());
+          aclsMap.put(key, allEnabledAcl);
+        }
+        queues[i++] = new Queue(queueName, aclsMap, Queue.QueueState.RUNNING);
+      }
+      super.setQueues(queues);
     }
+    
     public synchronized Set<String> getQueues() {
-      return queues;
+      return queueNames;
     }
   }
   

+ 19 - 10
src/docs/src/documentation/content/xdocs/cluster_setup.xml

@@ -271,6 +271,16 @@
 		        TaskTrackers.
 		      </td>
   		    </tr>
+		  </table>
+      
+      <p><br/><code> conf/mapred-queues.xml</code></p>
+      
+      <table>
+       <tr>
+          <th>Parameter</th>
+          <th>Value</th> 
+          <th>Notes</th>
+       </tr>
         <tr>
           <td>mapred.queue.names</td>
           <td>Comma separated list of queues to which jobs can be submitted.</td>
@@ -303,16 +313,6 @@
             <em>mapred.queue.queue-name.acl-name</em>, defined below.
           </td>
         </tr>
-		  </table>
-      
-      <p><br/><code> conf/mapred-queue-acls.xml</code></p>
-      
-      <table>
-       <tr>
-          <th>Parameter</th>
-          <th>Value</th> 
-          <th>Notes</th>
-       </tr>
         <tr>
           <td>mapred.queue.<em>queue-name</em>.acl-submit-job</td>
           <td>List of users and groups that can submit jobs to the
@@ -340,6 +340,15 @@
             his/her own job, irrespective of the ACLs.
           </td>
         </tr>
+        <tr>
+          <td>mapred.queue.<em>queue-name</em>.state</td>
+          <td>Specifies whether <em>queue-name</em> is running or stopped</td> 
+          <td>
+            Jobs can be submitted to a queue only if it is in the 
+            <em>running</em> state. However, jobs which are already running
+            when a queue is stopped will be allowed to finish.
+          </td>
+        </tr>
       </table>
       
 

+ 8 - 5
src/docs/src/documentation/content/xdocs/commands_manual.xml

@@ -577,16 +577,19 @@
         <p>Runs MR admin client</p>
         <p><code>Usage: hadoop mradmin  [</code>
         <a href="commands_manual.html#Generic+Options">GENERIC_OPTIONS</a>
-        <code>] [-refreshQueueAcls] </code></p>
+        <code>] [-refreshQueues] </code></p>
         <table>
         <tr>
         <th> COMMAND_OPTION </th><th> Description </th>
         </tr>
         <tr>
-        <td><code>-refreshQueueAcls</code></td>
-        <td> Refresh the queue acls used by hadoop, to check access during submissions
-        and administration of the job by the user. The properties present in
-        <code>mapred-queue-acls.xml</code> is reloaded by the queue manager.</td>
+        <td><code>-refreshQueues</code></td>
+        <td>Refresh the access control lists and state of queues configured in 
+        the system. These properties are loaded from 
+        <code>mapred-queues.xml</code>. If the file is malformed, then the
+        existing properties are not disturbed. New operations on jobs in the
+        queues will be subjected to checks against the refreshed ACLs. Likewise,
+        new jobs will be accepted to queues only if the queue state is running.
         </tr>
         </table>
       </section>

+ 6 - 3
src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java

@@ -31,13 +31,16 @@ public interface AdminOperationsProtocol extends VersionedProtocol {
   /**
    * Version 1: Initial version. Added refreshQueueAcls.
    * Version 2: Added node refresh facility
+   * Version 3: Changed refreshQueueAcls to refreshQueues
    */
-  public static final long versionID = 2L;
+  public static final long versionID = 3L;
 
   /**
-   * Refresh the queue acls in use currently.
+   * Refresh the queues used by the jobtracker and scheduler.
+   * 
+   * Access control lists and queue states are refreshed.
    */
-  void refreshQueueAcls() throws IOException;
+  void refreshQueues() throws IOException;
   
   /**
    * Refresh the node list at the {@link JobTracker} 

+ 11 - 6
src/mapred/org/apache/hadoop/mapred/JobQueueClient.java

@@ -112,12 +112,11 @@ class JobQueueClient extends Configured implements  Tool {
    */
 
   private void displayQueueInfo(String queue, boolean showJobs) throws IOException {
-    JobQueueInfo schedInfo = jc.getQueueInfo(queue);
-    if (schedInfo == null) {
+    JobQueueInfo jobQueueInfo = jc.getQueueInfo(queue);
+    if (jobQueueInfo == null) {
       System.out.printf("Queue Name : %s has no scheduling information \n", queue);
     } else {
-      System.out.printf("Queue Name : %s \n", schedInfo.getQueueName());
-      System.out.printf("Scheduling Info : %s \n",schedInfo.getSchedulingInfo());
+      printJobQueueInfo(jobQueueInfo);
     }
     if (showJobs) {
       System.out.printf("Job List\n");
@@ -128,6 +127,13 @@ class JobQueueClient extends Configured implements  Tool {
     }
   }
 
+  // format and print information about the passed in job queue.
+  private void printJobQueueInfo(JobQueueInfo jobQueueInfo) {
+    System.out.printf("Queue Name : %s \n", jobQueueInfo.getQueueName()); 
+    System.out.printf("Queue State : %s \n", jobQueueInfo.getQueueState());
+    System.out.printf("Scheduling Info : %s \n",jobQueueInfo.getSchedulingInfo());
+  }
+
   /**
    * Method used to display the list of the JobQueues registered
    * with the {@link QueueManager}
@@ -141,8 +147,7 @@ class JobQueueClient extends Configured implements  Tool {
       if(schedInfo.trim().equals("")){
         schedInfo = "N/A";
       }
-      System.out.printf("Queue Name : %s \n", queue.getQueueName());
-      System.out.printf("Scheduling Info : %s \n",queue.getSchedulingInfo());
+      printJobQueueInfo(queue);
     }
   }
 

+ 21 - 0
src/mapred/org/apache/hadoop/mapred/JobQueueInfo.java

@@ -37,6 +37,7 @@ public class JobQueueInfo implements Writable {
   //Once the scheduling information is set there is no way to recover it.
   private String schedulingInfo; 
   
+  private String queueState;
   
   /**
    * Default constructor for Job Queue Info.
@@ -56,6 +57,8 @@ public class JobQueueInfo implements Writable {
   public JobQueueInfo(String queueName, String schedulingInfo) {
     this.queueName = queueName;
     this.schedulingInfo = schedulingInfo;
+    // make it running by default.
+    this.queueState = Queue.QueueState.RUNNING.getStateName();
   }
   
   
@@ -100,15 +103,33 @@ public class JobQueueInfo implements Writable {
     }
   }
   
+  /**
+   * Set the state of the queue
+   * @param state state of the queue.
+   */
+  public void setQueueState(String state) {
+    queueState = state;
+  }
+  
+  /**
+   * Return the queue state
+   * @return the queue state.
+   */
+  public String getQueueState() {
+    return queueState;
+  }
+  
   @Override
   public void readFields(DataInput in) throws IOException {
     queueName = Text.readString(in);
+    queueState = Text.readString(in);
     schedulingInfo = Text.readString(in);
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, queueName);
+    Text.writeString(out, queueState);
     if(schedulingInfo!= null) {
       Text.writeString(out, schedulingInfo);
     }else {

+ 3 - 1
src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java

@@ -61,8 +61,10 @@ interface JobSubmissionProtocol extends VersionedProtocol {
    * Version 21: Modified TaskID to be aware of the new TaskTypes                                 
    * Version 22: Added method getQueueAclsForCurrentUser to get queue acls info
    *             for a user
+   * Version 23: Modified the JobQueueInfo class to inlucde queue state.
+   *             Part of HADOOP-5913.            
    */
-  public static final long versionID = 22L;
+  public static final long versionID = 23L;
 
   /**
    * Allocate a name for the job.

+ 18 - 13
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -753,7 +753,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         
         TaskID id = TaskID.forName(taskId);
         TaskInProgress tip = getTip(id);
-        
+
         updateTip(tip, task);
       }
 
@@ -766,7 +766,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         
         // Check if the transaction for this attempt can be committed
         String taskStatus = attempt.get(Keys.TASK_STATUS);
-        
+
         if (taskStatus.length() > 0) {
           // This means this is an update event
           if (taskStatus.equals(Values.SUCCESS.name())) {
@@ -1282,7 +1282,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
           // check the access
           try {
-            checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB, ugi);
+            checkAccess(job, Queue.QueueOperation.SUBMIT_JOB, ugi);
           } catch (Throwable t) {
             LOG.warn("Access denied for user " + ugi.getUserName() 
                      + " in groups : [" 
@@ -3081,10 +3081,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
       throw new IOException("Queue \"" + queue + "\" does not exist");        
     }
-
+    
+    //check if queue is RUNNING
+    if(!queueManager.isRunning(queue)) {
+      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));      
+      throw new IOException("Queue \"" + queue + "\" is not running");
+    }
     // check for access
     try {
-      checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
+      checkAccess(job, Queue.QueueOperation.SUBMIT_JOB);
     } catch (IOException ioe) {
        LOG.warn("Access denied for user " + job.getJobConf().getUser() 
                 + ". Ignoring job " + jobId, ioe);
@@ -3132,7 +3137,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   // Check whether the specified operation can be performed
   // related to the job.
   private void checkAccess(JobInProgress job, 
-                                QueueManager.QueueOperation oper) 
+                                Queue.QueueOperation oper) 
                                   throws IOException {
     // get the user group info
     UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
@@ -3140,7 +3145,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   }
 
   // use the passed ugi for checking the access
-  private void checkAccess(JobInProgress job, QueueManager.QueueOperation oper,
+  private void checkAccess(JobInProgress job, Queue.QueueOperation oper,
                            UserGroupInformation ugi) throws IOException {
     // get the queue
     String queue = job.getProfile().getQueueName();
@@ -3202,7 +3207,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     }
         
     JobStatus prevStatus = (JobStatus)job.getStatus().clone();
-    checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
+    checkAccess(job, Queue.QueueOperation.ADMINISTER_JOBS);
     job.kill();
     
     // Inform the listeners if the job is killed
@@ -3235,7 +3240,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
             + " is not a valid job");
         return;
     }
-    checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
+    checkAccess(job, Queue.QueueOperation.ADMINISTER_JOBS);
     JobPriority newPriority = JobPriority.valueOf(priority);
     setJobPriority(jobid, newPriority);
   }
@@ -3460,7 +3465,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail) throws IOException{
     TaskInProgress tip = taskidToTIPMap.get(taskid);
     if(tip != null) {
-      checkAccess(tip.getJob(), QueueManager.QueueOperation.ADMINISTER_JOBS);
+      checkAccess(tip.getJob(), Queue.QueueOperation.ADMINISTER_JOBS);
       return tip.killTask(taskid, shouldFail);
     }
     else {
@@ -3857,10 +3862,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   }
 
   @Override
-  public void refreshQueueAcls() throws IOException{
-    LOG.info("Refreshing queue acls. requested by : " + 
+  public void refreshQueues() throws IOException{
+    LOG.info("Refreshing queue information. requested by : " + 
         UserGroupInformation.getCurrentUGI().getUserName());
-    this.queueManager.refreshAcls(new Configuration(this.conf));
+    this.queueManager.refreshQueues(new Configuration(this.conf));
   }
 
   private void initializeTaskMemoryRelatedConfig() {

+ 185 - 0
src/mapred/org/apache/hadoop/mapred/Queue.java

@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A class for storing the properties of a job queue.
+ */
+class Queue {
+
+  private static final Log LOG = LogFactory.getLog(Queue.class);
+
+  //Queue name
+  private String name;
+
+  //acls list
+  private Map<String, AccessControlList> acls;
+
+  //Queue State
+  private QueueState state;
+
+  // An Object that can be used by schedulers to fill in
+  // arbitrary scheduling information. The toString method
+  // of these objects will be called by the framework to
+  // get a String that can be displayed on UI.
+  private Object schedulingInfo;
+
+  /**
+   * Enum representing queue state
+   */
+  static enum QueueState {
+
+    STOPPED("stopped"), RUNNING("running");
+    private final String stateName;
+
+    QueueState(String stateName) {
+      this.stateName = stateName;
+    }
+
+    /**
+     * @return the stateName
+     */
+    public String getStateName() {
+      return stateName;
+    }
+
+  }
+
+  /**
+   * Enum representing an operation that can be performed on a queue.
+   */
+  static enum QueueOperation {
+    SUBMIT_JOB ("acl-submit-job", false),
+    ADMINISTER_JOBS ("acl-administer-jobs", true);
+    // TODO: Add ACL for LIST_JOBS when we have ability to authenticate
+    //       users in UI
+    // TODO: Add ACL for CHANGE_ACL when we have an admin tool for
+    //       configuring queues.
+
+    private final String aclName;
+    private final boolean jobOwnerAllowed;
+
+    QueueOperation(String aclName, boolean jobOwnerAllowed) {
+      this.aclName = aclName;
+      this.jobOwnerAllowed = jobOwnerAllowed;
+    }
+
+    final String getAclName() {
+      return aclName;
+    }
+
+    final boolean isJobOwnerAllowed() {
+      return jobOwnerAllowed;
+    }
+  }
+
+  /**
+   * Create a job queue
+   * @param name name of the queue
+   * @param acls ACLs for the queue
+   * @param state state of the queue
+   */
+  Queue(String name, Map<String, AccessControlList> acls, QueueState state) {
+	  this.name = name;
+	  this.acls = acls;
+	  this.state = state;
+  }
+  
+  /**
+   * Return the name of the queue
+   * 
+   * @return name of the queue
+   */
+  String getName() {
+    return name;
+  }
+  
+  /**
+   * Set the name of the queue
+   * @param name name of the queue
+   */
+  void setName(String name) {
+    this.name = name;
+  }
+  
+  /**
+   * Return the ACLs for the queue
+   * 
+   * The keys in the map indicate the operations that can be performed,
+   * and the values indicate the list of users/groups who can perform
+   * the operation.
+   * 
+   * @return Map containing the operations that can be performed and
+   *          who can perform the operations.
+   */
+  Map<String, AccessControlList> getAcls() {
+    return acls;
+  }
+  
+  /**
+   * Set the ACLs for the queue
+   * @param acls Map containing the operations that can be performed and
+   *          who can perform the operations.
+   */
+  void setAcls(Map<String, AccessControlList> acls) {
+    this.acls = acls;
+  }
+  
+  /**
+   * Return the state of the queue.
+   * @return state of the queue
+   */
+  QueueState getState() {
+    return state;
+  }
+  
+  /**
+   * Set the state of the queue.
+   * @param state state of the queue.
+   */
+  void setState(QueueState state) {
+    this.state = state;
+  }
+  
+  /**
+   * Return the scheduling information for the queue
+   * @return scheduling information for the queue.
+   */
+  Object getSchedulingInfo() {
+    return schedulingInfo;
+  }
+  
+  /**
+   * Set the scheduling information from the queue.
+   * @param schedulingInfo scheduling information for the queue.
+   */
+  void setSchedulingInfo(Object schedulingInfo) {
+    this.schedulingInfo = schedulingInfo;
+  }
+}

+ 209 - 132
src/mapred/org/apache/hadoop/mapred/QueueManager.java

@@ -21,12 +21,14 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
-import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Queue.QueueState;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.SecurityUtil.AccessControlList;
 import org.apache.hadoop.util.StringUtils;
@@ -49,51 +51,24 @@ import org.apache.hadoop.util.StringUtils;
 class QueueManager {
   
   private static final Log LOG = LogFactory.getLog(QueueManager.class);
-  
-  // Prefix in configuration for queue related keys
-  private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX 
-                                                        = "mapred.queue.";
-  // Configured queues
+
+  // Configured queues this is backed by queues Map , mentioned below
   private Set<String> queueNames;
-  // Map of a queue and ACL property name with an ACL
-  private HashMap<String, AccessControlList> aclsMap;
-  // Map of a queue name to any generic object that represents 
-  // scheduler information 
-  private HashMap<String, Object> schedulerInfoObjects;
+
+  // Map of a queue name and Queue object
+  private HashMap<String, Queue> queues;
+
   // Whether ACLs are enabled in the system or not.
   private boolean aclsEnabled;
-  
-  //Resource in which queue acls are configured.
-  static final String QUEUE_ACLS_FILE_NAME = "mapred-queue-acls.xml";
-  
-  /**
-   * Enum representing an operation that can be performed on a queue.
-   */
-  static enum QueueOperation {
-    SUBMIT_JOB ("acl-submit-job", false),
-    ADMINISTER_JOBS ("acl-administer-jobs", true);
-    // TODO: Add ACL for LIST_JOBS when we have ability to authenticate 
-    //       users in UI
-    // TODO: Add ACL for CHANGE_ACL when we have an admin tool for 
-    //       configuring queues.
-    
-    private final String aclName;
-    private final boolean jobOwnerAllowed;
-    
-    QueueOperation(String aclName, boolean jobOwnerAllowed) {
-      this.aclName = aclName;
-      this.jobOwnerAllowed = jobOwnerAllowed;
-    }
 
-    final String getAclName() {
-      return aclName;
-    }
-    
-    final boolean isJobOwnerAllowed() {
-      return jobOwnerAllowed;
-    }
-  }
-  
+  static final String QUEUE_STATE_SUFFIX = "state";
+
+  static final String QUEUE_CONF_FILE_NAME = "mapred-queues.xml";
+
+  // Prefix in configuration for queue related keys
+  static final String QUEUE_CONF_PROPERTY_NAME_PREFIX
+                          = "mapred.queue.";//Resource in which queue acls are configured.
+
   /**
    * Construct a new QueueManager using configuration specified in the passed
    * in {@link org.apache.hadoop.conf.Configuration} object.
@@ -101,10 +76,28 @@ class QueueManager {
    * @param conf Configuration object where queue configuration is specified.
    */
   public QueueManager(Configuration conf) {
-    queueNames = new TreeSet<String>();
-    aclsMap = new HashMap<String, AccessControlList>();
-    schedulerInfoObjects = new HashMap<String, Object>();
-    initialize(conf);
+    checkDeprecation(conf);
+    conf.addResource(QUEUE_CONF_FILE_NAME);
+
+    queues = new HashMap<String, Queue>();
+
+    // First get the queue names 
+    String[] queueNameValues = conf.getStrings("mapred.queue.names",
+        new String[]{JobConf.DEFAULT_QUEUE_NAME});
+
+    // Get configured ACLs and state for each queue
+    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
+    for (String name : queueNameValues) {
+      try {
+        Map<String, AccessControlList> acls = getQueueAcls(name, conf);
+        QueueState state = getQueueState(name, conf);
+        queues.put(name, new Queue(name, acls, state));
+      } catch (Throwable t) {
+        LOG.warn("Not able to initialize queue " + name);
+      }
+    }
+    // Sync queue names with the configured queues.
+    queueNames = queues.keySet();
   }
   
   /**
@@ -121,7 +114,7 @@ class QueueManager {
   }
   
   /**
-   * Return true if the given {@link QueueManager.QueueOperation} can be 
+   * Return true if the given {@link Queue.QueueOperation} can be 
    * performed by the specified user on the given queue.
    * 
    * An operation is allowed if all users are provided access for this
@@ -134,13 +127,14 @@ class QueueManager {
    * 
    * @return true if the operation is allowed, false otherwise.
    */
-  public synchronized boolean hasAccess(String queueName, QueueOperation oper,
+  public synchronized boolean hasAccess(String queueName, 
+                                Queue.QueueOperation oper,
                                 UserGroupInformation ugi) {
     return hasAccess(queueName, null, oper, ugi);
   }
   
   /**
-   * Return true if the given {@link QueueManager.QueueOperation} can be 
+   * Return true if the given {@link Queue.QueueOperation} can be 
    * performed by the specified user on the specified job in the given queue.
    * 
    * An operation is allowed either if the owner of the job is the user 
@@ -148,7 +142,7 @@ class QueueManager {
    * operation, or if either the user or any of the groups specified is
    * provided access.
    * 
-   * If the {@link QueueManager.QueueOperation} is not job specific then the 
+   * If the {@link Queue.QueueOperation} is not job specific then the 
    * job parameter is ignored.
    * 
    * @param queueName Queue on which the operation needs to be performed.
@@ -160,28 +154,37 @@ class QueueManager {
    * @return true if the operation is allowed, false otherwise.
    */
   public synchronized boolean hasAccess(String queueName, JobInProgress job, 
-                                QueueOperation oper, 
+                                Queue.QueueOperation oper, 
                                 UserGroupInformation ugi) {
     if (!aclsEnabled) {
       return true;
     }
     
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("checking access for : " + toFullPropertyName(queueName, 
-                                            oper.getAclName()));      
+    Queue q = queues.get(queueName);
+    if (q == null) {
+      LOG.info("Queue " + queueName + " is not present");
+      return false;
     }
     
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("checking access for : " 
+          + QueueManager.toFullPropertyName(queueName, oper.getAclName()));
+    }
+
     if (oper.isJobOwnerAllowed()) {
-      if (job != null && job.getJobConf().getUser().equals(ugi.getUserName())) {
+      if (job != null 
+          && job.getJobConf().getUser().equals(ugi.getUserName())) {
         return true;
       }
     }
     
-    AccessControlList acl = aclsMap.get(toFullPropertyName(queueName, oper.getAclName()));
+    AccessControlList acl = q.getAcls().get(
+                                toFullPropertyName(queueName, 
+                                    oper.getAclName()));
     if (acl == null) {
       return false;
     }
-    
+
     // Check the ACL list
     boolean allowed = acl.allAllowed();
     if (!allowed) {
@@ -199,8 +202,21 @@ class QueueManager {
         }
       }
     }
-    
-    return allowed;    
+
+    return allowed;
+  }
+
+  /**
+   * Checks whether the given queue is running or not.
+   * @param queueName name of the queue
+   * @return true, if the queue is running.
+   */
+  synchronized boolean isRunning(String queueName) {
+    Queue q = queues.get(queueName);
+    if (q != null) {
+      return q.getState().equals(QueueState.RUNNING);
+    }
+    return false;
   }
   
   /**
@@ -216,7 +232,8 @@ class QueueManager {
    */
   public synchronized void setSchedulerInfo(String queueName, 
                                               Object queueInfo) {
-    schedulerInfoObjects.put(queueName, queueInfo);
+    if (queues.get(queueName) != null)
+      queues.get(queueName).setSchedulingInfo(queueInfo);
   }
   
   /**
@@ -225,125 +242,179 @@ class QueueManager {
    * @param queueName queue for which the scheduling information is required.
    * @return The scheduling information for this queue.
    * 
-   * @see #setSchedulerInfo(String, Object)
+   * @see #setSchedulingInfo(String, Object)
    */
   public synchronized Object getSchedulerInfo(String queueName) {
-    return schedulerInfoObjects.get(queueName);
+    if (queues.get(queueName) != null)
+      return queues.get(queueName).getSchedulingInfo();
+    return null;
   }
   
   /**
-   * Refresh the acls for the configured queues in the system by reading
-   * it from mapred-queue-acls.xml.
+   * Refresh the acls and state for the configured queues in the system 
+   * by reading it from mapred-queues.xml.
    * 
-   * The previous acls are removed. Previously configured queues and
-   * if or not acl is disabled is retained.
+   * Previously configured queues and if or not acls are disabled is retained.
    * 
-   * @throws IOException when queue ACL configuration file is invalid.
+   * @throws IOException when queue configuration file is invalid.
    */
-  synchronized void refreshAcls(Configuration conf) throws IOException {
+  synchronized void refreshQueues(Configuration conf) throws IOException {
+
+    // First check if things are configured in mapred-site.xml,
+    // so we can print out a deprecation warning. 
+    // This check is needed only until we support the configuration
+    // in mapred-site.xml
+    checkDeprecation(conf);
+    
+    // Add the queue configuration file. Values from mapred-site.xml
+    // will be overridden.
+    conf.addResource(QUEUE_CONF_FILE_NAME);
+    
+    // Now we refresh the properties of the queues. Note that we
+    // do *not* refresh the queue names or the acls flag. Instead
+    // we use the older values configured for them.
+    LOG.info("Refreshing acls and state for configured queues.");
     try {
-      HashMap<String, AccessControlList> newAclsMap = 
-        getQueueAcls(conf);
-      aclsMap = newAclsMap;
+      Iterator<String> itr = queueNames.iterator();
+      while(itr.hasNext()) {
+        String name = itr.next();
+        Queue q = queues.get(name);
+        Map<String, AccessControlList> newAcls = getQueueAcls(name, conf);
+        QueueState newState = getQueueState(name, conf);
+        q.setAcls(newAcls);
+        q.setState(newState);
+      }
     } catch (Throwable t) {
       String exceptionString = StringUtils.stringifyException(t);
-      LOG.warn("Queue ACLs could not be refreshed because there was an " +
+      LOG.warn("Queues could not be refreshed because there was an " +
       		"exception in parsing the configuration: "+ exceptionString +
-      		". Existing ACLs are retained.");
+      		". Existing ACLs/state is retained.");
       throw new IOException(exceptionString);
     }
-
   }
   
+  // Check if queue properties are configured in the passed in
+  // configuration. If yes, print out deprecation warning messages.
   private void checkDeprecation(Configuration conf) {
-    for(String queue: queueNames) {
-      for (QueueOperation oper : QueueOperation.values()) {
-        String key = toFullPropertyName(queue, oper.getAclName());
-        String aclString = conf.get(key);
-        if(aclString != null) {
-          LOG.warn("Configuring queue ACLs in mapred-site.xml or " +
-          		"hadoop-site.xml is deprecated. Configure queue ACLs in " + 
-          		QUEUE_ACLS_FILE_NAME);
-          return;
+
+    // check if queues are defined.
+    String[] queues = null;
+    String queueNameValues = conf.get("mapred.queue.names");
+    if (queueNameValues != null) {
+      LOG.warn("Configuring \"mapred.queue.names\" in mapred-site.xml or " +
+          		"hadoop-site.xml is deprecated. Configure " +
+              "\"mapred.queue.names\" in " +
+          		QUEUE_CONF_FILE_NAME);
+      // store queues so we can check if ACLs are also configured
+      // in the deprecated files.
+      queues = conf.getStrings("mapred.queue.names");
+    }
+    
+    // check if the acls flag is defined
+    String aclsEnable = conf.get("mapred.acls.enabled");
+    if (aclsEnable != null) {
+      LOG.warn("Configuring \"mapred.acls.enabled\" in mapred-site.xml or " +
+          		"hadoop-site.xml is deprecated. Configure " +
+              "\"mapred.acls.enabled\" in " +
+          		QUEUE_CONF_FILE_NAME);
+    }
+    
+    // check if acls are defined
+    if (queues != null) {
+      for (String queue : queues) {
+        for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
+          String key = toFullPropertyName(queue, oper.getAclName());
+          String aclString = conf.get(key);
+          if (aclString != null) {
+            LOG.warn("Configuring queue ACLs in mapred-site.xml or " +
+          	  "hadoop-site.xml is deprecated. Configure queue ACLs in " + 
+          		QUEUE_CONF_FILE_NAME);
+            // even if one string is configured, it is enough for printing
+            // the warning. so we can return from here.
+            return;
+          }
         }
       }
     }
   }
   
-  private HashMap<String, AccessControlList> getQueueAcls(Configuration conf)  {
-    checkDeprecation(conf);
-    conf.addResource(QUEUE_ACLS_FILE_NAME);
-    HashMap<String, AccessControlList> aclsMap = 
-      new HashMap<String, AccessControlList>();
-    for (String queue : queueNames) {
-      for (QueueOperation oper : QueueOperation.values()) {
-        String key = toFullPropertyName(queue, oper.getAclName());
-        String aclString = conf.get(key, "*");
-        aclsMap.put(key, new AccessControlList(aclString));
-      }
-    } 
-    return aclsMap;
+  // Parse ACLs for the queue from the configuration.
+  private Map<String, AccessControlList> getQueueAcls(String name,
+                                                        Configuration conf) {
+    HashMap<String, AccessControlList> map = 
+        new HashMap<String, AccessControlList>();
+    for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
+      String aclKey = toFullPropertyName(name, oper.getAclName());
+      map.put(aclKey, new AccessControlList(conf.get(aclKey, "*")));
+    }
+    return map;
   }
-  
-  private void initialize(Configuration conf) {
-    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
-    String[] queues = conf.getStrings("mapred.queue.names", 
-        new String[] {JobConf.DEFAULT_QUEUE_NAME});
-    addToSet(queueNames, queues);
-    aclsMap = getQueueAcls(conf);
+
+  // Parse ACLs for the queue from the configuration.
+  private QueueState getQueueState(String name, Configuration conf) {
+    QueueState retState = QueueState.RUNNING;
+    String stateVal = conf.get(toFullPropertyName(name,
+                                                  QueueManager.QUEUE_STATE_SUFFIX),
+                               QueueState.RUNNING.getStateName());
+    for (QueueState state : QueueState.values()) {
+      if (state.getStateName().equalsIgnoreCase(stateVal)) {
+        retState = state;
+        break;
+      }
+    }
+    return retState;
   }
-  
-  private static final String toFullPropertyName(String queue, 
+ 
+  public static final String toFullPropertyName(String queue,
       String property) {
     return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
   }
-  
-  private static final void addToSet(Set<String> set, String[] elems) {
-    for (String elem : elems) {
-      set.add(elem);
-    }
-  }
-  
+
   synchronized JobQueueInfo[] getJobQueueInfos() {
     ArrayList<JobQueueInfo> queueInfoList = new ArrayList<JobQueueInfo>();
-    for(String queue : queueNames) {
-      Object schedulerInfo = schedulerInfoObjects.get(queue);
-      if(schedulerInfo != null) {
-        queueInfoList.add(new JobQueueInfo(queue,schedulerInfo.toString()));
-      }else {
-        queueInfoList.add(new JobQueueInfo(queue,null));
+    for (String queue : queueNames) {
+      JobQueueInfo queueInfo = getJobQueueInfo(queue);
+      if (queueInfo != null) {
+        queueInfoList.add(getJobQueueInfo(queue));  
       }
     }
-    return (JobQueueInfo[]) queueInfoList.toArray(new JobQueueInfo[queueInfoList
-        .size()]);
+    return (JobQueueInfo[]) queueInfoList.toArray(
+            new JobQueueInfo[queueInfoList.size()]);
   }
 
-  JobQueueInfo getJobQueueInfo(String queue) {
-    Object schedulingInfo = schedulerInfoObjects.get(queue);
-    if(schedulingInfo!=null){
-      return new JobQueueInfo(queue,schedulingInfo.toString());
-    }else {
-      return new JobQueueInfo(queue,null);
+  synchronized JobQueueInfo getJobQueueInfo(String queue) {
+    if (queues.get(queue) != null) {
+      Object schedulingInfo = queues.get(queue).getSchedulingInfo();
+      JobQueueInfo qInfo;
+      if (schedulingInfo != null) {
+        qInfo = new JobQueueInfo(queue, schedulingInfo.toString());
+      } else {
+        qInfo = new JobQueueInfo(queue, null);
+      }
+      qInfo.setQueueState(queues.get(queue).getState().getStateName());
+      return qInfo;
     }
+    return null;
   }
 
   /**
-   * Generates the array of QueueAclsInfo object. The array consists of only those queues
-   * for which user <ugi.getUserName()> has acls
+   * Generates the array of QueueAclsInfo object. 
+   * 
+   * The array consists of only those queues for which user has acls.
    *
    * @return QueueAclsInfo[]
    * @throws java.io.IOException
    */
-  synchronized QueueAclsInfo[] getQueueAcls(UserGroupInformation
-          ugi) throws IOException {
+  synchronized QueueAclsInfo[] getQueueAcls(UserGroupInformation ugi)
+                                            throws IOException {
     //List of all QueueAclsInfo objects , this list is returned
     ArrayList<QueueAclsInfo> queueAclsInfolist =
             new ArrayList<QueueAclsInfo>();
-    QueueOperation[] operations = QueueOperation.values();
+    Queue.QueueOperation[] operations = Queue.QueueOperation.values();
     for (String queueName : queueNames) {
       QueueAclsInfo queueAclsInfo = null;
       ArrayList<String> operationsAllowed = null;
-      for (QueueOperation operation : operations) {
+      for (Queue.QueueOperation operation : operations) {
         if (hasAccess(queueName, operation, ugi)) {
           if (operationsAllowed == null) {
             operationsAllowed = new ArrayList<String>();
@@ -363,4 +434,10 @@ class QueueManager {
             queueAclsInfolist.size()]);
   }
 
+  // ONLY FOR TESTING - Do not use in production code.
+  synchronized void setQueues(Queue[] queues) {
+    for (Queue queue : queues) {
+      this.queues.put(queue.getName(), queue);
+    }
+  }
 }

+ 15 - 16
src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java

@@ -30,7 +30,6 @@ import org.apache.hadoop.mapred.AdminOperationsProtocol;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -54,15 +53,15 @@ public class MRAdmin extends Configured implements Tool {
   private static void printHelp(String cmd) {
     String summary = "hadoop mradmin is the command to execute Map-Reduce administrative commands.\n" +
     "The full syntax is: \n\n" +
-    "hadoop mradmin [-refreshServiceAcl] [-refreshQueueAcls] [-help [cmd]] "
+    "hadoop mradmin [-refreshServiceAcl] [-refreshQueues] [-help [cmd]] "
     + "[-refreshNodes]\n"; 
 
   String refreshServiceAcl = "-refreshServiceAcl: Reload the service-level authorization policy file\n" +
     "\t\tJobtracker will reload the authorization policy file.\n";
 
-  String refreshQueueAcls =
-        "-refreshQueueAcls: Reload the queue acls\n"
-            + "\t\tJobTracker will reload the mapred-queue-acls.xml file.\n";
+  String refreshQueues =
+        "-refreshQueues: Reload the queue acls and state.\n"
+            + "\t\tJobTracker will reload the mapred-queues.xml file.\n";
 
   String refreshNodes =
     "-refreshNodes: Refresh the hosts information at the jobtracker.\n";
@@ -72,8 +71,8 @@ public class MRAdmin extends Configured implements Tool {
 
   if ("refreshServiceAcl".equals(cmd)) {
     System.out.println(refreshServiceAcl);
-  } else if ("refreshQueueAcls".equals(cmd)) {
-    System.out.println(refreshQueueAcls);
+  } else if ("refreshQueues".equals(cmd)) {
+    System.out.println(refreshQueues);
   }  else if ("refreshNodes".equals(cmd)) {
     System.out.println(refreshNodes);
   } else if ("help".equals(cmd)) {
@@ -81,7 +80,7 @@ public class MRAdmin extends Configured implements Tool {
   } else {
     System.out.println(summary);
     System.out.println(refreshServiceAcl);
-    System.out.println(refreshQueueAcls);
+    System.out.println(refreshQueues);
     System.out.println(refreshNodes);
     System.out.println(help);
     System.out.println();
@@ -97,14 +96,14 @@ public class MRAdmin extends Configured implements Tool {
   private static void printUsage(String cmd) {
     if ("-refreshServiceAcl".equals(cmd)) {
       System.err.println("Usage: java MRAdmin" + " [-refreshServiceAcl]");
-    } else if ("-refreshQueueAcls".equals(cmd)) {
-      System.err.println("Usage: java MRAdmin" + " [-refreshQueueAcls]");
+    } else if ("-refreshQueues".equals(cmd)) {
+      System.err.println("Usage: java MRAdmin" + " [-refreshQueues]");
     } else if ("-refreshNodes".equals(cmd)) {
       System.err.println("Usage: java MRAdmin" + " [-refreshNodes]");
     } else {
       System.err.println("Usage: java MRAdmin");
       System.err.println("           [-refreshServiceAcl]");
-      System.err.println("           [-refreshQueueAcls]");
+      System.err.println("           [-refreshQueues]");
       System.err.println("           [-refreshNodes]");
       System.err.println("           [-help [cmd]]");
       System.err.println();
@@ -143,7 +142,7 @@ public class MRAdmin extends Configured implements Tool {
     return 0;
   }
 
-  private int refreshQueueAcls() throws IOException {
+  private int refreshQueues() throws IOException {
     // Get the current configuration
     Configuration conf = getConf();
     
@@ -157,7 +156,7 @@ public class MRAdmin extends Configured implements Tool {
                                              AdminOperationsProtocol.class));
     
     // Refresh the queue properties
-    adminOperationsProtocol.refreshQueueAcls();
+    adminOperationsProtocol.refreshQueues();
     
     return 0;
   }
@@ -201,7 +200,7 @@ public class MRAdmin extends Configured implements Tool {
     //
     // verify that we have enough command line parameters
     //
-    if ("-refreshServiceAcl".equals(cmd) || "-refreshQueueAcls".equals(cmd)
+    if ("-refreshServiceAcl".equals(cmd) || "-refreshQueues".equals(cmd)
         || "-refreshNodes".equals(cmd)) {
       if (args.length != 1) {
         printUsage(cmd);
@@ -213,8 +212,8 @@ public class MRAdmin extends Configured implements Tool {
     try {
       if ("-refreshServiceAcl".equals(cmd)) {
         exitCode = refreshAuthorizationPolicy();
-      } else if ("-refreshQueueAcls".equals(cmd)) {
-        exitCode = refreshQueueAcls();
+      } else if ("-refreshQueues".equals(cmd)) {
+        exitCode = refreshQueues();
       } else if ("-refreshNodes".equals(cmd)) {
         exitCode = refreshNodes();
       } else if ("-help".equals(cmd)) {

+ 2 - 0
src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java

@@ -108,6 +108,8 @@ public class TestJobQueueInformation extends TestCase {
     assertNotNull(queueInfos);
     assertEquals(1, queueInfos.length);
     assertEquals("default", queueInfos[0].getQueueName());
+    assertEquals(Queue.QueueState.RUNNING.getStateName(),
+                  queueInfos[0].getQueueState());
     JobConf conf = mrCluster.createJobConf();
     FileSystem fileSys = dfsCluster.getFileSystem();
     

+ 2 - 2
src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java

@@ -32,8 +32,8 @@ public class TestQueueAclsForCurrentUser extends TestCase {
   private QueueManager queueManager;
   private JobConf conf = null;
   UserGroupInformation currentUGI = null;
-  String submitAcl = QueueManager.QueueOperation.SUBMIT_JOB.getAclName();
-  String adminAcl  = QueueManager.QueueOperation.ADMINISTER_JOBS.getAclName();
+  String submitAcl = Queue.QueueOperation.SUBMIT_JOB.getAclName();
+  String adminAcl  = Queue.QueueOperation.ADMINISTER_JOBS.getAclName();
 
   private void setupConfForNoAccess() throws IOException,LoginException {
     currentUGI = UnixUserGroupInformation.login();

+ 107 - 20
src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java

@@ -222,7 +222,7 @@ public class TestQueueManager extends TestCase {
     String queueConfigPath =
         System.getProperty("test.build.extraconf", "build/test/extraconf");
     File queueConfigFile =
-        new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME);
+        new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME);
     File hadoopConfigFile = new File(queueConfigPath, "mapred-site.xml");
     try {
       //Setting up default mapred-site.xml
@@ -248,13 +248,13 @@ public class TestQueueManager extends TestCase {
       UserGroupInformation ugi = UnixUserGroupInformation.getCurrentUGI();
       //Job Submission should fail because ugi to be used is set to blank.
       assertFalse("User Job Submission Succeeded before refresh.",
-          queueManager.hasAccess("default", QueueManager.QueueOperation.
+          queueManager.hasAccess("default", Queue.QueueOperation.
               SUBMIT_JOB, ugi));
       assertFalse("User Job Submission Succeeded before refresh.",
-          queueManager.hasAccess("q1", QueueManager.QueueOperation.
+          queueManager.hasAccess("q1", Queue.QueueOperation.
               SUBMIT_JOB, ugi));
       assertFalse("User Job Submission Succeeded before refresh.",
-          queueManager.hasAccess("q2", QueueManager.QueueOperation.
+          queueManager.hasAccess("q2", Queue.QueueOperation.
               SUBMIT_JOB, ugi));
       
       //Test job submission as alternate user.
@@ -263,7 +263,7 @@ public class TestQueueManager extends TestCase {
       UserGroupInformation alternateUgi = 
         UserGroupInformation.readFrom(alternateUserConfig);
       assertTrue("Alternate User Job Submission failed before refresh.",
-          queueManager.hasAccess("q2", QueueManager.QueueOperation.
+          queueManager.hasAccess("q2", Queue.QueueOperation.
               SUBMIT_JOB, alternateUgi));
       
       //Set acl for the current user.
@@ -273,19 +273,19 @@ public class TestQueueManager extends TestCase {
       //write out queue-acls.xml.
       UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
       //refresh configuration
-      queueManager.refreshAcls(conf);
+      queueManager.refreshQueues(conf);
       //Submission should succeed
       assertTrue("User Job Submission failed after refresh.",
-          queueManager.hasAccess("default", QueueManager.QueueOperation.
+          queueManager.hasAccess("default", Queue.QueueOperation.
               SUBMIT_JOB, ugi));
       assertTrue("User Job Submission failed after refresh.",
-          queueManager.hasAccess("q1", QueueManager.QueueOperation.
+          queueManager.hasAccess("q1", Queue.QueueOperation.
               SUBMIT_JOB, ugi));
       assertTrue("User Job Submission failed after refresh.",
-          queueManager.hasAccess("q2", QueueManager.QueueOperation.
+          queueManager.hasAccess("q2", Queue.QueueOperation.
               SUBMIT_JOB, ugi));
       assertFalse("Alternate User Job Submission succeeded after refresh.",
-          queueManager.hasAccess("q2", QueueManager.QueueOperation.
+          queueManager.hasAccess("q2", Queue.QueueOperation.
               SUBMIT_JOB, alternateUgi));
       //delete the ACL file.
       queueConfigFile.delete();
@@ -294,9 +294,9 @@ public class TestQueueManager extends TestCase {
       hadoopConfProps.put("mapred.acls.enabled", "true");
       hadoopConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
       UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
-      queueManager.refreshAcls(conf);
+      queueManager.refreshQueues(conf);
       assertTrue("User Job Submission failed after refresh and no queue acls file.",
-          queueManager.hasAccess("default", QueueManager.QueueOperation.
+          queueManager.hasAccess("default", Queue.QueueOperation.
               SUBMIT_JOB, ugi));
     } finally{
       if(queueConfigFile.exists()) {
@@ -308,11 +308,98 @@ public class TestQueueManager extends TestCase {
     }
   }
 
+
+
+  /**
+   * Test to verify refreshing of queue properties by using MRAdmin tool.
+   *
+   * @throws Exception
+   */
+  public void testStateRefresh() throws Exception {
+    String queueConfigPath =
+        System.getProperty("test.build.extraconf", "build/test/extraconf");
+    File queueConfigFile =
+        new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME);
+    try {
+      //Setting up default mapred-site.xml
+      Properties queueConfProps = new Properties();
+      //these properties should be retained.
+      queueConfProps.put("mapred.queue.names", "default,qu1");
+      queueConfProps.put("mapred.acls.enabled", "true");
+      //These property should always be overridden
+      queueConfProps.put("mapred.queue.default.state", "running");
+      queueConfProps.put("mapred.queue.qu1.state", "stopped");
+      UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
+
+      //Create a new configuration to be used with QueueManager
+      JobConf conf = new JobConf();
+      setUpCluster(conf);
+      QueueManager queueManager = 
+        this.miniMRCluster.getJobTrackerRunner().getJobTracker().getQueueManager();
+
+      try{
+        Job job = submitSleepJob(10, 2, 10, 10, true,null, "default" );
+        assert(job.isSuccessful());
+      }catch(Exception e){
+        fail("submit job in default queue should be sucessful ");
+      }
+
+      try{
+        submitSleepJob(10, 2, 10, 10, true,null, "qu1" );
+        fail("submit job in default queue should be failed ");
+      }catch(Exception e){
+        assert(e.getMessage().contains("Queue \"" + "qu1" + "\" is not running"));
+      }
+
+      // verify state of queues before refresh
+      JobQueueInfo queueInfo = queueManager.getJobQueueInfo("default");
+      assertEquals(Queue.QueueState.RUNNING.getStateName(), 
+                    queueInfo.getQueueState());
+      queueInfo = queueManager.getJobQueueInfo("qu1");
+      assertEquals(Queue.QueueState.STOPPED.getStateName(),
+                    queueInfo.getQueueState());
+
+      queueConfProps.put("mapred.queue.default.state", "stopped");
+      queueConfProps.put("mapred.queue.qu1.state", "running");
+      UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
+
+      //refresh configuration
+      queueManager.refreshQueues(conf);
+
+      //Job Submission should pass now because ugi to be used is set to blank.
+      try{
+        submitSleepJob(10, 2, 10, 10, true,null,"qu1");
+      }catch(Exception e){
+        fail("submit job in qu1 queue should be sucessful ");
+      }
+
+      try{
+        submitSleepJob(10, 2, 10, 10, true,null, "default" );
+        fail("submit job in default queue should be failed ");
+      }catch(Exception e){
+        assert(e.getMessage().contains("Queue \"" + "default" + "\" is not running"));
+      }
+      
+      // verify state of queues after refresh
+      queueInfo = queueManager.getJobQueueInfo("default");
+      assertEquals(Queue.QueueState.STOPPED.getStateName(), 
+                    queueInfo.getQueueState());
+      queueInfo = queueManager.getJobQueueInfo("qu1");
+      assertEquals(Queue.QueueState.RUNNING.getStateName(),
+                    queueInfo.getQueueState());
+    } finally{
+      if(queueConfigFile.exists()) {
+        queueConfigFile.delete();
+      }
+      this.tearDownCluster();
+    }
+  }
+
   public void testQueueAclRefreshWithInvalidConfFile() throws IOException {
     String queueConfigPath =
       System.getProperty("test.build.extraconf", "build/test/extraconf");
     File queueConfigFile =
-      new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME);
+      new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME);
     File hadoopConfigFile = new File(queueConfigPath, "hadoop-site.xml");
     try {
       // queue properties with which the cluster is started.
@@ -333,13 +420,13 @@ public class TestQueueManager extends TestCase {
       QueueManager queueManager = new QueueManager(conf);
       //Testing access to queue.
       assertTrue("User Job Submission failed.",
-          queueManager.hasAccess("default", QueueManager.QueueOperation.
+          queueManager.hasAccess("default", Queue.QueueOperation.
               SUBMIT_JOB, ugi));
       assertTrue("User Job Submission failed.",
-          queueManager.hasAccess("q1", QueueManager.QueueOperation.
+          queueManager.hasAccess("q1", Queue.QueueOperation.
               SUBMIT_JOB, ugi));
       assertTrue("User Job Submission failed.",
-          queueManager.hasAccess("q2", QueueManager.QueueOperation.
+          queueManager.hasAccess("q2", Queue.QueueOperation.
               SUBMIT_JOB, ugi));
       
       //Write out a new incomplete invalid configuration file.
@@ -351,18 +438,18 @@ public class TestQueueManager extends TestCase {
       try {
         //Exception to be thrown by queue manager because configuration passed
         //is invalid.
-        queueManager.refreshAcls(conf);
+        queueManager.refreshQueues(conf);
         fail("Refresh of ACLs should have failed with invalid conf file.");
       } catch (Exception e) {
       }
       assertTrue("User Job Submission failed after invalid conf file refresh.",
-          queueManager.hasAccess("default", QueueManager.QueueOperation.
+          queueManager.hasAccess("default", Queue.QueueOperation.
               SUBMIT_JOB, ugi));
       assertTrue("User Job Submission failed after invalid conf file refresh.",
-          queueManager.hasAccess("q1", QueueManager.QueueOperation.
+          queueManager.hasAccess("q1", Queue.QueueOperation.
               SUBMIT_JOB, ugi));
       assertTrue("User Job Submission failed after invalid conf file refresh.",
-          queueManager.hasAccess("q2", QueueManager.QueueOperation.
+          queueManager.hasAccess("q2", Queue.QueueOperation.
               SUBMIT_JOB, ugi));
     } finally {
       //Cleanup the configuration files in all cases

+ 1 - 0
src/webapps/job/jobqueue_details.jsp

@@ -56,6 +56,7 @@ private static final long serialVersionUID = 526456771152222127L;
   <a href="jobtracker.jsp"><%=trackerName%></a>
 </h1>
 <div>
+State : <%= schedInfo.getQueueState() %> <br/>
 Scheduling Information : <%= schedulingInfoString.replaceAll("\n","<br/>") %>
 </div>
 <hr/>

+ 3 - 0
src/webapps/job/jobtracker.jsp

@@ -124,6 +124,7 @@
 <thead style="font-weight: bold">
 <tr>
 <td> Queue Name </td>
+<td> State </td>
 <td> Scheduling Information</td>
 </tr>
 </thead>
@@ -131,6 +132,7 @@
 <%
 for(JobQueueInfo queue: queues) {
   String queueName = queue.getQueueName();
+  String state = queue.getQueueState();
   String schedulingInformation = queue.getSchedulingInfo();
   if(schedulingInformation == null || schedulingInformation.trim().equals("")) {
     schedulingInformation = "NA";
@@ -138,6 +140,7 @@ for(JobQueueInfo queue: queues) {
 %>
 <tr>
 <td><a href="jobqueue_details.jsp?queueName=<%=queueName%>"><%=queueName%></a></td>
+<td><%=state%></td>
 <td><%=schedulingInformation.replaceAll("\n","<br/>") %>
 </td>
 </tr>