Browse Source

HADOOP-15660. ABFS: Add support for OAuth
Contributed by Da Zhou, Rajeev Bansal, and Junhua Gu.

Thomas Marquardt 6 years ago
parent
commit
9149b9703e
52 changed files with 1768 additions and 249 deletions
  1. 118 31
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
  2. 17 9
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
  3. 19 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
  4. 36 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java
  5. 1 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
  6. 84 5
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java
  7. 98 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java
  8. 344 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java
  9. 47 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADToken.java
  10. 62 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/ClientCredsTokenProvider.java
  11. 75 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdaptee.java
  12. 57 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java
  13. 48 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java
  14. 69 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/QueryParams.java
  15. 57 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/RefreshTokenBasedTokenProvider.java
  16. 66 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/UserPasswordTokenProvider.java
  17. 18 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java
  18. 16 2
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
  19. 1 1
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java
  20. 12 7
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
  21. 27 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java
  22. 21 14
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
  23. 1 1
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java
  24. 4 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java
  25. 0 3
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
  26. 6 2
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFinalize.java
  27. 7 1
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
  28. 176 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemOauth.java
  29. 3 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java
  30. 4 1
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java
  31. 9 2
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemRegistration.java
  32. 2 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
  33. 6 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
  34. 9 5
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ABFSContractTestBinding.java
  35. 4 15
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractAppend.java
  36. 3 14
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractConcat.java
  37. 3 14
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractCreate.java
  38. 3 14
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractDelete.java
  39. 1 1
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractDistCp.java
  40. 3 14
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractGetFileStatus.java
  41. 3 14
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractMkdir.java
  42. 3 14
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractOpen.java
  43. 3 14
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractRename.java
  44. 3 13
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractRootDirectory.java
  45. 1 1
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSecureDistCp.java
  46. 3 14
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java
  47. 3 14
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSetTimes.java
  48. 1 1
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAzureBlobFileSystemBasics.java
  49. 3 3
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
  50. 72 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestQueryParams.java
  51. 11 2
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/CleanUpAbfsTestContainer.java
  52. 125 3
      hadoop-tools/hadoop-azure/src/test/resources/azure-bfs-test.xml

+ 118 - 31
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.azurebfs;
 
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.Map;
 
@@ -26,7 +27,6 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
 import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
 import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
 import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation;
@@ -37,16 +37,26 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemExc
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TokenAccessProviderException;
 import org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator;
 import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator;
 import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator;
 import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator;
 import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator;
+import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.CustomTokenProviderAdaptee;
+import org.apache.hadoop.fs.azurebfs.oauth2.CustomTokenProviderAdapter;
+import org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.RefreshTokenBasedTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.UserPasswordTokenProvider;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
 import org.apache.hadoop.fs.azurebfs.services.KeyProvider;
 import org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider;
 import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
+import org.apache.hadoop.util.ReflectionUtils;
 
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FS_AZURE_SSL_CHANNEL_MODE;
 
 /**
@@ -58,81 +68,81 @@ public class AbfsConfiguration{
   private final Configuration configuration;
   private final boolean isSecure;
 
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE,
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_BUFFER_SIZE,
       MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE,
       MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE,
       DefaultValue = FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE)
   private int writeBufferSize;
 
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_READ_BUFFER_SIZE,
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE,
       MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE,
       MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE,
       DefaultValue = FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE)
   private int readBufferSize;
 
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL,
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MIN_BACKOFF_INTERVAL,
       DefaultValue = FileSystemConfigurations.DEFAULT_MIN_BACKOFF_INTERVAL)
   private int minBackoffInterval;
 
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL,
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MAX_BACKOFF_INTERVAL,
       DefaultValue = FileSystemConfigurations.DEFAULT_MAX_BACKOFF_INTERVAL)
   private int maxBackoffInterval;
 
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BACKOFF_INTERVAL,
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BACKOFF_INTERVAL,
       DefaultValue = FileSystemConfigurations.DEFAULT_BACKOFF_INTERVAL)
   private int backoffInterval;
 
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_IO_RETRIES,
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MAX_IO_RETRIES,
       MinValue = 0,
       DefaultValue = FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS)
   private int maxIoRetries;
 
-  @LongConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_SIZE_PROPERTY_NAME,
+  @LongConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BLOCK_SIZE_PROPERTY_NAME,
       MinValue = 0,
       MaxValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE,
       DefaultValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE)
   private long azureBlockSize;
 
-  @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
+  @StringConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
       DefaultValue = FileSystemConfigurations.AZURE_BLOCK_LOCATION_HOST_DEFAULT)
   private String azureBlockLocationHost;
 
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_OUT,
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CONCURRENT_CONNECTION_VALUE_OUT,
       MinValue = 1,
       DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_WRITE_THREADS)
   private int maxConcurrentWriteThreads;
 
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_IN,
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CONCURRENT_CONNECTION_VALUE_IN,
       MinValue = 1,
       DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_READ_THREADS)
   private int maxConcurrentReadThreads;
 
-  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND,
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_TOLERATE_CONCURRENT_APPEND,
       DefaultValue = FileSystemConfigurations.DEFAULT_READ_TOLERATE_CONCURRENT_APPEND)
   private boolean tolerateOobAppends;
 
-  @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ATOMIC_RENAME_KEY,
+  @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ATOMIC_RENAME_KEY,
           DefaultValue = FileSystemConfigurations.DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
   private String azureAtomicDirs;
 
-  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
       DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
   private boolean createRemoteFileSystemDuringInitialization;
 
-  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION,
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION,
           DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION)
   private boolean skipUserGroupMetadataDuringInitialization;
 
 
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH,
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_AHEAD_QUEUE_DEPTH,
       DefaultValue = FileSystemConfigurations.DEFAULT_READ_AHEAD_QUEUE_DEPTH)
   private int readAheadQueueDepth;
 
-  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ENABLE_FLUSH,
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_FLUSH,
           DefaultValue = FileSystemConfigurations.DEFAULT_ENABLE_FLUSH)
   private boolean enableFlush;
 
-  @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY,
+  @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_USER_AGENT_PREFIX_KEY,
           DefaultValue = "")
   private String userAgentId;
 
@@ -140,7 +150,7 @@ public class AbfsConfiguration{
 
   public AbfsConfiguration(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException {
     this.configuration = configuration;
-    this.isSecure = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_SECURE_MODE, false);
+    this.isSecure = this.configuration.getBoolean(FS_AZURE_SECURE_MODE, false);
 
     validateStorageAccountKeys();
     Field[] fields = this.getClass().getDeclaredFields();
@@ -161,17 +171,17 @@ public class AbfsConfiguration{
   }
 
   public boolean isEmulator() {
-    return this.getConfiguration().getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false);
+    return this.getConfiguration().getBoolean(FS_AZURE_EMULATOR_ENABLED, false);
   }
 
   public boolean isSecureMode() {
-    return this.isSecure;
+    return isSecure;
   }
 
   public String getStorageAccountKey(final String accountName) throws AzureBlobFileSystemException {
     String key;
     String keyProviderClass =
-            configuration.get(ConfigurationKeys.AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX + accountName);
+            configuration.get(AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX + accountName);
     KeyProvider keyProvider;
 
     if (keyProviderClass == null) {
@@ -278,19 +288,88 @@ public class AbfsConfiguration{
     return configuration.getEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, DEFAULT_FS_AZURE_SSL_CHANNEL_MODE);
   }
 
+  public AuthType getAuthType(final String accountName) {
+    return configuration.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME + accountName, AuthType.SharedKey);
+  }
+
+  public AccessTokenProvider getTokenProvider(final String accountName) throws TokenAccessProviderException {
+    AuthType authType = configuration.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME + accountName, AuthType.SharedKey);
+    if (authType == AuthType.OAuth) {
+      try {
+        Class<? extends AccessTokenProvider> tokenProviderClass =
+                configuration.getClass(FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME + accountName, null,
+                        AccessTokenProvider.class);
+        AccessTokenProvider tokenProvider = null;
+        if (tokenProviderClass == ClientCredsTokenProvider.class) {
+          String authEndpoint = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT + accountName);
+          String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountName);
+          String clientSecret = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET + accountName);
+          tokenProvider = new ClientCredsTokenProvider(authEndpoint, clientId, clientSecret);
+        } else if (tokenProviderClass == UserPasswordTokenProvider.class) {
+          String authEndpoint = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT + accountName);
+          String username = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_USER_NAME + accountName);
+          String password = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD + accountName);
+          tokenProvider = new UserPasswordTokenProvider(authEndpoint, username, password);
+        } else if (tokenProviderClass == MsiTokenProvider.class) {
+          String tenantGuid = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT + accountName);
+          String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountName);
+          tokenProvider = new MsiTokenProvider(tenantGuid, clientId);
+        } else if (tokenProviderClass == RefreshTokenBasedTokenProvider.class) {
+          String refreshToken = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN + accountName);
+          String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountName);
+          tokenProvider = new RefreshTokenBasedTokenProvider(clientId, refreshToken);
+        } else {
+          throw new IllegalArgumentException("Failed to initialize " + tokenProviderClass);
+        }
+        return tokenProvider;
+      } catch(IllegalArgumentException e) {
+        throw e;
+      } catch (Exception e) {
+        throw new TokenAccessProviderException("Unable to load key provider class.", e);
+      }
+
+    } else if (authType == AuthType.Custom) {
+      try {
+        String configKey = FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME + accountName;
+        Class<? extends CustomTokenProviderAdaptee> customTokenProviderClass =
+                configuration.getClass(configKey, null,
+                        CustomTokenProviderAdaptee.class);
+        if (customTokenProviderClass == null) {
+          throw new IllegalArgumentException(
+                  String.format("The configuration value for \"%s\" is invalid.", configKey));
+        }
+        CustomTokenProviderAdaptee azureTokenProvider = ReflectionUtils
+                .newInstance(customTokenProviderClass, configuration);
+        if (azureTokenProvider == null) {
+          throw new IllegalArgumentException("Failed to initialize " + customTokenProviderClass);
+        }
+        azureTokenProvider.initialize(configuration, accountName);
+        return new CustomTokenProviderAdapter(azureTokenProvider);
+      } catch(IllegalArgumentException e) {
+        throw e;
+      } catch (Exception e) {
+        throw new TokenAccessProviderException("Unable to load custom token provider class.", e);
+      }
+
+    } else {
+      throw new TokenAccessProviderException(String.format(
+              "Invalid auth type: %s is being used, expecting OAuth", authType));
+    }
+  }
+
   void validateStorageAccountKeys() throws InvalidConfigurationValueException {
     Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator(
-        ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true);
-    this.storageAccountKeys = this.configuration.getValByRegex(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX);
+        FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true);
+    this.storageAccountKeys = configuration.getValByRegex(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX);
 
-    for (Map.Entry<String, String> account : this.storageAccountKeys.entrySet()) {
+    for (Map.Entry<String, String> account : storageAccountKeys.entrySet()) {
       validator.validate(account.getValue());
     }
   }
 
   int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
     IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class);
-    String value = this.configuration.get(validator.ConfigurationKey());
+    String value = configuration.get(validator.ConfigurationKey());
 
     // validate
     return new IntegerConfigurationBasicValidator(
@@ -303,7 +382,7 @@ public class AbfsConfiguration{
 
   long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
     LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class);
-    String value = this.configuration.get(validator.ConfigurationKey());
+    String value = configuration.get(validator.ConfigurationKey());
 
     // validate
     return new LongConfigurationBasicValidator(
@@ -316,7 +395,7 @@ public class AbfsConfiguration{
 
   String validateString(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
     StringConfigurationValidatorAnnotation validator = field.getAnnotation(StringConfigurationValidatorAnnotation.class);
-    String value = this.configuration.get(validator.ConfigurationKey());
+    String value = configuration.get(validator.ConfigurationKey());
 
     // validate
     return new StringConfigurationBasicValidator(
@@ -327,7 +406,7 @@ public class AbfsConfiguration{
 
   String validateBase64String(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
     Base64StringConfigurationValidatorAnnotation validator = field.getAnnotation((Base64StringConfigurationValidatorAnnotation.class));
-    String value = this.configuration.get(validator.ConfigurationKey());
+    String value = configuration.get(validator.ConfigurationKey());
 
     // validate
     return new Base64StringConfigurationBasicValidator(
@@ -338,7 +417,7 @@ public class AbfsConfiguration{
 
   boolean validateBoolean(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
     BooleanConfigurationValidatorAnnotation validator = field.getAnnotation(BooleanConfigurationValidatorAnnotation.class);
-    String value = this.configuration.get(validator.ConfigurationKey());
+    String value = configuration.get(validator.ConfigurationKey());
 
     // validate
     return new BooleanConfigurationBasicValidator(
@@ -347,6 +426,14 @@ public class AbfsConfiguration{
         validator.ThrowIfInvalid()).validate(value);
   }
 
+  String getPasswordString(String key) throws IOException {
+    char[] passchars = configuration.getPassword(key);
+    if (passchars != null) {
+      return new String(passchars);
+    }
+    return null;
+  }
+
   @VisibleForTesting
   void setReadBufferSize(int bufferSize) {
     this.readBufferSize = bufferSize;

+ 17 - 9
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -67,10 +67,12 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
+import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
 import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
 import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -487,16 +489,22 @@ public class AzureBlobFileSystemStore {
       throw new InvalidUriException(uri.toString());
     }
 
-    int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT);
-    if (dotIndex <= 0) {
-      throw new InvalidUriException(
-          uri.toString() + " - account name is not fully qualified.");
+    SharedKeyCredentials creds = null;
+    AccessTokenProvider tokenProvider = null;
+
+    if (abfsConfiguration.getAuthType(accountName) == AuthType.SharedKey) {
+      int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT);
+      if (dotIndex <= 0) {
+        throw new InvalidUriException(
+                uri.toString() + " - account name is not fully qualified.");
+      }
+      creds = new SharedKeyCredentials(accountName.substring(0, dotIndex),
+            abfsConfiguration.getStorageAccountKey(accountName));
+    } else {
+      tokenProvider = abfsConfiguration.getTokenProvider(accountName);
     }
-    SharedKeyCredentials creds =
-            new SharedKeyCredentials(accountName.substring(0, dotIndex),
-                    this.abfsConfiguration.getStorageAccountKey(accountName));
 
-    this.client =  new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy());
+    this.client =  new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(), tokenProvider);
   }
 
   private String getRelativePath(final Path path) {
@@ -537,7 +545,7 @@ public class AzureBlobFileSystemStore {
       Date utcDate = new SimpleDateFormat(DATE_TIME_PATTERN).parse(lastModifiedTime);
       parsedTime = utcDate.getTime();
     } catch (ParseException e) {
-      LOG.error("Failed to parse the date {0}", lastModifiedTime);
+      LOG.error("Failed to parse the date {}", lastModifiedTime);
     } finally {
       return parsedTime;
     }

+ 19 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

@@ -60,5 +60,24 @@ public final class ConfigurationKeys {
   public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX = "fs.azure.account.keyprovider.";
   public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
 
+  /** Prefix for auth type properties: {@value}. */
+  public static final String FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME = "fs.azure.account.auth.type.";
+  /** Prefix for oauth token provider type: {@value}. */
+  public static final String FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME = "fs.azure.account.oauth.provider.type.";
+  /** Prefix for oauth AAD client id: {@value}. */
+  public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID = "fs.azure.account.oauth2.client.id.";
+  /** Prefix for oauth AAD client secret: {@value}. */
+  public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET = "fs.azure.account.oauth2.client.secret.";
+  /** Prefix for oauth AAD client endpoint: {@value}. */
+  public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT = "fs.azure.account.oauth2.client.endpoint.";
+  /** Prefix for oauth msi tenant id: {@value}. */
+  public static final String FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT = "fs.azure.account.oauth2.msi.tenant.";
+  /** Prefix for oauth user name: {@value}. */
+  public static final String FS_AZURE_ACCOUNT_OAUTH_USER_NAME = "fs.azure.account.oauth2.user.name.";
+  /** Prefix for oauth user password: {@value}. */
+  public static final String FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD = "fs.azure.account.oauth2.user.password.";
+  /** Prefix for oauth refresh token: {@value}. */
+  public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN = "fs.azure.account.oauth2.refresh.token.";
+
   private ConfigurationKeys() {}
 }

