瀏覽代碼

HADOOP-16455. ABFS: Implement FileSystem.access() method.

Contributed by Bilahari T H.
bilaharith 5 年之前
父節點
當前提交
9e69628f55

+ 8 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

@@ -178,6 +178,10 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_USE_UPN)
       DefaultValue = DEFAULT_USE_UPN)
   private boolean useUpn;
   private boolean useUpn;
 
 
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey =
+      FS_AZURE_ENABLE_CHECK_ACCESS, DefaultValue = DEFAULT_ENABLE_CHECK_ACCESS)
+  private boolean isCheckAccessEnabled;
+
   @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_LATENCY_TRACK,
   @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_LATENCY_TRACK,
           DefaultValue = DEFAULT_ABFS_LATENCY_TRACK)
           DefaultValue = DEFAULT_ABFS_LATENCY_TRACK)
   private boolean trackLatency;
   private boolean trackLatency;
@@ -403,6 +407,10 @@ public class AbfsConfiguration{
     return this.azureBlockSize;
     return this.azureBlockSize;
   }
   }
 
 
+  public boolean isCheckAccessEnabled() {
+    return this.isCheckAccessEnabled;
+  }
+
   public String getAzureBlockLocationHost() {
   public String getAzureBlockLocationHost() {
     return this.azureBlockLocationHost;
     return this.azureBlockLocationHost;
   }
   }

+ 20 - 3
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

@@ -861,9 +861,14 @@ public class AzureBlobFileSystem extends FileSystem {
    * @throws IOException                   see specific implementation
    * @throws IOException                   see specific implementation
    */
    */
   @Override
   @Override
-  public void access(final Path path, FsAction mode) throws IOException {
-    // TODO: make it no-op to unblock hive permission issue for now.
-    // Will add a long term fix similar to the implementation in AdlFileSystem.
+  public void access(final Path path, final FsAction mode) throws IOException {
+    LOG.debug("AzureBlobFileSystem.access path : {}, mode : {}", path, mode);
+    Path qualifiedPath = makeQualified(path);
+    try {
+      this.abfsStore.access(qualifiedPath, mode);
+    } catch (AzureBlobFileSystemException ex) {
+      checkCheckAccessException(path, ex);
+    }
   }
   }
 
 
   private FileStatus tryGetFileStatus(final Path f) {
   private FileStatus tryGetFileStatus(final Path f) {
@@ -976,6 +981,18 @@ public class AzureBlobFileSystem extends FileSystem {
     }
     }
   }
   }
 
 
