|
@@ -25,9 +25,12 @@ import java.util.Map;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
|
|
|
+import org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations;
|
|
|
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
|
|
|
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation;
|
|
|
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation;
|
|
@@ -69,6 +72,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*
|
|
|
@InterfaceAudience.Private
|
|
|
@InterfaceStability.Evolving
|
|
|
public class AbfsConfiguration{
|
|
|
+
|
|
|
private final Configuration rawConfig;
|
|
|
private final String accountName;
|
|
|
private final boolean isSecure;
|
|
@@ -486,13 +490,25 @@ public class AbfsConfiguration{
|
|
|
String password = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD);
|
|
|
tokenProvider = new UserPasswordTokenProvider(authEndpoint, username, password);
|
|
|
} else if (tokenProviderClass == MsiTokenProvider.class) {
|
|
|
+ String authEndpoint = getTrimmedPasswordString(
|
|
|
+ FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT,
|
|
|
+ AuthConfigurations.DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT);
|
|
|
String tenantGuid = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT);
|
|
|
String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID);
|
|
|
- tokenProvider = new MsiTokenProvider(tenantGuid, clientId);
|
|
|
+ String authority = getTrimmedPasswordString(
|
|
|
+ FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY,
|
|
|
+ AuthConfigurations.DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY);
|
|
|
+ authority = appendSlashIfNeeded(authority);
|
|
|
+ tokenProvider = new MsiTokenProvider(authEndpoint, tenantGuid,
|
|
|
+ clientId, authority);
|
|
|
} else if (tokenProviderClass == RefreshTokenBasedTokenProvider.class) {
|
|
|
+ String authEndpoint = getTrimmedPasswordString(
|
|
|
+ FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN_ENDPOINT,
|
|
|
+ AuthConfigurations.DEFAULT_FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN_ENDPOINT);
|
|
|
String refreshToken = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN);
|
|
|
String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID);
|
|
|
- tokenProvider = new RefreshTokenBasedTokenProvider(clientId, refreshToken);
|
|
|
+ tokenProvider = new RefreshTokenBasedTokenProvider(authEndpoint,
|
|
|
+ clientId, refreshToken);
|
|
|
} else {
|
|
|
throw new IllegalArgumentException("Failed to initialize " + tokenProviderClass);
|
|
|
}
|
|
@@ -649,4 +665,19 @@ public class AbfsConfiguration{
|
|
|
this.disableOutputStreamFlush = disableOutputStreamFlush;
|
|
|
}
|
|
|
|
|
|
+ private String getTrimmedPasswordString(String key, String defaultValue) throws IOException {
|
|
|
+ String value = getPasswordString(key);
|
|
|
+ if (StringUtils.isBlank(value)) {
|
|
|
+ value = defaultValue;
|
|
|
+ }
|
|
|
+ return value.trim();
|
|
|
+ }
|
|
|
+
|
|
|
+ private String appendSlashIfNeeded(String authority) {
|
|
|
+ if (!authority.endsWith(AbfsHttpConstants.FORWARD_SLASH)) {
|
|
|
+ authority = authority + AbfsHttpConstants.FORWARD_SLASH;
|
|
|
+ }
|
|
|
+ return authority;
|
|
|
+ }
|
|
|
+
|
|
|
}
|