Przeglądaj źródła

HADOOP-5396. Provide ability to refresh queue ACLs in the JobTracker without having to restart the daemon. Contributed by Sreekanth Ramakrishnan and Vinod Kumar Vavilapalli.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@765706 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala 16 lat temu
rodzic
commit
681c0a0a32

+ 4 - 0
CHANGES.txt

@@ -233,6 +233,10 @@ Trunk (unreleased changes)
     rather than buffering at least one record for each segment. (Devaraj Das
     rather than buffering at least one record for each segment. (Devaraj Das
     via cdouglas)
     via cdouglas)
 
 
+    HADOOP-5396. Provide ability to refresh queue ACLs in the JobTracker
+    without having to restart the daemon.
+    (Sreekanth Ramakrishnan and Vinod Kumar Vavilapalli via yhemanth)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a
     HADOOP-5595. NameNode does not need to run a replicator to choose a

+ 31 - 0
conf/mapred-queue-acls.xml.template

@@ -0,0 +1,31 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- This is a template file for queue acls configuration properties -->
+
+<configuration>
+
+<property>
+  <name>mapred.queue.default.acl-submit-job</name>
+  <value>*</value>
+  <description> Comma separated list of user and group names that are allowed
+    to submit jobs to the 'default' queue. The user list and the group list
+    are separated by a blank. For e.g. alice,bob group1,group2. 
+    If set to the special value '*', it means all users are allowed to 
+    submit jobs. 
+  </description>
+</property>
+
+<property>
+  <name>mapred.queue.default.acl-administer-jobs</name>
+  <value>*</value>
+  <description> Comma separated list of user and group names that are allowed
+    to delete jobs or modify job's priority for jobs not owned by the current
+    user in the 'default' queue. The user list and the group list
+    are separated by a blank. For e.g. alice,bob group1,group2. 
+    If set to the special value '*', it means all users are allowed to do 
+    this operation.
+  </description>
+</property>
+
+</configuration>

+ 12 - 1
src/docs/src/documentation/content/xdocs/cluster_setup.xml

@@ -303,6 +303,16 @@
             <em>mapred.queue.queue-name.acl-name</em>, defined below.
             <em>mapred.queue.queue-name.acl-name</em>, defined below.
           </td>
           </td>
         </tr>
         </tr>
+		  </table>
+      
+      <p><br/><code> conf/mapred-queue-acls.xml</code></p>
+      
+      <table>
+       <tr>
+          <th>Parameter</th>
+          <th>Value</th> 
+          <th>Notes</th>
+       </tr>
         <tr>
         <tr>
           <td>mapred.queue.<em>queue-name</em>.acl-submit-job</td>
           <td>mapred.queue.<em>queue-name</em>.acl-submit-job</td>
           <td>List of users and groups that can submit jobs to the
           <td>List of users and groups that can submit jobs to the
@@ -330,7 +340,8 @@
             his/her own job, irrespective of the ACLs.
             his/her own job, irrespective of the ACLs.
           </td>
           </td>
         </tr>
         </tr>
-		  </table>
+      </table>
+      
 
 
           <p>Typically all the above parameters are marked as 
           <p>Typically all the above parameters are marked as 
           <a href="ext:api/org/apache/hadoop/conf/configuration/final_parameters">
           <a href="ext:api/org/apache/hadoop/conf/configuration/final_parameters">

+ 18 - 1
src/docs/src/documentation/content/xdocs/commands_manual.xml

@@ -566,7 +566,24 @@
 			           </tr>
 			           </tr>
 			     </table>
 			     </table>
 			</section>
 			</section>
