Browse Source

HDFS-6379. HTTPFS - Implement ACLs support. (yoderme via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1602040 13f79535-47bb-0310-9956-ffa450edef68
Alejandro Abdelnur 11 years ago
parent
commit
8bfbec8cfb

+ 14 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/AclEntry.java

@@ -298,4 +298,18 @@ public class AclEntry {
     AclEntry aclEntry = builder.build();
     return aclEntry;
   }
+
+  /**
+   * Convert a List of AclEntries into a string - the reverse of parseAclSpec.
+   * @param aclSpec List of AclEntries to convert
+   * @return String representation of aclSpec
+   */
+  public static String aclSpecToString(List<AclEntry> aclSpec) {
+    StringBuilder buf = new StringBuilder();
+    for ( AclEntry e : aclSpec ) {
+      buf.append(e.toString());
+      buf.append(",");
+    }
+    return buf.substring(0, buf.length()-1);  // remove last ,
+  }
 }

+ 128 - 1
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java

@@ -31,6 +31,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.net.NetUtils;
@@ -86,6 +88,7 @@ public class HttpFSFileSystem extends FileSystem
   public static final String REPLICATION_PARAM = "replication";
   public static final String BLOCKSIZE_PARAM = "blocksize";
   public static final String PERMISSION_PARAM = "permission";
+  public static final String ACLSPEC_PARAM = "aclspec";
   public static final String DESTINATION_PARAM = "destination";
   public static final String RECURSIVE_PARAM = "recursive";
   public static final String SOURCES_PARAM = "sources";
@@ -95,6 +98,7 @@ public class HttpFSFileSystem extends FileSystem
   public static final String ACCESS_TIME_PARAM = "accesstime";
 
   public static final Short DEFAULT_PERMISSION = 0755;
+  public static final String ACLSPEC_DEFAULT = "";
 
   public static final String RENAME_JSON = "boolean";
 
@@ -152,6 +156,11 @@ public class HttpFSFileSystem extends FileSystem
   public static final String CONTENT_SUMMARY_SPACE_CONSUMED_JSON = "spaceConsumed";
   public static final String CONTENT_SUMMARY_SPACE_QUOTA_JSON = "spaceQuota";
 
+  public static final String ACL_STATUS_JSON = "AclStatus";
+  public static final String ACL_STICKY_BIT_JSON = "stickyBit";
+  public static final String ACL_ENTRIES_JSON = "entries";
+  public static final String ACL_BIT_JSON = "aclBit";
+
   public static final String ERROR_JSON = "RemoteException";
   public static final String ERROR_EXCEPTION_JSON = "exception";
   public static final String ERROR_CLASSNAME_JSON = "javaClassName";
@@ -169,10 +178,12 @@ public class HttpFSFileSystem extends FileSystem
     OPEN(HTTP_GET), GETFILESTATUS(HTTP_GET), LISTSTATUS(HTTP_GET),
     GETHOMEDIRECTORY(HTTP_GET), GETCONTENTSUMMARY(HTTP_GET),
     GETFILECHECKSUM(HTTP_GET),  GETFILEBLOCKLOCATIONS(HTTP_GET),
-    INSTRUMENTATION(HTTP_GET),
+    INSTRUMENTATION(HTTP_GET), GETACLSTATUS(HTTP_GET),
     APPEND(HTTP_POST), CONCAT(HTTP_POST),
     CREATE(HTTP_PUT), MKDIRS(HTTP_PUT), RENAME(HTTP_PUT), SETOWNER(HTTP_PUT),
     SETPERMISSION(HTTP_PUT), SETREPLICATION(HTTP_PUT), SETTIMES(HTTP_PUT),
+    MODIFYACLENTRIES(HTTP_PUT), REMOVEACLENTRIES(HTTP_PUT),
+    REMOVEDEFAULTACL(HTTP_PUT), REMOVEACL(HTTP_PUT), SETACL(HTTP_PUT),
     DELETE(HTTP_DELETE);
 
     private String httpMethod;
@@ -798,6 +809,105 @@ public class HttpFSFileSystem extends FileSystem
     return (Boolean) json.get(SET_REPLICATION_JSON);
   }
 
+  /**
+   * Modify the ACL entries for a file.
+   *
+   * @param path Path to modify
+   * @param aclSpec List<AclEntry> describing modifications
+   * @throws IOException
+   */
+  @Override
+  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
+          throws IOException {
+    Map<String, String> params = new HashMap<String, String>();
+    params.put(OP_PARAM, Operation.MODIFYACLENTRIES.toString());
+    params.put(ACLSPEC_PARAM, AclEntry.aclSpecToString(aclSpec));
+    HttpURLConnection conn = getConnection(
+            Operation.MODIFYACLENTRIES.getMethod(), params, path, true);
+    HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
+  }
+
+  /**
+   * Remove the specified ACL entries from a file
+   * @param path Path to modify
+   * @param aclSpec List<AclEntry> describing entries to remove
+   * @throws IOException
+   */
+  @Override
+  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
+          throws IOException {
+    Map<String, String> params = new HashMap<String, String>();
+    params.put(OP_PARAM, Operation.REMOVEACLENTRIES.toString());
+    params.put(ACLSPEC_PARAM, AclEntry.aclSpecToString(aclSpec));
+    HttpURLConnection conn = getConnection(
+            Operation.REMOVEACLENTRIES.getMethod(), params, path, true);
+    HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
+  }
+
+  /**
+   * Removes the default ACL for the given file
+   * @param path Path from which to remove the default ACL.
+   * @throws IOException
+   */
+  @Override
+  public void removeDefaultAcl(Path path) throws IOException {
+    Map<String, String> params = new HashMap<String, String>();
+    params.put(OP_PARAM, Operation.REMOVEDEFAULTACL.toString());
+    HttpURLConnection conn = getConnection(
+            Operation.REMOVEDEFAULTACL.getMethod(), params, path, true);
+    HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
+  }
+
+  /**
+   * Remove all ACLs from a file
+   * @param path Path from which to remove all ACLs
+   * @throws IOException
+   */
+  @Override
+  public void removeAcl(Path path) throws IOException {
+    Map<String, String> params = new HashMap<String, String>();
+    params.put(OP_PARAM, Operation.REMOVEACL.toString());
+    HttpURLConnection conn = getConnection(Operation.REMOVEACL.getMethod(),
+            params, path, true);
+    HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
+  }
+
+  /**
+   * Set the ACLs for the given file
+   * @param path Path to modify
+   * @param aclSpec List<AclEntry> describing modifications, must include
+   *                entries for user, group, and others for compatibility
+   *                with permission bits.
+   * @throws IOException
+   */
+  @Override
+  public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
+    Map<String, String> params = new HashMap<String, String>();
+    params.put(OP_PARAM, Operation.SETACL.toString());
+    params.put(ACLSPEC_PARAM, AclEntry.aclSpecToString(aclSpec));
+    HttpURLConnection conn = getConnection(Operation.SETACL.getMethod(),
+                                           params, path, true);
+    HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
+  }
+
+  /**
+   * Get the ACL information for a given file
+   * @param path Path to acquire ACL info for
+   * @return the ACL information in JSON format
+   * @throws IOException
+   */
+  @Override
+  public AclStatus getAclStatus(Path path) throws IOException {
+    Map<String, String> params = new HashMap<String, String>();
+    params.put(OP_PARAM, Operation.GETACLSTATUS.toString());
+    HttpURLConnection conn = getConnection(Operation.GETACLSTATUS.getMethod(),
+            params, path, true);
+    HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
+    JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
+    json = (JSONObject) json.get(ACL_STATUS_JSON);
+    return createAclStatus(json);
+  }
+
   private FileStatus createFileStatus(Path parent, JSONObject json) {
     String pathSuffix = (String) json.get(PATH_SUFFIX_JSON);
     Path path = (pathSuffix.equals("")) ? parent : new Path(parent, pathSuffix);
@@ -830,6 +940,23 @@ public class HttpFSFileSystem extends FileSystem
     return fileStatus;
   }
 
