Explorar o código

HADOOP-4348. Add service-level authorization for Hadoop.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@725603 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy %!s(int64=16) %!d(string=hai) anos
pai
achega
71b2ed323a
Modificáronse 49 ficheiros con 2694 adicións e 225 borrados
  1. 1 0
      .gitignore
  2. 2 0
      CHANGES.txt
  3. 10 0
      bin/hadoop
  4. 7 0
      build.xml
  5. 6 0
      conf/hadoop-default.xml
  6. 97 0
      conf/hadoop-policy.xml.template
  7. 13 0
      src/core/org/apache/hadoop/fs/permission/AccessControlException.java
  8. 92 31
      src/core/org/apache/hadoop/ipc/Client.java
  9. 93 0
      src/core/org/apache/hadoop/ipc/ConnectionHeader.java
  10. 57 13
      src/core/org/apache/hadoop/ipc/RPC.java
  11. 152 34
      src/core/org/apache/hadoop/ipc/Server.java
  12. 32 0
      src/core/org/apache/hadoop/ipc/Status.java
  13. 13 0
      src/core/org/apache/hadoop/security/AccessControlException.java
  14. 70 0
      src/core/org/apache/hadoop/security/Group.java
  15. 159 0
      src/core/org/apache/hadoop/security/SecurityUtil.java
  16. 5 0
      src/core/org/apache/hadoop/security/UnixUserGroupInformation.java
  17. 70 0
      src/core/org/apache/hadoop/security/User.java
  18. 58 11
      src/core/org/apache/hadoop/security/UserGroupInformation.java
  19. 76 0
      src/core/org/apache/hadoop/security/authorize/AuthorizationException.java
  20. 156 0
      src/core/org/apache/hadoop/security/authorize/ConfiguredPolicy.java
  21. 74 0
      src/core/org/apache/hadoop/security/authorize/ConnectionPermission.java
  22. 50 0
      src/core/org/apache/hadoop/security/authorize/PolicyProvider.java
  23. 39 0
      src/core/org/apache/hadoop/security/authorize/RefreshAuthorizationPolicyProtocol.java
  24. 53 0
      src/core/org/apache/hadoop/security/authorize/Service.java
  25. 105 0
      src/core/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java
  26. 50 0
      src/hdfs/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
  27. 16 0
      src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  28. 37 2
      src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  29. 61 1
      src/hdfs/org/apache/hadoop/hdfs/tools/DFSAdmin.java
  30. 31 2
      src/mapred/org/apache/hadoop/mapred/JobTracker.java
  31. 45 0
      src/mapred/org/apache/hadoop/mapred/MapReducePolicyProvider.java
  32. 26 74
      src/mapred/org/apache/hadoop/mapred/QueueManager.java
  33. 15 0
      src/mapred/org/apache/hadoop/mapred/TaskTracker.java
  34. 193 0
      src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java
  35. 97 0
      src/test/hadoop-policy.xml
  36. 57 30
      src/test/org/apache/hadoop/cli/TestCLI.java
  37. 76 7
      src/test/org/apache/hadoop/cli/testConf.xml
  38. 3 3
      src/test/org/apache/hadoop/cli/util/CLITestData.java
  39. 45 9
      src/test/org/apache/hadoop/cli/util/CommandExecutor.java
  40. 1 1
      src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
  41. 2 2
      src/test/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
  42. 3 1
      src/test/org/apache/hadoop/ipc/TestIPC.java
  43. 2 2
      src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java
  44. 64 0
      src/test/org/apache/hadoop/ipc/TestRPC.java
  45. 3 2
      src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
  46. 104 0
      src/test/org/apache/hadoop/security/TestAccessControlList.java
  47. 39 0
      src/test/org/apache/hadoop/security/authorize/HadoopPolicyProvider.java
  48. 82 0
      src/test/org/apache/hadoop/security/authorize/TestConfiguredPolicy.java
  49. 152 0
      src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java

+ 1 - 0
.gitignore

@@ -23,6 +23,7 @@ conf/masters
 conf/slaves
 conf/hadoop-env.sh
 conf/hadoop-site.xml
+conf/hadoop-policy.xml
 conf/capacity-scheduler.xml
 docs/api/
 logs/

+ 2 - 0
CHANGES.txt

@@ -82,6 +82,8 @@ Trunk (unreleased changes)
     move DataNode information to a separated page. (Boris Shkolnik via
     szetszwo)
 