-			
+			<section>
+        <title>mradmin</title>
+        <p>Runs MR admin client</p>
+        <p><code>Usage: hadoop mradmin  [</code>
+        <a href="commands_manual.html#Generic+Options">GENERIC_OPTIONS</a>
+        <code>] [-refreshQueueAcls] </code></p>
+        <table>
+        <tr>
+        <th> COMMAND_OPTION </th><th> Description </th>
+        </tr>
+        <tr>
+        <td><code>-refreshQueueAcls</code></td>
+        <td> Refresh the queue acls used by hadoop, to check access during submissions
+        and administration of the job by the user. The properties present in
+        <code>mapred-queue-acls.xml</code> is reloaded by the queue manager.</td>
+        </tr>
+        </table>
+      </section>
 			<section>
 			<section>
 				<title> jobtracker </title>
 				<title> jobtracker </title>
 				<p>
 				<p>

+ 0 - 23
src/mapred/mapred-default.xml

@@ -935,29 +935,6 @@
   </description>
   </description>
 </property>
 </property>
 
 
-<property>
-  <name>mapred.queue.default.acl-submit-job</name>
-  <value>*</value>
-  <description> Comma separated list of user and group names that are allowed
-    to submit jobs to the 'default' queue. The user list and the group list
-    are separated by a blank. For e.g. alice,bob group1,group2. 
-    If set to the special value '*', it means all users are allowed to 
-    submit jobs. 
-  </description>
-</property>
-
-<property>
-  <name>mapred.queue.default.acl-administer-jobs</name>
-  <value>*</value>
-  <description> Comma separated list of user and group names that are allowed
-    to delete jobs or modify job's priority for jobs not owned by the current
-    user in the 'default' queue. The user list and the group list
-    are separated by a blank. For e.g. alice,bob group1,group2. 
-    If set to the special value '*', it means all users are allowed to do 
-    this operation.
-  </description>
-</property>
-
 <property>
 <property>
   <name>mapred.job.queue.name</name>
   <name>mapred.job.queue.name</name>
   <value>default</value>
   <value>default</value>

+ 40 - 0
src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java

@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/**
+ * Protocol for admin operations. This is a framework-public interface and is
+ * NOT_TO_BE_USED_BY_USERS_DIRECTLY.
+ */
+public interface AdminOperationsProtocol extends VersionedProtocol {
+  
+  /**
+   * Version 1: Initial version. Added refreshQueueAcls.
+   */
+  public static final long versionID = 1L;
+
+  /**
+   * Refresh the queue acls in use currently.
+   */
+  void refreshQueueAcls() throws IOException;
+}

+ 16 - 3
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -86,7 +86,8 @@ import org.apache.hadoop.util.VersionInfo;
  *
  *
  *******************************************************/
  *******************************************************/
 public class JobTracker implements MRConstants, InterTrackerProtocol,
 public class JobTracker implements MRConstants, InterTrackerProtocol,
-    JobSubmissionProtocol, TaskTrackerManager, RefreshAuthorizationPolicyProtocol {
+    JobSubmissionProtocol, TaskTrackerManager,
+    RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol {
 
 
   static{
   static{
     Configuration.addDefaultResource("mapred-default.xml");
     Configuration.addDefaultResource("mapred-default.xml");
@@ -96,6 +97,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   static long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000;
   static long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000;
   static long RETIRE_JOB_INTERVAL;
   static long RETIRE_JOB_INTERVAL;
   static long RETIRE_JOB_CHECK_INTERVAL;
   static long RETIRE_JOB_CHECK_INTERVAL;
+
+  
   // The interval after which one fault of a tracker will be discarded,
   // The interval after which one fault of a tracker will be discarded,
   // if there are no faults during this. 
   // if there are no faults during this. 
   private static long UPDATE_FAULTY_TRACKER_INTERVAL = 24 * 60 * 60 * 1000;
   private static long UPDATE_FAULTY_TRACKER_INTERVAL = 24 * 60 * 60 * 1000;
@@ -205,6 +208,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       return JobSubmissionProtocol.versionID;
       return JobSubmissionProtocol.versionID;
     } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
     } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
       return RefreshAuthorizationPolicyProtocol.versionID;
       return RefreshAuthorizationPolicyProtocol.versionID;
+    } else if (protocol.equals(AdminOperationsProtocol.class.getName())){
+      return AdminOperationsProtocol.versionID;
     } else {
     } else {
       throw new IOException("Unknown protocol to job tracker: " + protocol);
       throw new IOException("Unknown protocol to job tracker: " + protocol);
     }
     }