+  private void checkCheckAccessException(final Path path,
+      final AzureBlobFileSystemException exception) throws IOException {
+    if (exception instanceof AbfsRestOperationException) {
+      AbfsRestOperationException ere = (AbfsRestOperationException) exception;
+      if (ere.getStatusCode() == HttpURLConnection.HTTP_FORBIDDEN) {
+        throw (IOException) new AccessControlException(ere.getMessage())
+            .initCause(exception);
+      }
+    }
+    checkException(path, exception);
+  }
+
   /**
   /**
    * Given a path and exception, choose which IOException subclass
    * Given a path and exception, choose which IOException subclass
    * to create.
    * to create.

+ 19 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -1072,6 +1072,25 @@ public class AzureBlobFileSystemStore implements Closeable {
     }
     }
   }
   }
 
 
+  public void access(final Path path, final FsAction mode)
+      throws AzureBlobFileSystemException {
+    LOG.debug("access for filesystem: {}, path: {}, mode: {}",
+        this.client.getFileSystem(), path, mode);
+    if (!this.abfsConfiguration.isCheckAccessEnabled()
+        || !getIsNamespaceEnabled()) {
+      LOG.debug("Returning; either check access is not enabled or the account"
+          + " used is not namespace enabled");
+      return;
+    }
+    try (AbfsPerfInfo perfInfo = startTracking("access", "checkAccess")) {
+      String relativePath =
+          AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true);
+      final AbfsRestOperation op = this.client
+          .checkAccess(relativePath, mode.SYMBOL);
+      perfInfo.registerResult(op.getResult()).registerSuccess(true);
+    }
+  }
+
   public boolean isAtomicRenameKey(String key) {
   public boolean isAtomicRenameKey(String key) {
     return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
     return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
   }
   }

+ 1 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java

@@ -37,6 +37,7 @@ public final class AbfsHttpConstants {
   public static final String SET_PROPERTIES_ACTION = "setProperties";
   public static final String SET_PROPERTIES_ACTION = "setProperties";
   public static final String SET_ACCESS_CONTROL = "setAccessControl";
   public static final String SET_ACCESS_CONTROL = "setAccessControl";
   public static final String GET_ACCESS_CONTROL = "getAccessControl";
   public static final String GET_ACCESS_CONTROL = "getAccessControl";
+  public static final String CHECK_ACCESS = "checkAccess";
   public static final String GET_STATUS = "getStatus";
   public static final String GET_STATUS = "getStatus";
   public static final String DEFAULT_TIMEOUT = "90";
   public static final String DEFAULT_TIMEOUT = "90";
   public static final String TOKEN_VERSION = "2";
   public static final String TOKEN_VERSION = "2";

+ 4 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

@@ -62,6 +62,10 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_DISABLE_OUTPUTSTREAM_FLUSH = "fs.azure.disable.outputstream.flush";
   public static final String FS_AZURE_DISABLE_OUTPUTSTREAM_FLUSH = "fs.azure.disable.outputstream.flush";
   public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix";
   public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix";
   public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode";
   public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode";
+  /** Provides a config to enable/disable the checkAccess API.
+   *  By default this will be
+   *  FileSystemConfigurations.DEFAULT_ENABLE_CHECK_ACCESS. **/
+  public static final String FS_AZURE_ENABLE_CHECK_ACCESS = "fs.azure.enable.check.access";
   public static final String FS_AZURE_USE_UPN = "fs.azure.use.upn";
   public static final String FS_AZURE_USE_UPN = "fs.azure.use.upn";
   /** User principal names (UPNs) have the format “{alias}@{domain}”. If true,
   /** User principal names (UPNs) have the format “{alias}@{domain}”. If true,
    *  only {alias} is included when a UPN would otherwise appear in the output
    *  only {alias} is included when a UPN would otherwise appear in the output

+ 2 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

@@ -67,7 +67,8 @@ public final class FileSystemConfigurations {
   public static final boolean DEFAULT_ENABLE_HTTPS = true;
   public static final boolean DEFAULT_ENABLE_HTTPS = true;
 
 
   public static final boolean DEFAULT_USE_UPN = false;
   public static final boolean DEFAULT_USE_UPN = false;
+  public static final boolean DEFAULT_ENABLE_CHECK_ACCESS = false;
   public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
   public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
 
 
   private FileSystemConfigurations() {}
   private FileSystemConfigurations() {}
-}
+}

+ 1 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java

@@ -32,6 +32,7 @@ public final class HttpQueryParams {
   public static final String QUERY_PARAM_RECURSIVE = "recursive";
   public static final String QUERY_PARAM_RECURSIVE = "recursive";
   public static final String QUERY_PARAM_MAXRESULTS = "maxResults";
   public static final String QUERY_PARAM_MAXRESULTS = "maxResults";
   public static final String QUERY_PARAM_ACTION = "action";
   public static final String QUERY_PARAM_ACTION = "action";
+  public static final String QUERY_FS_ACTION = "fsAction";
   public static final String QUERY_PARAM_POSITION = "position";
   public static final String QUERY_PARAM_POSITION = "position";
   public static final String QUERY_PARAM_TIMEOUT = "timeout";
   public static final String QUERY_PARAM_TIMEOUT = "timeout";
   public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";
   public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";

+ 22 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

@@ -524,6 +524,28 @@ public class AbfsClient implements Closeable {
     return op;
     return op;
   }
   }
 
 
+  /**
+   * Talks to the server to check whether the permission specified in
+   * the rwx parameter is present for the path specified in the path parameter.
+   *
+   * @param path  Path for which access check needs to be performed
+   * @param rwx   The permission to be checked on the path
+   * @return      The {@link AbfsRestOperation} object for the operation
+   * @throws AzureBlobFileSystemException in case of bad requests
+   */
+  public AbfsRestOperation checkAccess(String path, String rwx)
+      throws AzureBlobFileSystemException {
+    AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, CHECK_ACCESS);
+    abfsUriQueryBuilder.addQuery(QUERY_FS_ACTION, rwx);
+    URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    AbfsRestOperation op = new AbfsRestOperation(
+        AbfsRestOperationType.CheckAccess, this,
+        AbfsHttpConstants.HTTP_METHOD_HEAD, url, createDefaultHeaders());
+    op.execute();
+    return op;
+  }
+
   private URL createRequestUrl(final String query) throws AzureBlobFileSystemException {
   private URL createRequestUrl(final String query) throws AzureBlobFileSystemException {
     return createRequestUrl(EMPTY_STRING, query);
     return createRequestUrl(EMPTY_STRING, query);
   }
   }