+    HADOOP-4348. Add service-level authorization for Hadoop. (acmurthy) 
+
   IMPROVEMENTS
 
     HADOOP-4749. Added a new counter REDUCE_INPUT_BYTES. (Yongqiang He via 

+ 10 - 0
bin/hadoop

@@ -60,6 +60,7 @@ if [ $# = 0 ]; then
   echo "  namenode             run the DFS namenode"
   echo "  datanode             run a DFS datanode"
   echo "  dfsadmin             run a DFS admin client"
+  echo "  mradmin              run a Map-Reduce admin client"
   echo "  fsck                 run a DFS filesystem checking utility"
   echo "  fs                   run a generic filesystem user client"
   echo "  balancer             run a cluster balancing utility"
@@ -166,6 +167,11 @@ if [ "$HADOOP_LOGFILE" = "" ]; then
   HADOOP_LOGFILE='hadoop.log'
 fi
 
+# default policy file for service-level authorization
+if [ "$HADOOP_POLICYFILE" = "" ]; then
+  HADOOP_POLICYFILE="hadoop-policy.xml"
+fi
+
 # restore ordinary behaviour
 unset IFS
 
@@ -188,6 +194,9 @@ elif [ "$COMMAND" = "dfs" ] ; then
 elif [ "$COMMAND" = "dfsadmin" ] ; then
   CLASS=org.apache.hadoop.hdfs.tools.DFSAdmin
   HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
+elif [ "$COMMAND" = "mradmin" ] ; then
+  CLASS=org.apache.hadoop.mapred.tools.MRAdmin
+  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
 elif [ "$COMMAND" = "fsck" ] ; then
   CLASS=org.apache.hadoop.hdfs.tools.DFSck
   HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
@@ -268,6 +277,7 @@ HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.root.logger=${HADOOP_ROOT_LOGGER:-INFO,consol
 if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then
   HADOOP_OPTS="$HADOOP_OPTS -Djava.library.path=$JAVA_LIBRARY_PATH"
 fi  
+HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.policy.file=$HADOOP_POLICYFILE"
 
 # run it
 exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS -classpath "$CLASSPATH" $CLASS "$@"

+ 7 - 0
build.xml

@@ -96,6 +96,7 @@
   <property name="test.build.classes" value="${test.build.dir}/classes"/>
   <property name="test.build.testjar" value="${test.build.dir}/testjar"/>
   <property name="test.build.testshell" value="${test.build.dir}/testshell"/>
+  <property name="test.build.extraconf" value="${test.build.dir}/extraconf"/>
   <property name="test.build.javadoc" value="${test.build.dir}/docs/api"/>
   <property name="test.build.javadoc.dev" value="${test.build.dir}/docs/dev-api"/>
   <property name="test.include" value="Test*"/>
@@ -168,6 +169,7 @@
 
   <!-- the unit test classpath: uses test.src.dir for configuration -->
   <path id="test.classpath">
+    <pathelement location="${test.build.extraconf}"/>
     <pathelement location="${test.build.classes}" />
     <pathelement location="${test.src.dir}"/>
     <pathelement location="${build.dir}"/>
@@ -226,6 +228,7 @@
     <mkdir dir="${test.build.classes}"/>
     <mkdir dir="${test.build.testjar}"/>
     <mkdir dir="${test.build.testshell}"/>
+    <mkdir dir="${test.build.extraconf}"/>
     <tempfile property="touch.temp.file" destDir="${java.io.tmpdir}"/>
     <touch millis="0" file="${touch.temp.file}">
       <fileset dir="${conf.dir}" includes="**/*.template"/>
@@ -685,6 +688,8 @@
     <mkdir dir="${test.build.data}"/>
     <delete dir="${test.log.dir}"/>
     <mkdir dir="${test.log.dir}"/>
+  	<copy file="${test.src.dir}/hadoop-policy.xml" 
+  	  todir="${test.build.extraconf}" />
     <junit showoutput="${test.output}"
       printsummary="${test.junit.printsummary}"
       haltonfailure="${test.junit.haltonfailure}"
@@ -698,6 +703,8 @@
       <sysproperty key="test.debug.data" value="${test.debug.data}"/>
       <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
       <sysproperty key="test.src.dir" value="${test.src.dir}"/>
+      <sysproperty key="test.build.extraconf" value="${test.build.extraconf}" />
+      <sysproperty key="hadoop.policy.file" value="hadoop-policy.xml"/>
       <sysproperty key="java.library.path"
        value="${build.native}/lib:${lib.dir}/native/${build.platform}"/>
       <sysproperty key="install.c++.examples" value="${install.c++.examples}"/>

+ 6 - 0
conf/hadoop-default.xml

@@ -31,6 +31,12 @@
   ordering of the filters.</description>
 </property>
 
+<property>
+  <name>hadoop.security.authorization</name>
+  <value>false</value>
+  <description>Is service-level authorization enabled?</description>
+</property>
+
 <!--- logging properties -->
 
 <property>

+ 97 - 0
conf/hadoop-policy.xml.template

@@ -0,0 +1,97 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+  <property>
+    <name>security.client.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for ClientProtocol, which is used by user code 
+    via the DistributedFileSystem. 
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.client.datanode.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for ClientDatanodeProtocol, the client-to-datanode protocol 
+    for block recovery.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.datanode.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for DatanodeProtocol, which is used by datanodes to 
+    communicate with the namenode.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.inter.datanode.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for InterDatanodeProtocol, the inter-datanode protocol
+    for updating generation timestamp.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.namenode.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for NamenodeProtocol, the protocol used by the secondary
+    namenode to communicate with the namenode.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.inter.tracker.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for InterTrackerProtocol, used by the tasktrackers to 
+    communicate with the jobtracker.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.job.submission.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for JobSubmissionProtocol, used by job clients to 
+    communciate with the jobtracker for job submission, querying job status etc.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.task.umbilical.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for TaskUmbilicalProtocol, used by the map and reduce 
+    tasks to communicate with the parent tasktracker. 
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.refresh.policy.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for RefreshAuthorizationPolicyProtocol, used by the 
+    dfsadmin and mradmin commands to refresh the security policy in-effect. 
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+</configuration>

+ 13 - 0
src/core/org/apache/hadoop/fs/permission/AccessControlException.java

@@ -40,4 +40,17 @@ public class AccessControlException extends IOException {
   public AccessControlException(String s) {
     super(s);
   }
+  
+  /**
+   * Constructs a new exception with the specified cause and a detail
+   * message of <tt>(cause==null ? null : cause.toString())</tt> (which
+   * typically contains the class and detail message of <tt>cause</tt>).
+   * @param  cause the cause (which is saved for later retrieval by the
+   *         {@link #getCause()} method).  (A <tt>null</tt> value is
+   *         permitted, and indicates that the cause is nonexistent or
+   *         unknown.)
+   */
+  public AccessControlException(Throwable cause) {
+    super(cause);
+  }
 }

+ 92 - 31
src/core/org/apache/hadoop/ipc/Client.java

@@ -44,7 +44,6 @@ import org.apache.commons.logging.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -175,7 +174,10 @@ public class Client {
    * socket connected to a remote address.  Calls are multiplexed through this
    * socket: responses may be delivered out of order. */
   private class Connection extends Thread {
-    private ConnectionId remoteId;
+    private InetSocketAddress server;             // server ip:port
+    private ConnectionHeader header;              // connection header
+    private ConnectionId remoteId;                // connection id
+    
     private Socket socket = null;                 // connected socket
     private DataInputStream in;
     private DataOutputStream out;
@@ -186,17 +188,19 @@ public class Client {
     private AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate if the connection is closed
     private IOException closeException; // close reason
 
-    public Connection(InetSocketAddress address) throws IOException {
-      this(new ConnectionId(address, null));
-    }
-    
     public Connection(ConnectionId remoteId) throws IOException {
-      if (remoteId.getAddress().isUnresolved()) {
+      this.remoteId = remoteId;
+      this.server = remoteId.getAddress();
+      if (server.isUnresolved()) {
         throw new UnknownHostException("unknown host: " + 
                                        remoteId.getAddress().getHostName());
       }
-      this.remoteId = remoteId;
+      
       UserGroupInformation ticket = remoteId.getTicket();
+      Class<?> protocol = remoteId.getProtocol();
+      header = 
+        new ConnectionHeader(protocol == null ? null : protocol.getName(), ticket);
+      
       this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
           remoteId.getAddress().toString() +
           " from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
@@ -290,7 +294,7 @@ public class Client {
       short timeoutFailures = 0;
       try {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Connecting to "+remoteId.getAddress());
+          LOG.debug("Connecting to "+server);
         }
         while (true) {
           try {
@@ -362,7 +366,7 @@ public class Client {
         Thread.sleep(1000);
       } catch (InterruptedException ignored) {}
       
-      LOG.info("Retrying connect to server: " + remoteId.getAddress() + 
+      LOG.info("Retrying connect to server: " + server + 
           ". Already tried " + curRetries + " time(s).");
     }
 
@@ -370,12 +374,15 @@ public class Client {
      * Out is not synchronized because only the first thread does this.
      */
     private void writeHeader() throws IOException {
+      // Write out the header and version
       out.write(Server.HEADER.array());
       out.write(Server.CURRENT_VERSION);
-      //When there are more fields we can have ConnectionHeader Writable.
+
+      // Write out the ConnectionHeader
       DataOutputBuffer buf = new DataOutputBuffer();
-      ObjectWritable.writeObject(buf, remoteId.getTicket(), 
-                                 UserGroupInformation.class, conf);
+      header.write(buf);
+      
+      // Write out the payload length
       int bufLen = buf.getLength();
       out.writeInt(bufLen);
       out.write(buf.getData(), 0, bufLen);
@@ -413,7 +420,7 @@ public class Client {
     }
 
     public InetSocketAddress getRemoteAddress() {
-      return remoteId.getAddress();
+      return server;
     }
 
     /* Send a ping to the server if the time elapsed 
@@ -498,14 +505,18 @@ public class Client {
 
         Call call = calls.remove(id);
 
-        boolean isError = in.readBoolean();     // read if error
-        if (isError) {
-          call.setException(new RemoteException( WritableUtils.readString(in),
-              WritableUtils.readString(in)));
-        } else {
+        int state = in.readInt();     // read call status
+        if (state == Status.SUCCESS.state) {
           Writable value = ReflectionUtils.newInstance(valueClass, conf);
           value.readFields(in);                 // read value
           call.setValue(value);
+        } else if (state == Status.ERROR.state) {
+          call.setException(new RemoteException(WritableUtils.readString(in),
+                                                WritableUtils.readString(in)));
+        } else if (state == Status.FATAL.state) {
+          // Close the connection
+          markClosed(new RemoteException(WritableUtils.readString(in), 
+                                         WritableUtils.readString(in)));
         }
       } catch (IOException e) {
         markClosed(e);
@@ -551,7 +562,7 @@ public class Client {
       } else {
         // log the info
         if (LOG.isDebugEnabled()) {
-          LOG.debug("closing ipc connection to " + remoteId.address + ": " +
+          LOG.debug("closing ipc connection to " + server + ": " +
               closeException.getMessage(),closeException);
         }
 
@@ -673,17 +684,39 @@ public class Client {
 
   /** Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code>, returning the value.  Throws exceptions if there are
-   * network problems or if the remote code threw an exception. */
+   * network problems or if the remote code threw an exception.
+   * @deprecated Use {@link #call(Writable, InetSocketAddress, Class, UserGroupInformation)} instead 
+   */
+  @Deprecated
   public Writable call(Writable param, InetSocketAddress address)
   throws InterruptedException, IOException {
       return call(param, address, null);
   }
   
+  /** Make a call, passing <code>param</code>, to the IPC server running at
+   * <code>address</code> with the <code>ticket</code> credentials, returning 
+   * the value.  
+   * Throws exceptions if there are network problems or if the remote code 
+   * threw an exception.
+   * @deprecated Use {@link #call(Writable, InetSocketAddress, Class, UserGroupInformation)} instead 
+   */
+  @Deprecated
   public Writable call(Writable param, InetSocketAddress addr, 
-                       UserGroupInformation ticket)  
+      UserGroupInformation ticket)  
+      throws InterruptedException, IOException {
+    return call(param, addr, null, ticket);
+  }
+  
+  /** Make a call, passing <code>param</code>, to the IPC server running at
+   * <code>address</code> which is servicing the <code>protocol</code> protocol, 
+   * with the <code>ticket</code> credentials, returning the value.  
+   * Throws exceptions if there are network problems or if the remote code 
+   * threw an exception. */
+  public Writable call(Writable param, InetSocketAddress addr, 
+                       Class<?> protocol, UserGroupInformation ticket)  
                        throws InterruptedException, IOException {
     Call call = new Call(param);
-    Connection connection = getConnection(addr, ticket, call);
+    Connection connection = getConnection(addr, protocol, ticket, call);
     connection.sendParam(call);                 // send the parameter
     synchronized (call) {
       while (!call.done) {
@@ -736,11 +769,25 @@ public class Client {
     }
   }
 
+  /** 
+   * Makes a set of calls in parallel.  Each parameter is sent to the
+   * corresponding address.  When all values are available, or have timed out
+   * or errored, the collected results are returned in an array.  The array
+   * contains nulls for calls that timed out or errored.
+   * @deprecated Use {@link #call(Writable[], InetSocketAddress[], Class, UserGroupInformation)} instead 
+   */
+  @Deprecated
+  public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
+    throws IOException {
+    return call(params, addresses, null, null);
+  }
+  
   /** Makes a set of calls in parallel.  Each parameter is sent to the
    * corresponding address.  When all values are available, or have timed out
    * or errored, the collected results are returned in an array.  The array
    * contains nulls for calls that timed out or errored.  */
-  public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
+  public Writable[] call(Writable[] params, InetSocketAddress[] addresses, 
+                         Class<?> protocol, UserGroupInformation ticket)
     throws IOException {
     if (addresses.length == 0) return new Writable[0];
 
@@ -749,7 +796,8 @@ public class Client {
       for (int i = 0; i < params.length; i++) {
         ParallelCall call = new ParallelCall(params[i], results, i);
         try {
-          Connection connection = getConnection(addresses[i], null, call);
+          Connection connection = 
+            getConnection(addresses[i], protocol, ticket, call);
           connection.sendParam(call);             // send each parameter
         } catch (IOException e) {
           // log errors
@@ -770,7 +818,8 @@ public class Client {
 
   /** Get a connection from the pool, or create a new one and add it to the
    * pool.  Connections to a given host/port are reused. */
-  private Connection getConnection(InetSocketAddress addr, 
+  private Connection getConnection(InetSocketAddress addr,
+                                   Class<?> protocol,
                                    UserGroupInformation ticket,
                                    Call call)
                                    throws IOException {
@@ -783,7 +832,7 @@ public class Client {
      * connectionsId object and with set() method. We need to manage the
      * refs for keys in HashMap properly. For now its ok.
      */
-    ConnectionId remoteId = new ConnectionId(addr, ticket);
+    ConnectionId remoteId = new ConnectionId(addr, protocol, ticket);
     do {
       synchronized (connections) {
         connection = connections.get(remoteId);
@@ -804,13 +853,17 @@ public class Client {
 
   /**
    * This class holds the address and the user ticket. The client connections
-   * to servers are uniquely identified by <remoteAddress, ticket>
+   * to servers are uniquely identified by <remoteAddress, protocol, ticket>
    */
   private static class ConnectionId {
     InetSocketAddress address;
     UserGroupInformation ticket;
+    Class<?> protocol;
+    private static final int PRIME = 16777619;
     
-    ConnectionId(InetSocketAddress address, UserGroupInformation ticket) {
+    ConnectionId(InetSocketAddress address, Class<?> protocol, 
+                 UserGroupInformation ticket) {
+      this.protocol = protocol;
       this.address = address;
       this.ticket = ticket;
     }
@@ -818,15 +871,22 @@ public class Client {
     InetSocketAddress getAddress() {
       return address;
     }
+    
+    Class<?> getProtocol() {
+      return protocol;
+    }
+    
     UserGroupInformation getTicket() {
       return ticket;
     }
     
+    
     @Override
     public boolean equals(Object obj) {
      if (obj instanceof ConnectionId) {
        ConnectionId id = (ConnectionId) obj;
-       return address.equals(id.address) && ticket == id.ticket;
+       return address.equals(id.address) && protocol == id.protocol && 
+              ticket == id.ticket;
        //Note : ticket is a ref comparision.
      }
      return false;
@@ -834,7 +894,8 @@ public class Client {
     
     @Override
     public int hashCode() {
-      return address.hashCode() ^ System.identityHashCode(ticket);
+      return (address.hashCode() + PRIME * System.identityHashCode(protocol)) ^ 
+             System.identityHashCode(ticket);
     }
   }  
 }

+ 93 - 0
src/core/org/apache/hadoop/ipc/ConnectionHeader.java

@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * The IPC connection header sent by the client to the server
+ * on connection establishment.
+ */
+class ConnectionHeader implements Writable {
+  public static final Log LOG = LogFactory.getLog(ConnectionHeader.class);
+  
+  private String protocol;
+  private UserGroupInformation ugi = new UnixUserGroupInformation();
+  
+  public ConnectionHeader() {}
+  
+  /**
+   * Create a new {@link ConnectionHeader} with the given <code>protocol</code>
+   * and {@link UserGroupInformation}. 
+   * @param protocol protocol used for communication between the IPC client
+   *                 and the server
+   * @param ugi {@link UserGroupInformation} of the client communicating with
+   *            the server
+   */
+  public ConnectionHeader(String protocol, UserGroupInformation ugi) {
+    this.protocol = protocol;
+    this.ugi = ugi;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    protocol = Text.readString(in);
+    if (protocol.isEmpty()) {
+      protocol = null;
+    }
+    
+    boolean ugiPresent = in.readBoolean();
+    if (ugiPresent) {
+      ugi.readFields(in);
+    } else {
+      ugi = null;
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, (protocol == null) ? "" : protocol);
+    if (ugi != null) {
+      out.writeBoolean(true);
+      ugi.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  public String getProtocol() {
+    return protocol;
+  }
+
+  public UserGroupInformation getUgi() {
+    return ugi;
+  }
+
+  public String toString() {
+    return protocol + "-" + ugi;
+  }
+}

+ 57 - 13
src/core/org/apache/hadoop/ipc/RPC.java

@@ -30,15 +30,18 @@ import java.net.SocketTimeoutException;
 import java.io.*;
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collection;
 
 import javax.net.SocketFactory;
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginException;
 
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 
@@ -213,8 +216,10 @@ public class RPC {
       if (logDebug) {
         startTime = System.currentTimeMillis();
       }
+
       ObjectWritable value = (ObjectWritable)
-        client.call(new Invocation(method, args), address, ticket);
+        client.call(new Invocation(method, args), address, 
+                    method.getDeclaringClass(), ticket);
       if (logDebug) {
         long callTime = System.currentTimeMillis() - startTime;
         LOG.debug("Call: " + method.getName() + " " + callTime);
@@ -332,7 +337,13 @@ public class RPC {
   public static VersionedProtocol getProxy(Class<?> protocol,
       long clientVersion, InetSocketAddress addr, Configuration conf,
       SocketFactory factory) throws IOException {
-    return getProxy(protocol, clientVersion, addr, null, conf, factory);
+    UserGroupInformation ugi = null;
+    try {
+      ugi = UserGroupInformation.login(conf);
+    } catch (LoginException le) {
+      throw new RuntimeException("Couldn't login!");
+    }
+    return getProxy(protocol, clientVersion, addr, ugi, conf, factory);
   }
   
   /** Construct a client-side proxy object that implements the named protocol,
@@ -383,17 +394,29 @@ public class RPC {
     }
   }
 
-  /** Expert: Make multiple, parallel calls to a set of servers. */
+  /** 
+   * Expert: Make multiple, parallel calls to a set of servers.
+   * @deprecated Use {@link #call(Method, Object[][], InetSocketAddress[], UserGroupInformation, Configuration)} instead 
+   */
   public static Object[] call(Method method, Object[][] params,
                               InetSocketAddress[] addrs, Configuration conf)
     throws IOException {
+    return call(method, params, addrs, null, conf);
+  }
+  
+  /** Expert: Make multiple, parallel calls to a set of servers. */
+  public static Object[] call(Method method, Object[][] params,
+                              InetSocketAddress[] addrs, 
+                              UserGroupInformation ticket, Configuration conf)
+    throws IOException {
 
     Invocation[] invocations = new Invocation[params.length];
     for (int i = 0; i < params.length; i++)
       invocations[i] = new Invocation(method, params[i]);
     Client client = CLIENTS.getClient(conf);
     try {
-    Writable[] wrappedValues = client.call(invocations, addrs);
+    Writable[] wrappedValues = 
+      client.call(invocations, addrs, method.getDeclaringClass(), ticket);
     
     if (method.getReturnType() == Void.TYPE) {
       return null;
@@ -430,8 +453,8 @@ public class RPC {
   /** An RPC Server. */
   public static class Server extends org.apache.hadoop.ipc.Server {
     private Object instance;
-    private Class<?> implementation;
     private boolean verbose;
+    private boolean authorize = false;
 
     /** Construct an RPC server.
      * @param instance the instance whose methods will be called
@@ -464,26 +487,32 @@ public class RPC {
                   int numHandlers, boolean verbose) throws IOException {
       super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()));
       this.instance = instance;
-      this.implementation = instance.getClass();
       this.verbose = verbose;
+      this.authorize = 
+        conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, 
+                        false);
     }
 
-    public Writable call(Writable param, long receivedTime) throws IOException {
+    public Writable call(Class<?> protocol, Writable param, long receivedTime) 
+    throws IOException {
       try {
         Invocation call = (Invocation)param;
         if (verbose) log("Call: " + call);
-        
+
         Method method =
-          implementation.getMethod(call.getMethodName(),
+          protocol.getMethod(call.getMethodName(),
                                    call.getParameterClasses());
+        method.setAccessible(true);
 
         long startTime = System.currentTimeMillis();
         Object value = method.invoke(instance, call.getParameters());
         int processingTime = (int) (System.currentTimeMillis() - startTime);
         int qTime = (int) (startTime-receivedTime);
-        LOG.debug("Served: " + call.getMethodName() +
-            " queueTime= " + qTime +
-            " procesingTime= " + processingTime);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Served: " + call.getMethodName() +
+                    " queueTime= " + qTime +
+                    " procesingTime= " + processingTime);
+        }
         rpcMetrics.rpcQueueTime.inc(qTime);
         rpcMetrics.rpcProcessingTime.inc(processingTime);
 
@@ -517,6 +546,21 @@ public class RPC {
         throw ioe;
       }
     }
+
+    @Override
+    public void authorize(Subject user, ConnectionHeader connection) 
+    throws AuthorizationException {
+      if (authorize) {
+        Class<?> protocol = null;
+        try {
+          protocol = getProtocolClass(connection.getProtocol(), getConf());
+        } catch (ClassNotFoundException cfne) {
+          throw new AuthorizationException("Unknown protocol: " + 
+                                           connection.getProtocol());
+        }
+        ServiceAuthorizationManager.authorize(user, protocol);
+      }
+    }
   }
 
   private static void log(String value) {

+ 152 - 34
src/core/org/apache/hadoop/ipc/Server.java

@@ -40,25 +40,31 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.net.UnknownHostException;
 
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import javax.security.auth.Subject;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.ipc.metrics.RpcMetrics;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -74,18 +80,31 @@ public abstract class Server {
   public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
   
   // 1 : Introduce ping and server does not throw away RPCs
-  public static final byte CURRENT_VERSION = 2;
+  // 3 : Introduce the protocol into the RPC connection header
+  public static final byte CURRENT_VERSION = 3;
   
   /**
    * How many calls/handler are allowed in the queue.
    */
   private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
   
-  public static final Log LOG =
-    LogFactory.getLog(Server.class);
+  public static final Log LOG = LogFactory.getLog(Server.class);
 
   private static final ThreadLocal<Server> SERVER = new ThreadLocal<Server>();
 
+  private static final Map<String, Class<?>> PROTOCOL_CACHE = 
+    new ConcurrentHashMap<String, Class<?>>();
+  
+  static Class<?> getProtocolClass(String protocolName, Configuration conf) 
+  throws ClassNotFoundException {
+    Class<?> protocol = PROTOCOL_CACHE.get(protocolName);
+    if (protocol == null) {
+      protocol = conf.getClassByName(protocolName);
+      PROTOCOL_CACHE.put(protocolName, protocol);
+    }
+    return protocol;
+  }
+  
   /** Returns the server instance called under or null.  May be called under
    * {@link #call(Writable, long)} implementations, and under {@link Writable}
    * methods of paramters and return values.  Permits applications to access
@@ -191,7 +210,7 @@ public abstract class Server {
                                    // the time served when response is not null
     private ByteBuffer response;                      // the response for this call
 
-    public Call(int id, Writable param, Connection connection) {
+    public Call(int id, Writable param, Connection connection) { 
       this.id = id;
       this.param = param;
       this.connection = connection;
@@ -397,9 +416,10 @@ public abstract class Server {
       try {
         count = c.readAndProcess();
       } catch (InterruptedException ieo) {
+        LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
         throw ieo;
       } catch (Exception e) {
-        LOG.debug(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
+        LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
         count = -1; //so that the (count < 0) block is executed
       }
       if (count < 0) {
@@ -679,6 +699,7 @@ public abstract class Server {
                                          //version are read
     private boolean headerRead = false;  //if the connection header that
                                          //follows version is read.
+
     private SocketChannel channel;
     private ByteBuffer data;
     private ByteBuffer dataLengthBuffer;
@@ -691,8 +712,18 @@ public abstract class Server {
     // disconnected, we can say where it used to connect to.
     private String hostAddress;
     private int remotePort;
-    private UserGroupInformation ticket = null;
+    
+    ConnectionHeader header = new ConnectionHeader();
+    Class<?> protocol;
+    
+    Subject user = null;
 
+    // Fake 'call' for failed authorization response
+    private final int AUTHROIZATION_FAILED_CALLID = -1;
+    private final Call authFailedCall = 
+      new Call(AUTHROIZATION_FAILED_CALLID, null, null);
+    private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
+    
     public Connection(SelectionKey key, SocketChannel channel, 
                       long lastContact) {
       this.channel = channel;
@@ -816,6 +847,25 @@ public abstract class Server {
             processHeader();
             headerRead = true;
             data = null;
+            
+            // Authorize the connection
+            try {
+              authorize(user, header);
+              
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Successfully authorized " + header);
+              }
+            } catch (AuthorizationException ae) {
+              authFailedCall.connection = this;
+              setupResponse(authFailedResponse, authFailedCall, 
+                            Status.FATAL, null, 
+                            ae.getClass().getName(), ae.getMessage());
+              responder.doRespond(authFailedCall);
+              
+              // Close this connection
+              return -1;
+            }
+
             continue;
           }
         } 
@@ -823,14 +873,23 @@ public abstract class Server {
       }
     }
 
-    /// Reads the header following version
+    /// Reads the connection header following version
     private void processHeader() throws IOException {
-      /* In the current version, it is just a ticket.
-       * Later we could introduce a "ConnectionHeader" class.
-       */
       DataInputStream in =
         new DataInputStream(new ByteArrayInputStream(data.array()));
-      ticket = (UserGroupInformation) ObjectWritable.readObject(in, conf);
+      header.readFields(in);
+      try {
+        String protocolClassName = header.getProtocol();
+        if (protocolClassName != null) {
+          protocol = getProtocolClass(header.getProtocol(), conf);
+        }
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException("Unknown protocol: " + header.getProtocol());
+      }
+      
+      // TODO: Get the user name from the GSS API for Kerberbos-based security
+      // Create the user subject
+      user = SecurityUtil.getSubject(header.getUgi());
     }
     
     private void processData() throws  IOException, InterruptedException {
@@ -840,7 +899,7 @@ public abstract class Server {
         
       if (LOG.isDebugEnabled())
         LOG.debug(" got #" + id);
-            
+
       Writable param = ReflectionUtils.newInstance(paramClass, conf);           // read param
       param.readFields(dis);        
         
@@ -875,7 +934,7 @@ public abstract class Server {
       ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
       while (running) {
         try {
-          Call call = callQueue.take(); // pop the queue; maybe blocked here
+          final Call call = callQueue.take(); // pop the queue; maybe blocked here
 
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + ": has #" + call.id + " from " +
@@ -884,32 +943,39 @@ public abstract class Server {
           String errorClass = null;
           String error = null;
           Writable value = null;
-          
+
           CurCall.set(call);
-          UserGroupInformation previous = UserGroupInformation.getCurrentUGI();
-          UserGroupInformation.setCurrentUGI(call.connection.ticket);
           try {
-            value = call(call.param, call.timestamp);             // make the call
+            // Make the call as the user via Subject.doAs, thus associating
+            // the call with the Subject
+            value = 
+              Subject.doAs(call.connection.user, 
+                           new PrivilegedExceptionAction<Writable>() {
+                              @Override
+                              public Writable run() throws Exception {
+                                // make the call
+                                return call(call.connection.protocol, 
+                                            call.param, call.timestamp);
+
+                              }
+                           }
+                          );
+              
+          } catch (PrivilegedActionException pae) {
+            Exception e = pae.getException();
+            LOG.info(getName()+", call "+call+": error: " + e, e);
+            errorClass = e.getClass().getName();
+            error = StringUtils.stringifyException(e);
           } catch (Throwable e) {
             LOG.info(getName()+", call "+call+": error: " + e, e);
             errorClass = e.getClass().getName();
             error = StringUtils.stringifyException(e);
           }
-          UserGroupInformation.setCurrentUGI(previous);
           CurCall.set(null);
 
-          buf.reset();
-          DataOutputStream out = new DataOutputStream(buf);
-          out.writeInt(call.id);                // write call id
-          out.writeBoolean(error != null);      // write error flag
-
-          if (error == null) {
-            value.write(out);
-          } else {
-            WritableUtils.writeString(out, errorClass);
-            WritableUtils.writeString(out, error);
-          }
-          call.setResponse(ByteBuffer.wrap(buf.toByteArray()));
+          setupResponse(buf, call, 
+                        (error == null) ? Status.SUCCESS : Status.ERROR, 
+                        value, errorClass, error);
           responder.doRespond(call);
         } catch (InterruptedException e) {
           if (running) {                          // unexpected -- log it
@@ -977,6 +1043,39 @@ public abstract class Server {
     }
   }
   
+  /**
+   * Setup response for the IPC Call.
+   * 
+   * @param response buffer to serialize the response into
+   * @param call {@link Call} to which we are setting up the response
+   * @param status {@link Status} of the IPC call
+   * @param rv return value for the IPC Call, if the call was successful
+   * @param errorClass error class, if the the call failed
+   * @param error error message, if the call failed
+   * @throws IOException
+   */
+  private void setupResponse(ByteArrayOutputStream response, 
+                             Call call, Status status, 
+                             Writable rv, String errorClass, String error) 
+  throws IOException {
+    response.reset();
+    DataOutputStream out = new DataOutputStream(response);
+    out.writeInt(call.id);                // write call id
+    out.writeInt(status.state);           // write status
+
+    if (status == Status.SUCCESS) {
+      rv.write(out);
+    } else {
+      WritableUtils.writeString(out, errorClass);
+      WritableUtils.writeString(out, error);
+    }
+    call.setResponse(ByteBuffer.wrap(response.toByteArray()));
+  }
+  
+  Configuration getConf() {
+    return conf;
+  }
+  
   /** Sets the socket buffer size used for responding to RPCs */
   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
 
@@ -1030,10 +1129,29 @@ public abstract class Server {
     return listener.getAddress();
   }
   
+  /** 
+   * Called for each call. 
+   * @deprecated Use {@link #call(Class, Writable, long)} instead
+   */
+  @Deprecated
+  public Writable call(Writable param, long receiveTime) throws IOException {
+    return call(null, param, receiveTime);
+  }
+  
   /** Called for each call. */
-  public abstract Writable call(Writable param, long receiveTime)
-                                                throws IOException;
+  public abstract Writable call(Class<?> protocol,
+                               Writable param, long receiveTime)
+  throws IOException;
   
+  /**
+   * Authorize the incoming client connection.
+   * 
+   * @param user client user
+   * @param connection incoming connection
+   * @throws AuthorizationException when the client isn't authorized to talk the protocol
+   */
+  public void authorize(Subject user, ConnectionHeader connection) 
+  throws AuthorizationException {}
   
   /**
    * The number of open RPC conections

+ 32 - 0
src/core/org/apache/hadoop/ipc/Status.java

@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+/**
+ * Status of a Hadoop IPC call.
+ */
+enum Status {
+  SUCCESS (0),
+  ERROR (1),
+  FATAL (-1);
+  
+  int state;
+  private Status(int state) {
+    this.state = state;
+  }
+}

+ 13 - 0
src/core/org/apache/hadoop/security/AccessControlException.java

@@ -40,4 +40,17 @@ public class AccessControlException
    * @param s the detail message.
    */
   public AccessControlException(String s) {super(s);}
+  
+  /**
+   * Constructs a new exception with the specified cause and a detail
+   * message of <tt>(cause==null ? null : cause.toString())</tt> (which
+   * typically contains the class and detail message of <tt>cause</tt>).
+   * @param  cause the cause (which is saved for later retrieval by the
+   *         {@link #getCause()} method).  (A <tt>null</tt> value is
+   *         permitted, and indicates that the cause is nonexistent or
+   *         unknown.)
+   */
+  public AccessControlException(Throwable cause) {
+    super(cause);
+  }
 }

+ 70 - 0
src/core/org/apache/hadoop/security/Group.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import java.security.Principal;
+
+/**
+ * A group to which a user belongs to.
+ */
+public class Group implements Principal {
+  final String group;
+
+  /**
+   * Create a new <code>Group</code> with the given groupname.
+   * @param group group name
+   */
+  public Group(String group) {
+    this.group = group;
+  }
+
+  @Override
+  public String getName() {
+    return group;
+  }
+
+  @Override
+  public String toString() {
+    return group;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((group == null) ? 0 : group.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    Group other = (Group) obj;
+    if (group == null) {
+      if (other.group != null)
+        return false;
+    } else if (!group.equals(other.group))
+      return false;
+    return true;
+  }
+}

+ 159 - 0
src/core/org/apache/hadoop/security/SecurityUtil.java

@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import java.security.Policy;
+import java.security.Principal;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+import javax.security.auth.Subject;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.authorize.ConfiguredPolicy;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+
+public class SecurityUtil {
+
+  private static final Log LOG = LogFactory.getLog(SecurityUtil.class);
+  
+  static {
+    // Set an empty default policy
+    setPolicy(new ConfiguredPolicy(new Configuration(), 
+                                   PolicyProvider.DEFAULT_POLICY_PROVIDER));
+  }
+  
+  /**
+   * Set the global security policy for Hadoop.
+   * 
+   * @param policy {@link Policy} used for authorization.
+   */
+  public static void setPolicy(Policy policy) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Setting Hadoop security policy");
+    }
+    Policy.setPolicy(policy);
+  }
+
+  /**
+   * Get the current global security policy for Hadoop.
+   * @return the current {@link Policy}
+   */
+  public static Policy getPolicy() {
+    return Policy.getPolicy();
+  }
+  
+  /**
+   * Get the {@link Subject} for the user identified by <code>ugi</code>.
+   * @param ugi user
+   * @return the {@link Subject} for the user identified by <code>ugi</code>
+   */
+  public static Subject getSubject(UserGroupInformation ugi) {
+    if (ugi == null) {
+      return null;
+    }
+    
+    Set<Principal> principals =       // Number of principals = username + #groups 
+      new HashSet<Principal>(ugi.getGroupNames().length+1);
+    User userPrincipal = new User(ugi.getUserName()); 
+    principals.add(userPrincipal);
+    for (String group : ugi.getGroupNames()) {
+      Group groupPrincipal = new Group(group);
+      principals.add(groupPrincipal);
+    }
+    principals.add(ugi);
+    Subject user = 
+      new Subject(false, principals, new HashSet<Object>(), new HashSet<Object>());
+    
+    return user;
+  }
+  
+  /**
+   * Class representing a configured access control list.
+   */
+  public static class AccessControlList {
+    
+    // Indicates an ACL string that represents access to all users
+    public static final String WILDCARD_ACL_VALUE = "*";
+
+    // Set of users who are granted access.
+    private Set<String> users;
+    // Set of groups which are granted access
+    private Set<String> groups;
+    // Whether all users are granted access.
+    private boolean allAllowed;
+    
+    /**
+     * Construct a new ACL from a String representation of the same.
+     * 
+     * The String is a a comma separated list of users and groups.
+     * The user list comes first and is separated by a space followed 
+     * by the group list. For e.g. "user1,user2 group1,group2"
+     * 
+     * @param aclString String representation of the ACL
+     */
+    public AccessControlList(String aclString) {
+      users = new TreeSet<String>();
+      groups = new TreeSet<String>();
+      if (aclString.contains(WILDCARD_ACL_VALUE) && 
+          aclString.trim().equals(WILDCARD_ACL_VALUE)) {
+        allAllowed = true;
+      } else {
+        String[] userGroupStrings = aclString.split(" ", 2);
+        
+        if (userGroupStrings.length >= 1) {
+          String[] usersStr = userGroupStrings[0].split(",");
+          if (usersStr.length >= 1) {
+            addToSet(users, usersStr);
+          }
+        }
+        
+        if (userGroupStrings.length == 2) {
+          String[] groupsStr = userGroupStrings[1].split(",");
+          if (groupsStr.length >= 1) {
+            addToSet(groups, groupsStr);
+          }
+        }
+      }
+    }
+    
+    public boolean allAllowed() {
+      return allAllowed;
+    }
+    
+    public Set<String> getUsers() {
+      return users;
+    }
+    
+    public Set<String> getGroups() {
+      return groups;
+    }
+    
+    private static final void addToSet(Set<String> set, String[] strings) {
+      for (String s : strings) {
+        s = s.trim();
+        if (s.length() > 0) {
+          set.add(s);
+        }
+      }
+    }
+  }
+}

+ 5 - 0
src/core/org/apache/hadoop/security/UnixUserGroupInformation.java

@@ -424,4 +424,9 @@ public class UnixUserGroupInformation extends UserGroupInformation {
     }
     return buf.toString();
   }
+
+  @Override
+  public String getName() {
+    return toString();
+  }
 }

+ 70 - 0
src/core/org/apache/hadoop/security/User.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import java.security.Principal;
+
+/**
+ * The username of a user.
+ */
+public class User implements Principal {
+  final String user;
+
+  /**
+   * Create a new <code>User</code> with the given username.
+   * @param user user name
+   */
+  public User(String user) {
+    this.user = user;
+  }
+  
+  @Override
+  public String getName() {
+    return user;
+  }
+
+  @Override
+  public String toString() {
+    return user;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((user == null) ? 0 : user.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    User other = (User) obj;
+    if (user == null) {
+      if (other.user != null)
+        return false;
+    } else if (!user.equals(other.user))
+      return false;
+    return true;
+  }
+}

+ 58 - 11
src/core/org/apache/hadoop/security/UserGroupInformation.java

@@ -18,7 +18,11 @@
 package org.apache.hadoop.security;
 
 import java.io.IOException;
+import java.security.AccessController;
+import java.security.Principal;
+import java.util.Set;
 
+import javax.security.auth.Subject;
 import javax.security.auth.login.LoginException;
 
 import org.apache.commons.logging.Log;
@@ -28,26 +32,69 @@ import org.apache.hadoop.io.Writable;
 
 /** A {@link Writable} abstract class for storing user and groups information.
  */
-public abstract class UserGroupInformation implements Writable {
+public abstract class UserGroupInformation implements Writable, Principal {
   public static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
   private static UserGroupInformation LOGIN_UGI = null;
-
-  private static final ThreadLocal<UserGroupInformation> currentUGI
-    = new ThreadLocal<UserGroupInformation>();
-
+  
+  private static final ThreadLocal<Subject> currentUser =
+    new ThreadLocal<Subject>();
+  
   /** @return the {@link UserGroupInformation} for the current thread */ 
   public static UserGroupInformation getCurrentUGI() {
-    return currentUGI.get();
+    Subject user = getCurrentUser();
+    
+    if (user == null) {
+      user = currentUser.get();
+      if (user == null) {
+        return null;
+      }
+    }
+    
+    Set<UserGroupInformation> ugiPrincipals = 
+      user.getPrincipals(UserGroupInformation.class);
+    
+    UserGroupInformation ugi = null;
+    if (ugiPrincipals != null && ugiPrincipals.size() == 1) {
+      ugi = ugiPrincipals.iterator().next();
+      if (ugi == null) {
+        throw new RuntimeException("Cannot find _current user_ UGI in the Subject!");
+      }
+    } else {
+      throw new RuntimeException("Cannot resolve current user from subject, " +
+      		                       "which had " + ugiPrincipals.size() + 
+      		                       " UGI principals!");
+    }
+    return ugi;
   }
 
-  /** Set the {@link UserGroupInformation} for the current thread */ 
+  /** 
+   * Set the {@link UserGroupInformation} for the current thread
+   * @deprecated Use {@link #setCurrentUser(UserGroupInformation)} 
+   */ 
+  @Deprecated
   public static void setCurrentUGI(UserGroupInformation ugi) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(Thread.currentThread().getName() + ", ugi=" + ugi);
-    }
-    currentUGI.set(ugi);
+    setCurrentUser(ugi);
   }
 
+  /**
+   * Return the current user <code>Subject</code>.
+   * @return the current user <code>Subject</code>
+   */
+  static Subject getCurrentUser() {
+    return Subject.getSubject(AccessController.getContext());
+  }
+  
+  /**
+   * Set the {@link UserGroupInformation} for the current thread
+   * WARNING - This method should be used only in test cases and other exceptional
+   * cases!
+   * @param ugi {@link UserGroupInformation} for the current thread
+   */
+  public static void setCurrentUser(UserGroupInformation ugi) {
+    Subject user = SecurityUtil.getSubject(ugi);
+    currentUser.set(user);
+  }
+  
   /** Get username
    * 
    * @return the user's name

+ 76 - 0
src/core/org/apache/hadoop/security/authorize/AuthorizationException.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.authorize;
+
+import java.io.PrintStream;
+import java.io.PrintWriter;
+
+import org.apache.hadoop.security.AccessControlException;
+
+/**
+ * An exception class for authorization-related issues.
+ * 
+ * This class <em>does not</em> provide the stack trace for security purposes.
+ */
+public class AuthorizationException extends AccessControlException {
+  private static final long serialVersionUID = 1L;
+
+  public AuthorizationException() {
+    super();
+  }
+
+  public AuthorizationException(String message) {
+    super(message);
+  }
+  
+  /**
+   * Constructs a new exception with the specified cause and a detail
+   * message of <tt>(cause==null ? null : cause.toString())</tt> (which
+   * typically contains the class and detail message of <tt>cause</tt>).
+   * @param  cause the cause (which is saved for later retrieval by the
+   *         {@link #getCause()} method).  (A <tt>null</tt> value is
+   *         permitted, and indicates that the cause is nonexistent or
+   *         unknown.)
+   */
+  public AuthorizationException(Throwable cause) {
+    super(cause);
+  }
+  
+  private static StackTraceElement[] stackTrace = new StackTraceElement[0];
+  @Override
+  public StackTraceElement[] getStackTrace() {
+    // Do not provide the stack-trace
+    return stackTrace;
+  }
+
+  @Override
+  public void printStackTrace() {
+    // Do not provide the stack-trace
+  }
+
+  @Override
+  public void printStackTrace(PrintStream s) {
+    // Do not provide the stack-trace
+  }
+
+  @Override
+  public void printStackTrace(PrintWriter s) {
+    // Do not provide the stack-trace
+  }
+  
+}

+ 156 - 0
src/core/org/apache/hadoop/security/authorize/ConfiguredPolicy.java

@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.authorize;
+
+import java.security.Permission;
+import java.security.PermissionCollection;
+import java.security.Policy;
+import java.security.Principal;
+import java.security.ProtectionDomain;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Group;
+import org.apache.hadoop.security.User;
+import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+
+/**
+ * A {@link Configuration} based security {@link Policy} for Hadoop.
+ *
+ * {@link ConfiguredPolicy} works in conjunction with a {@link PolicyProvider}
+ * for providing service-level authorization for Hadoop.
+ */
+public class ConfiguredPolicy extends Policy implements Configurable {
+  public static final String HADOOP_POLICY_FILE = "hadoop-policy.xml";
+  private static final Log LOG = LogFactory.getLog(ConfiguredPolicy.class);
+      
+  private Configuration conf;
+  private PolicyProvider policyProvider;
+  private volatile Map<Principal, Set<Permission>> permissions;
+  private volatile Set<Permission> allowedPermissions;
+
+  public ConfiguredPolicy(Configuration conf, PolicyProvider policyProvider) {
+    this.conf = conf;      
+    this.policyProvider = policyProvider;
+    refresh();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    refresh();
+  }
+
+  @Override
+  public boolean implies(ProtectionDomain domain, Permission permission) {
+    // Only make checks for domains having principals 
+    if(domain.getPrincipals().length == 0) {
+      return true; 
+    }
+
+    return super.implies(domain, permission);
+  }
+
+  @Override
+  public PermissionCollection getPermissions(ProtectionDomain domain) {
+    PermissionCollection permissionCollection = super.getPermissions(domain);
+    for (Principal principal : domain.getPrincipals()) {
+      Set<Permission> principalPermissions = permissions.get(principal);
+      if (principalPermissions != null) {
+        for (Permission permission : principalPermissions) {
+          permissionCollection.add(permission);
+        }
+      }
+
+      for (Permission permission : allowedPermissions) {
+        permissionCollection.add(permission);
+      }
+    }
+    return permissionCollection;
+  }
+
+  @Override
+  public void refresh() {
+    // Get the system property 'hadoop.policy.file'
+    String policyFile = 
+      System.getProperty("hadoop.policy.file", HADOOP_POLICY_FILE);
+    
+    // Make a copy of the original config, and load the policy file
+    Configuration policyConf = new Configuration(conf);
+    policyConf.addResource(policyFile);
+    
+    Map<Principal, Set<Permission>> newPermissions = 
+      new HashMap<Principal, Set<Permission>>();
+    Set<Permission> newAllowPermissions = new HashSet<Permission>();
+
+    // Parse the config file
+    Service[] services = policyProvider.getServices();
+    if (services != null) {
+      for (Service service : services) {
+        AccessControlList acl = 
+          new AccessControlList(
+              policyConf.get(service.getServiceKey(), 
+                             AccessControlList.WILDCARD_ACL_VALUE)
+              );
+        
+        if (acl.allAllowed()) {
+          newAllowPermissions.add(service.getPermission());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Policy - " + service.getPermission() + " * ");
+          }
+        } else {
+          for (String user : acl.getUsers()) {
+            addPermission(newPermissions, new User(user), service.getPermission());
+          }
+
+          for (String group : acl.getGroups()) {
+            addPermission(newPermissions, new Group(group), service.getPermission());
+          }
+        }
+      }
+    }
+
+    // Flip to the newly parsed permissions
+    allowedPermissions = newAllowPermissions;
+    permissions = newPermissions;
+  }
+
+  private void addPermission(Map<Principal, Set<Permission>> permissions,
+                             Principal principal, Permission permission) {
+    Set<Permission> principalPermissions = permissions.get(principal);
+    if (principalPermissions == null) {
+      principalPermissions = new HashSet<Permission>();
+      permissions.put(principal, principalPermissions);
+    }
+    principalPermissions.add(permission);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Policy - Adding  " + permission + " to " + principal);
+    }
+  }
+}

+ 74 - 0
src/core/org/apache/hadoop/security/authorize/ConnectionPermission.java

@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.authorize;
+
+import java.security.Permission;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/**
+ * {@link Permission} to initiate a connection to a given service.
+ */
+public class ConnectionPermission extends Permission {
+
+  private static final long serialVersionUID = 1L;
+  private final Class<?> protocol;
+
+  /**
+   * {@link ConnectionPermission} for a given service.
+   * @param protocol service to be accessed
+   */
+  public ConnectionPermission(Class<?> protocol) {
+    super(protocol.getName());
+    this.protocol = protocol;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof ConnectionPermission) {
+      return protocol == ((ConnectionPermission)obj).protocol;
+    }
+    return false;
+  }
+
+  @Override
+  public String getActions() {
+    return "ALLOW";
+  }
+
+  @Override
+  public int hashCode() {
+    return protocol.hashCode();
+  }
+
+  @Override
+  public boolean implies(Permission permission) {
+    if (permission instanceof ConnectionPermission) {
+      ConnectionPermission that = (ConnectionPermission)permission;
+      if (that.protocol.equals(VersionedProtocol.class)) {
+        return true;
+      }
+      return this.protocol.equals(that.protocol);
+    }
+    return false;
+  }
+
+  public String toString() {
+    return "ConnectionPermission(" + protocol.getName() + ")";
+  }
+}

+ 50 - 0
src/core/org/apache/hadoop/security/authorize/PolicyProvider.java

@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.authorize;
+
+import java.security.Policy;
+
+/**
+ * {@link PolicyProvider} provides the {@link Service} definitions to the
+ * security {@link Policy} in effect for Hadoop.
+ *
+ */
+public abstract class PolicyProvider {
+
+  /**
+   * Configuration key for the {@link PolicyProvider} implementation.
+   */
+  public static final String POLICY_PROVIDER_CONFIG = 
+    "hadoop.security.authorization.policyprovider";
+  
+  /**
+   * A default {@link PolicyProvider} without any defined services.
+   */
+  public static final PolicyProvider DEFAULT_POLICY_PROVIDER =
+    new PolicyProvider() {
+    public Service[] getServices() {
+      return null;
+    }
+  };
+  
+  /**
+   * Get the {@link Service} definitions from the {@link PolicyProvider}.
+   * @return the {@link Service} definitions
+   */
+  public abstract Service[] getServices();
+}

+ 39 - 0
src/core/org/apache/hadoop/security/authorize/RefreshAuthorizationPolicyProtocol.java

@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.authorize;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/**
+ * Protocol which is used to refresh the authorization policy in use currently.
+ */
+public interface RefreshAuthorizationPolicyProtocol extends VersionedProtocol {
+  
+  /**
+   * Version 1: Initial version
+   */
+  public static final long versionID = 1L;
+
+  /**
+   * Refresh the service-level authorization policy in-effect.
+   * @throws IOException
+   */
+  void refreshServiceAcl() throws IOException;
+}

+ 53 - 0
src/core/org/apache/hadoop/security/authorize/Service.java

@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.authorize;
+
+import java.security.Permission;
+
+/**
+ * An abstract definition of <em>service</em> as related to 
+ * Service Level Authorization for Hadoop.
+ * 
+ * Each service defines it's configuration key and also the necessary
+ * {@link Permission} required to access the service.
+ */
+public class Service {
+  private String key;
+  private Permission permission;
+  
+  public Service(String key, Class<?> protocol) {
+    this.key = key;
+    this.permission = new ConnectionPermission(protocol);
+  }
+  
+  /**
+   * Get the configuration key for the service.
+   * @return the configuration key for the service
+   */
+  public String getServiceKey() {
+    return key;
+  }
+  
+  /**
+   * Get the {@link Permission} required to access the service.
+   * @return the {@link Permission} required to access the service
+   */
+  public Permission getPermission() {
+    return permission;
+  }
+}

+ 105 - 0
src/core/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java

@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.authorize;
+
+import java.security.AccessControlException;
+import java.security.AccessController;
+import java.security.Permission;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.auth.Subject;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * An authorization manager which handles service-level authorization
+ * for incoming service requests.
+ */
+public class ServiceAuthorizationManager {
+
+  private static final Log LOG = 
+    LogFactory.getLog(ServiceAuthorizationManager.class);
+  
+  /**
+   * Configuration key for controlling service-level authorization for Hadoop.
+   */
+  public static final String SERVICE_AUTHORIZATION_CONFIG = 
+    "hadoop.security.authorization";
+  
+  private static Map<Class<?>, Permission> protocolToPermissionMap = 
+    Collections.synchronizedMap(new HashMap<Class<?>, Permission>());
+
+  /**
+   * Authorize the user to access the protocol being used.
+   * 
+   * @param user user accessing the service 
+   * @param protocol service being accessed
+   * @throws AuthorizationException on authorization failure
+   */
+  public static void authorize(Subject user, Class<?> protocol) 
+  throws AuthorizationException {
+    Permission permission = protocolToPermissionMap.get(protocol);
+    if (permission == null) {
+      permission = new ConnectionPermission(protocol);
+      protocolToPermissionMap.put(protocol, permission);
+    }
+    
+    checkPermission(user, permission);
+  }
+  
+  /**
+   * Check if the given {@link Subject} has all of necessary {@link Permission} 
+   * set.
+   * 
+   * @param user <code>Subject</code> to be authorized
+   * @param permissions <code>Permission</code> set
+   * @throws AuthorizationException if the authorization failed
+   */
+  private static void checkPermission(final Subject user, 
+                                      final Permission... permissions) 
+  throws AuthorizationException {
+    try{
+      Subject.doAs(user, 
+                   new PrivilegedExceptionAction<Void>() {
+                     @Override
+                     public Void run() throws Exception {
+                       try {
+                         for(Permission permission : permissions) {
+                           AccessController.checkPermission(permission);
+                         }
+                       } catch (AccessControlException ace) {
+                         LOG.info("Authorization failed for " + 
+                                  UserGroupInformation.getCurrentUGI(), ace);
+                         throw new AuthorizationException(ace);
+                       }
+                      return null;
+                     }
+                   }
+                  );
+    } catch (PrivilegedActionException e) {
+      throw new AuthorizationException(e.getException());
+    }
+  }
+  
+}

+ 50 - 0
src/hdfs/org/apache/hadoop/hdfs/HDFSPolicyProvider.java

@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.security.authorize.Service;
+
+/**
+ * {@link PolicyProvider} for HDFS protocols.
+ */
+public class HDFSPolicyProvider extends PolicyProvider {
+  private static final Service[] hdfsServices =
+    new Service[] {
+    new Service("security.client.protocol.acl", ClientProtocol.class),
+    new Service("security.client.datanode.protocol.acl", 
+                ClientDatanodeProtocol.class),
+    new Service("security.datanode.protocol.acl", DatanodeProtocol.class),
+    new Service("security.inter.datanode.protocol.acl", 
+                InterDatanodeProtocol.class),
+    new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
+    new Service("security.refresh.policy.protocol.acl", 
+                RefreshAuthorizationPolicyProtocol.class),
+  };
+  
+  @Override
+  public Service[] getServices() {
+    return hdfsServices;
+  }
+}

+ 16 - 0
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -44,6 +44,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -80,6 +81,10 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.authorize.ConfiguredPolicy;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -367,6 +372,17 @@ public class DataNode extends Configured
     this.dnRegistration.setInfoPort(this.infoServer.getPort());
     myMetrics = new DataNodeMetrics(conf, dnRegistration.getStorageID());
     
+    // set service-level authorization security policy
+    if (conf.getBoolean(
+          ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
+      PolicyProvider policyProvider = 
+        (PolicyProvider)(ReflectionUtils.newInstance(
+            conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
+                HDFSPolicyProvider.class, PolicyProvider.class), 
+            conf));
+      SecurityUtil.setPolicy(new ConfiguredPolicy(conf, policyProvider));
+    }
+
     //init ipc server
     InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
         conf.get("dfs.datanode.ipc.address"));

+ 37 - 2
src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
@@ -41,10 +42,17 @@ import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ConfiguredPolicy;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 
 import java.io.*;
 import java.net.*;
@@ -86,7 +94,8 @@ import java.util.Iterator;
  * state, for example partial blocksMap etc.
  **********************************************************/
 public class NameNode implements ClientProtocol, DatanodeProtocol,
-                                 NamenodeProtocol, FSConstants {
+                                 NamenodeProtocol, FSConstants,
+                                 RefreshAuthorizationPolicyProtocol {
   public long getProtocolVersion(String protocol, 
                                  long clientVersion) throws IOException { 
     if (protocol.equals(ClientProtocol.class.getName())) {
@@ -95,6 +104,8 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
       return DatanodeProtocol.versionID;
     } else if (protocol.equals(NamenodeProtocol.class.getName())){
       return NamenodeProtocol.versionID;
+    } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
+      return RefreshAuthorizationPolicyProtocol.versionID;
     } else {
       throw new IOException("Unknown protocol to name node: " + protocol);
     }
@@ -116,7 +127,9 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
   private Thread emptier;
   /** only used for testing purposes  */
   private boolean stopRequested = false;
-
+  /** Is service level authorization enabled? */
+  private boolean serviceAuthEnabled = false;
+  
   /** Format a new filesystem.  Destroys any filesystem that may already
    * exist at this location.  **/
   public static void format(Configuration conf) throws IOException {
@@ -155,6 +168,19 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
   private void initialize(Configuration conf) throws IOException {
     InetSocketAddress socAddr = NameNode.getAddress(conf);
     int handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
+    
+    // set service-level authorization security policy
+    if (serviceAuthEnabled = 
+          conf.getBoolean(
+            ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
+      PolicyProvider policyProvider = 
+        (PolicyProvider)(ReflectionUtils.newInstance(
+            conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
+                HDFSPolicyProvider.class, PolicyProvider.class), 
+            conf));
+      SecurityUtil.setPolicy(new ConfiguredPolicy(conf, policyProvider));
+    }
+
     // create rpc server 
     this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(),
                                 handlerCount, false, conf);
@@ -841,6 +867,15 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
     return false;
   }
 
+  @Override
+  public void refreshServiceAcl() throws IOException {
+    if (!serviceAuthEnabled) {
+      throw new AuthorizationException("Service Level Authorization not enabled!");
+    }
+
+    SecurityUtil.getPolicy().refresh();
+  }
+
   private static void printUsage() {
     System.err.println(
       "Usage: java NameNode [" +

+ 61 - 1
src/hdfs/org/apache/hadoop/hdfs/tools/DFSAdmin.java

@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import javax.security.auth.login.LoginException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem.DiskStatus;
@@ -29,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
@@ -36,6 +39,9 @@ import org.apache.hadoop.fs.shell.Command;
 import org.apache.hadoop.fs.shell.CommandFormat;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -387,6 +393,7 @@ public class DFSAdmin extends FsShell {
       "\t[" + ClearQuotaCommand.USAGE +"]\n" +
       "\t[" + SetSpaceQuotaCommand.USAGE + "]\n" +
       "\t[" + ClearSpaceQuotaCommand.USAGE +"]\n" +
+      "\t[-refreshServiceAcl]\n" +
       "\t[-help [cmd]]\n";
 
     String report ="-report: \tReports basic filesystem information and statistics.\n";
@@ -429,6 +436,9 @@ public class DFSAdmin extends FsShell {
       "\t\t\t3. Blocks currrently being replicated\n" +
       "\t\t\t4. Blocks waiting to be deleted\n";
 
+    String refreshServiceAcl = "-refreshServiceAcl: Reload the service-level authorization policy file\n" +
+      "\t\tNamenode will reload the authorization policy file.\n";
+    
     String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
       "\t\tis specified.\n";
 
@@ -452,6 +462,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(SetSpaceQuotaCommand.DESCRIPTION);
     } else if (ClearSpaceQuotaCommand.matches(cmd)) {
       System.out.println(ClearSpaceQuotaCommand.DESCRIPTION);
+    } else if ("refresh-auth-policy".equals(cmd)) {
+      System.out.println(refreshServiceAcl);
     } else if ("help".equals(cmd)) {
       System.out.println(help);
     } else {
@@ -466,6 +478,7 @@ public class DFSAdmin extends FsShell {
       System.out.println(ClearQuotaCommand.DESCRIPTION);
       System.out.println(SetSpaceQuotaCommand.DESCRIPTION);
       System.out.println(ClearSpaceQuotaCommand.DESCRIPTION);
+      System.out.println(refreshServiceAcl);
       System.out.println(help);
       System.out.println();
       ToolRunner.printGenericCommandUsage(System.out);
@@ -549,6 +562,42 @@ public class DFSAdmin extends FsShell {
     return 0;
   }
 
+  private static UnixUserGroupInformation getUGI(Configuration conf) 
+  throws IOException {
+    UnixUserGroupInformation ugi = null;
+    try {
+      ugi = UnixUserGroupInformation.login(conf, true);
+    } catch (LoginException e) {
+      throw (IOException)(new IOException(
+          "Failed to get the current user's information.").initCause(e));
+    }
+    return ugi;
+  }
+
+  /**
+   * Refresh the authorization policy on the {@link NameNode}.
+   * @return exitcode 0 on success, non-zero on failure
+   * @throws IOException
+   */
+  public int refreshServiceAcl() throws IOException {
+    // Get the current configuration
+    Configuration conf = getConf();
+    
+    // Create the client
+    RefreshAuthorizationPolicyProtocol refreshProtocol = 
+      (RefreshAuthorizationPolicyProtocol) 
+      RPC.getProxy(RefreshAuthorizationPolicyProtocol.class, 
+                   RefreshAuthorizationPolicyProtocol.versionID, 
+                   NameNode.getAddress(conf), getUGI(conf), conf,
+                   NetUtils.getSocketFactory(conf, 
+                                             RefreshAuthorizationPolicyProtocol.class));
+    
+    // Refresh the authorization policy in-effect
+    refreshProtocol.refreshServiceAcl();
+    
+    return 0;
+  }
+  
   /**
    * Displays format of commands.
    * @param cmd The command that is being executed.
@@ -571,7 +620,7 @@ public class DFSAdmin extends FsShell {
                          + " [-upgradeProgress status | details | force]");
     } else if ("-metasave".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
-                         + " [-metasave filename]");
+          + " [-metasave filename]");
     } else if (SetQuotaCommand.matches(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [" + SetQuotaCommand.USAGE+"]");
@@ -584,6 +633,9 @@ public class DFSAdmin extends FsShell {
     } else if (ClearSpaceQuotaCommand.matches(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " ["+ClearSpaceQuotaCommand.USAGE+"]");
+    } else if ("-refreshServiceAcl".equals(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+                         + " [-refreshServiceAcl]");
     } else {
       System.err.println("Usage: java DFSAdmin");
       System.err.println("           [-report]");
@@ -592,6 +644,7 @@ public class DFSAdmin extends FsShell {
       System.err.println("           [-finalizeUpgrade]");
       System.err.println("           [-upgradeProgress status | details | force]");
       System.err.println("           [-metasave filename]");
+      System.err.println("           [-refreshServiceAcl]");
       System.err.println("           ["+SetQuotaCommand.USAGE+"]");
       System.err.println("           ["+ClearQuotaCommand.USAGE+"]");
       System.err.println("           ["+SetSpaceQuotaCommand.USAGE+"]");
@@ -652,6 +705,11 @@ public class DFSAdmin extends FsShell {
         printUsage(cmd);
         return exitCode;
       }
+    } else if ("-refreshServiceAcl".equals(cmd)) {
+      if (argv.length != 1) {
+        printUsage(cmd);
+        return exitCode;
+      }
     }
     
     // initialize DFSAdmin
@@ -688,6 +746,8 @@ public class DFSAdmin extends FsShell {
         exitCode = new ClearSpaceQuotaCommand(argv, i, fs).runAll();
       } else if (SetSpaceQuotaCommand.matches(cmd)) {
         exitCode = new SetSpaceQuotaCommand(argv, i, fs).runAll();
+      } else if ("-refreshServiceAcl".equals(cmd)) {
+        exitCode = refreshServiceAcl();
       } else if ("-help".equals(cmd)) {
         if (i < argv.length) {
           printHelp(argv[i]);

+ 31 - 2
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -66,7 +66,13 @@ import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ConfiguredPolicy;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -78,7 +84,7 @@ import org.apache.hadoop.util.VersionInfo;
  *
  *******************************************************/
 public class JobTracker implements MRConstants, InterTrackerProtocol,
-    JobSubmissionProtocol, TaskTrackerManager {
+    JobSubmissionProtocol, TaskTrackerManager, RefreshAuthorizationPolicyProtocol {
 
   static long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000;
   static long RETIRE_JOB_INTERVAL;
@@ -186,10 +192,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       return InterTrackerProtocol.versionID;
     } else if (protocol.equals(JobSubmissionProtocol.class.getName())){
       return JobSubmissionProtocol.versionID;
+    } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
+      return RefreshAuthorizationPolicyProtocol.versionID;
     } else {
       throw new IOException("Unknown protocol to job tracker: " + protocol);
     }
   }
+  
   /**
    * A thread to timeout tasks that have been assigned to task trackers,
    * but that haven't reported back yet.
@@ -1335,10 +1344,22 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
           JobQueueTaskScheduler.class, TaskScheduler.class);
     taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
                                            
-    // Set ports, start RPC servers, etc.
+    // Set ports, start RPC servers, setup security policy etc.
     InetSocketAddress addr = getAddress(conf);
     this.localMachine = addr.getHostName();
     this.port = addr.getPort();
+    
+    // Set service-level authorization security policy
+    if (conf.getBoolean(
+          ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
+      PolicyProvider policyProvider = 
+        (PolicyProvider)(ReflectionUtils.newInstance(
+            conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
+                MapReducePolicyProvider.class, PolicyProvider.class), 
+            conf));
+      SecurityUtil.setPolicy(new ConfiguredPolicy(conf, policyProvider));
+    }
+    
     int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);
     this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), handlerCount, false, conf);
     if (LOG.isDebugEnabled()) {
@@ -3163,4 +3184,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     return conf.getInt("mapred.jobtracker.maxtasks.per.job", -1);
   }
   
+  @Override
+  public void refreshServiceAcl() throws IOException {
+    if (!conf.getBoolean(
+            ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
+      throw new AuthorizationException("Service Level Authorization not enabled!");
+    }
+    SecurityUtil.getPolicy().refresh();
+  }
 }

+ 45 - 0
src/mapred/org/apache/hadoop/mapred/MapReducePolicyProvider.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.security.authorize.Service;
+
+/**
+ * {@link PolicyProvider} for Map-Reduce protocols.
+ */
+public class MapReducePolicyProvider extends PolicyProvider {
+  private static final Service[] mapReduceServices = 
+    new Service[] {
+      new Service("security.inter.tracker.protocol.acl", 
+                  InterTrackerProtocol.class),
+      new Service("security.job.submission.protocol.acl",
+                  JobSubmissionProtocol.class),
+      new Service("security.task.umbilical.protocol.acl", 
+                  TaskUmbilicalProtocol.class),
+      new Service("security.refresh.policy.protocol.acl", 
+                  RefreshAuthorizationPolicyProtocol.class),
+  };
+  
+  @Override
+  public Service[] getServices() {
+    return mapReduceServices;
+  }
+
+}

+ 26 - 74
src/mapred/org/apache/hadoop/mapred/QueueManager.java

@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.SecurityUtil.AccessControlList;
 
 /**
  * Class that exposes information about queues maintained by the Hadoop
@@ -51,12 +52,10 @@ class QueueManager {
   // Prefix in configuration for queue related keys
   private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX 
                                                         = "mapred.queue.";
-  // Indicates an ACL string that represents access to all users
-  private static final String ALL_ALLOWED_ACL_VALUE = "*";
   // Configured queues
   private Set<String> queueNames;
   // Map of a queue and ACL property name with an ACL
-  private HashMap<String, ACL> aclsMap;
+  private HashMap<String, AccessControlList> aclsMap;
   // Map of a queue name to any generic object that represents 
   // scheduler information 
   private HashMap<String, Object> schedulerInfoObjects;
@@ -91,69 +90,6 @@ class QueueManager {
     }
   }
   
-  /**
-   * Class representing an access control that is configured.
-   */
-  private static class ACL {
-    
-    // Set of users who are granted access.
-    private Set<String> users;
-    // Set of groups which are granted access
-    private Set<String> groups;
-    // Whether all users are granted access.
-    private boolean allAllowed;
-    
-    /**
-     * Construct a new ACL from a String representation of the same.
-     * 
-     * The String is a a comma separated list of users and groups.
-     * The user list comes first and is separated by a space followed 
-     * by the group list. For e.g. "user1,user2 group1,group2"
-     * 
-     * @param aclString String representation of the ACL
-     */
-    ACL (String aclString) {
-      users = new TreeSet<String>();
-      groups = new TreeSet<String>();
-      if (aclString.equals(ALL_ALLOWED_ACL_VALUE)) {
-        allAllowed = true;
-      } else {
-        String[] userGroupStrings = aclString.split(" ", 2);
-        
-        if (userGroupStrings.length >= 1) {
-          String[] usersStr = userGroupStrings[0].split(",");
-          if (usersStr.length >= 1) {
-            addToSet(users, usersStr);
-          }
-        }
-        
-        if (userGroupStrings.length == 2) {
-          String[] groupsStr = userGroupStrings[1].split(",");
-          if (groupsStr.length >= 1) {
-            addToSet(groups, groupsStr);
-          }
-        }
-      }
-    }
-    
-    boolean allUsersAllowed() {
-      return allAllowed;
-    }
-    
-    boolean isUserAllowed(String user) {
-      return users.contains(user);
-    }
-    
-    boolean isAnyGroupAllowed(String[] otherGroups) {
-      for (String g : otherGroups) {
-        if (groups.contains(g)) {
-          return true;
-        }
-      }
-      return false;
-    }
-  }
-  
   /**
    * Construct a new QueueManager using configuration specified in the passed
    * in {@link org.apache.hadoop.conf.Configuration} object.
@@ -162,7 +98,7 @@ class QueueManager {
    */
   public QueueManager(Configuration conf) {
     queueNames = new TreeSet<String>();
-    aclsMap = new HashMap<String, ACL>();
+    aclsMap = new HashMap<String, AccessControlList>();
     schedulerInfoObjects = new HashMap<String, Object>();
     initialize(conf);
   }
@@ -237,13 +173,30 @@ class QueueManager {
       }
     }
     
-    ACL acl = aclsMap.get(toFullPropertyName(queueName, oper.getAclName()));
+    AccessControlList acl = aclsMap.get(toFullPropertyName(queueName, oper.getAclName()));
     if (acl == null) {
       return false;
     }
-    return ((acl.allUsersAllowed()) ||
-              (acl.isUserAllowed(ugi.getUserName())) ||
-              (acl.isAnyGroupAllowed(ugi.getGroupNames())));    
+    
+    // Check the ACL list
+    boolean allowed = acl.allAllowed();
+    if (!allowed) {
+      // Check the allowed users list
+      if (acl.getUsers().contains(ugi.getUserName())) {
+        allowed = true;
+      } else {
+        // Check the allowed groups list
+        Set<String> allowedGroups = acl.getGroups();
+        for (String group : ugi.getGroupNames()) {
+          if (allowedGroups.contains(group)) {
+            allowed = true;
+            break;
+          }
+        }
+      }
+    }
+    
+    return allowed;    
   }
   
   /**
@@ -302,7 +255,7 @@ class QueueManager {
       for (QueueOperation oper : QueueOperation.values()) {
         String key = toFullPropertyName(queue, oper.getAclName());
         String aclString = conf.get(key, "*");
-        aclsMap.put(key, new ACL(aclString));
+        aclsMap.put(key, new AccessControlList(aclString));
       }
     }
   }
@@ -317,8 +270,7 @@ class QueueManager {
       set.add(elem);
     }
   }
-
-
+  
   synchronized JobQueueInfo[] getJobQueueInfos() {
     ArrayList<JobQueueInfo> queueInfoList = new ArrayList<JobQueueInfo>();
     for(String queue : queueNames) {

+ 15 - 0
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -75,6 +75,10 @@ import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.authorize.ConfiguredPolicy;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
@@ -480,6 +484,17 @@ public class TaskTracker
     
     this.jvmManager = new JvmManager(this);
 
+    // Set service-level authorization security policy
+    if (this.fConf.getBoolean(
+          ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
+      PolicyProvider policyProvider = 
+        (PolicyProvider)(ReflectionUtils.newInstance(
+            this.fConf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
+                MapReducePolicyProvider.class, PolicyProvider.class), 
+            this.fConf));
+      SecurityUtil.setPolicy(new ConfiguredPolicy(this.fConf, policyProvider));
+    }
+    
     // RPC initialization
     int max = maxCurrentMapTasks > maxCurrentReduceTasks ? 
                        maxCurrentMapTasks : maxCurrentReduceTasks;

+ 193 - 0
src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java

@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.tools;
+
+import java.io.IOException;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Administrative access to Hadoop Map-Reduce.
+ *
+ * Currently it only provides the ability to connect to the {@link JobTracker}
+ * and refresh the service-level authorization policy.
+ */
+public class MRAdmin extends Configured implements Tool {
+
+  public MRAdmin() {
+    super();
+  }
+
+  public MRAdmin(Configuration conf) {
+    super(conf);
+  }
+
+  private static void printHelp(String cmd) {
+    String summary = "hadoop mradmin is the command to execute Map-Reduce administrative commands.\n" +
+    "The full syntax is: \n\n" +
+    "hadoop mradmin [-refreshServiceAcl] [-help [cmd]]\n"; 
+
+  String refreshServiceAcl = "-refreshServiceAcl: Reload the service-level authorization policy file\n" +
+    "\t\tJobtracker will reload the authorization policy file.\n";
+  
+  String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
+    "\t\tis specified.\n";
+
+  if ("refresh-auth-policy".equals(cmd)) {
+    System.out.println(refreshServiceAcl);
+  } else if ("help".equals(cmd)) {
+    System.out.println(help);
+  } else {
+    System.out.println(summary);
+    System.out.println(refreshServiceAcl);
+    System.out.println(help);
+    System.out.println();
+    ToolRunner.printGenericCommandUsage(System.out);
+  }
+
+}
+  
+  /**
+   * Displays format of commands.
+   * @param cmd The command that is being executed.
+   */
+  private static void printUsage(String cmd) {
+    if ("-refreshServiceAcl".equals(cmd)) {
+      System.err.println("Usage: java MRAdmin"
+                         + " [-refreshServiceAcl]");
+    } else {
+      System.err.println("Usage: java MRAdmin");
+      System.err.println("           [-refreshServiceAcl]");
+      System.err.println("           [-help [cmd]]");
+      System.err.println();
+      ToolRunner.printGenericCommandUsage(System.err);
+    }
+  }
+  
+  private static UnixUserGroupInformation getUGI(Configuration conf) 
+  throws IOException {
+    UnixUserGroupInformation ugi = null;
+    try {
+      ugi = UnixUserGroupInformation.login(conf, true);
+    } catch (LoginException e) {
+      throw (IOException)(new IOException(
+          "Failed to get the current user's information.").initCause(e));
+    }
+    return ugi;
+  }
+
+  private int refreshAuthorizationPolicy() throws IOException {
+    // Get the current configuration
+    Configuration conf = getConf();
+    
+    // Create the client
+    RefreshAuthorizationPolicyProtocol refreshProtocol = 
+      (RefreshAuthorizationPolicyProtocol) 
+      RPC.getProxy(RefreshAuthorizationPolicyProtocol.class, 
+                   RefreshAuthorizationPolicyProtocol.versionID, 
+                   JobTracker.getAddress(conf), getUGI(conf), conf,
+                   NetUtils.getSocketFactory(conf, 
+                                             RefreshAuthorizationPolicyProtocol.class));
+    
+    // Refresh the authorization policy in-effect
+    refreshProtocol.refreshServiceAcl();
+    
+    return 0;
+  }
+  
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 1) {
+      printUsage("");
+      return -1;
+    }
+
+    int exitCode = -1;
+    int i = 0;
+    String cmd = args[i++];
+
+    //
+    // verify that we have enough command line parameters
+    //
+    if ("-refreshServiceAcl".equals(cmd)) {
+      if (args.length != 1) {
+        printUsage(cmd);
+        return exitCode;
+      }
+    }
+    
+    exitCode = 0;
+    try {
+      if ("-refreshServiceAcl".equals(cmd)) {
+        exitCode = refreshAuthorizationPolicy();
+      } else if ("-help".equals(cmd)) {
+        if (i < args.length) {
+          printUsage(args[i]);
+        } else {
+          printHelp("");
+        }
+      } else {
+        exitCode = -1;
+        System.err.println(cmd.substring(1) + ": Unknown command");
+        printUsage("");
+      }
+
+    } catch (IllegalArgumentException arge) {
+      exitCode = -1;
+      System.err.println(cmd.substring(1) + ": " + arge.getLocalizedMessage());
+      printUsage(cmd);
+    } catch (RemoteException e) {
+      //
+      // This is a error returned by hadoop server. Print
+      // out the first line of the error mesage, ignore the stack trace.
+      exitCode = -1;
+      try {
+        String[] content;
+        content = e.getLocalizedMessage().split("\n");
+        System.err.println(cmd.substring(1) + ": "
+                           + content[0]);
+      } catch (Exception ex) {
+        System.err.println(cmd.substring(1) + ": "
+                           + ex.getLocalizedMessage());
+      }
+    } catch (Exception e) {
+      exitCode = -1;
+      System.err.println(cmd.substring(1) + ": "
+                         + e.getLocalizedMessage());
+    } 
+    return exitCode;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int result = ToolRunner.run(new MRAdmin(), args);
+    System.exit(result);
+  }
+
+}

+ 97 - 0
src/test/hadoop-policy.xml

@@ -0,0 +1,97 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+  <property>
+    <name>security.client.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for ClientProtocol, which is used by user code 
+    via the DistributedFileSystem. 
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.client.datanode.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for ClientDatanodeProtocol, the client-to-datanode protocol 
+    for block recovery.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.datanode.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for DatanodeProtocol, which is used by datanodes to 
+    communicate with the namenode.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.inter.datanode.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for InterDatanodeProtocol, the inter-datanode protocol
+    for updating generation timestamp.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.namenode.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for NamenodeProtocol, the protocol used by the secondary
+    namenode to communicate with the namenode.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.inter.tracker.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for InterTrackerProtocol, used by the tasktrackers to 
+    communicate with the jobtracker.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.job.submission.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for JobSubmissionProtocol, used by job clients to 
+    communciate with the jobtracker for job submission, querying job status etc.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.task.umbilical.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for TaskUmbilicalProtocol, used by the map and reduce 
+    tasks to communicate with the parent tasktracker. 
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.refresh.policy.protocol.acl</name>
+    <value>${user.name}</value>
+    <description>ACL for RefreshAuthorizationPolicyProtocol, used by the 
+    dfsadmin and mradmin commands to refresh the security policy in-effect. 
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+</configuration>

+ 57 - 30
src/test/org/apache/hadoop/cli/TestCLI.java

@@ -38,6 +38,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.security.authorize.HadoopPolicyProvider;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.util.StringUtils;
 import org.xml.sax.Attributes;
 import org.xml.sax.SAXException;
@@ -73,9 +78,11 @@ public class TestCLI extends TestCase {
   static ComparatorData comparatorData = null;
   
   private static Configuration conf = null;
-  private static MiniDFSCluster cluster = null;
+  private static MiniDFSCluster dfsCluster = null;
   private static DistributedFileSystem dfs = null;
+  private static MiniMRCluster mrCluster = null;
   private static String namenode = null;
+  private static String jobtracker = null;
   private static String clitestDataDir = null;
   private static String username = null;
   
@@ -109,19 +116,31 @@ public class TestCLI extends TestCase {
     // Start up the mini dfs cluster
     boolean success = false;
     conf = new Configuration();
-    cluster = new MiniDFSCluster(conf, 1, true, null);
+    conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
+                  HadoopPolicyProvider.class, PolicyProvider.class);
+    conf.setBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, 
+                    true);
+
+    dfsCluster = new MiniDFSCluster(conf, 1, true, null);
     namenode = conf.get("fs.default.name", "file:///");
     clitestDataDir = new File(TEST_CACHE_DATA_DIR).
       toURI().toString().replace(' ', '+');
     username = System.getProperty("user.name");
 
-    FileSystem fs = cluster.getFileSystem();
+    FileSystem fs = dfsCluster.getFileSystem();
     assertTrue("Not a HDFS: "+fs.getUri(),
                fs instanceof DistributedFileSystem);
     dfs = (DistributedFileSystem) fs;
+    
+     // Start up mini mr cluster
+    JobConf mrConf = new JobConf(conf);
+    mrCluster = new MiniMRCluster(1, dfsCluster.getFileSystem().getUri().toString(), 1, 
+                           null, null, mrConf);
+    jobtracker = mrCluster.createJobConf().get("mapred.job.tracker", "local");
+
     success = true;
 
-    assertTrue("Error setting up Mini DFS cluster", success);
+    assertTrue("Error setting up Mini DFS & MR clusters", success);
   }
   
   /**
@@ -129,12 +148,14 @@ public class TestCLI extends TestCase {
    */
   public void tearDown() throws Exception {
     boolean success = false;
+    mrCluster.shutdown();
+    
     dfs.close();
-    cluster.shutdown();
+    dfsCluster.shutdown();
     success = true;
     Thread.sleep(2000);
 
-    assertTrue("Error tearing down Mini DFS cluster", success);
+    assertTrue("Error tearing down Mini DFS & MR clusters", success);
     
     displayResults();
   }
@@ -147,6 +168,7 @@ public class TestCLI extends TestCase {
   private String expandCommand(final String cmd) {
     String expCmd = cmd;
     expCmd = expCmd.replaceAll("NAMENODE", namenode);
+    expCmd = expCmd.replaceAll("JOBTRACKER", jobtracker);
     expCmd = expCmd.replaceAll("CLITEST_DATA", clitestDataDir);
     expCmd = expCmd.replaceAll("USERNAME", username);
     
@@ -173,30 +195,30 @@ public class TestCLI extends TestCase {
         LOG.info("");
 
         ArrayList<TestCmd> testCommands = td.getTestCommands();
-        for (int j = 0; j < testCommands.size(); j++) {
+        for (TestCmd cmd : testCommands) {
           LOG.info("              Test Commands: [" + 
-              expandCommand(testCommands.get(j).getCmd()) + "]");
+                   expandCommand(cmd.getCmd()) + "]");
         }
 
         LOG.info("");
         ArrayList<TestCmd> cleanupCommands = td.getCleanupCommands();
-        for (int j = 0; j < cleanupCommands.size(); j++) {
+        for (TestCmd cmd : cleanupCommands) {
           LOG.info("           Cleanup Commands: [" +
-              expandCommand(cleanupCommands.get(j).getCmd()) + "]");
+                   expandCommand(cmd.getCmd()) + "]");
         }
 
         LOG.info("");
         ArrayList<ComparatorData> compdata = td.getComparatorData();
-        for (int j = 0; j < compdata.size(); j++) {
-          boolean resultBoolean = compdata.get(j).getTestResult();
+        for (ComparatorData cd : compdata) {
+          boolean resultBoolean = cd.getTestResult();
           LOG.info("                 Comparator: [" + 
-              compdata.get(j).getComparatorType() + "]");
+                   cd.getComparatorType() + "]");
           LOG.info("         Comparision result:   [" + 
-              (resultBoolean ? "pass" : "fail") + "]");
+                   (resultBoolean ? "pass" : "fail") + "]");
           LOG.info("            Expected output:   [" + 
-              compdata.get(j).getExpectedOutput() + "]");
+                   cd.getExpectedOutput() + "]");
           LOG.info("              Actual output:   [" + 
-              compdata.get(j).getActualOutput() + "]");
+                   cd.getActualOutput() + "]");
         }
         LOG.info("");
       }
@@ -319,9 +341,9 @@ public class TestCLI extends TestCase {
    
       // Execute the test commands
       ArrayList<TestCmd> testCommands = testdata.getTestCommands();
-      for (int i = 0; i < testCommands.size(); i++) {
+      for (TestCmd cmd : testCommands) {
       try {
-        CommandExecutor.executeCommand(testCommands.get(i), namenode);
+        CommandExecutor.executeCommand(cmd, namenode, jobtracker);
       } catch (Exception e) {
         fail(StringUtils.stringifyException(e));
       }
@@ -330,28 +352,27 @@ public class TestCLI extends TestCase {
       boolean overallTCResult = true;
       // Run comparators
       ArrayList<ComparatorData> compdata = testdata.getComparatorData();
-      for (int i = 0; i < compdata.size(); i++) {
-        final String comptype = compdata.get(i).getComparatorType();
+      for (ComparatorData cd : compdata) {
+        final String comptype = cd.getComparatorType();
         
         boolean compareOutput = false;
         
         if (! comptype.equalsIgnoreCase("none")) {
-          compareOutput = compareTestOutput(compdata.get(i));
+          compareOutput = compareTestOutput(cd);
           overallTCResult &= compareOutput;
         }
         
-        compdata.get(i).setExitCode(CommandExecutor.getLastExitCode());
-        compdata.get(i).setActualOutput(
-          CommandExecutor.getLastCommandOutput());
-        compdata.get(i).setTestResult(compareOutput);
+        cd.setExitCode(CommandExecutor.getLastExitCode());
+        cd.setActualOutput(CommandExecutor.getLastCommandOutput());
+        cd.setTestResult(compareOutput);
       }
       testdata.setTestResult(overallTCResult);
       
       // Execute the cleanup commands
       ArrayList<TestCmd> cleanupCommands = testdata.getCleanupCommands();
-      for (int i = 0; i < cleanupCommands.size(); i++) {
+      for (TestCmd cmd : cleanupCommands) {
       try { 
-        CommandExecutor.executeCommand(cleanupCommands.get(i), namenode);
+        CommandExecutor.executeCommand(cmd, namenode, jobtracker);
       } catch (Exception e) {
         fail(StringUtils.stringifyException(e));
       }
@@ -410,12 +431,18 @@ public class TestCLI extends TestCase {
         } else if (cleanupCommands != null) {
           cleanupCommands.add(new TestCmd(charString, CommandType.FS));
         }
-      } else if (qName.equals("admin-command")) {
+      } else if (qName.equals("dfs-admin-command")) {
           if (testCommands != null) {
-              testCommands.add(new TestCmd(charString,CommandType.ADMIN));
+              testCommands.add(new TestCmd(charString,CommandType.DFSADMIN));
             } else if (cleanupCommands != null) {
-              cleanupCommands.add(new TestCmd(charString, CommandType.ADMIN));
+              cleanupCommands.add(new TestCmd(charString, CommandType.DFSADMIN));
             } 
+      } else if (qName.equals("mr-admin-command")) {
+        if (testCommands != null) {
+            testCommands.add(new TestCmd(charString,CommandType.MRADMIN));
+          } else if (cleanupCommands != null) {
+            cleanupCommands.add(new TestCmd(charString, CommandType.MRADMIN));
+          } 
       } else if (qName.equals("comparators")) {
         td.setComparatorData(testComparators);
       } else if (qName.equals("comparator")) {

+ 76 - 7
src/test/org/apache/hadoop/cli/testConf.xml

@@ -3171,10 +3171,10 @@
       <test-commands>
         <command>-fs NAMENODE -mkdir /test </command>
         <command>-fs NAMENODE -touchz /test/file1 </command>
-        <admin-command>-fs NAMENODE -setQuota 1 /test/file1 </admin-command>
+        <dfs-admin-command>-fs NAMENODE -setQuota 1 /test/file1 </dfs-admin-command>
       </test-commands>
       <cleanup-commands>
-      	<admin-command>-fs NAMENODE -setQuota 5 /test </admin-command>
+      	<dfs-admin-command>-fs NAMENODE -setQuota 5 /test </dfs-admin-command>
         <!-- Same directory will be used in the next test -->
       </cleanup-commands>
       <comparators>
@@ -3188,7 +3188,7 @@
     <test> <!--Tested -->
       <description>verifying error messages for quota commands - setting quota on non-existing file</description>
       <test-commands>
-        <admin-command>-fs NAMENODE -setSpaceQuota 1g /test1 </admin-command>
+        <dfs-admin-command>-fs NAMENODE -setSpaceQuota 1g /test1 </dfs-admin-command>
       </test-commands>
       <cleanup-commands>
              <!-- Same directory will be used in the next test -->   
@@ -3204,7 +3204,7 @@
     <test> <!--Tested -->
       <description>verifying error messages for quota commands - exceeding quota</description>
       <test-commands>
-        <admin-command>-fs NAMENODE -setQuota 3 /test </admin-command>
+        <dfs-admin-command>-fs NAMENODE -setQuota 3 /test </dfs-admin-command>
         <command>-fs NAMENODE -touchz /test/file0 </command>
         <command>-fs NAMENODE -mkdir /test/test1 </command>
       </test-commands>
@@ -3222,7 +3222,7 @@
     <test> <!--Tested -->
       <description>verifying error messages for quota commands - setting not valid quota</description>
       <test-commands>
-        <admin-command>-fs NAMENODE -setQuota 0 /test </admin-command>
+        <dfs-admin-command>-fs NAMENODE -setQuota 0 /test </dfs-admin-command>
       </test-commands>
       <cleanup-commands>
              <!-- Same directory will be used in the next test -->   
@@ -3238,7 +3238,7 @@
     <test> <!--Tested -->
       <description>verifying error messages for quota commands - setting not valid space quota</description>
       <test-commands>
-        <admin-command>-fs NAMENODE -setSpaceQuota a5 /test </admin-command>
+        <dfs-admin-command>-fs NAMENODE -setSpaceQuota a5 /test </dfs-admin-command>
       </test-commands>
       <cleanup-commands>
              <!-- Same directory will be used in the next test -->   
@@ -3254,7 +3254,7 @@
     <test> <!--Tested -->
       <description>verifying error messages for quota commands - clearQuota on non existing file</description>
       <test-commands>
-        <admin-command>-fs NAMENODE -clrQuota /test1 </admin-command>
+        <dfs-admin-command>-fs NAMENODE -clrQuota /test1 </dfs-admin-command>
       </test-commands>
       <cleanup-commands>
       	<command>-fs NAMENODE -rmr /test </command>
@@ -3266,5 +3266,74 @@
         </comparator>
       </comparators>
     </test>
+    
+    <test> <!--Tested -->
+      <description>refreshServiceAcl: refreshing security authorization policy for namenode</description>
+      <test-commands>
+        <dfs-admin-command>-fs NAMENODE -refreshServiceAcl </dfs-admin-command>
+      </test-commands>
+      <cleanup-commands>
+        <!-- No cleanup -->
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>ExactComparator</type>
+          <expected-output></expected-output>
+        </comparator>
+      </comparators>
+    </test>
+    
+    <test> <!--Tested -->
+      <description>refreshServiceAcl: verifying error message while refreshing security authorization policy for namenode</description>
+      <test-commands>
+        <!-- hadoop-policy.xml for tests has 
+             security.refresh.policy.protocol.acl = ${user.name} -->
+        <dfs-admin-command>-fs NAMENODE -Dhadoop.job.ugi=blah,blah -refreshServiceAcl </dfs-admin-command>
+      </test-commands>
+      <cleanup-commands>
+        <!-- No cleanup -->
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>SubstringComparator</type>
+          <expected-output>access denied</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+    
+    <test> <!--Tested -->
+      <description>refreshServiceAcl: refreshing security authorization policy for jobtracker</description>
+      <test-commands>
+        <mr-admin-command>-jt JOBTRACKER -refreshServiceAcl </mr-admin-command>
+      </test-commands>
+      <cleanup-commands>
+        <!-- No cleanup -->
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>ExactComparator</type>
+          <expected-output></expected-output>
+        </comparator>
+      </comparators>
+    </test>
+    
+    <test> <!--Tested -->
+      <description>refreshServiceAcl: verifying error message while refreshing security authorization policy for jobtracker</description>
+      <test-commands>
+        <!-- hadoop-policy.xml for tests has 
+             security.refresh.policy.protocol.acl = ${user.name} -->
+        <mr-admin-command>-jt JOBTRACKER -Dhadoop.job.ugi=blah,blah -refreshServiceAcl </mr-admin-command>
+      </test-commands>
+      <cleanup-commands>
+        <!-- No cleanup -->
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>SubstringComparator</type>
+          <expected-output>access denied</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+    
   </tests>
 </configuration>

+ 3 - 3
src/test/org/apache/hadoop/cli/util/CLITestData.java

@@ -37,13 +37,13 @@ public class CLITestData {
 
   /**
    * Class to define Test Command. includes type of the command and command itself
-   * Valid types FS and Admin (for dfsadmin commands)
-   *
+   * Valid types FS, DFSADMIN and MRADMIN.
    */
   static public class TestCmd {
     public enum CommandType {
         FS,
-        ADMIN
+        DFSADMIN,
+        MRADMIN
     }
     private final CommandType type;
     private final String cmd;

+ 45 - 9
src/test/org/apache/hadoop/cli/util/CommandExecutor.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.cli.util.CLITestData.TestCmd;
 import org.apache.hadoop.cli.util.CLITestData.TestCmd.CommandType;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.mapred.tools.MRAdmin;
 import org.apache.hadoop.util.ToolRunner;
 
 /**
@@ -40,8 +41,8 @@ public class CommandExecutor {
   private static Exception lastException = null;
   private static String cmdExecuted = null;
   
-  private static String[] getFSCommandAsArgs(final String cmd, 
-		  final String namenode) {
+  private static String[] getCommandAsArgs(final String cmd, final String masterKey,
+		                                       final String master) {
     StringTokenizer tokenizer = new StringTokenizer(cmd, " ");
     String[] args = new String[tokenizer.countTokens()];
     
@@ -49,7 +50,7 @@ public class CommandExecutor {
     while (tokenizer.hasMoreTokens()) {
       args[i] = tokenizer.nextToken();
 
-      args[i] = args[i].replaceAll("NAMENODE", namenode);
+      args[i] = args[i].replaceAll(masterKey, master);
       args[i] = args[i].replaceAll("CLITEST_DATA", 
         new File(TestCLI.TEST_CACHE_DATA_DIR).
         toURI().toString().replace(' ', '+'));
@@ -61,12 +62,16 @@ public class CommandExecutor {
     return args;
   }
   
-  public static int executeCommand(final TestCmd cmd, final String namenode) throws Exception {
+  public static int executeCommand(final TestCmd cmd, 
+                                   final String namenode, final String jobtracker) 
+  throws Exception {
     switch(cmd.getType()) {
-    case ADMIN:
-      return CommandExecutor.executeDFSAdminCommand(cmd.getCmd(),namenode);
+    case DFSADMIN:
+      return CommandExecutor.executeDFSAdminCommand(cmd.getCmd(), namenode);
+    case MRADMIN:
+      return CommandExecutor.executeMRAdminCommand(cmd.getCmd(), jobtracker);
     case FS:
-      return CommandExecutor.executeFSCommand(cmd.getCmd(),namenode);
+      return CommandExecutor.executeFSCommand(cmd.getCmd(), namenode);
     default:
       throw new Exception("Unknow type of Test command:"+ cmd.getType()); 
     }
@@ -83,7 +88,7 @@ public class CommandExecutor {
       System.setErr(new PrintStream(bao));
       
       DFSAdmin shell = new DFSAdmin();
-      String[] args = getFSCommandAsArgs(cmd, namenode);
+      String[] args = getCommandAsArgs(cmd, "NAMENODE", namenode);
       cmdExecuted = cmd;
      
       try {
@@ -102,6 +107,37 @@ public class CommandExecutor {
       return exitCode;
   }
   
+  public static int executeMRAdminCommand(final String cmd, 
+                                          final String jobtracker) {
+    exitCode = 0;
+    
+    ByteArrayOutputStream bao = new ByteArrayOutputStream();
+    PrintStream origOut = System.out;
+    PrintStream origErr = System.err;
+    
+    System.setOut(new PrintStream(bao));
+    System.setErr(new PrintStream(bao));
+    
+    MRAdmin mradmin = new MRAdmin();
+    String[] args = getCommandAsArgs(cmd, "JOBTRACKER", jobtracker);
+    cmdExecuted = cmd;
+   
+    try {
+      ToolRunner.run(mradmin, args);
+    } catch (Exception e) {
+      e.printStackTrace();
+      lastException = e;
+      exitCode = -1;
+    } finally {
+      System.setOut(origOut);
+      System.setErr(origErr);
+    }
+    
+    commandOutput = bao.toString();
+    
+    return exitCode;
+  }
+
   public static int executeFSCommand(final String cmd, final String namenode) {
     exitCode = 0;
     
@@ -113,7 +149,7 @@ public class CommandExecutor {
     System.setErr(new PrintStream(bao));
     
     FsShell shell = new FsShell();
-    String[] args = getFSCommandAsArgs(cmd, namenode);
+    String[] args = getCommandAsArgs(cmd, "NAMENODE", namenode);
     cmdExecuted = cmd;
     
     try {

+ 1 - 1
src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -233,7 +233,7 @@ public class MiniDFSCluster {
                         long[] simulatedCapacities) throws IOException {
     this.conf = conf;
     try {
-      UserGroupInformation.setCurrentUGI(UnixUserGroupInformation.login(conf));
+      UserGroupInformation.setCurrentUser(UnixUserGroupInformation.login(conf));
     } catch (LoginException e) {
       IOException ioe = new IOException();
       ioe.initCause(e);

+ 2 - 2
src/test/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java

@@ -87,7 +87,7 @@ public class NNThroughputBenchmark {
   NNThroughputBenchmark(Configuration conf) throws IOException, LoginException {
     config = conf;
     ugi = UnixUserGroupInformation.login(config);
-    UserGroupInformation.setCurrentUGI(ugi);
+    UserGroupInformation.setCurrentUser(ugi);
 
     // We do not need many handlers, since each thread simulates a handler
     // by calling name-node methods directly
@@ -337,7 +337,7 @@ public class NNThroughputBenchmark {
     }
 
     public void run() {
-      UserGroupInformation.setCurrentUGI(ugi);
+      UserGroupInformation.setCurrentUser(ugi);
       localNumOpsExecuted = 0;
       localCumulativeTime = 0;
       arg1 = statsOp.getExecutionArgument(daemonId);

+ 3 - 1
src/test/org/apache/hadoop/ipc/TestIPC.java

@@ -59,7 +59,9 @@ public class TestIPC extends TestCase {
       this.sleep = sleep;
     }
 
-    public Writable call(Writable param, long receivedTime) throws IOException {
+    @Override
+    public Writable call(Class<?> protocol, Writable param, long receiveTime)
+        throws IOException {
       if (sleep) {
         try {
           Thread.sleep(RANDOM.nextInt(2*PING_INTERVAL));      // sleep a bit

+ 2 - 2
src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java

@@ -71,8 +71,8 @@ public class TestIPCServerResponder extends TestCase {
     }
 
     @Override
-    public Writable call(final Writable param, final long receivedTime) 
-                                               throws IOException {
+    public Writable call(Class<?> protocol, Writable param, long receiveTime)
+        throws IOException {
       if (sleep) {
         try {
           Thread.sleep(RANDOM.nextInt(20)); // sleep a bit

+ 64 - 0
src/test/org/apache/hadoop/ipc/TestRPC.java

@@ -34,6 +34,12 @@ import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
 
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ConfiguredPolicy;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.Service;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 
 /** Unit tests for RPC. */
 public class TestRPC extends TestCase {
@@ -319,6 +325,64 @@ public class TestRPC extends TestCase {
     }
   }
   
+  private static final String ACL_CONFIG = "test.protocol.acl";
+  
+  private static class TestPolicyProvider extends PolicyProvider {
+
+    @Override
+    public Service[] getServices() {
+      return new Service[] { new Service(ACL_CONFIG, TestProtocol.class) };
+    }
+    
+  }
+  
+  private void doRPCs(Configuration conf, boolean expectFailure) throws Exception {
+    SecurityUtil.setPolicy(new ConfiguredPolicy(conf, new TestPolicyProvider()));
+    
+    Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, 5, true, conf);
+
+    TestProtocol proxy = null;
+
+    server.start();
+
+    InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    
+    try {
+      proxy = (TestProtocol)RPC.getProxy(
+          TestProtocol.class, TestProtocol.versionID, addr, conf);
+      proxy.ping();
+
+      if (expectFailure) {
+        fail("Expect RPC.getProxy to fail with AuthorizationException!");
+      }
+    } catch (RemoteException e) {
+      if (expectFailure) {
+        assertTrue(e.unwrapRemoteException() instanceof AuthorizationException);
+      } else {
+        throw e;
+      }
+    } finally {
+      server.stop();
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
+    }
+  }
+  
+  public void testAuthorization() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(
+        ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, true);
+    
+    // Expect to succeed
+    conf.set(ACL_CONFIG, "*");
+    doRPCs(conf, false);
+    
+    // Reset authorization to expect failure
+    conf.set(ACL_CONFIG, "invalid invalid");
+    doRPCs(conf, true);
+  }
+  
   public static void main(String[] args) throws Exception {
 
     new TestRPC("test").testCalls();

+ 3 - 2
src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

@@ -170,7 +170,7 @@ public class TestMiniMRWithDFS extends TestCase {
     }
   }
 
-  static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
+  public static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
     LOG.info("runPI");
     double estimate = org.apache.hadoop.examples.PiEstimator.estimate(
         NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue();
@@ -179,7 +179,8 @@ public class TestMiniMRWithDFS extends TestCase {
     checkTaskDirectories(mr, new String[]{}, new String[]{});
   }
 
-  static void runWordCount(MiniMRCluster mr, JobConf jobConf) throws IOException {
+  public static void runWordCount(MiniMRCluster mr, JobConf jobConf) 
+  throws IOException {
     LOG.info("runWordCount");
     // Run a word count example
     // Keeping tasks that match this pattern

+ 104 - 0
src/test/org/apache/hadoop/security/TestAccessControlList.java

@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+
+import junit.framework.TestCase;
+
+public class TestAccessControlList extends TestCase {
+  
+  public void testWildCardAccessControlList() throws Exception {
+    AccessControlList acl;
+    
+    acl = new AccessControlList("*");
+    assertTrue(acl.allAllowed());
+    
+    acl = new AccessControlList("  * ");
+    assertTrue(acl.allAllowed());
+    
+    acl = new AccessControlList(" *");
+    assertTrue(acl.allAllowed());
+    
+    acl = new AccessControlList("*  ");
+    assertTrue(acl.allAllowed());
+  }
+  
+  public void testAccessControlList() throws Exception {
+    AccessControlList acl;
+    Set<String> users;
+    Set<String> groups;
+    
+    acl = new AccessControlList("drwho tardis");
+    users = acl.getUsers();
+    assertEquals(users.size(), 1);
+    assertEquals(users.iterator().next(), "drwho");
+    groups = acl.getGroups();
+    assertEquals(groups.size(), 1);
+    assertEquals(groups.iterator().next(), "tardis");
+    
+    acl = new AccessControlList("drwho");
+    users = acl.getUsers();
+    assertEquals(users.size(), 1);
+    assertEquals(users.iterator().next(), "drwho");
+    groups = acl.getGroups();
+    assertEquals(groups.size(), 0);
+    
+    acl = new AccessControlList("drwho ");
+    users = acl.getUsers();
+    assertEquals(users.size(), 1);
+    assertEquals(users.iterator().next(), "drwho");
+    groups = acl.getGroups();
+    assertEquals(groups.size(), 0);
+    
+    acl = new AccessControlList(" tardis");
+    users = acl.getUsers();
+    assertEquals(users.size(), 0);
+    groups = acl.getGroups();
+    assertEquals(groups.size(), 1);
+    assertEquals(groups.iterator().next(), "tardis");
+
+    Iterator<String> iter;
+    acl = new AccessControlList("drwho,joe tardis,users");
+    users = acl.getUsers();
+    assertEquals(users.size(), 2);
+    iter = users.iterator();
+    assertEquals(iter.next(), "drwho");
+    assertEquals(iter.next(), "joe");
+    groups = acl.getGroups();
+    assertEquals(groups.size(), 2);
+    iter = groups.iterator();
+    assertEquals(iter.next(), "tardis");
+    assertEquals(iter.next(), "users");
+    
+    acl = new AccessControlList("drwho,joe tardis, users");
+    users = acl.getUsers();
+    assertEquals(users.size(), 2);
+    iter = users.iterator();
+    assertEquals(iter.next(), "drwho");
+    assertEquals(iter.next(), "joe");
+    groups = acl.getGroups();
+    assertEquals(groups.size(), 2);
+    iter = groups.iterator();
+    assertEquals(iter.next(), "tardis");
+    assertEquals(iter.next(), "users");
+  }
+}

+ 39 - 0
src/test/org/apache/hadoop/security/authorize/HadoopPolicyProvider.java

@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.authorize;
+
+import org.apache.hadoop.hdfs.HDFSPolicyProvider;
+import org.apache.hadoop.mapred.MapReducePolicyProvider;
+
+public class HadoopPolicyProvider extends PolicyProvider {
+
+  @Override
+  public Service[] getServices() {
+    Service[] hdfsServices = new HDFSPolicyProvider().getServices();
+    Service[] mrServices = new MapReducePolicyProvider().getServices();
+    
+    Service[] hadoopServices = 
+      new Service[hdfsServices.length + mrServices.length];
+    System.arraycopy(hdfsServices, 0, hadoopServices, 0, hdfsServices.length);
+    System.arraycopy(mrServices, 0, hadoopServices, hdfsServices.length, 
+                     mrServices.length);
+
+    return hadoopServices;
+  }
+
+}

+ 82 - 0
src/test/org/apache/hadoop/security/authorize/TestConfiguredPolicy.java

@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.authorize;
+
+import java.security.Permission;
+
+import javax.security.auth.Subject;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+
+import junit.framework.TestCase;
+
+public class TestConfiguredPolicy extends TestCase {
+  private static final String USER1 = "drwho";
+  private static final String USER2 = "joe";
+  private static final String[] GROUPS1 = new String[]{"tardis"};
+  private static final String[] GROUPS2 = new String[]{"users"};
+  
+  private static final String KEY_1 = "test.policy.1";
+  private static final String KEY_2 = "test.policy.2";
+  
+  public static class Protocol1 {
+    int i;
+  }
+  public static class Protocol2 {
+    int j;
+  }
+  
+  private static class TestPolicyProvider extends PolicyProvider {
+    @Override
+    public Service[] getServices() {
+      return new Service[] {
+          new Service(KEY_1, Protocol1.class),
+          new Service(KEY_2, Protocol2.class),
+          };
+    }
+  }
+  
+  public void testConfiguredPolicy() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(KEY_1, AccessControlList.WILDCARD_ACL_VALUE);
+    conf.set(KEY_2, USER1 + " " + GROUPS1[0]);
+    
+    ConfiguredPolicy policy = new ConfiguredPolicy(conf, new TestPolicyProvider());
+    SecurityUtil.setPolicy(policy);
+    
+    Subject user1 = 
+      SecurityUtil.getSubject(new UnixUserGroupInformation(USER1, GROUPS1));
+
+    // Should succeed
+    ServiceAuthorizationManager.authorize(user1, Protocol1.class);
+    
+    // Should fail
+    Subject user2 = 
+      SecurityUtil.getSubject(new UnixUserGroupInformation(USER2, GROUPS2));
+    boolean failed = false;
+    try {
+      ServiceAuthorizationManager.authorize(user2, Protocol2.class);
+    } catch (AuthorizationException ae) {
+      failed = true;
+    }
+    assertTrue(failed);
+  }
+}

+ 152 - 0
src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java

@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.authorize;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HDFSPolicyProvider;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.TestMiniMRWithDFS;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+import junit.framework.TestCase;
+
+public class TestServiceLevelAuthorization extends TestCase {
+  public void testServiceLevelAuthorization() throws Exception {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    try {
+      final int slaves = 4;
+
+      // Turn on service-level authorization
+      Configuration conf = new Configuration();
+      conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
+                    HadoopPolicyProvider.class, PolicyProvider.class);
+      conf.setBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, 
+                      true);
+      
+      // Start the mini clusters
+      dfs = new MiniDFSCluster(conf, slaves, true, null);
+      fileSys = dfs.getFileSystem();
+      JobConf mrConf = new JobConf(conf);
+      mr = new MiniMRCluster(slaves, fileSys.getUri().toString(), 1, 
+                             null, null, mrConf);
+
+      // Run examples
+      TestMiniMRWithDFS.runPI(mr, mr.createJobConf(mrConf));
+      TestMiniMRWithDFS.runWordCount(mr, mr.createJobConf(mrConf));
+    } finally {
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown();
+      }
+    }
+  }
+  
+  private static final String DUMMY_ACL = "nouser nogroup";
+  private static final String UNKNOWN_USER = "dev,null";
+  
+  private void rewriteHadoopPolicyFile(File policyFile) throws IOException {
+    FileWriter fos = new FileWriter(policyFile);
+    PolicyProvider policyProvider = new HDFSPolicyProvider();
+    fos.write("<configuration>\n");
+    for (Service service : policyProvider.getServices()) {
+      String key = service.getServiceKey();
+      String value ="*";
+      if (key.equals("security.refresh.policy.protocol.acl")) {
+        value = DUMMY_ACL;
+      }
+      fos.write("<property><name>"+ key + "</name><value>" + value + 
+                "</value></property>\n");
+      System.err.println("<property><name>"+ key + "</name><value>" + value + 
+          "</value></property>\n");
+    }
+    fos.write("</configuration>\n");
+    fos.close();
+  }
+  
+  private void refreshPolicy(Configuration conf)  throws IOException {
+    DFSAdmin dfsAdmin = new DFSAdmin(conf);
+    dfsAdmin.refreshServiceAcl();
+  }
+  
+  public void testRefresh() throws Exception {
+    MiniDFSCluster dfs = null;
+    try {
+      final int slaves = 4;
+
+      // Turn on service-level authorization
+      Configuration conf = new Configuration();
+      conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
+                    HDFSPolicyProvider.class, PolicyProvider.class);
+      conf.setBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, 
+                      true);
+      
+      // Start the mini dfs cluster
+      dfs = new MiniDFSCluster(conf, slaves, true, null);
+
+      // Refresh the service level authorization policy
+      refreshPolicy(conf);
+      
+      // Simulate an 'edit' of hadoop-policy.xml
+      String confDir = System.getProperty("test.build.extraconf", 
+                                          "build/test/extraconf");
+      File policyFile = new File(confDir, ConfiguredPolicy.HADOOP_POLICY_FILE);
+      String policyFileCopy = ConfiguredPolicy.HADOOP_POLICY_FILE + ".orig";
+      FileUtil.copy(policyFile, FileSystem.getLocal(conf),   // first save original 
+                    new Path(confDir, policyFileCopy), false, conf);
+      rewriteHadoopPolicyFile(                               // rewrite the file
+          new File(confDir, ConfiguredPolicy.HADOOP_POLICY_FILE));
+      
+      // Refresh the service level authorization policy
+      refreshPolicy(conf);
+      
+      // Refresh the service level authorization policy once again, 
+      // this time it should fail!
+      try {
+        // Note: hadoop-policy.xml for tests has 
+        // security.refresh.policy.protocol.acl = ${user.name}
+        conf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, UNKNOWN_USER);
+        refreshPolicy(conf);
+        fail("Refresh of NameNode's policy file cannot be successful!");
+      } catch (RemoteException re) {
+        System.out.println("Good, refresh worked... refresh failed with: " + 
+                           StringUtils.stringifyException(re.unwrapRemoteException()));
+      } finally {
+        // Reset to original hadoop-policy.xml
+        FileUtil.fullyDelete(new File(confDir, 
+            ConfiguredPolicy.HADOOP_POLICY_FILE));
+        FileUtil.replaceFile(new File(confDir, policyFileCopy), new File(confDir, ConfiguredPolicy.HADOOP_POLICY_FILE));
+      }
+    } finally {
+      if (dfs != null) { dfs.shutdown(); }
+    }
+  }
+
+}