@@ -1508,8 +1513,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     // Read the hosts/exclude files to restrict access to the jobtracker.
     // Read the hosts/exclude files to restrict access to the jobtracker.
     this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
     this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
                                            conf.get("mapred.hosts.exclude", ""));
                                            conf.get("mapred.hosts.exclude", ""));
-    
-    queueManager = new QueueManager(this.conf);
+
+    Configuration queuesConf = new Configuration(this.conf);
+    queueManager = new QueueManager(queuesConf);
     
     
     // Create the scheduler
     // Create the scheduler
     Class<? extends TaskScheduler> schedulerClass
     Class<? extends TaskScheduler> schedulerClass
@@ -3585,4 +3591,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     }
     }
     SecurityUtil.getPolicy().refresh();
     SecurityUtil.getPolicy().refresh();
   }
   }
+
+  @Override
+  public void refreshQueueAcls() throws IOException{
+    LOG.info("Refreshing queue acls. requested by : " + 
+        UserGroupInformation.getCurrentUGI().getUserName());
+    this.queueManager.refreshAcls(new Configuration(this.conf));
+  }
 }
 }

+ 55 - 22
src/mapred/org/apache/hadoop/mapred/QueueManager.java

@@ -18,6 +18,7 @@
 
 
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.Set;
 import java.util.Set;
@@ -25,10 +26,10 @@ import java.util.TreeSet;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.SecurityUtil.AccessControlList;
 import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+import org.apache.hadoop.util.StringUtils;
 
 
 /**
 /**
  * Class that exposes information about queues maintained by the Hadoop
  * Class that exposes information about queues maintained by the Hadoop
@@ -62,6 +63,9 @@ class QueueManager {
   // Whether ACLs are enabled in the system or not.
   // Whether ACLs are enabled in the system or not.
   private boolean aclsEnabled;
   private boolean aclsEnabled;
   
   
+  //Resource in which queue acls are configured.
+  static final String QUEUE_ACLS_FILE_NAME = "mapred-queue-acls.xml";
+  
   /**
   /**
    * Enum representing an operation that can be performed on a queue.
    * Enum representing an operation that can be performed on a queue.
    */
    */
@@ -228,36 +232,65 @@ class QueueManager {
   }
   }
   
   
   /**
   /**
-   * Refresh information configured for queues in the system by reading
-   * it from the passed in {@link org.apache.hadoop.conf.Configuration}.
-   *
-   * Previously stored information about queues is removed and new
-   * information populated from the configuration.
+   * Refresh the acls for the configured queues in the system by reading
+   * it from mapred-queue-acls.xml.
    * 
    * 
-   * @param conf New configuration for the queues. 
+   * The previous acls are removed. Previously configured queues and
+   * if or not acl is disabled is retained.
+   * 
+   * @throws IOException when queue ACL configuration file is invalid.
    */
    */
-  public synchronized void refresh(Configuration conf) {
-    queueNames.clear();
-    aclsMap.clear();
-    schedulerInfoObjects.clear();
-    initialize(conf);
+  synchronized void refreshAcls(Configuration conf) throws IOException {
+    try {
+      HashMap<String, AccessControlList> newAclsMap = 
+        getQueueAcls(conf);
+      aclsMap = newAclsMap;
+    } catch (Throwable t) {
+      String exceptionString = StringUtils.stringifyException(t);
+      LOG.warn("Queue ACLs could not be refreshed because there was an " +
+      		"exception in parsing the configuration: "+ exceptionString +
+      		". Existing ACLs are retained.");
+      throw new IOException(exceptionString);
+    }
+
   }
   }
   
   