+ 2 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java

@@ -39,5 +39,6 @@ public enum AbfsRestOperationType {
     Append,
     Append,
     Flush,
     Flush,
     ReadFile,
     ReadFile,
-    DeletePath
+    DeletePath,
+    CheckAccess
 }
 }

+ 4 - 0
hadoop-tools/hadoop-azure/src/site/markdown/abfs.md

@@ -661,6 +661,10 @@ Hflush() being the only documented API that can provide persistent data
 transfer, Flush() also attempting to persist buffered data will lead to
 transfer, Flush() also attempting to persist buffered data will lead to
 performance issues.
 performance issues.
 
 
+### <a name="flushconfigoptions"></a> Access Options
+Config `fs.azure.enable.check.access` needs to be set true to enable
+ the AzureBlobFileSystem.access().
+
 ### <a name="perfoptions"></a> Perf Options
 ### <a name="perfoptions"></a> Perf Options
 
 
 #### <a name="abfstracklatencyoptions"></a> 1. HTTP Request Tracking Options
 #### <a name="abfstracklatencyoptions"></a> 1. HTTP Request Tracking Options

+ 314 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java

@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs;
+
+import com.google.common.collect.Lists;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.Assume;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.security.AccessControlException;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CHECK_ACCESS;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_ID;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_SECRET;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CLIENT_ID;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CLIENT_SECRET;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
+
+/**
+ * Test cases for AzureBlobFileSystem.access()
+ */
+public class ITestAzureBlobFileSystemCheckAccess
+    extends AbstractAbfsIntegrationTest {
+
+  private static final String TEST_FOLDER_PATH = "CheckAccessTestFolder";
+  private final FileSystem superUserFs;
+  private final FileSystem testUserFs;
+  private final String testUserGuid;
+  private final boolean isCheckAccessEnabled;
+  private final boolean isHNSEnabled;
+
+  public ITestAzureBlobFileSystemCheckAccess() throws Exception {
+    super.setup();
+    this.superUserFs = getFileSystem();
+    testUserGuid = getConfiguration()
+        .get(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID);
+    this.testUserFs = getTestUserFs();
+    this.isCheckAccessEnabled = getConfiguration().isCheckAccessEnabled();
+    this.isHNSEnabled = getConfiguration()
+        .getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
+  }
+
+  private FileSystem getTestUserFs() throws Exception {
+    String orgClientId = getConfiguration().get(FS_AZURE_BLOB_FS_CLIENT_ID);
+    String orgClientSecret = getConfiguration()
+        .get(FS_AZURE_BLOB_FS_CLIENT_SECRET);
+    Boolean orgCreateFileSystemDurungInit = getConfiguration()
+        .getBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true);
+    getRawConfiguration().set(FS_AZURE_BLOB_FS_CLIENT_ID,
+        getConfiguration().get(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_ID));
+    getRawConfiguration().set(FS_AZURE_BLOB_FS_CLIENT_SECRET, getConfiguration()
+        .get(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_SECRET));
+    getRawConfiguration()
+        .setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
+            false);
+    FileSystem fs = FileSystem.newInstance(getRawConfiguration());
+    getRawConfiguration().set(FS_AZURE_BLOB_FS_CLIENT_ID, orgClientId);
+    getRawConfiguration().set(FS_AZURE_BLOB_FS_CLIENT_SECRET, orgClientSecret);
+    getRawConfiguration()
+        .setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
+            orgCreateFileSystemDurungInit);
+    return fs;
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCheckAccessWithNullPath() throws IOException {
+    superUserFs.access(null, FsAction.READ);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testCheckAccessForFileWithNullFsAction() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    //  NPE when trying to convert null FsAction enum
+    superUserFs.access(new Path("test.txt"), null);
+  }
+
+  @Test(expected = FileNotFoundException.class)
+  public void testCheckAccessForNonExistentFile() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    Path nonExistentFile = setupTestDirectoryAndUserAccess(
+        "/nonExistentFile1.txt", FsAction.ALL);
+    superUserFs.delete(nonExistentFile, true);
+    testUserFs.access(nonExistentFile, FsAction.READ);
+  }
+
+  @Test
+  public void testWhenCheckAccessConfigIsOff() throws Exception {
+    Assume.assumeTrue(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT + " is false",
+        isHNSEnabled);
+    Configuration conf = getRawConfiguration();
+    conf.setBoolean(FS_AZURE_ENABLE_CHECK_ACCESS, false);
+    FileSystem fs = FileSystem.newInstance(conf);
+    Path testFilePath = setupTestDirectoryAndUserAccess("/test1.txt",
+        FsAction.NONE);
+    fs.access(testFilePath, FsAction.EXECUTE);
+    fs.access(testFilePath, FsAction.READ);
+    fs.access(testFilePath, FsAction.WRITE);
+    fs.access(testFilePath, FsAction.READ_EXECUTE);
+    fs.access(testFilePath, FsAction.WRITE_EXECUTE);
+    fs.access(testFilePath, FsAction.READ_WRITE);
+    fs.access(testFilePath, FsAction.ALL);
+    testFilePath = setupTestDirectoryAndUserAccess("/test1.txt", FsAction.ALL);
+    fs.access(testFilePath, FsAction.EXECUTE);
+    fs.access(testFilePath, FsAction.READ);
+    fs.access(testFilePath, FsAction.WRITE);
+    fs.access(testFilePath, FsAction.READ_EXECUTE);
+    fs.access(testFilePath, FsAction.WRITE_EXECUTE);
+    fs.access(testFilePath, FsAction.READ_WRITE);
+    fs.access(testFilePath, FsAction.ALL);
+    fs.access(testFilePath, null);
+
+    Path nonExistentFile = setupTestDirectoryAndUserAccess(
+        "/nonExistentFile2" + ".txt", FsAction.NONE);
+    superUserFs.delete(nonExistentFile, true);
+    fs.access(nonExistentFile, FsAction.READ);
+  }
+
+  @Test
+  public void testCheckAccessForAccountWithoutNS() throws Exception {
+    Assume.assumeFalse(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT + " is true",
+        getConfiguration()
+            .getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, true));
+    testUserFs.access(new Path("/"), FsAction.READ);
+  }
+
+  @Test
+  public void testFsActionNONE() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    Path testFilePath = setupTestDirectoryAndUserAccess("/test2.txt",
+        FsAction.NONE);
+    assertInaccessible(testFilePath, FsAction.EXECUTE);
+    assertInaccessible(testFilePath, FsAction.READ);
+    assertInaccessible(testFilePath, FsAction.WRITE);
+    assertInaccessible(testFilePath, FsAction.READ_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.WRITE_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.READ_WRITE);
+    assertInaccessible(testFilePath, FsAction.ALL);
+  }
+
+  @Test
+  public void testFsActionEXECUTE() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    Path testFilePath = setupTestDirectoryAndUserAccess("/test3.txt",
+        FsAction.EXECUTE);
+    assertAccessible(testFilePath, FsAction.EXECUTE);
+
+    assertInaccessible(testFilePath, FsAction.READ);
+    assertInaccessible(testFilePath, FsAction.WRITE);
+    assertInaccessible(testFilePath, FsAction.READ_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.WRITE_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.READ_WRITE);
+    assertInaccessible(testFilePath, FsAction.ALL);
+  }
+
+  @Test
+  public void testFsActionREAD() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    Path testFilePath = setupTestDirectoryAndUserAccess("/test4.txt",
+        FsAction.READ);
+    assertAccessible(testFilePath, FsAction.READ);
+
+    assertInaccessible(testFilePath, FsAction.EXECUTE);
+    assertInaccessible(testFilePath, FsAction.WRITE);
+    assertInaccessible(testFilePath, FsAction.READ_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.WRITE_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.READ_WRITE);
+    assertInaccessible(testFilePath, FsAction.ALL);
+  }
+
+  @Test
+  public void testFsActionWRITE() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    Path testFilePath = setupTestDirectoryAndUserAccess("/test5.txt",
+        FsAction.WRITE);
+    assertAccessible(testFilePath, FsAction.WRITE);
+
+    assertInaccessible(testFilePath, FsAction.EXECUTE);
+    assertInaccessible(testFilePath, FsAction.READ);
+    assertInaccessible(testFilePath, FsAction.READ_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.WRITE_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.READ_WRITE);
+    assertInaccessible(testFilePath, FsAction.ALL);
+  }
+
+  @Test
+  public void testFsActionREADEXECUTE() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    Path testFilePath = setupTestDirectoryAndUserAccess("/test6.txt",
+        FsAction.READ_EXECUTE);
+    assertAccessible(testFilePath, FsAction.EXECUTE);
+    assertAccessible(testFilePath, FsAction.READ);
+    assertAccessible(testFilePath, FsAction.READ_EXECUTE);
+
+    assertInaccessible(testFilePath, FsAction.WRITE);
+    assertInaccessible(testFilePath, FsAction.WRITE_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.READ_WRITE);
+    assertInaccessible(testFilePath, FsAction.ALL);
+  }
+
+  @Test
+  public void testFsActionWRITEEXECUTE() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    Path testFilePath = setupTestDirectoryAndUserAccess("/test7.txt",
+        FsAction.WRITE_EXECUTE);
+    assertAccessible(testFilePath, FsAction.EXECUTE);
+    assertAccessible(testFilePath, FsAction.WRITE);
+    assertAccessible(testFilePath, FsAction.WRITE_EXECUTE);
+
+    assertInaccessible(testFilePath, FsAction.READ);
+    assertInaccessible(testFilePath, FsAction.READ_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.READ_WRITE);
+    assertInaccessible(testFilePath, FsAction.ALL);
+  }
+
+  @Test
+  public void testFsActionALL() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    Path testFilePath = setupTestDirectoryAndUserAccess("/test8.txt",
+        FsAction.ALL);
+    assertAccessible(testFilePath, FsAction.EXECUTE);
+    assertAccessible(testFilePath, FsAction.WRITE);
+    assertAccessible(testFilePath, FsAction.WRITE_EXECUTE);
+    assertAccessible(testFilePath, FsAction.READ);
+    assertAccessible(testFilePath, FsAction.READ_EXECUTE);
+    assertAccessible(testFilePath, FsAction.READ_WRITE);
+    assertAccessible(testFilePath, FsAction.ALL);
+  }
+
+  private void assumeHNSAndCheckAccessEnabled() {
+    Assume.assumeTrue(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT + " is false",
+        isHNSEnabled);
+    Assume.assumeTrue(FS_AZURE_ENABLE_CHECK_ACCESS + " is false",
+        isCheckAccessEnabled);
+  }
+
+  private void assertAccessible(Path testFilePath, FsAction fsAction)
+      throws IOException {
+    assertTrue(
+        "Should have been given access  " + fsAction + " on " + testFilePath,
+        isAccessible(testUserFs, testFilePath, fsAction));
+  }
+
+  private void assertInaccessible(Path testFilePath, FsAction fsAction)
+      throws IOException {
+    assertFalse(
+        "Should have been denied access  " + fsAction + " on " + testFilePath,
+        isAccessible(testUserFs, testFilePath, fsAction));
+  }
+
+  private void setExecuteAccessForParentDirs(Path dir) throws IOException {
+    dir = dir.getParent();
+    while (dir != null) {
+      modifyAcl(dir, testUserGuid, FsAction.EXECUTE);
+      dir = dir.getParent();
+    }
+  }
+
+  private void modifyAcl(Path file, String uid, FsAction fsAction)
+      throws IOException {
+    List<AclEntry> aclSpec = Lists.newArrayList(AclTestHelpers
+        .aclEntry(AclEntryScope.ACCESS, AclEntryType.USER, uid, fsAction));
+    this.superUserFs.modifyAclEntries(file, aclSpec);
+  }
+
+  private Path setupTestDirectoryAndUserAccess(String testFileName,
+      FsAction fsAction) throws Exception {
+    Path file = new Path(TEST_FOLDER_PATH + testFileName);
+    file = this.superUserFs.makeQualified(file);
+    this.superUserFs.delete(file, true);
+    this.superUserFs.create(file);
+    modifyAcl(file, testUserGuid, fsAction);
+    setExecuteAccessForParentDirs(file);
+    return file;
+  }
+
+  private boolean isAccessible(FileSystem fs, Path path, FsAction fsAction)
+      throws IOException {
+    try {
+      fs.access(path, fsAction);
+    } catch (AccessControlException ace) {
+      return false;
+    }
+    return true;
+  }
+}