+ 36 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Thrown if there is a problem instantiating a TokenAccessProvider or retrieving a configuration
+ * using a TokenAccessProvider object.
+ */
+@InterfaceAudience.Private
+public class TokenAccessProviderException extends AzureBlobFileSystemException {
+
+    public TokenAccessProviderException(String message) {
+        super(message);
+    }
+
+    public TokenAccessProviderException(String message, Throwable cause) {
+        super(message);
+    }
+}

+ 1 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java

@@ -44,6 +44,7 @@ public enum AzureServiceErrorCode {
   INGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Ingress is over the account limit."),
   EGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Egress is over the account limit."),
   INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", HttpURLConnection.HTTP_BAD_REQUEST, null),
+  AUTHORIZATION_PERMISSION_MISS_MATCH("AuthorizationPermissionMismatch", HttpURLConnection.HTTP_FORBIDDEN, null),
   UNKNOWN(null, -1, null);
 
   private final String errorCode;

+ 84 - 5
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java

@@ -57,13 +57,31 @@ public class ListResultEntrySchema {
   @JsonProperty(value = "contentLength")
   private Long contentLength;
 
+  /**
+   * The owner property.
+   */
+  @JsonProperty(value = "owner")
+  private String owner;
+
+  /**
+   * The group property.
+   */
+  @JsonProperty(value = "group")
+  private String group;
+
+  /**
+   * The permissions property.
+   */
+  @JsonProperty(value = "permissions")
+  private String permissions;
+
   /**
    * Get the name value.
    *
    * @return the name value
    */
   public String name() {
-    return this.name;
+    return name;
   }
 
   /**
@@ -83,7 +101,7 @@ public class ListResultEntrySchema {
    * @return the isDirectory value
    */
   public Boolean isDirectory() {
-    return this.isDirectory;
+    return isDirectory;
   }
 
   /**
@@ -103,7 +121,7 @@ public class ListResultEntrySchema {
    * @return the lastModified value
    */
   public String lastModified() {
-    return this.lastModified;
+    return lastModified;
   }
 
   /**
@@ -123,7 +141,7 @@ public class ListResultEntrySchema {
    * @return the etag value
    */
   public String eTag() {
-    return this.eTag;
+    return eTag;
   }
 
   /**
@@ -143,7 +161,7 @@ public class ListResultEntrySchema {
    * @return the contentLength value
    */
   public Long contentLength() {
-    return this.contentLength;
+    return contentLength;
   }
 
   /**
@@ -157,4 +175,65 @@ public class ListResultEntrySchema {
     return this;
   }
 
+  /**
+   *
+   Get the owner value.
+   *
+   * @return the owner value
+   */
+  public String owner() {
+    return owner;
+  }
+
+  /**
+   * Set the owner value.
+   *
+   * @param owner the owner value to set
+   * @return the ListEntrySchema object itself.
+   */
+  public ListResultEntrySchema withOwner(final String owner) {
+    this.owner = owner;
+    return this;
+  }
+
+  /**
+   * Get the group value.
+   *
+   * @return the group value
+   */
+  public String group() {
+    return group;
+  }
+
+  /**
+   * Set the group value.
+   *
+   * @param group the group value to set
+   * @return the ListEntrySchema object itself.
+   */
+  public ListResultEntrySchema withGroup(final String group) {
+    this.group = group;
+    return this;
+  }
+
+  /**
+   * Get the permissions value.
+   *
+   * @return the permissions value
+   */
+  public String permissions() {
+    return permissions;
+  }
+
+  /**
+   * Set the permissions value.
+   *
+   * @param permissions the permissions value to set
+   * @return the ListEntrySchema object itself.
+   */
+  public ListResultEntrySchema withPermissions(final String permissions) {
+    this.permissions = permissions;
+    return this;
+  }
+
 }

+ 98 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java

@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Returns an Azure Active Directory token when requested. The provider can
+ * cache the token if it has already retrieved one. If it does, then the
+ * provider is responsible for checking expiry and refreshing as needed.
+ *
+ * In other words, this is is a token cache that fetches tokens when
+ * requested, if the cached token has expired.
+ *
+ */
+public abstract class AccessTokenProvider {
+
+  private AzureADToken token;
+  private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
+
+  /**
+   * returns the {@link AzureADToken} cached (or retrieved) by this instance.
+   *
+   * @return {@link AzureADToken} containing the access token
+   * @throws IOException if there is an error fetching the token
+   */
+  public synchronized AzureADToken getToken() throws IOException {
+    if (isTokenAboutToExpire()) {
+      LOG.debug("AAD Token is missing or expired:"
+              + " Calling refresh-token from abstract base class");
+      token = refreshToken();
+    }
+    return token;
+  }
+
+  /**
+   * the method to fetch the access token. Derived classes should override
+   * this method to actually get the token from Azure Active Directory.
+   *
+   * This method will be called initially, and then once when the token
+   * is about to expire.
+   *
+   *
+   * @return {@link AzureADToken} containing the access token
+   * @throws IOException if there is an error fetching the token
+   */
+  protected abstract AzureADToken refreshToken() throws IOException;
+
+  /**
+   * Checks if the token is about to expire in the next 5 minutes.
+   * The 5 minute allowance is to allow for clock skew and also to
+   * allow for token to be refreshed in that much time.
+   *
+   * @return true if the token is expiring in next 5 minutes
+   */
+  private boolean isTokenAboutToExpire() {
+    if (token == null) {
+      LOG.debug("AADToken: no token. Returning expiring=true");
+      return true;   // no token should have same response as expired token
+    }
+    boolean expiring = false;
+    // allow 5 minutes for clock skew
+    long approximatelyNow = System.currentTimeMillis() + FIVE_MINUTES;
+    if (token.getExpiry().getTime() < approximatelyNow) {
+      expiring = true;
+    }
+    if (expiring) {
+      LOG.debug("AADToken: token expiring: "
+              + token.getExpiry().toString()
+              + " : Five-minute window: "
+              + new Date(approximatelyNow).toString());
+    }
+
+    return expiring;
+  }
+
+  // 5 minutes in milliseconds
+  private static final long FIVE_MINUTES = 300 * 1000;
+}

+ 344 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java

@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+import java.util.Hashtable;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
+
+/**
+ * This class provides convenience methods to obtain AAD tokens.
+ * While convenient, it is not necessary to use these methods to
+ * obtain the tokens. Customers can use any other method
+ * (e.g., using the adal4j client) to obtain tokens.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class AzureADAuthenticator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AzureADAuthenticator.class);
+  private static final String RESOURCE_NAME = "https://storage.azure.com/";
+  private static final int CONNECT_TIMEOUT = 30 * 1000;
+  private static final int READ_TIMEOUT = 30 * 1000;
+
+  private AzureADAuthenticator() {
+    // no operation
+  }
+
+  /**
+   * gets Azure Active Directory token using the user ID and password of
+   * a service principal (that is, Web App in Azure Active Directory).
+   *
+   * Azure Active Directory allows users to set up a web app as a
+   * service principal. Users can optionally obtain service principal keys
+   * from AAD. This method gets a token using a service principal's client ID
+   * and keys. In addition, it needs the token endpoint associated with the
+   * user's directory.
+   *
+   *
+   * @param authEndpoint the OAuth 2.0 token endpoint associated
+   *                     with the user's directory (obtain from
+   *                     Active Directory configuration)
+   * @param clientId     the client ID (GUID) of the client web app
+   *                     btained from Azure Active Directory configuration
+   * @param clientSecret the secret key of the client web app
+   * @return {@link AzureADToken} obtained using the creds
+   * @throws IOException throws IOException if there is a failure in connecting to Azure AD
+   */
+  public static AzureADToken getTokenUsingClientCreds(String authEndpoint,
+                                                      String clientId, String clientSecret)
+          throws IOException {
+    Preconditions.checkNotNull(authEndpoint, "authEndpoint");
+    Preconditions.checkNotNull(clientId, "clientId");
+    Preconditions.checkNotNull(clientSecret, "clientSecret");
+
+    QueryParams qp = new QueryParams();
+    qp.add("resource", RESOURCE_NAME);
+    qp.add("grant_type", "client_credentials");
+    qp.add("client_id", clientId);
+    qp.add("client_secret", clientSecret);
+    LOG.debug("AADToken: starting to fetch token using client creds for client ID " + clientId);
+
+    return getTokenCall(authEndpoint, qp.serialize(), null, null);
+  }
+
+  /**
+   * Gets AAD token from the local virtual machine's VM extension. This only works on
+   * an Azure VM with MSI extension
+   * enabled.
+   *
+   * @param tenantGuid  (optional) The guid of the AAD tenant. Can be {@code null}.
+   * @param clientId    (optional) The clientId guid of the MSI service
+   *                    principal to use. Can be {@code null}.
+   * @param bypassCache {@code boolean} specifying whether a cached token is acceptable or a fresh token
+   *                    request should me made to AAD
+   * @return {@link AzureADToken} obtained using the creds
+   * @throws IOException throws IOException if there is a failure in obtaining the token
+   */
+  public static AzureADToken getTokenFromMsi(String tenantGuid, String clientId,
+                                             boolean bypassCache) throws IOException {
+    Preconditions.checkNotNull(tenantGuid, "tenantGuid");
+    Preconditions.checkNotNull(clientId, "clientId");
+
+    String authEndpoint = "http://169.254.169.254/metadata/identity/oauth2/token";
+
+    QueryParams qp = new QueryParams();
+    qp.add("api-version", "2018-02-01");
+    qp.add("resource", RESOURCE_NAME);
+
+
+    if (tenantGuid.length() > 0) {
+      String authority = "https://login.microsoftonline.com/" + tenantGuid;
+      qp.add("authority", authority);
+    }
+
+    if (clientId.length() > 0) {
+      qp.add("client_id", clientId);
+    }
+
+    if (bypassCache) {
+      qp.add("bypass_cache", "true");
+    }
+
+    Hashtable<String, String> headers = new Hashtable<>();
+    headers.put("Metadata", "true");
+
+    LOG.debug("AADToken: starting to fetch token using MSI");
+    return getTokenCall(authEndpoint, qp.serialize(), headers, "GET");
+  }
+
+  /**
+   * Gets Azure Active Directory token using refresh token.
+   *
+   * @param clientId the client ID (GUID) of the client web app obtained from Azure Active Directory configuration
+   * @param refreshToken the refresh token
+   * @return {@link AzureADToken} obtained using the refresh token
+   * @throws IOException throws IOException if there is a failure in connecting to Azure AD
+   */
+  public static AzureADToken getTokenUsingRefreshToken(String clientId,
+                                                       String refreshToken) throws IOException {
+    String authEndpoint = "https://login.microsoftonline.com/Common/oauth2/token";
+    QueryParams qp = new QueryParams();
+    qp.add("grant_type", "refresh_token");
+    qp.add("refresh_token", refreshToken);
+    if (clientId != null) {
+      qp.add("client_id", clientId);
+    }
+    LOG.debug("AADToken: starting to fetch token using refresh token for client ID " + clientId);
+    return getTokenCall(authEndpoint, qp.serialize(), null, null);
+  }
+
+  private static class HttpException extends IOException {
+    private int httpErrorCode;
+    private String requestId;
+
+    public int getHttpErrorCode() {
+      return this.httpErrorCode;
+    }
+
+    public String getRequestId() {
+      return this.requestId;
+    }
+
+    HttpException(int httpErrorCode, String requestId, String message) {
+      super(message);
+      this.httpErrorCode = httpErrorCode;
+      this.requestId = requestId;
+    }
+  }
+
+  private static AzureADToken getTokenCall(String authEndpoint, String body,
+                                           Hashtable<String, String> headers, String httpMethod)
+          throws IOException {
+    AzureADToken token = null;
+    ExponentialRetryPolicy retryPolicy
+            = new ExponentialRetryPolicy(3, 0, 1000, 2);
+
+    int httperror = 0;
+    String requestId;
+    String httpExceptionMessage = null;
+    IOException ex = null;
+    boolean succeeded = false;
+    int retryCount = 0;
+    do {
+      httperror = 0;
+      requestId = "";
+      ex = null;
+      try {
+        token = getTokenSingleCall(authEndpoint, body, headers, httpMethod);
+      } catch (HttpException e) {
+        httperror = e.httpErrorCode;
+        requestId = e.requestId;
+        httpExceptionMessage = e.getMessage();
+      } catch (IOException e) {
+        ex = e;
+      }
+      succeeded = ((httperror == 0) && (ex == null));
+      retryCount++;
+    } while (!succeeded && retryPolicy.shouldRetry(retryCount, httperror));
+    if (!succeeded) {
+      if (ex != null) {
+        throw ex;
+      }
+      if (httperror != 0) {
+        throw new IOException(httpExceptionMessage);
+      }
+    }
+    return token;
+  }
+
+  private static AzureADToken getTokenSingleCall(
+          String authEndpoint, String payload, Hashtable<String, String> headers, String httpMethod)
+          throws IOException {
+
+    AzureADToken token = null;
+    HttpURLConnection conn = null;
+    String urlString = authEndpoint;
+
+    httpMethod = (httpMethod == null) ? "POST" : httpMethod;
+    if (httpMethod.equals("GET")) {
+      urlString = urlString + "?" + payload;
+    }
+
+    try {
+      URL url = new URL(urlString);
+      conn = (HttpURLConnection) url.openConnection();
+      conn.setRequestMethod(httpMethod);
+      conn.setReadTimeout(READ_TIMEOUT);
+      conn.setConnectTimeout(CONNECT_TIMEOUT);
+
+      if (headers != null && headers.size() > 0) {
+        for (Map.Entry<String, String> entry : headers.entrySet()) {
+          conn.setRequestProperty(entry.getKey(), entry.getValue());
+        }
+      }
+      conn.setRequestProperty("Connection", "close");
+
+      if (httpMethod.equals("POST")) {
+        conn.setDoOutput(true);
+        conn.getOutputStream().write(payload.getBytes("UTF-8"));
+      }
+
+      int httpResponseCode = conn.getResponseCode();
+      String requestId = conn.getHeaderField("x-ms-request-id");
+      String responseContentType = conn.getHeaderField("Content-Type");
+      long responseContentLength = conn.getHeaderFieldLong("Content-Length", 0);
+
+      requestId = requestId == null ? "" : requestId;
+      if (httpResponseCode == HttpURLConnection.HTTP_OK
+              && responseContentType.startsWith("application/json") && responseContentLength > 0) {
+        InputStream httpResponseStream = conn.getInputStream();
+        token = parseTokenFromStream(httpResponseStream);
+      } else {
+        String responseBody = consumeInputStream(conn.getInputStream(), 1024);
+        String proxies = "none";
+        String httpProxy = System.getProperty("http.proxy");
+        String httpsProxy = System.getProperty("https.proxy");
+        if (httpProxy != null || httpsProxy != null) {
+          proxies = "http:" + httpProxy + "; https:" + httpsProxy;
+        }
+        String logMessage =
+                "AADToken: HTTP connection failed for getting token from AzureAD. Http response: "
+                        + httpResponseCode + " " + conn.getResponseMessage()
+                        + " Content-Type: " + responseContentType
+                        + " Content-Length: " + responseContentLength
+                        + " Request ID: " + requestId.toString()
+                        + " Proxies: " + proxies
+                        + " First 1K of Body: " + responseBody;
+        LOG.debug(logMessage);
+        throw new HttpException(httpResponseCode, requestId, logMessage);
+      }
+    } finally {
+      if (conn != null) {
+        conn.disconnect();
+      }
+    }
+    return token;
+  }
+
+  private static AzureADToken parseTokenFromStream(InputStream httpResponseStream) throws IOException {
+    AzureADToken token = new AzureADToken();
+    try {
+      int expiryPeriod = 0;
+
+      JsonFactory jf = new JsonFactory();
+      JsonParser jp = jf.createJsonParser(httpResponseStream);
+      String fieldName, fieldValue;
+      jp.nextToken();
+      while (jp.hasCurrentToken()) {
+        if (jp.getCurrentToken() == JsonToken.FIELD_NAME) {
+          fieldName = jp.getCurrentName();
+          jp.nextToken();  // field value
+          fieldValue = jp.getText();
+
+          if (fieldName.equals("access_token")) {
+            token.setAccessToken(fieldValue);
+          }
+          if (fieldName.equals("expires_in")) {
+            expiryPeriod = Integer.parseInt(fieldValue);
+          }
+        }
+        jp.nextToken();
+      }
+      jp.close();
+      long expiry = System.currentTimeMillis();
+      expiry = expiry + expiryPeriod * 1000L; // convert expiryPeriod to milliseconds and add
+      token.setExpiry(new Date(expiry));
+      LOG.debug("AADToken: fetched token with expiry " + token.getExpiry().toString());
+    } catch (Exception ex) {
+      LOG.debug("AADToken: got exception when parsing json token " + ex.toString());
+      throw ex;
+    } finally {
+      httpResponseStream.close();
+    }
+    return token;
+  }
+
+  private static String consumeInputStream(InputStream inStream, int length) throws IOException {
+    byte[] b = new byte[length];
+    int totalBytesRead = 0;
+    int bytesRead = 0;
+
+    do {
+      bytesRead = inStream.read(b, totalBytesRead, length - totalBytesRead);
+      if (bytesRead > 0) {
+        totalBytesRead += bytesRead;
+      }
+    } while (bytesRead >= 0 && totalBytesRead < length);
+
+    return new String(b, 0, totalBytesRead, StandardCharsets.UTF_8);
+  }
+}