-  private void initialize(Configuration conf) {
-    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
-    String[] queues = conf.getStrings("mapred.queue.names", 
-                                  new String[] {JobConf.DEFAULT_QUEUE_NAME});
-    addToSet(queueNames, queues);
-    
-    // for every queue, and every operation, get the ACL
-    // if any is specified and store in aclsMap.
-    for (String queue : queues) {
+  private void checkDeprecation(Configuration conf) {
+    for(String queue: queueNames) {
+      for (QueueOperation oper : QueueOperation.values()) {
+        String key = toFullPropertyName(queue, oper.getAclName());
+        String aclString = conf.get(key);
+        if(aclString != null) {
+          LOG.warn("Configuring queue ACLs in mapred-site.xml or " +
+          		"hadoop-site.xml is deprecated. Configure queue ACLs in " + 
+          		QUEUE_ACLS_FILE_NAME);
+          return;
+        }
+      }
+    }
+  }
+  
+  private HashMap<String, AccessControlList> getQueueAcls(Configuration conf)  {
+    checkDeprecation(conf);
+    conf.addResource(QUEUE_ACLS_FILE_NAME);
+    HashMap<String, AccessControlList> aclsMap = 
+      new HashMap<String, AccessControlList>();
+    for (String queue : queueNames) {
       for (QueueOperation oper : QueueOperation.values()) {
       for (QueueOperation oper : QueueOperation.values()) {
         String key = toFullPropertyName(queue, oper.getAclName());
         String key = toFullPropertyName(queue, oper.getAclName());
         String aclString = conf.get(key, "*");
         String aclString = conf.get(key, "*");
         aclsMap.put(key, new AccessControlList(aclString));
         aclsMap.put(key, new AccessControlList(aclString));
       }
       }
-    }
+    } 
+    return aclsMap;
+  }
+  
+  private void initialize(Configuration conf) {
+    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
+    String[] queues = conf.getStrings("mapred.queue.names", 
+        new String[] {JobConf.DEFAULT_QUEUE_NAME});
+    addToSet(queueNames, queues);
+    aclsMap = getQueueAcls(conf);
   }
   }
   
   
   private static final String toFullPropertyName(String queue, 
   private static final String toFullPropertyName(String queue, 

+ 39 - 9
src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.AdminOperationsProtocol;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
@@ -34,9 +35,10 @@ import org.apache.hadoop.util.ToolRunner;
 
 
 /**
 /**
  * Administrative access to Hadoop Map-Reduce.
  * Administrative access to Hadoop Map-Reduce.
- *
+ * 
  * Currently it only provides the ability to connect to the {@link JobTracker}
  * Currently it only provides the ability to connect to the {@link JobTracker}
- * and refresh the service-level authorization policy.
+ * and 1) refresh the service-level authorization policy, 2) refresh queue acl
+ * properties.
  */
  */
 public class MRAdmin extends Configured implements Tool {
 public class MRAdmin extends Configured implements Tool {
 
 
@@ -51,21 +53,28 @@ public class MRAdmin extends Configured implements Tool {
   private static void printHelp(String cmd) {
   private static void printHelp(String cmd) {
     String summary = "hadoop mradmin is the command to execute Map-Reduce administrative commands.\n" +
     String summary = "hadoop mradmin is the command to execute Map-Reduce administrative commands.\n" +
     "The full syntax is: \n\n" +
     "The full syntax is: \n\n" +
-    "hadoop mradmin [-refreshServiceAcl] [-help [cmd]]\n"; 
+    "hadoop mradmin [-refreshServiceAcl] [-refreshQueueAcls] [-help [cmd]]\n"; 
 
 
   String refreshServiceAcl = "-refreshServiceAcl: Reload the service-level authorization policy file\n" +
   String refreshServiceAcl = "-refreshServiceAcl: Reload the service-level authorization policy file\n" +
     "\t\tJobtracker will reload the authorization policy file.\n";
     "\t\tJobtracker will reload the authorization policy file.\n";
-  
+
+  String refreshQueueAcls =
+        "-refreshQueueAcls: Reload the queue acls\n"
+            + "\t\tJobTracker will reload the mapred-queue-acls.xml file.\n";
+
   String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
   String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
     "\t\tis specified.\n";
     "\t\tis specified.\n";
 
 
   if ("refreshServiceAcl".equals(cmd)) {
   if ("refreshServiceAcl".equals(cmd)) {
     System.out.println(refreshServiceAcl);
     System.out.println(refreshServiceAcl);
+  } else if ("refreshQueueAcls".equals(cmd)) {
+    System.out.println(refreshQueueAcls);
   } else if ("help".equals(cmd)) {
   } else if ("help".equals(cmd)) {
     System.out.println(help);
     System.out.println(help);
   } else {
   } else {
     System.out.println(summary);
     System.out.println(summary);
     System.out.println(refreshServiceAcl);
     System.out.println(refreshServiceAcl);
+    System.out.println(refreshQueueAcls);
     System.out.println(help);
     System.out.println(help);
     System.out.println();
     System.out.println();
     ToolRunner.printGenericCommandUsage(System.out);
     ToolRunner.printGenericCommandUsage(System.out);
@@ -79,11 +88,13 @@ public class MRAdmin extends Configured implements Tool {
    */
    */
   private static void printUsage(String cmd) {
   private static void printUsage(String cmd) {
     if ("-refreshServiceAcl".equals(cmd)) {
     if ("-refreshServiceAcl".equals(cmd)) {
-      System.err.println("Usage: java MRAdmin"
-                         + " [-refreshServiceAcl]");
+      System.err.println("Usage: java MRAdmin" + " [-refreshServiceAcl]");
+    } else if ("-refreshQueueAcls".equals(cmd)) {
+      System.err.println("Usage: java MRAdmin" + " [-refreshQueueAcls]");
     } else {
     } else {
       System.err.println("Usage: java MRAdmin");
       System.err.println("Usage: java MRAdmin");
       System.err.println("           [-refreshServiceAcl]");
       System.err.println("           [-refreshServiceAcl]");
+      System.err.println("           [-refreshQueueAcls]");
       System.err.println("           [-help [cmd]]");
       System.err.println("           [-help [cmd]]");
       System.err.println();
       System.err.println();
       ToolRunner.printGenericCommandUsage(System.err);
       ToolRunner.printGenericCommandUsage(System.err);
@@ -120,7 +131,25 @@ public class MRAdmin extends Configured implements Tool {
     
     
     return 0;
     return 0;
   }
   }
-  
+
+  private int refreshQueueAcls() throws IOException {
+    // Get the current configuration
+    Configuration conf = getConf();
+    
+    // Create the client
+    AdminOperationsProtocol adminOperationsProtocol = 
+      (AdminOperationsProtocol) 
+      RPC.getProxy(AdminOperationsProtocol.class, 
+                   AdminOperationsProtocol.versionID, 
+                   JobTracker.getAddress(conf), getUGI(conf), conf,
+                   NetUtils.getSocketFactory(conf, 
+                                             AdminOperationsProtocol.class));
+    
+    // Refresh the queue properties
+    adminOperationsProtocol.refreshQueueAcls();
+    
+    return 0;
+  }
 
 
   @Override
   @Override
   public int run(String[] args) throws Exception {
   public int run(String[] args) throws Exception {
@@ -136,7 +165,7 @@ public class MRAdmin extends Configured implements Tool {
     //
     //
     // verify that we have enough command line parameters
     // verify that we have enough command line parameters
     //
     //
-    if ("-refreshServiceAcl".equals(cmd)) {
+    if ("-refreshServiceAcl".equals(cmd) || "-refreshQueueAcls".equals(cmd)) {
       if (args.length != 1) {
       if (args.length != 1) {
         printUsage(cmd);
         printUsage(cmd);
         return exitCode;
         return exitCode;
@@ -147,6 +176,8 @@ public class MRAdmin extends Configured implements Tool {
     try {
     try {
       if ("-refreshServiceAcl".equals(cmd)) {
       if ("-refreshServiceAcl".equals(cmd)) {
         exitCode = refreshAuthorizationPolicy();
         exitCode = refreshAuthorizationPolicy();
+      } else if ("-refreshQueueAcls".equals(cmd)) {
+        exitCode = refreshQueueAcls();
       } else if ("-help".equals(cmd)) {
       } else if ("-help".equals(cmd)) {
         if (i < args.length) {
         if (i < args.length) {
           printUsage(args[i]);
           printUsage(args[i]);
@@ -189,5 +220,4 @@ public class MRAdmin extends Configured implements Tool {
     int result = ToolRunner.run(new MRAdmin(), args);
     int result = ToolRunner.run(new MRAdmin(), args);
     System.exit(result);
     System.exit(result);
   }
   }
-
 }
 }

+ 186 - 9
src/test/org/apache/hadoop/mapred/TestQueueManager.java

@@ -18,7 +18,11 @@
 
 
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Properties;
 import java.util.Set;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.TreeSet;
 
 
@@ -28,13 +32,13 @@ import junit.framework.TestCase;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.examples.SleepJob;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
 
 
 public class TestQueueManager extends TestCase {
 public class TestQueueManager extends TestCase {
 
 
@@ -195,6 +199,169 @@ public class TestQueueManager extends TestCase {
     verifyJobPriorityChangeAsOtherUser(conf, false, 
     verifyJobPriorityChangeAsOtherUser(conf, false, 
                               "junk-user,junk-user-group");
                               "junk-user,junk-user-group");
   }
   }
+
+  /**
+   * Test to verify refreshing of queue properties by using MRAdmin tool.
+   * 
+   * @throws Exception
+   */
+  public void testACLRefresh() throws Exception {
+    String queueConfigPath =
+        System.getProperty("test.build.extraconf", "build/test/extraconf");
+    File queueConfigFile =
+        new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME);
+    File hadoopConfigFile = new File(queueConfigPath, "mapred-site.xml");
+    try {
+      //Setting up default mapred-site.xml
+      Properties hadoopConfProps = new Properties();
+      //these properties should be retained.
+      hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
+      hadoopConfProps.put("mapred.acls.enabled", "true");
+      //These property should always be overridden
+      hadoopConfProps.put("mapred.queue.default.acl-submit-job", "u1");
+      hadoopConfProps.put("mapred.queue.q1.acl-submit-job", "u2");
+      hadoopConfProps.put("mapred.queue.q2.acl-submit-job", "u1");
+      UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
+      
+      //Actual property which would be used.
+      Properties queueConfProps = new Properties();
+      queueConfProps.put("mapred.queue.default.acl-submit-job", " ");
+      //Writing out the queue configuration file.
+      UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
+      
+      //Create a new configuration to be used with QueueManager
+      JobConf conf = new JobConf();
+      QueueManager queueManager = new QueueManager(conf);
+      UserGroupInformation ugi = UnixUserGroupInformation.getCurrentUGI();
+      //Job Submission should fail because ugi to be used is set to blank.
+      assertFalse("User Job Submission Succeeded before refresh.",
+          queueManager.hasAccess("default", QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertFalse("User Job Submission Succeeded before refresh.",
+          queueManager.hasAccess("q1", QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertFalse("User Job Submission Succeeded before refresh.",
+          queueManager.hasAccess("q2", QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      
+      //Test job submission as alternate user.
+      Configuration alternateUserConfig = new Configuration();
+      alternateUserConfig.set("hadoop.job.ugi","u1,users");
+      UserGroupInformation alternateUgi = 
+        UserGroupInformation.readFrom(alternateUserConfig);
+      assertTrue("Alternate User Job Submission failed before refresh.",
+          queueManager.hasAccess("q2", QueueManager.QueueOperation.
+              SUBMIT_JOB, alternateUgi));
+      
+      //Set acl for the current user.
+      queueConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
+      queueConfProps.put("mapred.queue.q1.acl-submit-job", ugi.getUserName());
+      queueConfProps.put("mapred.queue.q2.acl-submit-job", ugi.getUserName());
+      //write out queue-acls.xml.
+      UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
+      //refresh configuration
+      queueManager.refreshAcls(conf);
+      //Submission should succeed
+      assertTrue("User Job Submission failed after refresh.",
+          queueManager.hasAccess("default", QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertTrue("User Job Submission failed after refresh.",
+          queueManager.hasAccess("q1", QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertTrue("User Job Submission failed after refresh.",
+          queueManager.hasAccess("q2", QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertFalse("Alternate User Job Submission succeeded after refresh.",
+          queueManager.hasAccess("q2", QueueManager.QueueOperation.
+              SUBMIT_JOB, alternateUgi));
+      //delete the ACL file.
+      queueConfigFile.delete();
+      
+      //rewrite the mapred-site.xml
+      hadoopConfProps.put("mapred.acls.enabled", "true");
+      hadoopConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
+      UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
+      queueManager.refreshAcls(conf);
+      assertTrue("User Job Submission failed after refresh and no queue acls file.",
+          queueManager.hasAccess("default", QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+    } finally{
+      if(queueConfigFile.exists()) {
+        queueConfigFile.delete();
+      }
+      if(hadoopConfigFile.exists()) {
+        hadoopConfigFile.delete();
+      }
+    }
+  }
+
+  public void testQueueAclRefreshWithInvalidConfFile() throws IOException {
+    String queueConfigPath =
+      System.getProperty("test.build.extraconf", "build/test/extraconf");
+    File queueConfigFile =
+      new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME);
+    File hadoopConfigFile = new File(queueConfigPath, "hadoop-site.xml");
+    try {
+      // queue properties with which the cluster is started.
+      Properties hadoopConfProps = new Properties();
+      hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
+      hadoopConfProps.put("mapred.acls.enabled", "true");
+      UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
+      
+      //properties for mapred-queue-acls.xml
+      Properties queueConfProps = new Properties();
+      UserGroupInformation ugi = UnixUserGroupInformation.getCurrentUGI();
+      queueConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
+      queueConfProps.put("mapred.queue.q1.acl-submit-job", ugi.getUserName());
+      queueConfProps.put("mapred.queue.q2.acl-submit-job", ugi.getUserName());
+      UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
+      
+      Configuration conf = new JobConf();
+      QueueManager queueManager = new QueueManager(conf);
+      //Testing access to queue.
+      assertTrue("User Job Submission failed.",
+          queueManager.hasAccess("default", QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertTrue("User Job Submission failed.",
+          queueManager.hasAccess("q1", QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertTrue("User Job Submission failed.",
+          queueManager.hasAccess("q2", QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      
+      //Write out a new incomplete invalid configuration file.
+      PrintWriter writer = new PrintWriter(new FileOutputStream(queueConfigFile));
+      writer.println("<configuration>");
+      writer.println("<property>");
+      writer.flush();
+      writer.close();
+      try {
+        //Exception to be thrown by queue manager because configuration passed
+        //is invalid.
+        queueManager.refreshAcls(conf);
+        fail("Refresh of ACLs should have failed with invalid conf file.");
+      } catch (Exception e) {
+      }
+      assertTrue("User Job Submission failed after invalid conf file refresh.",
+          queueManager.hasAccess("default", QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertTrue("User Job Submission failed after invalid conf file refresh.",
+          queueManager.hasAccess("q1", QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertTrue("User Job Submission failed after invalid conf file refresh.",
+          queueManager.hasAccess("q2", QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+    } finally {
+      //Cleanup the configuration files in all cases
+      if(hadoopConfigFile.exists()) {
+        hadoopConfigFile.delete();
+      }
+      if(queueConfigFile.exists()) {
+        queueConfigFile.delete();
+      }
+    }
+  }
+  
   
   
   private JobConf setupConf(String aclName, String aclValue) {
   private JobConf setupConf(String aclName, String aclValue) {
     JobConf conf = new JobConf();
     JobConf conf = new JobConf();
@@ -217,10 +384,20 @@ public class TestQueueManager extends TestCase {
   }
   }
 
 
   private void verifyJobSubmission(JobConf conf, boolean shouldSucceed, 
   private void verifyJobSubmission(JobConf conf, boolean shouldSucceed, 
-                                    String queue) throws IOException {
+      String queue) throws IOException {
     setUpCluster(conf);
     setUpCluster(conf);
     try {
     try {
-      RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, null, queue);
+      runAndVerifySubmission(conf, shouldSucceed, queue, null);
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  private void runAndVerifySubmission(JobConf conf, boolean shouldSucceed,
+      String queue, String userInfo)
+      throws IOException {
+    try {
+      RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, userInfo, queue);
       if (shouldSucceed) {
       if (shouldSucceed) {
         assertTrue(rjob.isSuccessful());
         assertTrue(rjob.isSuccessful());
       } else {
       } else {
@@ -411,14 +588,14 @@ public class TestQueueManager extends TestCase {
     if (queueName != null) {
     if (queueName != null) {
       clientConf.setQueueName(queueName);
       clientConf.setQueueName(queueName);
     }
     }
+    JobConf jc = new JobConf(clientConf);
+    if (userInfo != null) {
+      jc.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
+    }
     RunningJob rJob = null;
     RunningJob rJob = null;
     if (shouldComplete) {
     if (shouldComplete) {
-      rJob = JobClient.runJob(clientConf);  
+      rJob = JobClient.runJob(jc);  
     } else {
     } else {
-      JobConf jc = new JobConf(clientConf);
-      if (userInfo != null) {
-        jc.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
-      }
       rJob = new JobClient(clientConf).submitJob(jc);
       rJob = new JobClient(clientConf).submitJob(jc);
     }
     }
     return rJob;
     return rJob;

+ 22 - 1
src/test/org/apache/hadoop/mapred/UtilsForTests.java

@@ -21,12 +21,16 @@ package org.apache.hadoop.mapred;
 import java.io.File;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileInputStream;
 import java.io.DataOutputStream;
 import java.io.DataOutputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
 import java.text.DecimalFormat;
 import java.text.DecimalFormat;
 import java.util.Arrays;
 import java.util.Arrays;
+import java.util.Enumeration;
 import java.util.Iterator;
 import java.util.Iterator;
+import java.util.Properties;
 
 
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.RandomWriter;
 import org.apache.hadoop.examples.RandomWriter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
@@ -38,7 +42,6 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
@@ -46,12 +49,16 @@ import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSeq
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 
 
+import org.apache.commons.logging.Log;
+
 /** 
 /** 
  * Utilities used in unit test.
  * Utilities used in unit test.
  *  
  *  
  */
  */
 public class UtilsForTests {
 public class UtilsForTests {
 
 
+  static final Log LOG = LogFactory.getLog(UtilsForTests.class);
+
   final static long KB = 1024L * 1;
   final static long KB = 1024L * 1;
   final static long MB = 1024L * KB;
   final static long MB = 1024L * KB;
   final static long GB = 1024L * MB;
   final static long GB = 1024L * MB;
@@ -659,4 +666,18 @@ public class UtilsForTests {
       }
       }
     }
     }
   }
   }
+
+  static void setUpConfigFile(Properties confProps, File configFile)
+      throws IOException {
+    Configuration config = new Configuration(false);
+    FileOutputStream fos = new FileOutputStream(configFile);
+
+    for (Enumeration<?> e = confProps.propertyNames(); e.hasMoreElements();) {
+      String key = (String) e.nextElement();
+      config.set(key, confProps.getProperty(key));
+    }
+
+    config.writeXml(fos);
+    fos.close();
+  }
 }
 }