+ 8 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java

@@ -34,6 +34,14 @@ public final class TestConfigurationKeys {
   public static final String FS_AZURE_BLOB_DATA_READER_CLIENT_ID = "fs.azure.account.oauth2.reader.client.id";
   public static final String FS_AZURE_BLOB_DATA_READER_CLIENT_ID = "fs.azure.account.oauth2.reader.client.id";
   public static final String FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET = "fs.azure.account.oauth2.reader.client.secret";
   public static final String FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET = "fs.azure.account.oauth2.reader.client.secret";
 
 
+  public static final String FS_AZURE_BLOB_FS_CLIENT_ID = "fs.azure.account.oauth2.client.id";
+  public static final String FS_AZURE_BLOB_FS_CLIENT_SECRET = "fs.azure.account.oauth2.client.secret";
+
+  public static final String FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_ID = "fs.azure.account.test.oauth2.client.id";
+  public static final String FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_SECRET = "fs.azure.account.test.oauth2.client.secret";
+
+  public static final String FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID = "fs.azure.check.access.testuser.guid";
+
   public static final String TEST_CONFIGURATION_FILE_NAME = "azure-test.xml";
   public static final String TEST_CONFIGURATION_FILE_NAME = "azure-test.xml";
   public static final String TEST_CONTAINER_PREFIX = "abfs-testcontainer-";
   public static final String TEST_CONTAINER_PREFIX = "abfs-testcontainer-";
   public static final int TEST_TIMEOUT = 15 * 60 * 1000;
   public static final int TEST_TIMEOUT = 15 * 60 * 1000;