+ 47 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADToken.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.util.Date;
+
+
+/**
+ * Object representing the AAD access token to use when making HTTP requests to Azure Data Lake Storage.
+ */
+public class AzureADToken {
+  private String accessToken;
+  private Date expiry;
+
+  public String getAccessToken() {
+    return this.accessToken;
+  }
+
+  public void setAccessToken(String accessToken) {
+    this.accessToken = accessToken;
+  }
+
+  public Date getExpiry() {
+    return new Date(this.expiry.getTime());
+  }
+
+  public void setExpiry(Date expiry) {
+    this.expiry = new Date(expiry.getTime());
+  }
+
+}

+ 62 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/ClientCredsTokenProvider.java

@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Provides tokens based on client credentials.
+ */
+public class ClientCredsTokenProvider extends AccessTokenProvider {
+
+  private final String authEndpoint;
+
+  private final String clientId;
+
+  private final String clientSecret;
+
+  private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
+
+
+  public ClientCredsTokenProvider(final String authEndpoint,
+                                  final String clientId, final String clientSecret) {
+
+    Preconditions.checkNotNull(authEndpoint, "authEndpoint");
+    Preconditions.checkNotNull(clientId, "clientId");
+    Preconditions.checkNotNull(clientSecret, "clientSecret");
+
+    this.authEndpoint = authEndpoint;
+    this.clientId = clientId;
+    this.clientSecret = clientSecret;
+  }
+
+
+  @Override
+  protected AzureADToken refreshToken() throws IOException {
+    LOG.debug("AADToken: refreshing client-credential based token");
+    return AzureADAuthenticator.getTokenUsingClientCreds(authEndpoint, clientId, clientSecret);
+  }
+
+
+}