+  /**
+   * Convert the given JSON object into an AclStatus
+   * @param json Input JSON representing the ACLs
+   * @return Resulting AclStatus
+   */
+  private AclStatus createAclStatus(JSONObject json) {
+    AclStatus.Builder aclStatusBuilder = new AclStatus.Builder()
+            .owner((String) json.get(OWNER_JSON))
+            .group((String) json.get(GROUP_JSON))
+            .stickyBit((Boolean) json.get(ACL_STICKY_BIT_JSON));
+    JSONArray entries = (JSONArray) json.get(ACL_ENTRIES_JSON);
+    for ( Object e : entries ) {
+      aclStatusBuilder.addEntry(AclEntry.parseAclEntry(e.toString(), true));
+    }
+    return aclStatusBuilder.build();
+  }
+
   @Override
   public ContentSummary getContentSummary(Path f) throws IOException {
     Map<String, String> params = new HashMap<String, String>();

+ 385 - 55
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java

@@ -26,7 +26,10 @@ import org.apache.hadoop.fs.GlobFilter;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.lib.service.FileSystemAccess;
 import org.json.simple.JSONArray;
@@ -36,6 +39,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -44,34 +48,170 @@ import java.util.Map;
 @InterfaceAudience.Private
 public class FSOperations {
 
-  @SuppressWarnings({"unchecked", "deprecation"})
-  private static Map fileStatusToJSONRaw(FileStatus status, boolean emptyPathSuffix) {
-    Map json = new LinkedHashMap();
-    json.put(HttpFSFileSystem.PATH_SUFFIX_JSON, (emptyPathSuffix) ? "" : status.getPath().getName());
-    json.put(HttpFSFileSystem.TYPE_JSON, HttpFSFileSystem.FILE_TYPE.getType(status).toString());
-    json.put(HttpFSFileSystem.LENGTH_JSON, status.getLen());
-    json.put(HttpFSFileSystem.OWNER_JSON, status.getOwner());
-    json.put(HttpFSFileSystem.GROUP_JSON, status.getGroup());
-    json.put(HttpFSFileSystem.PERMISSION_JSON, HttpFSFileSystem.permissionToString(status.getPermission()));
-    json.put(HttpFSFileSystem.ACCESS_TIME_JSON, status.getAccessTime());
-    json.put(HttpFSFileSystem.MODIFICATION_TIME_JSON, status.getModificationTime());
-    json.put(HttpFSFileSystem.BLOCK_SIZE_JSON, status.getBlockSize());
-    json.put(HttpFSFileSystem.REPLICATION_JSON, status.getReplication());
-    return json;
+  /**
+   * This class is used to group a FileStatus and an AclStatus together.
+   * It's needed for the GETFILESTATUS and LISTSTATUS calls, which take
+   * most info from the FileStatus and a wee bit from the AclStatus.
+   */
+  private static class StatusPair {
+    private FileStatus fileStatus;
+    private AclStatus aclStatus;
+
+    /**
+     * Simple constructor
+     * @param fileStatus Existing FileStatus object
+     * @param aclStatus Existing AclStatus object
+     */
+    public StatusPair(FileStatus fileStatus, AclStatus aclStatus) {
+      this.fileStatus = fileStatus;
+      this.aclStatus = aclStatus;
+    }
+
+    /**
+     * Create one StatusPair by performing the underlying calls to
+     * fs.getFileStatus and fs.getAclStatus
+     * @param fs The FileSystem where 'path' lives
+     * @param path The file/directory to query
+     * @throws IOException
+     */
+    public StatusPair(FileSystem fs, Path path) throws IOException {
+      fileStatus = fs.getFileStatus(path);
+      aclStatus = null;
+      try {
+        aclStatus = fs.getAclStatus(path);
+      } catch (AclException e) {
+        /*
+         * The cause is almost certainly an "ACLS aren't enabled"
+         * exception, so leave aclStatus at null and carry on.
+         */
+      } catch (UnsupportedOperationException e) {
+        /* Ditto above - this is the case for a local file system */
+      }
+    }
+
+    /**
+     * Return a Map suitable for conversion into JSON format
+     * @return The JSONish Map
+     */
+    public Map<String,Object> toJson() {
+      Map<String,Object> json = new LinkedHashMap<String,Object>();
+      json.put(HttpFSFileSystem.FILE_STATUS_JSON, toJsonInner(true));
+      return json;
+    }
+
+    /**
+     * Return in inner part of the JSON for the status - used by both the
+     * GETFILESTATUS and LISTSTATUS calls.
+     * @param emptyPathSuffix Whether or not to include PATH_SUFFIX_JSON
+     * @return The JSONish Map
+     */
+    public Map<String,Object> toJsonInner(boolean emptyPathSuffix) {
+      Map<String,Object> json = new LinkedHashMap<String,Object>();
+      json.put(HttpFSFileSystem.PATH_SUFFIX_JSON,
+              (emptyPathSuffix) ? "" : fileStatus.getPath().getName());
+      json.put(HttpFSFileSystem.TYPE_JSON,
+              HttpFSFileSystem.FILE_TYPE.getType(fileStatus).toString());
+      json.put(HttpFSFileSystem.LENGTH_JSON, fileStatus.getLen());
+      json.put(HttpFSFileSystem.OWNER_JSON, fileStatus.getOwner());
+      json.put(HttpFSFileSystem.GROUP_JSON, fileStatus.getGroup());
+      json.put(HttpFSFileSystem.PERMISSION_JSON,
+              HttpFSFileSystem.permissionToString(fileStatus.getPermission()));
+      json.put(HttpFSFileSystem.ACCESS_TIME_JSON, fileStatus.getAccessTime());
+      json.put(HttpFSFileSystem.MODIFICATION_TIME_JSON,
+              fileStatus.getModificationTime());
+      json.put(HttpFSFileSystem.BLOCK_SIZE_JSON, fileStatus.getBlockSize());
+      json.put(HttpFSFileSystem.REPLICATION_JSON, fileStatus.getReplication());
+      if ( (aclStatus != null) && !(aclStatus.getEntries().isEmpty()) ) {
+        json.put(HttpFSFileSystem.ACL_BIT_JSON,true);
+      }
+      return json;
+    }
   }
 
   /**
-   * Converts a FileSystemAccess <code>FileStatus</code> object into a JSON
-   * object.
+   * Simple class used to contain and operate upon a list of StatusPair
+   * objects.  Used by LISTSTATUS.
+   */
+  private static class StatusPairs {
+    private StatusPair[] statusPairs;
+
+    /**
+     * Construct a list of StatusPair objects
+     * @param fs The FileSystem where 'path' lives
+     * @param path The directory to query
+     * @param filter A possible filter for entries in the directory
+     * @throws IOException
+     */
+    public StatusPairs(FileSystem fs, Path path, PathFilter filter)
+            throws IOException {
+      /* Grab all the file statuses at once in an array */
+      FileStatus[] fileStatuses = fs.listStatus(path, filter);
+
+      /* We'll have an array of StatusPairs of the same length */
+      AclStatus aclStatus = null;
+      statusPairs = new StatusPair[fileStatuses.length];
+
+      /*
+       * For each FileStatus, attempt to acquire an AclStatus.  If the
+       * getAclStatus throws an exception, we assume that ACLs are turned
+       * off entirely and abandon the attempt.
+       */
+      boolean useAcls = true;   // Assume ACLs work until proven otherwise
+      for (int i = 0; i < fileStatuses.length; i++) {
+        if (useAcls) {
+          try {
+            aclStatus = fs.getAclStatus(fileStatuses[i].getPath());
+          } catch (AclException e) {
+            /* Almost certainly due to an "ACLs not enabled" exception */
+            aclStatus = null;
+            useAcls = false;
+          } catch (UnsupportedOperationException e) {
+            /* Ditto above - this is the case for a local file system */
+            aclStatus = null;
+            useAcls = false;
+          }
+        }
+        statusPairs[i] = new StatusPair(fileStatuses[i], aclStatus);
+      }
+    }
+
+    /**
+     * Return a Map suitable for conversion into JSON.
+     * @return A JSONish Map
+     */
+    @SuppressWarnings({"unchecked"})
+    public Map<String,Object> toJson() {
+      Map<String,Object> json = new LinkedHashMap<String,Object>();
+      Map<String,Object> inner = new LinkedHashMap<String,Object>();
+      JSONArray statuses = new JSONArray();
+      for (StatusPair s : statusPairs) {
+        statuses.add(s.toJsonInner(false));
+      }
+      inner.put(HttpFSFileSystem.FILE_STATUS_JSON, statuses);
+      json.put(HttpFSFileSystem.FILE_STATUSES_JSON, inner);
+      return json;
+    }
+  }
+
+  /** Converts an <code>AclStatus</code> object into a JSON object.
    *
-   * @param status FileSystemAccess file status.
+   * @param aclStatus AclStatus object
    *
-   * @return The JSON representation of the file status.
+   * @return The JSON representation of the ACLs for the file
    */
-  @SuppressWarnings({"unchecked", "deprecation"})
-  private static Map fileStatusToJSON(FileStatus status) {
-    Map json = new LinkedHashMap();
-    json.put(HttpFSFileSystem.FILE_STATUS_JSON, fileStatusToJSONRaw(status, true));
+  @SuppressWarnings({"unchecked"})
+  private static Map<String,Object> aclStatusToJSON(AclStatus aclStatus) {
+    Map<String,Object> json = new LinkedHashMap<String,Object>();
+    Map<String,Object> inner = new LinkedHashMap<String,Object>();
+    JSONArray entriesArray = new JSONArray();
+    inner.put(HttpFSFileSystem.OWNER_JSON, aclStatus.getOwner());
+    inner.put(HttpFSFileSystem.GROUP_JSON, aclStatus.getGroup());
+    inner.put(HttpFSFileSystem.ACL_STICKY_BIT_JSON, aclStatus.isStickyBit());
+    for ( AclEntry e : aclStatus.getEntries() ) {
+      entriesArray.add(e.toString());
+    }
+    inner.put(HttpFSFileSystem.ACL_ENTRIES_JSON, entriesArray);
+    json.put(HttpFSFileSystem.ACL_STATUS_JSON, inner);
     return json;
   }
 
@@ -117,30 +257,6 @@ public class FSOperations {
     return response;
   }
 
-  /**
-   * Converts a FileSystemAccess <code>FileStatus</code> array into a JSON array
-   * object.
-   *
-   * @param status FileSystemAccess file status array.
-   * <code>SCHEME://HOST:PORT</code> in the file status.
-   *
-   * @return The JSON representation of the file status array.
-   */
-  @SuppressWarnings("unchecked")
-  private static Map fileStatusToJSON(FileStatus[] status) {
-    JSONArray json = new JSONArray();
-    if (status != null) {
-      for (FileStatus s : status) {
-        json.add(fileStatusToJSONRaw(s, false));
-      }
-    }
-    Map response = new LinkedHashMap();
-    Map temp = new LinkedHashMap();
-    temp.put(HttpFSFileSystem.FILE_STATUS_JSON, json);
-    response.put(HttpFSFileSystem.FILE_STATUSES_JSON, temp);
-    return response;
-  }
-
   /**
    * Converts an object into a Json Map with with one key-value entry.
    * <p/>
@@ -418,18 +534,19 @@ public class FSOperations {
     }
 
     /**
-     * Executes the filesystem operation.
+     * Executes the filesystem getFileStatus operation and returns the
+     * result in a JSONish Map.
      *
      * @param fs filesystem instance to use.
      *
      * @return a Map object (JSON friendly) with the file status.
      *
-     * @throws IOException thrown if an IO error occured.
+     * @throws IOException thrown if an IO error occurred.
      */
     @Override
     public Map execute(FileSystem fs) throws IOException {
-      FileStatus status = fs.getFileStatus(path);
-      return fileStatusToJSON(status);
+      StatusPair sp = new StatusPair(fs, path);
+      return sp.toJson();
     }
 
   }
@@ -482,19 +599,20 @@ public class FSOperations {
     }
 
     /**
-     * Executes the filesystem operation.
+     * Returns data for a JSON Map containing the information for
+     * the set of files in 'path' that match 'filter'.
      *
      * @param fs filesystem instance to use.
      *
      * @return a Map with the file status of the directory
-     *         contents.
+     *         contents that match the filter
      *
-     * @throws IOException thrown if an IO error occured.
+     * @throws IOException thrown if an IO error occurred.
      */
     @Override
     public Map execute(FileSystem fs) throws IOException {
-      FileStatus[] status = fs.listStatus(path, filter);
-      return fileStatusToJSON(status);
+      StatusPairs sp = new StatusPairs(fs, path, filter);
+      return sp.toJson();
     }
 
     @Override
@@ -690,6 +808,218 @@ public class FSOperations {
 
   }
 
+  /**
+   * Executor that sets the acl for a file in a FileSystem
+   */
+  @InterfaceAudience.Private
+  public static class FSSetAcl implements FileSystemAccess.FileSystemExecutor<Void> {
+
+    private Path path;
+    private List<AclEntry> aclEntries;
+
+    /**
+     * Creates a set-acl executor.
+     *
+     * @param path path to set the acl.
+     * @param aclSpec acl to set.
+     */
+    public FSSetAcl(String path, String aclSpec) {
+      this.path = new Path(path);
+      this.aclEntries = AclEntry.parseAclSpec(aclSpec, true);
+    }
+
+    /**
+     * Executes the filesystem operation.
+     *
+     * @param fs filesystem instance to use.
+     *
+     * @return void.
+     *
+     * @throws IOException thrown if an IO error occurred.
+     */
+    @Override
+    public Void execute(FileSystem fs) throws IOException {
+      fs.setAcl(path, aclEntries);
+      return null;
+    }
+
+  }
+
+  /**
+   * Executor that removes all acls from a file in a FileSystem
+   */
+  @InterfaceAudience.Private
+  public static class FSRemoveAcl implements FileSystemAccess.FileSystemExecutor<Void> {
+
+    private Path path;
+
+    /**
+     * Creates a remove-acl executor.
+     *
+     * @param path path from which to remove the acl.
+     */
+    public FSRemoveAcl(String path) {
+      this.path = new Path(path);
+    }
+
+    /**
+     * Executes the filesystem operation.
+     *
+     * @param fs filesystem instance to use.
+     *
+     * @return void.
+     *
+     * @throws IOException thrown if an IO error occurred.
+     */
+    @Override
+    public Void execute(FileSystem fs) throws IOException {
+      fs.removeAcl(path);
+      return null;
+    }
+
+  }
+
+  /**
+   * Executor that modifies acl entries for a file in a FileSystem
+   */
+  @InterfaceAudience.Private
+  public static class FSModifyAclEntries implements FileSystemAccess.FileSystemExecutor<Void> {
+
+    private Path path;
+    private List<AclEntry> aclEntries;
+
+    /**
+     * Creates a modify-acl executor.
+     *
+     * @param path path to set the acl.
+     * @param aclSpec acl to set.
+     */
+    public FSModifyAclEntries(String path, String aclSpec) {
+      this.path = new Path(path);
+      this.aclEntries = AclEntry.parseAclSpec(aclSpec, true);
+    }
+
+    /**
+     * Executes the filesystem operation.
+     *
+     * @param fs filesystem instance to use.
+     *
+     * @return void.
+     *
+     * @throws IOException thrown if an IO error occurred.
+     */
+    @Override
+    public Void execute(FileSystem fs) throws IOException {
+      fs.modifyAclEntries(path, aclEntries);
+      return null;
+    }
+
+  }
+
+  /**
+   * Executor that removes acl entries from a file in a FileSystem
+   */
+  @InterfaceAudience.Private
+  public static class FSRemoveAclEntries implements FileSystemAccess.FileSystemExecutor<Void> {
+
+    private Path path;
+    private List<AclEntry> aclEntries;
+
+    /**
+     * Creates a remove acl entry executor.
+     *
+     * @param path path to set the acl.
+     * @param aclSpec acl parts to remove.
+     */
+    public FSRemoveAclEntries(String path, String aclSpec) {
+      this.path = new Path(path);
+      this.aclEntries = AclEntry.parseAclSpec(aclSpec, true);
+    }
+
+    /**
+     * Executes the filesystem operation.
+     *
+     * @param fs filesystem instance to use.
+     *
+     * @return void.
+     *
+     * @throws IOException thrown if an IO error occurred.
+     */
+    @Override
+    public Void execute(FileSystem fs) throws IOException {
+      fs.removeAclEntries(path, aclEntries);
+      return null;
+    }
+
+  }
+
+  /**
+   * Executor that removes the default acl from a directory in a FileSystem
+   */
+  @InterfaceAudience.Private
+  public static class FSRemoveDefaultAcl implements FileSystemAccess.FileSystemExecutor<Void> {
+
+    private Path path;
+
+    /**
+     * Creates an executor for removing the default acl.
+     *
+     * @param path path to set the acl.
+     */
+    public FSRemoveDefaultAcl(String path) {
+      this.path = new Path(path);
+    }
+
+    /**
+     * Executes the filesystem operation.
+     *
+     * @param fs filesystem instance to use.
+     *
+     * @return void.
+     *
+     * @throws IOException thrown if an IO error occurred.
+     */
+    @Override
+    public Void execute(FileSystem fs) throws IOException {
+      fs.removeDefaultAcl(path);
+      return null;
+    }
+
+  }
+
+  /**
+   * Executor that gets the ACL information for a given file.
+   */
+  @InterfaceAudience.Private
+  public static class FSAclStatus implements FileSystemAccess.FileSystemExecutor<Map> {
+    private Path path;
+
+    /**
+     * Creates an executor for getting the ACLs for a file.
+     *
+     * @param path the path to retrieve the ACLs.
+     */
+    public FSAclStatus(String path) {
+      this.path = new Path(path);
+    }
+
+    /**
+     * Executes the filesystem operation.
+     *
+     * @param fs filesystem instance to use.
+     *
+     * @return a Map object (JSON friendly) with the file status.
+     *
+     * @throws IOException thrown if an IO error occurred.
+     */
+    @Override
+    public Map execute(FileSystem fs) throws IOException {
+      AclStatus status = fs.getAclStatus(path);
+      return aclStatusToJSON(status);
+    }
+
+  }
+
   /**
    * Executor that performs a set-replication FileSystemAccess files system operation.
    */

+ 35 - 0
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java

@@ -33,12 +33,16 @@ import org.slf4j.MDC;
 import javax.ws.rs.ext.Provider;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT;
 
 /**
  * HttpFS ParametersProvider.
  */
 @Provider
 @InterfaceAudience.Private
+@SuppressWarnings("unchecked")
 public class HttpFSParametersProvider extends ParametersProvider {
 
   private static final Map<Enum, Class<Param<?>>[]> PARAMS_DEF =
@@ -55,6 +59,7 @@ public class HttpFSParametersProvider extends ParametersProvider {
     PARAMS_DEF.put(Operation.GETFILECHECKSUM, new Class[]{DoAsParam.class});
     PARAMS_DEF.put(Operation.GETFILEBLOCKLOCATIONS,
       new Class[]{DoAsParam.class});
+    PARAMS_DEF.put(Operation.GETACLSTATUS, new Class[]{DoAsParam.class});
     PARAMS_DEF.put(Operation.INSTRUMENTATION, new Class[]{DoAsParam.class});
     PARAMS_DEF.put(Operation.APPEND,
       new Class[]{DoAsParam.class, DataParam.class});
@@ -77,6 +82,16 @@ public class HttpFSParametersProvider extends ParametersProvider {
                   AccessTimeParam.class});
     PARAMS_DEF.put(Operation.DELETE,
       new Class[]{DoAsParam.class, RecursiveParam.class});
+    PARAMS_DEF.put(Operation.SETACL,
+            new Class[]{DoAsParam.class, AclPermissionParam.class});
+    PARAMS_DEF.put(Operation.REMOVEACL,
+            new Class[]{DoAsParam.class});
+    PARAMS_DEF.put(Operation.MODIFYACLENTRIES,
+            new Class[]{DoAsParam.class, AclPermissionParam.class});
+    PARAMS_DEF.put(Operation.REMOVEACLENTRIES,
+            new Class[]{DoAsParam.class, AclPermissionParam.class});
+    PARAMS_DEF.put(Operation.REMOVEDEFAULTACL,
+            new Class[]{DoAsParam.class});
   }
 
   public HttpFSParametersProvider() {
@@ -370,6 +385,26 @@ public class HttpFSParametersProvider extends ParametersProvider {
 
   }
 
+  /**
+   * Class for AclPermission parameter.
+   */
+  @InterfaceAudience.Private
+  public static class AclPermissionParam extends StringParam {
+
+    /**
+     * Parameter name.
+     */
+    public static final String NAME = HttpFSFileSystem.ACLSPEC_PARAM;
+
+    /**
+     * Constructor.
+     */
+    public AclPermissionParam() {
+      super(NAME, HttpFSFileSystem.ACLSPEC_DEFAULT,
+              Pattern.compile(DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT));
+    }
+  }
+
   /**
    * Class for replication parameter.
    */

+ 55 - 0
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
 import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AccessTimeParam;
+import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AclPermissionParam;
 import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.BlockSizeParam;
 import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DataParam;
 import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DestinationParam;
@@ -313,6 +314,14 @@ public class HttpFSServer {
         response = Response.status(Response.Status.BAD_REQUEST).build();
         break;
       }
+      case GETACLSTATUS: {
+        FSOperations.FSAclStatus command =
+                new FSOperations.FSAclStatus(path);
+        Map json = fsExecute(user, doAs, command);
+        AUDIT_LOG.info("ACL status for [{}]", path);
+        response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
+        break;
+      }
       default: {
         throw new IOException(
           MessageFormat.format("Invalid HTTP GET operation [{0}]",
@@ -579,6 +588,52 @@ public class HttpFSServer {
         response = Response.ok().build();
         break;
       }
+      case SETACL: {
+        String aclSpec = params.get(AclPermissionParam.NAME,
+                AclPermissionParam.class);
+        FSOperations.FSSetAcl command =
+                new FSOperations.FSSetAcl(path, aclSpec);
+        fsExecute(user, doAs, command);
+        AUDIT_LOG.info("[{}] to acl [{}]", path, aclSpec);
+        response = Response.ok().build();
+        break;
+      }
+      case REMOVEACL: {
+        FSOperations.FSRemoveAcl command =
+                new FSOperations.FSRemoveAcl(path);
+        fsExecute(user, doAs, command);
+        AUDIT_LOG.info("[{}] removed acl", path);
+        response = Response.ok().build();
+        break;
+      }
+      case MODIFYACLENTRIES: {
+        String aclSpec = params.get(AclPermissionParam.NAME,
+                AclPermissionParam.class);
+        FSOperations.FSModifyAclEntries command =
+                new FSOperations.FSModifyAclEntries(path, aclSpec);
+        fsExecute(user, doAs, command);
+        AUDIT_LOG.info("[{}] modify acl entry with [{}]", path, aclSpec);
+        response = Response.ok().build();
+        break;
+      }
+      case REMOVEACLENTRIES: {
+        String aclSpec = params.get(AclPermissionParam.NAME,
+                AclPermissionParam.class);
+        FSOperations.FSRemoveAclEntries command =
+                new FSOperations.FSRemoveAclEntries(path, aclSpec);
+        fsExecute(user, doAs, command);
+        AUDIT_LOG.info("[{}] remove acl entry [{}]", path, aclSpec);
+        response = Response.ok().build();
+        break;
+      }
+      case REMOVEDEFAULTACL: {
+        FSOperations.FSRemoveDefaultAcl command =
+                new FSOperations.FSRemoveDefaultAcl(path);
+        fsExecute(user, doAs, command);
+        AUDIT_LOG.info("[{}] remove default acl", path);
+        response = Response.ok().build();
+        break;
+      }
       default: {
         throw new IOException(
           MessageFormat.format("Invalid HTTP PUT operation [{0}]",

+ 113 - 1
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java

@@ -26,6 +26,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -87,6 +89,7 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
     String fsDefaultName = getProxiedFSURI();
     Configuration conf = new Configuration(false);
     conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, fsDefaultName);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
     File hdfsSite = new File(new File(homeDir, "conf"), "hdfs-site.xml");
     OutputStream os = new FileOutputStream(hdfsSite);
     conf.writeXml(os);
@@ -479,9 +482,112 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
     Assert.assertEquals(httpContentSummary.getSpaceQuota(), hdfsContentSummary.getSpaceQuota());
   }
 
+  /**
+   * Runs assertions testing that two AclStatus objects contain the same info
+   * @param a First AclStatus
+   * @param b Second AclStatus
+   * @throws Exception
+   */
+  private void assertSameAcls(AclStatus a, AclStatus b) throws Exception {
+    Assert.assertTrue(a.getOwner().equals(b.getOwner()));
+    Assert.assertTrue(a.getGroup().equals(b.getGroup()));
+    Assert.assertTrue(a.isStickyBit() == b.isStickyBit());
+    Assert.assertTrue(a.getEntries().size() == b.getEntries().size());
+    for (AclEntry e : a.getEntries()) {
+      Assert.assertTrue(b.getEntries().contains(e));
+    }
+    for (AclEntry e : b.getEntries()) {
+      Assert.assertTrue(a.getEntries().contains(e));
+    }
+  }
+
+  /**
+   * Simple ACL tests on a file:  Set an acl, add an acl, remove one acl,
+   * and remove all acls.
+   * @throws Exception
+   */
+  private void testFileAcls() throws Exception {
+    if ( isLocalFS() ) {
+      return;
+    }
+
+    final String aclUser1 = "user:foo:rw-";
+    final String aclUser2 = "user:bar:r--";
+    final String aclGroup1 = "group::r--";
+    final String aclSet = "user::rwx," + aclUser1 + ","
+            + aclGroup1 + ",other::---";
+
+    FileSystem proxyFs = FileSystem.get(getProxiedFSConf());
+    FileSystem httpfs = getHttpFSFileSystem();
+
+    Path path = new Path(getProxiedFSTestDir(), "testAclStatus.txt");
+    OutputStream os = proxyFs.create(path);
+    os.write(1);
+    os.close();
+
+    AclStatus proxyAclStat = proxyFs.getAclStatus(path);
+    AclStatus httpfsAclStat = httpfs.getAclStatus(path);
+    assertSameAcls(httpfsAclStat, proxyAclStat);
+
+    httpfs.setAcl(path, AclEntry.parseAclSpec(aclSet,true));
+    proxyAclStat = proxyFs.getAclStatus(path);
+    httpfsAclStat = httpfs.getAclStatus(path);
+    assertSameAcls(httpfsAclStat, proxyAclStat);
+
+    httpfs.modifyAclEntries(path, AclEntry.parseAclSpec(aclUser2, true));
+    proxyAclStat = proxyFs.getAclStatus(path);
+    httpfsAclStat = httpfs.getAclStatus(path);
+    assertSameAcls(httpfsAclStat, proxyAclStat);
+
+    httpfs.removeAclEntries(path, AclEntry.parseAclSpec(aclUser1, true));
+    proxyAclStat = proxyFs.getAclStatus(path);
+    httpfsAclStat = httpfs.getAclStatus(path);
+    assertSameAcls(httpfsAclStat, proxyAclStat);
+
+    httpfs.removeAcl(path);
+    proxyAclStat = proxyFs.getAclStatus(path);
+    httpfsAclStat = httpfs.getAclStatus(path);
+    assertSameAcls(httpfsAclStat, proxyAclStat);
+  }
+
+  /**
+   * Simple acl tests on a directory: set a default acl, remove default acls.
+   * @throws Exception
+   */
+  private void testDirAcls() throws Exception {
+    if ( isLocalFS() ) {
+      return;
+    }
+
+    final String defUser1 = "default:user:glarch:r-x";
+
+    FileSystem proxyFs = FileSystem.get(getProxiedFSConf());
+    FileSystem httpfs = getHttpFSFileSystem();
+
+    Path dir = getProxiedFSTestDir();
+
+    /* ACL Status on a directory */
+    AclStatus proxyAclStat = proxyFs.getAclStatus(dir);
+    AclStatus httpfsAclStat = httpfs.getAclStatus(dir);
+    assertSameAcls(httpfsAclStat, proxyAclStat);
+
+    /* Set a default ACL on the directory */
+    httpfs.setAcl(dir, (AclEntry.parseAclSpec(defUser1,true)));
+    proxyAclStat = proxyFs.getAclStatus(dir);
+    httpfsAclStat = httpfs.getAclStatus(dir);
+    assertSameAcls(httpfsAclStat, proxyAclStat);
+
+    /* Remove the default ACL */
+    httpfs.removeDefaultAcl(dir);
+    proxyAclStat = proxyFs.getAclStatus(dir);
+    httpfsAclStat = httpfs.getAclStatus(dir);
+    assertSameAcls(httpfsAclStat, proxyAclStat);
+  }
+
   protected enum Operation {
     GET, OPEN, CREATE, APPEND, CONCAT, RENAME, DELETE, LIST_STATUS, WORKING_DIRECTORY, MKDIRS,
-    SET_TIMES, SET_PERMISSION, SET_OWNER, SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY
+    SET_TIMES, SET_PERMISSION, SET_OWNER, SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY,
+    FILEACLS, DIRACLS
   }
 
   private void operation(Operation op) throws Exception {
@@ -533,6 +639,12 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
       case CONTENT_SUMMARY:
         testContentSummary();
         break;
+      case FILEACLS:
+        testFileAcls();
+        break;
+      case DIRACLS:
+        testDirAcls();
+        break;
     }
   }
 

+ 213 - 9
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSServer.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.fs.http.server;
 
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.json.simple.JSONArray;
 import org.junit.Assert;
 
 import java.io.BufferedReader;
@@ -31,6 +33,7 @@ import java.io.Writer;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.text.MessageFormat;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -128,6 +131,7 @@ public class TestHttpFSServer extends HFSTestCase {
     String fsDefaultName = TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
     Configuration conf = new Configuration(false);
     conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, fsDefaultName);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
     File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
     OutputStream os = new FileOutputStream(hdfsSite);
     conf.writeXml(os);
@@ -241,6 +245,10 @@ public class TestHttpFSServer extends HFSTestCase {
   private void createWithHttp ( String filename, String perms )
           throws Exception {
     String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
+    // Remove leading / from filename
+    if ( filename.charAt(0) == '/' ) {
+      filename = filename.substring(1);
+    }
     String pathOps;
     if ( perms == null ) {
       pathOps = MessageFormat.format(
@@ -260,18 +268,24 @@ public class TestHttpFSServer extends HFSTestCase {
   }
 
   /**
-   * Talks to the http interface to get the json output of the GETFILESTATUS
-   * command on the given file.
+   * Talks to the http interface to get the json output of a *STATUS command
+   * on the given file.
    *
    * @param filename The file to query.
+   * @param command Either GETFILESTATUS, LISTSTATUS, or ACLSTATUS
    * @return A string containing the JSON output describing the file.
    * @throws Exception
    */
-  private String getFileStatus ( String filename ) throws Exception {
+  private String getStatus(String filename, String command)
+          throws Exception {
     String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
+    // Remove leading / from filename
+    if ( filename.charAt(0) == '/' ) {
+      filename = filename.substring(1);
+    }
     String pathOps = MessageFormat.format(
-            "/webhdfs/v1/{0}?user.name={1}&op=GETFILESTATUS",
-            filename, user);
+            "/webhdfs/v1/{0}?user.name={1}&op={2}",
+            filename, user, command);
     URL url = new URL(TestJettyHelper.getJettyURL(), pathOps);
     HttpURLConnection conn = (HttpURLConnection) url.openConnection();
     conn.connect();
@@ -283,6 +297,30 @@ public class TestHttpFSServer extends HFSTestCase {
     return reader.readLine();
   }
 
+  /**
+   * General-purpose http PUT command to the httpfs server.
+   * @param filename The file to operate upon
+   * @param command The command to perform (SETACL, etc)
+   * @param params Parameters, like "aclspec=..."
+   */
+  private void putCmd(String filename, String command,
+                      String params) throws Exception {
+    String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
+    // Remove leading / from filename
+    if ( filename.charAt(0) == '/' ) {
+      filename = filename.substring(1);
+    }
+    String pathOps = MessageFormat.format(
+            "/webhdfs/v1/{0}?user.name={1}{2}{3}&op={4}",
+            filename, user, (params == null) ? "" : "&",
+            (params == null) ? "" : params, command);
+    URL url = new URL(TestJettyHelper.getJettyURL(), pathOps);
+    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+    conn.setRequestMethod("PUT");
+    conn.connect();
+    Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+  }
+
   /**
    * Given the JSON output from the GETFILESTATUS call, return the
    * 'permission' value.
@@ -298,6 +336,27 @@ public class TestHttpFSServer extends HFSTestCase {
     return (String) details.get("permission");
   }
 
+  /**
+   * Given the JSON output from the GETACLSTATUS call, return the
+   * 'entries' value as a List<String>.
+   * @param statusJson JSON from GETACLSTATUS
+   * @return A List of Strings which are the elements of the ACL entries
+   * @throws Exception
+   */
+  private List<String> getAclEntries ( String statusJson ) throws Exception {
+    List<String> entries = new ArrayList<String>();
+    JSONParser parser = new JSONParser();
+    JSONObject jsonObject = (JSONObject) parser.parse(statusJson);
+    JSONObject details = (JSONObject) jsonObject.get("AclStatus");
+    JSONArray jsonEntries = (JSONArray) details.get("entries");
+    if ( jsonEntries != null ) {
+      for (Object e : jsonEntries) {
+        entries.add(e.toString());
+      }
+    }
+    return entries;
+  }
+
   /**
    * Validate that files are created with 755 permissions when no
    * 'permissions' attribute is specified, and when 'permissions'
@@ -314,22 +373,167 @@ public class TestHttpFSServer extends HFSTestCase {
     fs.mkdirs(new Path("/perm"));
 
     createWithHttp("/perm/none", null);
-    String statusJson = getFileStatus("/perm/none");
+    String statusJson = getStatus("/perm/none", "GETFILESTATUS");
     Assert.assertTrue("755".equals(getPerms(statusJson)));
 
     createWithHttp("/perm/p-777", "777");
-    statusJson = getFileStatus("/perm/p-777");
+    statusJson = getStatus("/perm/p-777", "GETFILESTATUS");
     Assert.assertTrue("777".equals(getPerms(statusJson)));
 
     createWithHttp("/perm/p-654", "654");
-    statusJson = getFileStatus("/perm/p-654");
+    statusJson = getStatus("/perm/p-654", "GETFILESTATUS");
     Assert.assertTrue("654".equals(getPerms(statusJson)));
 
     createWithHttp("/perm/p-321", "321");
-    statusJson = getFileStatus("/perm/p-321");
+    statusJson = getStatus("/perm/p-321", "GETFILESTATUS");
     Assert.assertTrue("321".equals(getPerms(statusJson)));
   }
 
+  /**
+   * Validate the various ACL set/modify/remove calls.  General strategy is
+   * to verify each of the following steps with GETFILESTATUS, LISTSTATUS,
+   * and GETACLSTATUS:
+   * <ol>
+   *   <li>Create a file with no ACLs</li>
+   *   <li>Add a user + group ACL</li>
+   *   <li>Add another user ACL</li>
+   *   <li>Remove the first user ACL</li>
+   *   <li>Remove all ACLs</li>
+   * </ol>
+   */
+  @Test
+  @TestDir
+  @TestJetty
+  @TestHdfs
+  public void testFileAcls() throws Exception {
+    final String aclUser1 = "user:foo:rw-";
+    final String aclUser2 = "user:bar:r--";
+    final String aclGroup1 = "group::r--";
+    final String aclSpec = "aclspec=user::rwx," + aclUser1 + ","
+            + aclGroup1 + ",other::---";
+    final String modAclSpec = "aclspec=" + aclUser2;
+    final String remAclSpec = "aclspec=" + aclUser1;
+    final String dir = "/aclFileTest";
+    final String path = dir + "/test";
+    String statusJson;
+    List<String> aclEntries;
+
+    createHttpFSServer(false);
+
+    FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
+    fs.mkdirs(new Path(dir));
+
+    createWithHttp(path, null);
+
+    /* getfilestatus and liststatus don't have 'aclBit' in their reply */
+    statusJson = getStatus(path, "GETFILESTATUS");
+    Assert.assertEquals(-1, statusJson.indexOf("aclBit"));
+    statusJson = getStatus(dir, "LISTSTATUS");
+    Assert.assertEquals(-1, statusJson.indexOf("aclBit"));
+
+    /* getaclstatus works and returns no entries */
+    statusJson = getStatus(path, "GETACLSTATUS");
+    aclEntries = getAclEntries(statusJson);
+    Assert.assertTrue(aclEntries.size() == 0);
+
+    /*
+     * Now set an ACL on the file.  (getfile|list)status have aclBit,
+     * and aclstatus has entries that looks familiar.
+     */
+    putCmd(path, "SETACL", aclSpec);
+    statusJson = getStatus(path, "GETFILESTATUS");
+    Assert.assertNotEquals(-1, statusJson.indexOf("aclBit"));
+    statusJson = getStatus(dir, "LISTSTATUS");
+    Assert.assertNotEquals(-1, statusJson.indexOf("aclBit"));
+    statusJson = getStatus(path, "GETACLSTATUS");
+    aclEntries = getAclEntries(statusJson);
+    Assert.assertTrue(aclEntries.size() == 2);
+    Assert.assertTrue(aclEntries.contains(aclUser1));
+    Assert.assertTrue(aclEntries.contains(aclGroup1));
+
+    /* Modify acl entries to add another user acl */
+    putCmd(path, "MODIFYACLENTRIES", modAclSpec);
+    statusJson = getStatus(path, "GETACLSTATUS");
+    aclEntries = getAclEntries(statusJson);
+    Assert.assertTrue(aclEntries.size() == 3);
+    Assert.assertTrue(aclEntries.contains(aclUser1));
+    Assert.assertTrue(aclEntries.contains(aclUser2));
+    Assert.assertTrue(aclEntries.contains(aclGroup1));
+
+    /* Remove the first user acl entry and verify */
+    putCmd(path, "REMOVEACLENTRIES", remAclSpec);
+    statusJson = getStatus(path, "GETACLSTATUS");
+    aclEntries = getAclEntries(statusJson);
+    Assert.assertTrue(aclEntries.size() == 2);
+    Assert.assertTrue(aclEntries.contains(aclUser2));
+    Assert.assertTrue(aclEntries.contains(aclGroup1));
+
+    /* Remove all acls and verify */
+    putCmd(path, "REMOVEACL", null);
+    statusJson = getStatus(path, "GETACLSTATUS");
+    aclEntries = getAclEntries(statusJson);
+    Assert.assertTrue(aclEntries.size() == 0);
+    statusJson = getStatus(path, "GETFILESTATUS");
+    Assert.assertEquals(-1, statusJson.indexOf("aclBit"));
+    statusJson = getStatus(dir, "LISTSTATUS");
+    Assert.assertEquals(-1, statusJson.indexOf("aclBit"));
+  }
+
+  /**
+   * Test ACL operations on a directory, including default ACLs.
+   * General strategy is to use GETFILESTATUS and GETACLSTATUS to verify:
+   * <ol>
+   *   <li>Initial status with no ACLs</li>
+   *   <li>The addition of a default ACL</li>
+   *   <li>The removal of default ACLs</li>
+   * </ol>
+   *
+   * @throws Exception
+   */
+  @Test
+  @TestDir
+  @TestJetty
+  @TestHdfs
+  public void testDirAcls() throws Exception {
+    final String defUser1 = "default:user:glarch:r-x";
+    final String defSpec1 = "aclspec=" + defUser1;
+    final String dir = "/aclDirTest";
+    String statusJson;
+    List<String> aclEntries;
+
+    createHttpFSServer(false);
+
+    FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
+    fs.mkdirs(new Path(dir));
+
+    /* getfilestatus and liststatus don't have 'aclBit' in their reply */
+    statusJson = getStatus(dir, "GETFILESTATUS");
+    Assert.assertEquals(-1, statusJson.indexOf("aclBit"));
+
+    /* No ACLs, either */
+    statusJson = getStatus(dir, "GETACLSTATUS");
+    aclEntries = getAclEntries(statusJson);
+    Assert.assertTrue(aclEntries.size() == 0);
+
+    /* Give it a default ACL and verify */
+    putCmd(dir, "SETACL", defSpec1);
+    statusJson = getStatus(dir, "GETFILESTATUS");
+    Assert.assertNotEquals(-1, statusJson.indexOf("aclBit"));
+    statusJson = getStatus(dir, "GETACLSTATUS");
+    aclEntries = getAclEntries(statusJson);
+    Assert.assertTrue(aclEntries.size() == 5);
+    /* 4 Entries are default:(user|group|mask|other):perm */
+    Assert.assertTrue(aclEntries.contains(defUser1));
+
+    /* Remove the default ACL and re-verify */
+    putCmd(dir, "REMOVEDEFAULTACL", null);
+    statusJson = getStatus(dir, "GETFILESTATUS");
+    Assert.assertEquals(-1, statusJson.indexOf("aclBit"));
+    statusJson = getStatus(dir, "GETACLSTATUS");
+    aclEntries = getAclEntries(statusJson);
+    Assert.assertTrue(aclEntries.size() == 0);
+  }
+
   @Test
   @TestDir
   @TestJetty

+ 283 - 0
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSServerNoACLs.java

@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.http.server;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.test.HTestCase;
+import org.apache.hadoop.test.HadoopUsersConfTestHelper;
+import org.apache.hadoop.test.TestDir;
+import org.apache.hadoop.test.TestDirHelper;
+import org.apache.hadoop.test.TestJetty;
+import org.apache.hadoop.test.TestJettyHelper;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.webapp.WebAppContext;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Writer;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.text.MessageFormat;
+
+/**
+ * This test class ensures that everything works as expected when ACL
+ * support is turned off HDFS.  This is the default configuration.  The other
+ * tests operate with ACL support turned on.
+ */
+public class TestHttpFSServerNoACLs extends HTestCase {
+
+  private MiniDFSCluster miniDfs;
+  private Configuration nnConf;
+
+  /**
+   * Fire up our own hand-rolled MiniDFSCluster.  We do this here instead
+   * of relying on TestHdfsHelper because we don't want to turn on ACL
+   * support.
+   *
+   * @throws Exception
+   */
+  private void startMiniDFS() throws Exception {
+
+    File testDirRoot = TestDirHelper.getTestDir();
+
+    if (System.getProperty("hadoop.log.dir") == null) {
+      System.setProperty("hadoop.log.dir",
+              new File(testDirRoot, "hadoop-log").getAbsolutePath());
+    }
+    if (System.getProperty("test.build.data") == null) {
+      System.setProperty("test.build.data",
+              new File(testDirRoot, "hadoop-data").getAbsolutePath());
+    }
+
+    Configuration conf = HadoopUsersConfTestHelper.getBaseConf();
+    HadoopUsersConfTestHelper.addUserConf(conf);
+    conf.set("fs.hdfs.impl.disable.cache", "true");
+    conf.set("dfs.block.access.token.enable", "false");
+    conf.set("dfs.permissions", "true");
+    conf.set("hadoop.security.authentication", "simple");
+
+    // Explicitly turn off ACL support
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, false);
+
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+    builder.numDataNodes(2);
+    miniDfs = builder.build();
+    nnConf = miniDfs.getConfiguration(0);
+  }
+
+  /**
+   * Create an HttpFS Server to talk to the MiniDFSCluster we created.
+   * @throws Exception
+   */
+  private void createHttpFSServer() throws Exception {
+    File homeDir = TestDirHelper.getTestDir();
+    Assert.assertTrue(new File(homeDir, "conf").mkdir());
+    Assert.assertTrue(new File(homeDir, "log").mkdir());
+    Assert.assertTrue(new File(homeDir, "temp").mkdir());
+    HttpFSServerWebApp.setHomeDirForCurrentThread(homeDir.getAbsolutePath());
+
+    File secretFile = new File(new File(homeDir, "conf"), "secret");
+    Writer w = new FileWriter(secretFile);
+    w.write("secret");
+    w.close();
+
+    // HDFS configuration
+    File hadoopConfDir = new File(new File(homeDir, "conf"), "hadoop-conf");
+    if ( !hadoopConfDir.mkdirs() ) {
+      throw new IOException();
+    }
+
+    String fsDefaultName =
+            nnConf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
+    Configuration conf = new Configuration(false);
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, fsDefaultName);
+
+    // Explicitly turn off ACLs, just in case the default becomes true later
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, false);
+
+    File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
+    OutputStream os = new FileOutputStream(hdfsSite);
+    conf.writeXml(os);
+    os.close();
+
+    // HTTPFS configuration
+    conf = new Configuration(false);
+    conf.set("httpfs.hadoop.config.dir", hadoopConfDir.toString());
+    conf.set("httpfs.proxyuser." +
+                    HadoopUsersConfTestHelper.getHadoopProxyUser() + ".groups",
+            HadoopUsersConfTestHelper.getHadoopProxyUserGroups());
+    conf.set("httpfs.proxyuser." +
+                    HadoopUsersConfTestHelper.getHadoopProxyUser() + ".hosts",
+            HadoopUsersConfTestHelper.getHadoopProxyUserHosts());
+    conf.set("httpfs.authentication.signature.secret.file",
+            secretFile.getAbsolutePath());
+
+    File httpfsSite = new File(new File(homeDir, "conf"), "httpfs-site.xml");
+    os = new FileOutputStream(httpfsSite);
+    conf.writeXml(os);
+    os.close();
+
+    ClassLoader cl = Thread.currentThread().getContextClassLoader();
+    URL url = cl.getResource("webapp");
+    if ( url == null ) {
+      throw new IOException();
+    }
+    WebAppContext context = new WebAppContext(url.getPath(), "/webhdfs");
+    Server server = TestJettyHelper.getJettyServer();
+    server.addHandler(context);
+    server.start();
+  }
+
+  /**
+   * Talks to the http interface to get the json output of a *STATUS command
+   * on the given file.
+   *
+   * @param filename The file to query.
+   * @param command Either GETFILESTATUS, LISTSTATUS, or ACLSTATUS
+   * @param expectOK Is this operation expected to succeed?
+   * @throws Exception
+   */
+  private void getStatus(String filename, String command, boolean expectOK)
+          throws Exception {
+    String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
+    // Remove leading / from filename
+    if ( filename.charAt(0) == '/' ) {
+      filename = filename.substring(1);
+    }
+    String pathOps = MessageFormat.format(
+            "/webhdfs/v1/{0}?user.name={1}&op={2}",
+            filename, user, command);
+    URL url = new URL(TestJettyHelper.getJettyURL(), pathOps);
+    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+    conn.connect();
+    int resp = conn.getResponseCode();
+    BufferedReader reader;
+    if ( expectOK ) {
+      Assert.assertEquals(HttpURLConnection.HTTP_OK, resp);
+      reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
+      String res = reader.readLine();
+      Assert.assertTrue(!res.contains("aclBit"));
+      Assert.assertTrue(res.contains("owner")); // basic sanity check
+    } else {
+      Assert.assertEquals(HttpURLConnection.HTTP_INTERNAL_ERROR, resp);
+      reader = new BufferedReader(new InputStreamReader(conn.getErrorStream()));
+      String res = reader.readLine();
+      Assert.assertTrue(res.contains("RemoteException"));
+      Assert.assertTrue(res.contains("ACL"));
+      Assert.assertTrue(res.contains("rejected"));
+    }
+  }
+
+  /**
+   * General-purpose http PUT command to the httpfs server.
+   * @param filename The file to operate upon
+   * @param command The command to perform (SETACL, etc)
+   * @param params Parameters, like "aclspec=..."
+   */
+  private void putCmd(String filename, String command,
+                      String params, boolean expectOK) throws Exception {
+    String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
+    // Remove leading / from filename
+    if ( filename.charAt(0) == '/' ) {
+      filename = filename.substring(1);
+    }
+    String pathOps = MessageFormat.format(
+            "/webhdfs/v1/{0}?user.name={1}{2}{3}&op={4}",
+            filename, user, (params == null) ? "" : "&",
+            (params == null) ? "" : params, command);
+    URL url = new URL(TestJettyHelper.getJettyURL(), pathOps);
+    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+    conn.setRequestMethod("PUT");
+    conn.connect();
+    int resp = conn.getResponseCode();
+    if ( expectOK ) {
+      Assert.assertEquals(HttpURLConnection.HTTP_OK, resp);
+    } else {
+      Assert.assertEquals(HttpURLConnection.HTTP_INTERNAL_ERROR, resp);
+      BufferedReader reader;
+      reader = new BufferedReader(new InputStreamReader(conn.getErrorStream()));
+      String err = reader.readLine();
+      Assert.assertTrue(err.contains("RemoteException"));
+      Assert.assertTrue(err.contains("ACL"));
+      Assert.assertTrue(err.contains("rejected"));
+    }
+  }
+
+  /**
+   * Ensure that
+   * <ol>
+   *   <li>GETFILESTATUS and LISTSTATUS work happily</li>
+   *   <li>ACLSTATUS throws an exception</li>
+   *   <li>The ACL SET, REMOVE, etc calls all fail</li>
+   * </ol>
+   *
+   * @throws Exception
+   */
+  @Test
+  @TestDir
+  @TestJetty
+  public void testWithNoAcls() throws Exception {
+    final String aclUser1 = "user:foo:rw-";
+    final String aclUser2 = "user:bar:r--";
+    final String aclGroup1 = "group::r--";
+    final String aclSpec = "aclspec=user::rwx," + aclUser1 + ","
+            + aclGroup1 + ",other::---";
+    final String modAclSpec = "aclspec=" + aclUser2;
+    final String remAclSpec = "aclspec=" + aclUser1;
+    final String defUser1 = "default:user:glarch:r-x";
+    final String defSpec1 = "aclspec=" + defUser1;
+    final String dir = "/noACLs";
+    final String path = dir + "/foo";
+
+    startMiniDFS();
+    createHttpFSServer();
+
+    FileSystem fs = FileSystem.get(nnConf);
+    fs.mkdirs(new Path(dir));
+    OutputStream os = fs.create(new Path(path));
+    os.write(1);
+    os.close();
+
+    /* The normal status calls work as expected; GETACLSTATUS fails */
+    getStatus(path, "GETFILESTATUS", true);
+    getStatus(dir, "LISTSTATUS", true);
+    getStatus(path, "GETACLSTATUS", false);
+
+    /* All the ACL-based PUT commands fail with ACL exceptions */
+    putCmd(path, "SETACL", aclSpec, false);
+    putCmd(path, "MODIFYACLENTRIES", modAclSpec, false);
+    putCmd(path, "REMOVEACLENTRIES", remAclSpec, false);
+    putCmd(path, "REMOVEACL", null, false);
+    putCmd(dir, "SETACL", defSpec1, false);
+    putCmd(dir, "REMOVEDEFAULTACL", null, false);
+
+    miniDfs.shutdown();
+  }
+}

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/test/TestHdfsHelper.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.Test;
 import org.junit.runners.model.FrameworkMethod;
@@ -145,6 +146,7 @@ public class TestHdfsHelper extends TestDirHelper {
       conf.set("dfs.block.access.token.enable", "false");
       conf.set("dfs.permissions", "true");
       conf.set("hadoop.security.authentication", "simple");
+      conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
       MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
       builder.numDataNodes(2);
       MiniDFSCluster miniHdfs = builder.build();

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -424,6 +424,8 @@ Release 2.5.0 - UNRELEASED
 
     HDFS-6315. Decouple recording edit logs from FSDirectory. (wheat9)
 
+    HDFS-6379. HTTPFS - Implement ACLs support. (yoderme via tucu)
+
   OPTIMIZATIONS
 
     HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)