Переглянути джерело

HADOOP-16734. Backport HADOOP-16455- "ABFS: Implement FileSystem.access() method" to branch-2.
Contributed by Bilahari T H.

Bilahari T H 5 роки тому
батько
коміт
48c564d5e3

+ 8 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

@@ -178,6 +178,10 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_USE_UPN)
   private boolean useUpn;
 
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey =
+      FS_AZURE_ENABLE_CHECK_ACCESS, DefaultValue = DEFAULT_ENABLE_CHECK_ACCESS)
+  private boolean isCheckAccessEnabled;
+
   private Map<String, String> storageAccountKeys;
 
   public AbfsConfiguration(final Configuration rawConfig, String accountName)
@@ -399,6 +403,10 @@ public class AbfsConfiguration{
     return this.azureBlockSize;
   }
 
+  public boolean isCheckAccessEnabled() {
+    return this.isCheckAccessEnabled;
+  }
+
   public String getAzureBlockLocationHost() {
     return this.azureBlockLocationHost;
   }

+ 20 - 3
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

@@ -854,9 +854,14 @@ public class AzureBlobFileSystem extends FileSystem {
    * @throws IOException                   see specific implementation
    */
   @Override
-  public void access(final Path path, FsAction mode) throws IOException {
-    // TODO: make it no-op to unblock hive permission issue for now.
-    // Will add a long term fix similar to the implementation in AdlFileSystem.
+  public void access(final Path path, final FsAction mode) throws IOException {
+    LOG.debug("AzureBlobFileSystem.access path : {}, mode : {}", path, mode);
+    Path qualifiedPath = makeQualified(path);
+    try {
+      this.abfsStore.access(qualifiedPath, mode);
+    } catch (AzureBlobFileSystemException ex) {
+      checkCheckAccessException(path, ex);
+    }
   }
 
   private FileStatus tryGetFileStatus(final Path f) {
@@ -969,6 +974,18 @@ public class AzureBlobFileSystem extends FileSystem {
     }
   }
 
+  private void checkCheckAccessException(final Path path,
+      final AzureBlobFileSystemException exception) throws IOException {
+    if (exception instanceof AbfsRestOperationException) {
+      AbfsRestOperationException ere = (AbfsRestOperationException) exception;
+      if (ere.getStatusCode() == HttpURLConnection.HTTP_FORBIDDEN) {
+        throw (IOException) new AccessControlException(ere.getMessage())
+            .initCause(exception);
+      }
+    }
+    checkException(path, exception);
+  }
+
   /**
    * Given a path and exception, choose which IOException subclass
    * to create.

+ 19 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -884,6 +884,25 @@ public class AzureBlobFileSystemStore {
     return aclStatusBuilder.build();
   }
 
+  public void access(final Path path, final FsAction mode)
+      throws AzureBlobFileSystemException {
+    LOG.debug("access for filesystem: {}, path: {}, mode: {}",
+        this.client.getFileSystem(), path, mode);
+    if (!this.abfsConfiguration.isCheckAccessEnabled()
+        || !getIsNamespaceEnabled()) {
+      LOG.debug("Returning; either check access is not enabled or the account"
+          + " used is not namespace enabled");
+      return;
+    }
+    //try (AbfsPerfInfo perfInfo = startTracking("access", "checkAccess")) {
+      String relativePath =
+          AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true);
+      final AbfsRestOperation op = this.client
+          .checkAccess(relativePath, mode.SYMBOL);
+    // perfInfo.registerResult(op.getResult()).registerSuccess(true);
+    //}
+  }
+
   public boolean isAtomicRenameKey(String key) {
     return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
   }

+ 1 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java

@@ -37,6 +37,7 @@ public final class AbfsHttpConstants {
   public static final String SET_PROPERTIES_ACTION = "setProperties";
   public static final String SET_ACCESS_CONTROL = "setAccessControl";
   public static final String GET_ACCESS_CONTROL = "getAccessControl";
+  public static final String CHECK_ACCESS = "checkAccess";
   public static final String GET_STATUS = "getStatus";
   public static final String DEFAULT_TIMEOUT = "90";
   public static final String TOKEN_VERSION = "2";

+ 4 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

@@ -62,6 +62,10 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_DISABLE_OUTPUTSTREAM_FLUSH = "fs.azure.disable.outputstream.flush";
   public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix";
   public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode";
+  /** Provides a config to enable/disable the checkAccess API.
+   *  By default this will be
+   *  FileSystemConfigurations.DEFAULT_ENABLE_CHECK_ACCESS. **/
+  public static final String FS_AZURE_ENABLE_CHECK_ACCESS = "fs.azure.enable.check.access";
   public static final String FS_AZURE_USE_UPN = "fs.azure.use.upn";
   /** User principal names (UPNs) have the format “{alias}@{domain}”. If true,
    *  only {alias} is included when a UPN would otherwise appear in the output

+ 3 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

@@ -68,5 +68,7 @@ public final class FileSystemConfigurations {
 
   public static final boolean DEFAULT_USE_UPN = false;
 
+  public static final boolean DEFAULT_ENABLE_CHECK_ACCESS = false;
+
   private FileSystemConfigurations() {}
-}
+}

+ 1 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java

@@ -32,6 +32,7 @@ public final class HttpQueryParams {
   public static final String QUERY_PARAM_RECURSIVE = "recursive";
   public static final String QUERY_PARAM_MAXRESULTS = "maxResults";
   public static final String QUERY_PARAM_ACTION = "action";
+  public static final String QUERY_FS_ACTION = "fsAction";
   public static final String QUERY_PARAM_POSITION = "position";
   public static final String QUERY_PARAM_TIMEOUT = "timeout";
   public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";

+ 22 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

@@ -507,6 +507,28 @@ public class AbfsClient {
     return op;
   }
 
+  /**
+   * Talks to the server to check whether the permission specified in
+   * the rwx parameter is present for the path specified in the path parameter.
+   *
+   * @param path  Path for which access check needs to be performed
+   * @param rwx   The permission to be checked on the path
+   * @return      The {@link AbfsRestOperation} object for the operation
+   * @throws AzureBlobFileSystemException in case of bad requests
+   */
+  public AbfsRestOperation checkAccess(String path, String rwx)
+      throws AzureBlobFileSystemException {
+    AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, CHECK_ACCESS);
+    abfsUriQueryBuilder.addQuery(QUERY_FS_ACTION, rwx);
+    URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    AbfsRestOperation op = new AbfsRestOperation(
+        AbfsRestOperationType.CheckAccess, this,
+        AbfsHttpConstants.HTTP_METHOD_HEAD, url, createDefaultHeaders());
+    op.execute();
+    return op;
+  }
+
   private URL createRequestUrl(final String query) throws AzureBlobFileSystemException {
     return createRequestUrl(EMPTY_STRING, query);
   }

+ 2 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java

@@ -39,5 +39,6 @@ public enum AbfsRestOperationType {
     Append,
     Flush,
     ReadFile,
-    DeletePath
+    DeletePath,
+    CheckAccess
 }

+ 4 - 0
hadoop-tools/hadoop-azure/src/site/markdown/abfs.md

@@ -92,6 +92,10 @@ performance issues.
 
 ## Testing ABFS
 
+### <a name="flushconfigoptions"></a> Access Options
+Config `fs.azure.enable.check.access` needs to be set true to enable
+ the AzureBlobFileSystem.access().
+
 See the relevant section in [Testing Azure](testing_azure.html).
 
 ## References

+ 320 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java

@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs;
+
+import com.google.common.collect.Lists;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.Assume;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.security.AccessControlException;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CHECK_ACCESS;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_ID;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_SECRET;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CLIENT_ID;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CLIENT_SECRET;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
+
+/**
+ * Test cases for AzureBlobFileSystem.access()
+ */
+public class ITestAzureBlobFileSystemCheckAccess
+    extends AbstractAbfsIntegrationTest {
+
+  private static final String TEST_FOLDER_PATH = "CheckAccessTestFolder";
+  private final FileSystem superUserFs;
+  private final FileSystem testUserFs;
+  private final String testUserGuid;
+  private final boolean isCheckAccessEnabled;
+  private final boolean isHNSEnabled;
+
+  public ITestAzureBlobFileSystemCheckAccess() throws Exception {
+    super.setup();
+    this.superUserFs = getFileSystem();
+    testUserGuid = getConfiguration()
+        .get(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID);
+    this.testUserFs = getTestUserFs();
+    this.isCheckAccessEnabled = getConfiguration().isCheckAccessEnabled();
+    this.isHNSEnabled = getConfiguration()
+        .getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
+  }
+
+  private FileSystem getTestUserFs() throws Exception {
+    String orgClientId = getConfiguration().get(FS_AZURE_BLOB_FS_CLIENT_ID);
+    String orgClientSecret = getConfiguration()
+        .get(FS_AZURE_BLOB_FS_CLIENT_SECRET);
+    Boolean orgCreateFileSystemDurungInit = getConfiguration()
+        .getBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true);
+    getRawConfiguration().set(FS_AZURE_BLOB_FS_CLIENT_ID,
+        getConfiguration().get(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_ID));
+    getRawConfiguration().set(FS_AZURE_BLOB_FS_CLIENT_SECRET, getConfiguration()
+        .get(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_SECRET));
+    getRawConfiguration()
+        .setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
+            false);
+    FileSystem fs = FileSystem.newInstance(getRawConfiguration());
+    getRawConfiguration().set(FS_AZURE_BLOB_FS_CLIENT_ID, orgClientId);
+    getRawConfiguration().set(FS_AZURE_BLOB_FS_CLIENT_SECRET, orgClientSecret);
+    getRawConfiguration()
+        .setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
+            orgCreateFileSystemDurungInit);
+    return fs;
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  @Ignore
+  /*  Ignoring as the FileSystem.checkPath does not check for
+      Preconditions.checkArgument(path != null, "null path");
+      hence results in NPE
+   */
+  public void testCheckAccessWithNullPath() throws IOException {
+    superUserFs.access(null, FsAction.READ);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testCheckAccessForFileWithNullFsAction() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    //  NPE when trying to convert null FsAction enum
+    superUserFs.access(new Path("test.txt"), null);
+  }
+
+  @Test(expected = FileNotFoundException.class)
+  public void testCheckAccessForNonExistentFile() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    Path nonExistentFile = setupTestDirectoryAndUserAccess(
+        "/nonExistentFile1.txt", FsAction.ALL);
+    superUserFs.delete(nonExistentFile, true);
+    testUserFs.access(nonExistentFile, FsAction.READ);
+  }
+
+  @Test
+  public void testWhenCheckAccessConfigIsOff() throws Exception {
+    Assume.assumeTrue(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT + " is false",
+        isHNSEnabled);
+    Configuration conf = getRawConfiguration();
+    conf.setBoolean(FS_AZURE_ENABLE_CHECK_ACCESS, false);
+    FileSystem fs = FileSystem.newInstance(conf);
+    Path testFilePath = setupTestDirectoryAndUserAccess("/test1.txt",
+        FsAction.NONE);
+    fs.access(testFilePath, FsAction.EXECUTE);
+    fs.access(testFilePath, FsAction.READ);
+    fs.access(testFilePath, FsAction.WRITE);
+    fs.access(testFilePath, FsAction.READ_EXECUTE);
+    fs.access(testFilePath, FsAction.WRITE_EXECUTE);
+    fs.access(testFilePath, FsAction.READ_WRITE);
+    fs.access(testFilePath, FsAction.ALL);
+    testFilePath = setupTestDirectoryAndUserAccess("/test1.txt", FsAction.ALL);
+    fs.access(testFilePath, FsAction.EXECUTE);
+    fs.access(testFilePath, FsAction.READ);
+    fs.access(testFilePath, FsAction.WRITE);
+    fs.access(testFilePath, FsAction.READ_EXECUTE);
+    fs.access(testFilePath, FsAction.WRITE_EXECUTE);
+    fs.access(testFilePath, FsAction.READ_WRITE);
+    fs.access(testFilePath, FsAction.ALL);
+    fs.access(testFilePath, null);
+
+    Path nonExistentFile = setupTestDirectoryAndUserAccess(
+        "/nonExistentFile2" + ".txt", FsAction.NONE);
+    superUserFs.delete(nonExistentFile, true);
+    fs.access(nonExistentFile, FsAction.READ);
+  }
+
+  @Test
+  public void testCheckAccessForAccountWithoutNS() throws Exception {
+    Assume.assumeFalse(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT + " is true",
+        getConfiguration()
+            .getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, true));
+    testUserFs.access(new Path("/"), FsAction.READ);
+  }
+
+  @Test
+  public void testFsActionNONE() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    Path testFilePath = setupTestDirectoryAndUserAccess("/test2.txt",
+        FsAction.NONE);
+    assertInaccessible(testFilePath, FsAction.EXECUTE);
+    assertInaccessible(testFilePath, FsAction.READ);
+    assertInaccessible(testFilePath, FsAction.WRITE);
+    assertInaccessible(testFilePath, FsAction.READ_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.WRITE_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.READ_WRITE);
+    assertInaccessible(testFilePath, FsAction.ALL);
+  }
+
+  @Test
+  public void testFsActionEXECUTE() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    Path testFilePath = setupTestDirectoryAndUserAccess("/test3.txt",
+        FsAction.EXECUTE);
+    assertAccessible(testFilePath, FsAction.EXECUTE);
+
+    assertInaccessible(testFilePath, FsAction.READ);
+    assertInaccessible(testFilePath, FsAction.WRITE);
+    assertInaccessible(testFilePath, FsAction.READ_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.WRITE_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.READ_WRITE);
+    assertInaccessible(testFilePath, FsAction.ALL);
+  }
+
+  @Test
+  public void testFsActionREAD() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    Path testFilePath = setupTestDirectoryAndUserAccess("/test4.txt",
+        FsAction.READ);
+    assertAccessible(testFilePath, FsAction.READ);
+
+    assertInaccessible(testFilePath, FsAction.EXECUTE);
+    assertInaccessible(testFilePath, FsAction.WRITE);
+    assertInaccessible(testFilePath, FsAction.READ_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.WRITE_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.READ_WRITE);
+    assertInaccessible(testFilePath, FsAction.ALL);
+  }
+
+  @Test
+  public void testFsActionWRITE() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    Path testFilePath = setupTestDirectoryAndUserAccess("/test5.txt",
+        FsAction.WRITE);
+    assertAccessible(testFilePath, FsAction.WRITE);
+
+    assertInaccessible(testFilePath, FsAction.EXECUTE);
+    assertInaccessible(testFilePath, FsAction.READ);
+    assertInaccessible(testFilePath, FsAction.READ_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.WRITE_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.READ_WRITE);
+    assertInaccessible(testFilePath, FsAction.ALL);
+  }
+
+  @Test
+  public void testFsActionREADEXECUTE() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    Path testFilePath = setupTestDirectoryAndUserAccess("/test6.txt",
+        FsAction.READ_EXECUTE);
+    assertAccessible(testFilePath, FsAction.EXECUTE);
+    assertAccessible(testFilePath, FsAction.READ);
+    assertAccessible(testFilePath, FsAction.READ_EXECUTE);
+
+    assertInaccessible(testFilePath, FsAction.WRITE);
+    assertInaccessible(testFilePath, FsAction.WRITE_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.READ_WRITE);
+    assertInaccessible(testFilePath, FsAction.ALL);
+  }
+
+  @Test
+  public void testFsActionWRITEEXECUTE() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    Path testFilePath = setupTestDirectoryAndUserAccess("/test7.txt",
+        FsAction.WRITE_EXECUTE);
+    assertAccessible(testFilePath, FsAction.EXECUTE);
+    assertAccessible(testFilePath, FsAction.WRITE);
+    assertAccessible(testFilePath, FsAction.WRITE_EXECUTE);
+
+    assertInaccessible(testFilePath, FsAction.READ);
+    assertInaccessible(testFilePath, FsAction.READ_EXECUTE);
+    assertInaccessible(testFilePath, FsAction.READ_WRITE);
+    assertInaccessible(testFilePath, FsAction.ALL);
+  }
+
+  @Test
+  public void testFsActionALL() throws Exception {
+    assumeHNSAndCheckAccessEnabled();
+    Path testFilePath = setupTestDirectoryAndUserAccess("/test8.txt",
+        FsAction.ALL);
+    assertAccessible(testFilePath, FsAction.EXECUTE);
+    assertAccessible(testFilePath, FsAction.WRITE);
+    assertAccessible(testFilePath, FsAction.WRITE_EXECUTE);
+    assertAccessible(testFilePath, FsAction.READ);
+    assertAccessible(testFilePath, FsAction.READ_EXECUTE);
+    assertAccessible(testFilePath, FsAction.READ_WRITE);
+    assertAccessible(testFilePath, FsAction.ALL);
+  }
+
+  private void assumeHNSAndCheckAccessEnabled() {
+    Assume.assumeTrue(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT + " is false",
+        isHNSEnabled);
+    Assume.assumeTrue(FS_AZURE_ENABLE_CHECK_ACCESS + " is false",
+        isCheckAccessEnabled);
+  }
+
+  private void assertAccessible(Path testFilePath, FsAction fsAction)
+      throws IOException {
+    assertTrue(
+        "Should have been given access  " + fsAction + " on " + testFilePath,
+        isAccessible(testUserFs, testFilePath, fsAction));
+  }
+
+  private void assertInaccessible(Path testFilePath, FsAction fsAction)
+      throws IOException {
+    assertFalse(
+        "Should have been denied access  " + fsAction + " on " + testFilePath,
+        isAccessible(testUserFs, testFilePath, fsAction));
+  }
+
+  private void setExecuteAccessForParentDirs(Path dir) throws IOException {
+    dir = dir.getParent();
+    while (dir != null) {
+      modifyAcl(dir, testUserGuid, FsAction.EXECUTE);
+      dir = dir.getParent();
+    }
+  }
+
+  private void modifyAcl(Path file, String uid, FsAction fsAction)
+      throws IOException {
+    List<AclEntry> aclSpec = Lists.newArrayList(AclTestHelpers
+        .aclEntry(AclEntryScope.ACCESS, AclEntryType.USER, uid, fsAction));
+    this.superUserFs.modifyAclEntries(file, aclSpec);
+  }
+
+  private Path setupTestDirectoryAndUserAccess(String testFileName,
+      FsAction fsAction) throws Exception {
+    Path file = new Path(TEST_FOLDER_PATH + testFileName);
+    file = this.superUserFs.makeQualified(file);
+    this.superUserFs.delete(file, true);
+    this.superUserFs.create(file);
+    modifyAcl(file, testUserGuid, fsAction);
+    setExecuteAccessForParentDirs(file);
+    return file;
+  }
+
+  private boolean isAccessible(FileSystem fs, Path path, FsAction fsAction)
+      throws IOException {
+    try {
+      fs.access(path, fsAction);
+    } catch (AccessControlException ace) {
+      return false;
+    }
+    return true;
+  }
+}

+ 8 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java

@@ -34,6 +34,14 @@ public final class TestConfigurationKeys {
   public static final String FS_AZURE_BLOB_DATA_READER_CLIENT_ID = "fs.azure.account.oauth2.reader.client.id";
   public static final String FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET = "fs.azure.account.oauth2.reader.client.secret";
 
+  public static final String FS_AZURE_BLOB_FS_CLIENT_ID = "fs.azure.account.oauth2.client.id";
+  public static final String FS_AZURE_BLOB_FS_CLIENT_SECRET = "fs.azure.account.oauth2.client.secret";
+
+  public static final String FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_ID = "fs.azure.account.test.oauth2.client.id";
+  public static final String FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_SECRET = "fs.azure.account.test.oauth2.client.secret";
+
+  public static final String FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID = "fs.azure.check.access.testuser.guid";
+
   public static final String TEST_CONFIGURATION_FILE_NAME = "azure-test.xml";
   public static final String TEST_CONTAINER_PREFIX = "abfs-testcontainer-";
   public static final int TEST_TIMEOUT = 15 * 60 * 1000;