+ 75 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdaptee.java

@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+
+/**
+ * This interface provides an extensibility model for customizing the acquisition
+ * of Azure Active Directory Access Tokens.   When "fs.azure.account.auth.type" is
+ * set to "Custom", implementors may use the
+ * "fs.azure.account.oauth.provider.type.{accountName}" configuration property
+ * to specify a class with a custom implementation of CustomTokenProviderAdaptee.
+ * This class will be dynamically loaded, initialized, and invoked to provide
+ * AAD Access Tokens and their Expiry.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface CustomTokenProviderAdaptee {
+
+  /**
+   * Initialize with supported configuration. This method is invoked when the
+   * (URI, Configuration)} method is invoked.
+   *
+   * @param configuration Configuration object
+   * @param accountName Account Name
+   * @throws IOException if instance can not be configured.
+   */
+  void initialize(Configuration configuration, String accountName)
+      throws IOException;
+
+  /**
+   * Obtain the access token that should be added to https connection's header.
+   * Will be called depending upon {@link #getExpiryTime()} expiry time is set,
+   * so implementations should be performant. Implementations are responsible
+   * for any refreshing of the token.
+   *
+   * @return String containing the access token
+   * @throws IOException if there is an error fetching the token
+   */
+  String getAccessToken() throws IOException;
+
+  /**
+   * Obtain expiry time of the token. If implementation is performant enough to
+   * maintain expiry and expect {@link #getAccessToken()} call for every
+   * connection then safe to return current or past time.
+   *
+   * However recommended to use the token expiry time received from Azure Active
+   * Directory.
+   *
+   * @return Date to expire access token retrieved from AAD.
+   */
+  Date getExpiryTime();
+}

+ 57 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java

@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Provides tokens based on custom implementation, following the Adapter Design
+ * Pattern.
+ */
+public final class CustomTokenProviderAdapter extends AccessTokenProvider {
+
+  private CustomTokenProviderAdaptee adaptee;
+  private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
+
+  /**
+   * Constructs a token provider based on the custom token provider.
+   *
+   * @param adaptee the custom token provider
+   */
+  public CustomTokenProviderAdapter(CustomTokenProviderAdaptee adaptee) {
+    Preconditions.checkNotNull(adaptee, "adaptee");
+    this.adaptee = adaptee;
+  }
+
+  protected AzureADToken refreshToken() throws IOException {
+    LOG.debug("AADToken: refreshing custom based token");
+
+    AzureADToken azureADToken = new AzureADToken();
+    azureADToken.setAccessToken(adaptee.getAccessToken());
+    azureADToken.setExpiry(adaptee.getExpiryTime());
+
+    return azureADToken;
+  }
+}

+ 48 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java

@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides tokens based on Azure VM's Managed Service Identity.
+ */
+public class MsiTokenProvider extends AccessTokenProvider {
+
+  private final String tenantGuid;
+
+  private final String clientId;
+
+  private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
+
+  public MsiTokenProvider(final String tenantGuid, final String clientId) {
+    this.tenantGuid = tenantGuid;
+    this.clientId = clientId;
+  }
+
+  @Override
+  protected AzureADToken refreshToken() throws IOException {
+    LOG.debug("AADToken: refreshing token from MSI");
+    AzureADToken token = AzureADAuthenticator.getTokenFromMsi(tenantGuid, clientId, false);
+    return token;
+  }
+}

+ 69 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/QueryParams.java

@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utilities class http query parameters.
+ */
+public class QueryParams {
+  private Map<String, String> params = new HashMap<>();
+  private String apiVersion = null;
+  private String separator = "";
+  private String serializedString = null;
+
+  public void add(String name, String value) {
+    params.put(name, value);
+    serializedString = null;
+  }
+
+  public void setApiVersion(String apiVersion) {
+    this.apiVersion = apiVersion;
+    serializedString = null;
+  }
+
+  public String serialize() {
+    if (serializedString == null) {
+      StringBuilder sb = new StringBuilder();
+      for (Map.Entry<String, String> entry : params.entrySet()) {
+        String name = entry.getKey();
+        try {
+          sb.append(separator);
+          sb.append(URLEncoder.encode(name, "UTF-8"));
+          sb.append('=');
+          sb.append(URLEncoder.encode(entry.getValue(), "UTF-8"));
+          separator = "&";
+        } catch (UnsupportedEncodingException ex) {
+        }
+      }
+
+      if (apiVersion != null) {
+        sb.append(separator);
+        sb.append("api-version=");
+        sb.append(apiVersion);
+        separator = "&";
+      }
+      serializedString = sb.toString();
+    }
+    return serializedString;
+  }
+}

+ 57 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/RefreshTokenBasedTokenProvider.java

@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Provides tokens based on refresh token.
+ */
+public class RefreshTokenBasedTokenProvider extends AccessTokenProvider {
+  private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
+
+  private final String clientId;
+
+  private final String refreshToken;
+
+  /**
+   * Constructs a token provider based on the refresh token provided.
+   *
+   * @param clientId the client ID (GUID) of the client web app obtained from Azure Active Directory configuration
+   * @param refreshToken the refresh token
+   */
+  public RefreshTokenBasedTokenProvider(String clientId, String refreshToken) {
+    Preconditions.checkNotNull(clientId, "clientId");
+    Preconditions.checkNotNull(refreshToken, "refreshToken");
+    this.clientId = clientId;
+    this.refreshToken = refreshToken;
+  }
+
+
+  @Override
+  protected AzureADToken refreshToken() throws IOException {
+    LOG.debug("AADToken: refreshing refresh-token based token");
+    return AzureADAuthenticator.getTokenUsingRefreshToken(clientId, refreshToken);
+  }
+}

+ 66 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/UserPasswordTokenProvider.java

@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+/**
+ * Provides tokens based on username and password.
+ */
+public class UserPasswordTokenProvider extends AccessTokenProvider {
+
+  private final String authEndpoint;
+
+  private final String username;
+
+  private final String password;
+
+  private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
+
+  public UserPasswordTokenProvider(final String authEndpoint,
+                                   final String username, final String password) {
+    Preconditions.checkNotNull(authEndpoint, "authEndpoint");
+    Preconditions.checkNotNull(username, "username");
+    Preconditions.checkNotNull(password, "password");
+
+    this.authEndpoint = authEndpoint;
+    this.username = username;
+    this.password = password;
+  }
+
+  @Override
+  protected AzureADToken refreshToken() throws IOException {
+    LOG.debug("AADToken: refreshing user-password based token");
+    return AzureADAuthenticator.getTokenUsingClientCreds(authEndpoint, username, password);
+  }
+
+  private static String getPasswordString(Configuration conf, String key)
+          throws IOException {
+    char[] passchars = conf.getPassword(key);
+    if (passchars == null) {
+      throw new IOException("Password " + key + " not found");
+    }
+    return new String(passchars);
+  }
+}

+ 18 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java

@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.oauth2;

+ 16 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
 
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
@@ -42,7 +43,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*
 import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
 
 /**
- * AbfsClient
+ * AbfsClient.
  */
 public class AbfsClient {
   public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
@@ -54,9 +55,13 @@ public class AbfsClient {
   private final AbfsConfiguration abfsConfiguration;
   private final String userAgent;
 
+  private final AccessTokenProvider tokenProvider;
+
+
   public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
                     final AbfsConfiguration abfsConfiguration,
-                    final ExponentialRetryPolicy exponentialRetryPolicy) {
+                    final ExponentialRetryPolicy exponentialRetryPolicy,
+                    final AccessTokenProvider tokenProvider) {
     this.baseUrl = baseUrl;
     this.sharedKeyCredentials = sharedKeyCredentials;
     String baseUrlString = baseUrl.toString();
@@ -76,6 +81,7 @@ public class AbfsClient {
     }
 
     this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
+    this.tokenProvider = tokenProvider;
   }
 
   public String getFileSystem() {
@@ -409,6 +415,14 @@ public class AbfsClient {
     return encodedString;
   }
 
+  public synchronized String getAccessToken() throws IOException {
+    if (tokenProvider != null) {
+      return "Bearer " + tokenProvider.getToken().getAccessToken();
+    } else {
+      return null;
+    }
+  }
+
   @VisibleForTesting
   String initializeUserAgent(final AbfsConfiguration abfsConfiguration,
                              final String sslProviderName) {

+ 1 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java

@@ -19,7 +19,7 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 /**
- * The Http Request / Response Headers for Rest AbfsClient
+ * The Http Request / Response Headers for Rest AbfsClient.
  */
 public class AbfsHttpHeader {
   private final String name;

+ 12 - 7
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 
 /**
  * The AbfsRestOperation for Rest AbfsClient.
@@ -48,7 +49,7 @@ public class AbfsRestOperation {
   // request body and all the download methods have a response body.
   private final boolean hasRequestBody;
 
-  private final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
+  private static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
 
   // For uploads, this is the request entity body.  For downloads,
   // this will hold the response entity body.
@@ -139,9 +140,15 @@ public class AbfsRestOperation {
       httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
 
       // sign the HTTP request
-      client.getSharedKeyCredentials().signRequest(
-          httpOperation.getConnection(),
-          hasRequestBody ? bufferLength : 0);
+      if (client.getAccessToken() == null) {
+        // sign the HTTP request
+        client.getSharedKeyCredentials().signRequest(
+                httpOperation.getConnection(),
+                hasRequestBody ? bufferLength : 0);
+      } else {
+        httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
+                client.getAccessToken());
+      }
 
       if (hasRequestBody) {
         // HttpUrlConnection requires
@@ -163,9 +170,7 @@ public class AbfsRestOperation {
       return false;
     }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("HttpRequest: " + httpOperation.toString());
-    }
+    LOG.debug("HttpRequest: " + httpOperation.toString());
 
     if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) {
       return false;

+ 27 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java

@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+/**
+ * Auth Type Enum.
+ */
+public enum AuthType {
+    SharedKey,
+    OAuth,
+    Custom
+}

+ 21 - 14
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
 import org.apache.hadoop.fs.azure.AbstractWasbTestWithTimeout;
 import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
 import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
@@ -62,7 +63,7 @@ public abstract class AbstractAbfsIntegrationTest extends
   private static final Logger LOG =
       LoggerFactory.getLogger(AbstractAbfsIntegrationTest.class);
 
-  private final boolean isEmulator;
+  private boolean isEmulator;
   private NativeAzureFileSystem wasb;
   private AzureBlobFileSystem abfs;
   private String abfsScheme;
@@ -71,20 +72,18 @@ public abstract class AbstractAbfsIntegrationTest extends
   private String fileSystemName;
   private String accountName;
   private String testUrl;
-
-  protected AbstractAbfsIntegrationTest(final boolean secure) {
-    this(secure ? FileSystemUriSchemes.ABFS_SECURE_SCHEME : FileSystemUriSchemes.ABFS_SCHEME);
-  }
+  private AuthType authType;
 
   protected AbstractAbfsIntegrationTest() {
-    this(FileSystemUriSchemes.ABFS_SCHEME);
-  }
-
-  private AbstractAbfsIntegrationTest(final String scheme) {
-    abfsScheme = scheme;
     fileSystemName = ABFS_TEST_CONTAINER_PREFIX + UUID.randomUUID().toString();
     configuration = new Configuration();
     configuration.addResource(ABFS_TEST_RESOURCE_XML);
+    this.accountName = this.configuration.get(FS_AZURE_TEST_ACCOUNT_NAME);
+
+    authType = configuration.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME
+            + accountName, AuthType.SharedKey);
+    abfsScheme = authType == AuthType.SharedKey ? FileSystemUriSchemes.ABFS_SCHEME
+            : FileSystemUriSchemes.ABFS_SECURE_SCHEME;
 
     String accountName = configuration.get(FS_AZURE_TEST_ACCOUNT_NAME, "");
     assumeTrue("Not set: " + FS_AZURE_TEST_ACCOUNT_NAME,
@@ -94,8 +93,13 @@ public abstract class AbstractAbfsIntegrationTest extends
         accountName, containsString("dfs.core.windows.net"));
     String fullKey = FS_AZURE_TEST_ACCOUNT_KEY_PREFIX
         + accountName;
-    assumeTrue("Not set: " + fullKey,
-        configuration.get(fullKey) != null);
+
+    if (authType == AuthType.SharedKey) {
+      assumeTrue("Not set: " + fullKey, configuration.get(fullKey) != null);
+    } else {
+      String accessTokenProviderKey = FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME + accountName;
+      assumeTrue("Not set: " + accessTokenProviderKey, configuration.get(accessTokenProviderKey) != null);
+    }
 
     final String abfsUrl = this.getFileSystemName() + "@" + this.getAccountName();
     URI defaultUri = null;
@@ -110,7 +114,6 @@ public abstract class AbstractAbfsIntegrationTest extends
     configuration.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString());
     configuration.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true);
     this.isEmulator = this.configuration.getBoolean(FS_AZURE_EMULATOR_ENABLED, false);
-    this.accountName = this.configuration.get(FS_AZURE_TEST_ACCOUNT_NAME);
   }
 
 
@@ -119,7 +122,7 @@ public abstract class AbstractAbfsIntegrationTest extends
     //Create filesystem first to make sure getWasbFileSystem() can return an existing filesystem.
     createFileSystem();
 
-    if (!isEmulator) {
+    if (!isEmulator && authType == AuthType.SharedKey) {
       final URI wasbUri = new URI(abfsUrlToWasbUrl(getTestUrl()));
       final AzureNativeFileSystemStore azureNativeFileSystemStore =
           new AzureNativeFileSystemStore();
@@ -234,6 +237,10 @@ public abstract class AbstractAbfsIntegrationTest extends
     return isEmulator;
   }
 
+  protected AuthType getAuthType() {
+    return this.authType;
+  }
+
   /**
    * Write a buffer to a file.
    * @param path path

+ 1 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java

@@ -19,7 +19,6 @@
 package org.apache.hadoop.fs.azurebfs;
 
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 import org.junit.Assert;
@@ -30,6 +29,7 @@ import org.junit.Test;
  */
 public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
   private static final int LIST_MAX_RESULTS = 5000;
+
   @Test
   public void testContinuationTokenHavingEqualSign() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();

+ 4 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java

@@ -22,8 +22,11 @@ import com.microsoft.azure.storage.CloudStorageAccount;
 import com.microsoft.azure.storage.blob.CloudBlobClient;
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
 import com.microsoft.azure.storage.blob.CloudBlockBlob;
+
+import org.junit.Assume;
 import org.junit.Test;
 
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
@@ -34,6 +37,7 @@ public class ITestAzureBlobFileSystemBackCompat extends
     AbstractAbfsIntegrationTest {
   public ITestAzureBlobFileSystemBackCompat() {
     super();
+    Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
   }
 
   @Test

+ 0 - 3
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java

@@ -34,9 +34,6 @@ public class ITestAzureBlobFileSystemFileStatus extends
     AbstractAbfsIntegrationTest {
   private static final Path TEST_FILE = new Path("testFile");
   private static final Path TEST_FOLDER = new Path("testDir");
-  public ITestAzureBlobFileSystemFileStatus() {
-    super();
-  }
 
   @Test
   public void testEnsureStatusWorksForRoot() throws Exception {

+ 6 - 2
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFinalize.java

@@ -25,12 +25,14 @@ import org.junit.Test;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
 
 /**
  * Test finalize() method when "fs.abfs.impl.disable.cache" is enabled.
  */
 public class ITestAzureBlobFileSystemFinalize extends AbstractAbfsScaleTest{
-  static final String DISABLE_CACHE_KEY = "fs.abfs.impl.disable.cache";
+  static final String DISABLE_ABFS_CACHE_KEY = "fs.abfs.impl.disable.cache";
+  static final String DISABLE_ABFSSS_CACHE_KEY = "fs.abfss.impl.disable.cache";
 
   public ITestAzureBlobFileSystemFinalize() throws Exception {
     super();
@@ -40,7 +42,9 @@ public class ITestAzureBlobFileSystemFinalize extends AbstractAbfsScaleTest{
   public void testFinalize() throws Exception {
     // Disable the cache for filesystem to make sure there is no reference.
     Configuration configuration = this.getConfiguration();
-    configuration.setBoolean(this.DISABLE_CACHE_KEY, true);
+    configuration.setBoolean(
+            this.getAuthType() == AuthType.SharedKey ? DISABLE_ABFS_CACHE_KEY : DISABLE_ABFSSS_CACHE_KEY,
+    true);
 
     AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration);
 

+ 7 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java

@@ -31,9 +31,9 @@ import java.io.IOException;
 import com.microsoft.azure.storage.blob.BlockEntry;
 import com.microsoft.azure.storage.blob.BlockListingFilter;
 import com.microsoft.azure.storage.blob.CloudBlockBlob;
-import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount;
 import org.hamcrest.core.IsEqual;
 import org.hamcrest.core.IsNot;
+import org.junit.Assume;
 import org.junit.Test;
 
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -42,6 +42,9 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
+
 /**
  * Test flush operation.
  */
@@ -209,6 +212,8 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
 
   @Test
   public void testFlushWithFlushEnabled() throws Exception {
+    Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
+
     AzureBlobStorageTestAccount testAccount = createWasbTestAccount();
     String wasbUrl = testAccount.getFileSystem().getName();
     String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);
@@ -228,6 +233,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
 
   @Test
   public void testFlushWithFlushDisabled() throws Exception {
+    Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
     AzureBlobStorageTestAccount testAccount = createWasbTestAccount();
     String wasbUrl = testAccount.getFileSystem().getName();
     String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);

+ 176 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemOauth.java

@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
+import org.junit.Assume;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_DATA_READER_CLIENT_ID;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET;
+
+/**
+ * Test Azure Oauth with Blob Data contributor role and Blob Data Reader role.
+ * The Test AAD client need to be configured manually through Azure Portal, then save their properties in
+ * configuration files.
+ */
+public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
+
+  private static final Path FILE_PATH = new Path("/testFile");
+  private static final Path EXISTED_FILE_PATH = new Path("/existedFile");
+  private static final Path EXISTED_FOLDER_PATH = new Path("/existedFolder");
+
+  public ITestAzureBlobFileSystemOauth() {
+    Assume.assumeTrue(this.getAuthType() == AuthType.OAuth);
+  }
+  /*
+  * BLOB DATA CONTRIBUTOR should have full access to the container and blobs in the container.
+  * */
+  @Test
+  public void testBlobDataContributor() throws Exception {
+    String clientId = this.getConfiguration().get(TestConfigurationKeys.FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID);
+    Assume.assumeTrue("Contributor client id not provided", clientId != null);
+    String secret = this.getConfiguration().get(TestConfigurationKeys.FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET);
+    Assume.assumeTrue("Contributor client secret not provided", secret != null);
+
+    prepareFiles();
+
+    final AzureBlobFileSystem fs = getBlobConributor();
+
+    // create and write into file in current container/fs
+    try(FSDataOutputStream stream = fs.create(FILE_PATH)) {
+      stream.write(0);
+    }
+    assertTrue(fs.exists(FILE_PATH));
+    FileStatus fileStatus = fs.getFileStatus(FILE_PATH);
+    assertEquals(1, fileStatus.getLen());
+    // delete file
+    assertTrue(fs.delete(FILE_PATH, true));
+    assertFalse(fs.exists(FILE_PATH));
+
+    // Verify Blob Data Contributor has full access to existed folder, file
+
+    // READ FOLDER
+    assertTrue(fs.exists(EXISTED_FOLDER_PATH));
+
+    //DELETE FOLDER
+    fs.delete(EXISTED_FOLDER_PATH, true);
+    assertFalse(fs.exists(EXISTED_FOLDER_PATH));
+
+    // READ FILE
+    try (FSDataInputStream stream = fs.open(EXISTED_FILE_PATH)) {
+      assertTrue(stream.read() != 0);
+    }
+
+    assertEquals(0, fs.getFileStatus(EXISTED_FILE_PATH).getLen());
+
+    // WRITE FILE
+    try (FSDataOutputStream stream = fs.append(EXISTED_FILE_PATH)) {
+      stream.write(0);
+    }
+
+    assertEquals(1, fs.getFileStatus(EXISTED_FILE_PATH).getLen());
+
+    // REMOVE FILE
+    fs.delete(EXISTED_FILE_PATH, true);
+    assertFalse(fs.exists(EXISTED_FILE_PATH));
+  }
+
+  /*
+   * BLOB DATA READER should have only READ access to the container and blobs in the container.
+   * */
+  @Test
+  public void testBlobDataReader() throws Exception {
+    String clientId = this.getConfiguration().get(TestConfigurationKeys.FS_AZURE_BLOB_DATA_READER_CLIENT_ID);
+    Assume.assumeTrue("Reader client id not provided", clientId != null);
+    String secret = this.getConfiguration().get(TestConfigurationKeys.FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET);
+    Assume.assumeTrue("Reader client secret not provided", secret != null);
+
+    prepareFiles();
+    final AzureBlobFileSystem fs = getBlobReader();
+
+    // Use abfsStore in this test to verify the  ERROR code in AbfsRestOperationException
+    AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
+    // TEST READ FS
+    Map<String, String> properties = abfsStore.getFilesystemProperties();
+    // TEST READ FOLDER
+    assertTrue(fs.exists(EXISTED_FOLDER_PATH));
+
+    // TEST DELETE FOLDER
+    try {
+      abfsStore.delete(EXISTED_FOLDER_PATH, true);
+    } catch (AbfsRestOperationException e) {
+      assertEquals(AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH, e.getErrorCode());
+    }
+
+    // TEST READ  FILE
+    try (InputStream inputStream = abfsStore.openFileForRead(EXISTED_FILE_PATH, null)) {
+      assertTrue(inputStream.read() != 0);
+    }
+
+    // TEST WRITE FILE
+    try {
+      abfsStore.openFileForWrite(EXISTED_FILE_PATH, true);
+    } catch (AbfsRestOperationException e) {
+      assertEquals(AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH, e.getErrorCode());
+    }
+
+  }
+
+  private void prepareFiles() throws IOException {
+    // create test files/folders to verify access control diff between
+    // Blob data contributor and Blob data reader
+    final AzureBlobFileSystem fs = this.getFileSystem();
+    fs.create(EXISTED_FILE_PATH);
+    assertTrue(fs.exists(EXISTED_FILE_PATH));
+    fs.mkdirs(EXISTED_FOLDER_PATH);
+    assertTrue(fs.exists(EXISTED_FOLDER_PATH));
+  }
+
+  private AzureBlobFileSystem getBlobConributor() throws Exception {
+    Configuration configuration = this.getConfiguration();
+    configuration.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + this.getAccountName(), configuration.get(FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID));
+    configuration.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET + this.getAccountName(), configuration.get(FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET));
+    return getFileSystem(configuration);
+  }
+
+  private AzureBlobFileSystem getBlobReader() throws Exception {
+    Configuration configuration = this.getConfiguration();
+    configuration.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + this.getAccountName(), configuration.get(FS_AZURE_BLOB_DATA_READER_CLIENT_ID));
+    configuration.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET + this.getAccountName(), configuration.get(FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET));
+    return getFileSystem(configuration);
+  }
+}

+ 3 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java

@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.Random;
 import java.util.concurrent.Callable;
 
+import org.junit.Assume;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,6 +35,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -66,6 +68,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
 
   public ITestAzureBlobFileSystemRandomRead() throws Exception {
     super();
+    Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
   }
 
   @Test

+ 4 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
 
 /**
  * Test AzureBlobFileSystem initialization.
@@ -41,8 +42,10 @@ public class ITestFileSystemInitialization extends AbstractAbfsIntegrationTest {
     final String accountName = getAccountName();
     final String filesystem = getFileSystemName();
 
+    String scheme = this.getAuthType() == AuthType.SharedKey ? FileSystemUriSchemes.ABFS_SCHEME
+            : FileSystemUriSchemes.ABFS_SECURE_SCHEME;
     assertEquals(fs.getUri(),
-        new URI(FileSystemUriSchemes.ABFS_SCHEME,
+        new URI(scheme,
             filesystem + "@" + accountName,
             null,
             null,

+ 9 - 2
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemRegistration.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
 
 /**
  * Test AzureBlobFileSystem registration.
@@ -79,8 +80,14 @@ public class ITestFileSystemRegistration extends AbstractAbfsIntegrationTest {
     AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(getConfiguration());
     assertNotNull("filesystem", fs);
 
-    Abfs afs = (Abfs) FileContext.getFileContext(getConfiguration()).getDefaultFileSystem();
-    assertNotNull("filecontext", afs);
+    if (this.getAuthType() == AuthType.OAuth) {
+      Abfss afs = (Abfss) FileContext.getFileContext(getConfiguration()).getDefaultFileSystem();
+      assertNotNull("filecontext", afs);
+    } else {
+      Abfs afs = (Abfs) FileContext.getFileContext(getConfiguration()).getDefaultFileSystem();
+      assertNotNull("filecontext", afs);
+    }
+
   }
 
   @Test

+ 2 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDeleted;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsDirectory;
@@ -50,6 +51,7 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
 
   public ITestWasbAbfsCompatibility() throws Exception {
     Assume.assumeFalse("Emulator is not supported", isEmulator());
+    Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
   }
 
   @Test

+ 6 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java

@@ -28,6 +28,12 @@ public final class TestConfigurationKeys {
   public static final String FS_AZURE_TEST_HOST_PORT = "fs.azure.test.host.port";
   public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs";
 
+  public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID = "fs.azure.account.oauth2.contributor.client.id";
+  public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET = "fs.azure.account.oauth2.contributor.client.secret";
+
+  public static final String FS_AZURE_BLOB_DATA_READER_CLIENT_ID = "fs.azure.account.oauth2.reader.client.id";
+  public static final String FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET = "fs.azure.account.oauth2.reader.client.secret";
+
   public static final String ABFS_TEST_RESOURCE_XML = "azure-bfs-test.xml";
 
   public static final String ABFS_TEST_CONTAINER_PREFIX = "abfs-testcontainer-";

+ 9 - 5
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ABFSContractTestBinding.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
 import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
 import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
 
 /**
  * Bind ABFS contract tests to the Azure test setup/teardown.
@@ -32,18 +33,17 @@ import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
 public class ABFSContractTestBinding extends AbstractAbfsIntegrationTest {
   private final URI testUri;
 
-  public ABFSContractTestBinding(final boolean secure) throws Exception {
-    this(secure, true);
+  public ABFSContractTestBinding() throws Exception {
+    this(true);
   }
 
-  public ABFSContractTestBinding(final boolean secure,
+  public ABFSContractTestBinding(
       final boolean useExistingFileSystem) throws Exception{
-    super(secure);
     if (useExistingFileSystem) {
       Configuration configuration = getConfiguration();
       String testUrl = configuration.get(TestConfigurationKeys.FS_AZURE_CONTRACT_TEST_URI);
 
-      if (secure) {
+      if (getAuthType() != AuthType.SharedKey) {
         testUrl = testUrl.replaceFirst(FileSystemUriSchemes.ABFS_SCHEME, FileSystemUriSchemes.ABFS_SECURE_SCHEME);
       }
       setTestUrl(testUrl);
@@ -61,4 +61,8 @@ public class ABFSContractTestBinding extends AbstractAbfsIntegrationTest {
   public Configuration getConfiguration() {
     return super.getConfiguration();
   }
+
+  public boolean isSecureMode() {
+    return this.getAuthType() == AuthType.SharedKey ? false : true;
+  }
 }

+ 4 - 15
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractAppend.java

@@ -18,34 +18,23 @@
 
 package org.apache.hadoop.fs.azurebfs.contract;
 
-import java.util.Arrays;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractAppendTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.junit.Test;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 
 /**
  * Contract test for open operation.
  */
-@RunWith(Parameterized.class)
 public class ITestAbfsFileSystemContractAppend extends AbstractContractAppendTest {
-  @Parameterized.Parameters(name = "SecureMode={0}")
-  public static Iterable<Object[]> secure() {
-    return Arrays.asList(new Object[][] { {true}, {false} });
-  }
-
   private final boolean isSecure;
   private final ABFSContractTestBinding binding;
 
-  public ITestAbfsFileSystemContractAppend(final boolean secure) throws Exception {
-    this.isSecure = secure;
-    binding = new ABFSContractTestBinding(this.isSecure);
+  public ITestAbfsFileSystemContractAppend() throws Exception {
+    binding = new ABFSContractTestBinding();
+    this.isSecure = binding.isSecureMode();
   }
 
   @Override

+ 3 - 14
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractConcat.java

@@ -17,11 +17,6 @@
  */
 package org.apache.hadoop.fs.azurebfs.contract;
 
-import java.util.Arrays;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractConcatTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
@@ -29,19 +24,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
 /**
  * Contract test for concat operation.
  */
-@RunWith(Parameterized.class)
 public class ITestAbfsFileSystemContractConcat extends AbstractContractConcatTest{
-  @Parameterized.Parameters(name = "SecureMode={0}")
-  public static Iterable<Object[]> secure() {
-    return Arrays.asList(new Object[][] { {true}, {false} });
-  }
-
   private final boolean isSecure;
   private final ABFSContractTestBinding binding;
 
-  public ITestAbfsFileSystemContractConcat(final boolean secure) throws Exception {
-    isSecure = secure;
-    binding = new ABFSContractTestBinding(isSecure);
+  public ITestAbfsFileSystemContractConcat() throws Exception {
+    binding = new ABFSContractTestBinding();
+    this.isSecure = binding.isSecureMode();
   }
 
   @Override

+ 3 - 14
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractCreate.java

@@ -18,11 +18,6 @@
 
 package org.apache.hadoop.fs.azurebfs.contract;
 
-import java.util.Arrays;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
@@ -30,19 +25,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
 /**
  * Contract test for create operation.
  */
-@RunWith(Parameterized.class)
 public class ITestAbfsFileSystemContractCreate extends AbstractContractCreateTest{
-  @Parameterized.Parameters(name = "SecureMode={0}")
-  public static Iterable<Object[]> secure() {
-    return Arrays.asList(new Object[][] { {true}, {false} });
-  }
-
   private final boolean isSecure;
   private final ABFSContractTestBinding binding;
 
-  public ITestAbfsFileSystemContractCreate(final boolean secure) throws Exception {
-    this.isSecure = secure;
-    binding = new ABFSContractTestBinding(this.isSecure);
+  public ITestAbfsFileSystemContractCreate() throws Exception {
+    binding = new ABFSContractTestBinding();
+    this.isSecure = binding.isSecureMode();
   }
 
   @Override

+ 3 - 14
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractDelete.java

@@ -18,11 +18,6 @@
 
 package org.apache.hadoop.fs.azurebfs.contract;
 
-import java.util.Arrays;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractDeleteTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
@@ -30,19 +25,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
 /**
  * Contract test for delete operation.
  */
-@RunWith(Parameterized.class)
 public class ITestAbfsFileSystemContractDelete extends AbstractContractDeleteTest {
-  @Parameterized.Parameters(name = "SecureMode={0}")
-  public static Iterable<Object[]> secure() {
-    return Arrays.asList(new Object[][] { {true}, {false} });
-  }
-
   private final boolean isSecure;
   private final ABFSContractTestBinding binding;
 
-  public ITestAbfsFileSystemContractDelete(final boolean secure) throws Exception {
-    this.isSecure = secure;
-    binding = new ABFSContractTestBinding(isSecure);
+  public ITestAbfsFileSystemContractDelete() throws Exception {
+    binding = new ABFSContractTestBinding();
+    this.isSecure = binding.isSecureMode();
   }
 
   @Override

+ 1 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractDistCp.java

@@ -28,7 +28,7 @@ public class ITestAbfsFileSystemContractDistCp extends AbstractContractDistCpTes
   private final ABFSContractTestBinding binding;
 
   public ITestAbfsFileSystemContractDistCp() throws Exception {
-    binding = new ABFSContractTestBinding(false);
+    binding = new ABFSContractTestBinding();
   }
 
   @Override

+ 3 - 14
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractGetFileStatus.java

@@ -17,11 +17,6 @@
  */
 package org.apache.hadoop.fs.azurebfs.contract;
 
-import java.util.Arrays;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
@@ -29,19 +24,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
 /**
  * Contract test for getFileStatus operation.
  */
-@RunWith(Parameterized.class)
 public class ITestAbfsFileSystemContractGetFileStatus extends AbstractContractGetFileStatusTest {
-  @Parameterized.Parameters(name = "SecureMode={0}")
-  public static Iterable<Object[]> secure() {
-    return Arrays.asList(new Object[][] { {true}, {false} });
-  }
-
   private final boolean isSecure;
   private final ABFSContractTestBinding binding;
 
-  public ITestAbfsFileSystemContractGetFileStatus(final boolean secure) throws Exception {
-    this.isSecure = secure;
-    binding = new ABFSContractTestBinding(isSecure);
+  public ITestAbfsFileSystemContractGetFileStatus() throws Exception {
+    binding = new ABFSContractTestBinding();
+    this.isSecure = binding.isSecureMode();
   }
 
   @Override

+ 3 - 14
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractMkdir.java

@@ -18,11 +18,6 @@
 
 package org.apache.hadoop.fs.azurebfs.contract;
 
-import java.util.Arrays;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
@@ -30,19 +25,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
 /**
  * Contract test for mkdir operation.
  */
-@RunWith(Parameterized.class)
 public class ITestAbfsFileSystemContractMkdir extends AbstractContractMkdirTest {
-  @Parameterized.Parameters(name = "SecureMode={0}")
-  public static Iterable<Object[]> secure() {
-    return Arrays.asList(new Object[][] { {true}, {false} });
-  }
-
   private final boolean isSecure;
   private final ABFSContractTestBinding binding;
 
-  public ITestAbfsFileSystemContractMkdir(final boolean secure) throws Exception {
-    this.isSecure = secure;
-    binding = new ABFSContractTestBinding(secure);
+  public ITestAbfsFileSystemContractMkdir() throws Exception {
+    binding = new ABFSContractTestBinding();
+    this.isSecure = binding.isSecureMode();
   }
 
   @Override

+ 3 - 14
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractOpen.java

@@ -18,11 +18,6 @@
 
 package org.apache.hadoop.fs.azurebfs.contract;
 
-import java.util.Arrays;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
@@ -30,19 +25,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
 /**
  * Contract test for open operation.
  */
-@RunWith(Parameterized.class)
 public class ITestAbfsFileSystemContractOpen extends AbstractContractOpenTest {
-  @Parameterized.Parameters(name = "SecureMode={0}")
-  public static Iterable<Object[]> secure() {
-    return Arrays.asList(new Object[][] { {true}, {false} });
-  }
-
   private final boolean isSecure;
   private final ABFSContractTestBinding binding;
 
-  public ITestAbfsFileSystemContractOpen(final boolean secure) throws Exception {
-    this.isSecure = secure;
-    binding = new ABFSContractTestBinding(this.isSecure);
+  public ITestAbfsFileSystemContractOpen() throws Exception {
+    binding = new ABFSContractTestBinding();
+    this.isSecure = binding.isSecureMode();
   }
 
   @Override

+ 3 - 14
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractRename.java

@@ -18,11 +18,6 @@
 
 package org.apache.hadoop.fs.azurebfs.contract;
 
-import java.util.Arrays;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
@@ -30,19 +25,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
 /**
  * Contract test for rename operation.
  */
-@RunWith(Parameterized.class)
 public class ITestAbfsFileSystemContractRename extends AbstractContractRenameTest {
-  @Parameterized.Parameters(name = "SecureMode={0}")
-  public static Iterable<Object[]> secure() {
-    return Arrays.asList(new Object[][] { {true}, {false} });
-  }
-
   private final boolean isSecure;
   private final ABFSContractTestBinding binding;
 
-  public ITestAbfsFileSystemContractRename(final boolean secure) throws Exception {
-    this.isSecure = secure;
-    binding = new ABFSContractTestBinding(this.isSecure);
+  public ITestAbfsFileSystemContractRename() throws Exception {
+    binding = new ABFSContractTestBinding();
+    this.isSecure = binding.isSecureMode();
   }
 
   @Override

+ 3 - 13
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractRootDirectory.java

@@ -17,31 +17,21 @@
  */
 package org.apache.hadoop.fs.azurebfs.contract;
 
-import java.util.Arrays;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.junit.Ignore;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 /**
  * Contract test for root directory operation.
  */
-@RunWith(Parameterized.class)
 public class ITestAbfsFileSystemContractRootDirectory extends AbstractContractRootDirectoryTest {
-  @Parameterized.Parameters(name = "SecureMode={0}")
-  public static Iterable<Object[]> secure() {
-    return Arrays.asList(new Object[][] { {true}, {false} });
-  }
-
   private final boolean isSecure;
   private final ABFSContractTestBinding binding;
 
-  public ITestAbfsFileSystemContractRootDirectory(final boolean secure) throws Exception {
-    this.isSecure = secure;
-    binding = new ABFSContractTestBinding(secure);
+  public ITestAbfsFileSystemContractRootDirectory() throws Exception {
+    binding = new ABFSContractTestBinding();
+    this.isSecure = binding.isSecureMode();
   }
 
   @Override

+ 1 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSecureDistCp.java

@@ -28,7 +28,7 @@ public class ITestAbfsFileSystemContractSecureDistCp extends AbstractContractDis
   private final ABFSContractTestBinding binding;
 
   public ITestAbfsFileSystemContractSecureDistCp() throws Exception {
-    binding = new ABFSContractTestBinding(true);
+    binding = new ABFSContractTestBinding();
   }
 
   @Override

+ 3 - 14
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java

@@ -18,11 +18,6 @@
 
 package org.apache.hadoop.fs.azurebfs.contract;
 
-import java.util.Arrays;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
@@ -30,19 +25,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
 /**
  * Contract test for seek operation.
  */
-@RunWith(Parameterized.class)
 public class ITestAbfsFileSystemContractSeek extends AbstractContractSeekTest{
-  @Parameterized.Parameters(name = "SecureMode={0}")
-  public static Iterable<Object[]> secure() {
-    return Arrays.asList(new Object[][] { {true}, {false} });
-  }
-
   private final boolean isSecure;
   private final ABFSContractTestBinding binding;
 
-  public ITestAbfsFileSystemContractSeek(final boolean secure) throws Exception {
-    this.isSecure = secure;
-    binding = new ABFSContractTestBinding(this.isSecure);
+  public ITestAbfsFileSystemContractSeek() throws Exception {
+    binding = new ABFSContractTestBinding();
+    this.isSecure = binding.isSecureMode();
   }
 
   @Override

+ 3 - 14
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSetTimes.java

@@ -17,11 +17,6 @@
  */
 package org.apache.hadoop.fs.azurebfs.contract;
 
-import java.util.Arrays;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractSetTimesTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
@@ -29,19 +24,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
 /**
  * Contract test for setTimes operation.
  */
-@RunWith(Parameterized.class)
 public class ITestAbfsFileSystemContractSetTimes extends AbstractContractSetTimesTest {
-  @Parameterized.Parameters(name = "SecureMode={0}")
-  public static Iterable<Object[]> secure() {
-    return Arrays.asList(new Object[][] { {true}, {false} });
-  }
-
   private final boolean isSecure;
   private final ABFSContractTestBinding binding;
 
-  public ITestAbfsFileSystemContractSetTimes(final boolean secure) throws Exception {
-    this.isSecure = secure;
-    binding = new ABFSContractTestBinding(this.isSecure);
+  public ITestAbfsFileSystemContractSetTimes() throws Exception {
+    binding = new ABFSContractTestBinding();
+    this.isSecure = binding.isSecureMode();
   }
 
   @Override

+ 1 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAzureBlobFileSystemBasics.java

@@ -40,7 +40,7 @@ public class ITestAzureBlobFileSystemBasics extends FileSystemContractBaseTest {
   public ITestAzureBlobFileSystemBasics() throws Exception {
     // If all contract tests are running in parallel, some root level tests in FileSystemContractBaseTest will fail
     // due to the race condition. Hence for this contract test it should be tested in different container
-    binding = new ABFSContractTestBinding(false, false);
+    binding = new ABFSContractTestBinding(false);
   }
 
 

+ 3 - 3
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java

@@ -21,11 +21,11 @@ package org.apache.hadoop.fs.azurebfs.services;
 import java.net.URL;
 import java.util.regex.Pattern;
 
-import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
-import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
 
@@ -40,7 +40,7 @@ public final class TestAbfsClient {
                                  AbfsConfiguration config,
                                  boolean includeSSLProvider) {
     AbfsClient client = new AbfsClient(baseUrl, null,
-        config, null);
+        config, null, null);
     String sslProviderName = null;
     if (includeSSLProvider) {
       sslProviderName = SSLSocketFactoryEx.getDefaultFactory().getProviderName();

+ 72 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestQueryParams.java

@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.azurebfs.oauth2.QueryParams;
+/**
+ * Test query params serialization.
+ */
+public class TestQueryParams {
+  private static final String SEPARATOR = "&";
+  private static final String[][] PARAM_ARRAY = {{"K0", "V0"}, {"K1", "V1"}, {"K2", "V2"}};
+
+  @Test
+  public void testOneParam() {
+    String key = PARAM_ARRAY[0][0];
+    String value = PARAM_ARRAY[0][1];
+
+    Map<String, String> paramMap = new HashMap<>();
+    paramMap.put(key, value);
+
+    QueryParams qp = new QueryParams();
+    qp.add(key, value);
+    Assert.assertEquals(key + "=" + value, qp.serialize());
+  }
+
+  @Test
+  public void testMultipleParams() {
+    QueryParams qp = new QueryParams();
+    for (String[] entry : PARAM_ARRAY) {
+      qp.add(entry[0], entry[1]);
+    }
+    Map<String, String> paramMap = constructMap(qp.serialize());
+    Assert.assertEquals(PARAM_ARRAY.length, paramMap.size());
+
+    for (String[] entry : PARAM_ARRAY) {
+      Assert.assertTrue(paramMap.containsKey(entry[0]));
+      Assert.assertEquals(entry[1], paramMap.get(entry[0]));
+    }
+  }
+
+  private Map<String, String> constructMap(String input) {
+    String[] entries = input.split(SEPARATOR);
+    Map<String, String> paramMap = new HashMap<>();
+    for (String entry : entries) {
+      String[] keyValue = entry.split("=");
+      paramMap.put(keyValue[0], keyValue[1]);
+    }
+    return paramMap;
+  }
+
+}

+ 11 - 2
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/CleanUpAbfsTestContainer.java

@@ -21,9 +21,13 @@ import com.microsoft.azure.storage.CloudStorageAccount;
 import com.microsoft.azure.storage.blob.CloudBlobClient;
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
 
-import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount;
+import org.junit.Assume;
 import org.junit.Test;
 
+import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
+
 import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.ABFS_TEST_CONTAINER_PREFIX;
 
 /**
@@ -31,7 +35,12 @@ import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.ABFS
  * In that case, dev can use this tool to list and delete all test containers.
  * By default, all test container used in E2E tests sharing same prefix: "abfs-testcontainer-"
  */
-public class CleanUpAbfsTestContainer {
+public class CleanUpAbfsTestContainer extends AbstractAbfsIntegrationTest{
+
+  public CleanUpAbfsTestContainer() {
+    Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
+  }
+
   @Test
   public void testEnumContainers() throws Throwable {
     int count = 0;

+ 125 - 3
hadoop-tools/hadoop-azure/src/test/resources/azure-bfs-test.xml

@@ -22,6 +22,27 @@
     <value>{YOURACCOUNT}</value>
   </property>
 
+  <property>
+    <name>fs.azure.account.auth.type.{YOURACCOUNT}.dfs.core.windows.net</name>
+    <value>{AUTH TYPE}</value>
+    <description>The auth type can be : SharedKey, OAuth, Custom. By default "SharedKey" is used.</description>
+  </property>
+
+   <property>
+    <name>fs.contract.test.fs.abfs</name>
+    <value>abfs://{CONTAINERNAME}@{ACCOUNTNAME}.dfs.core.windows.net/value>
+    <description>The name of the azure file system for testing.</description>
+  </property>
+
+  <property>
+    <name>fs.contract.test.fs.abfss</name>
+    <value>abfss://{CONTAINERNAME}@{ACCOUNTNAME}.dfs.core.windows.net/value>
+    <description>The name of the azure file system for testing.</description>
+  </property>
+  -->
+
+  <!--If auth type is "SharedKey", provide SharedKey credentials below -->
+  <!--
   <property>
     <name>fs.azure.account.key.{YOURACCOUNT}.dfs.core.windows.net</name>
     <value>{ACCOUNTKEY}</value>
@@ -31,14 +52,115 @@
     <name>fs.azure.test.account.key.{YOURACCOUNT}.dfs.core.windows.net</name>
     <value>{ACCOUNTKEY}</value>
   </property>
+  -->
 
+  <!--If auth type is "OAuth", set below properties, AAD client and tenant related properties can be obtained through Azure Portal-->
+  <!--
   <property>
-    <name>fs.contract.test.fs.abfs</name>
-    <value>abfs://{CONTAINERNAME}@{ACCOUNTNAME}.dfs.core.windows.net/value>
-    <description>The name of the azure file system for testing.</description>
+    <name>fs.azure.account.oauth.provider.type.{YOURACCOUNT}.dfs.core.windows.net</name>
+    <value>org.apache.hadoop.fs.azurebfs.oauth2.{Token Provider Class name}</value>
+    <description>The full name of token provider class name.</description>
+  </property>
+ -->
+
+  <!--If "ClientCredsTokenProvider" is set as key provider, set auth endpoint, client id and secret below-->
+  <!--
+   <property>
+    <name>fs.azure.account.oauth2.client.endpoint.{YOURACCOUNT}.dfs.core.windows.net</name>
+    <value>https://login.microsoftonline.com/{TENANTID}/oauth2/token</value>
+    <description>Token end point, this can be found through Azure portal</description>
+  </property>
+
+   <property>
+     <name>fs.azure.account.oauth2.client.id.{YOURACCOUNT}.dfs.core.windows.net</name>
+     <value>{client id}</value>
+     <description>AAD client id.</description>
+   </property>
+
+   <property>
+     <name>fs.azure.account.oauth2.client.secret.{YOURACCOUNT}.dfs.core.windows.net</name>
+     <value>{client secret}</value>
+   </property>
+ -->
+
+  <!--If "UserPasswordTokenProvider" is set as key provider, set auth endpoint, use name and password below-->
+  <!--
+   <property>
+    <name>fs.azure.account.oauth2.client.endpoint.{YOURACCOUNT}.dfs.core.windows.net</name>
+    <value>https://login.microsoftonline.com/{TENANTID}/oauth2/token</value>
+    <description>Token end point, this can be found through Azure portal</description>
   </property>
 
+   <property>
+     <name>fs.azure.account.oauth2.user.name.{YOURACCOUNT}.dfs.core.windows.net</name>
+     <value>{user name}</value>
+   </property>
+
+   <property>
+     <name>fs.azure.account.oauth2.user.password.{YOURACCOUNT}.dfs.core.windows.net</name>
+     <value>{user password}</value>
+   </property>
+ -->
+
+  <!--If "MsiTokenProvider" is set as key provider, set tenantGuid and client id below-->
+  <!--
+   <property>
+     <name>fs.azure.account.oauth2.msi.tenant.{YOURACCOUNT}.dfs.core.windows.net</name>
+     <value>{tenantGuid}</value>
+     <description>msi tenantGuid.</description>
+   </property>
+
+   <property>
+     <name>fs.azure.account.oauth2.client.id.{YOURACCOUNT}.dfs.core.windows.net</name>
+     <value>{client id}</value>
+     <description>AAD client id.</description>
+   </property>
+  -->
+
+  <!--If "RefreshTokenBasedTokenProvider" is set as key provider, set refresh token and client id below-->
+  <!--
+   <property>
+     <name>fs.azure.account.oauth2.refresh.token.{YOURACCOUNT}.dfs.core.windows.net</name>
+     <value>{refresh token}</value>
+     <description>refresh token.</description>
+   </property>
+
+   <property>
+     <name>fs.azure.account.oauth2.client.id.{YOURACCOUNT}.dfs.core.windows.net</name>
+     <value>{client id}</value>
+     <description>AAD client id.</description>
+   </property>
+  -->
+
+  <!--Below four configure properties are provided for Blob Data Contributor a
+  nd Blob Data Reader OAuth access test "ITestAzureBlobFileSystemOauth",
+  using"ClientCredsTokenProvider" as key
+  provider. These are for test only.-->
+
+  <!--
+  <property>
+    <name>fs.azure.account.oauth2.contributor.client.id</name>
+    <value>{ID of client which has Data Contributor role}</value>
+  </property>
+
+  <property>
+    <name>fs.azure.account.oauth2.contributor.client.secret</name>
+    <value>{secret of client which has Data Contributor role}</value>
+  </property>
+
+  <property>
+    <name>fs.azure.account.oauth2.reader.client.id</name>
+    <value>{ID of client which has Data Reader role}</value>
+  </property>
+
+  <property>
+    <name>fs.azure.account.oauth2.reader.client.secret</name>
+    <value>{Secret of client which has Data Reader role}</value>
+  </property>
   -->
+
+
+
   <!-- Save above configuration properties in a separate file named -->
   <!-- azure-bfs-auth-keys.xml in the same directory as this file. -->
   <!-- DO NOT ADD azure-bfs-auth-keys.xml TO REVISION CONTROL.  The keys to your -->