|
@@ -21,13 +21,25 @@ package org.apache.hadoop.fs.azure;
|
|
|
import java.io.IOException;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
+import java.net.UnknownHostException;
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
|
|
|
|
+import org.apache.commons.lang.StringUtils;
|
|
|
+import org.apache.commons.lang.Validate;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.azure.security.Constants;
|
|
|
+import org.apache.hadoop.fs.azure.security.SecurityUtils;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
|
|
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
|
|
+import org.apache.hadoop.security.authentication.client.Authenticator;
|
|
|
+import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
|
|
|
import org.apache.http.client.methods.HttpGet;
|
|
|
import org.apache.http.client.utils.URIBuilder;
|
|
|
import org.codehaus.jackson.JsonParseException;
|
|
|
import org.codehaus.jackson.map.JsonMappingException;
|
|
|
import org.codehaus.jackson.map.ObjectMapper;
|
|
|
+
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -42,12 +54,6 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
|
|
|
|
|
|
public static final Logger LOG =
|
|
|
LoggerFactory.getLogger(AzureNativeFileSystemStore.class);
|
|
|
- /**
|
|
|
- * Configuration parameter name expected in the Configuration
|
|
|
- * object to provide the url of the remote service {@value}
|
|
|
- */
|
|
|
- private static final String KEY_CRED_SERVICE_URL =
|
|
|
- "fs.azure.cred.service.url";
|
|
|
|
|
|
/**
|
|
|
* Container SAS Key generation OP name. {@value}
|
|
@@ -81,7 +87,7 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
|
|
|
* Query parameter name for user info {@value}
|
|
|
*/
|
|
|
private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME =
|
|
|
- "delegation_token";
|
|
|
+ "delegation";
|
|
|
|
|
|
/**
|
|
|
* Query parameter name for the relative path inside the storage
|
|
@@ -93,41 +99,50 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
|
|
|
private String delegationToken = "";
|
|
|
private String credServiceUrl = "";
|
|
|
private WasbRemoteCallHelper remoteCallHelper = null;
|
|
|
+ private boolean isSecurityEnabled;
|
|
|
+ private boolean isKerberosSupportEnabled;
|
|
|
|
|
|
public RemoteSASKeyGeneratorImpl(Configuration conf) {
|
|
|
super(conf);
|
|
|
}
|
|
|
|
|
|
- public boolean initialize(Configuration conf, String delegationToken) {
|
|
|
+ public void initialize(Configuration conf) throws IOException {
|
|
|
|
|
|
LOG.debug("Initializing RemoteSASKeyGeneratorImpl instance");
|
|
|
- credServiceUrl = conf.get(KEY_CRED_SERVICE_URL);
|
|
|
-
|
|
|
- if (delegationToken == null || delegationToken.isEmpty()) {
|
|
|
- LOG.error("Delegation Token not provided for initialization"
|
|
|
- + " of RemoteSASKeyGenerator");
|
|
|
- return false;
|
|
|
+ try {
|
|
|
+ delegationToken = SecurityUtils.getDelegationTokenFromCredentials();
|
|
|
+ } catch (IOException e) {
|
|
|
+ final String msg = "Error in fetching the WASB delegation token";
|
|
|
+ LOG.error(msg, e);
|
|
|
+ throw new IOException(msg, e);
|
|
|
}
|
|
|
|
|
|
- this.delegationToken = delegationToken;
|
|
|
+ try {
|
|
|
+ credServiceUrl = SecurityUtils.getCredServiceUrls(conf);
|
|
|
+ } catch (UnknownHostException e) {
|
|
|
+ final String msg = "Invalid CredService Url, configure it correctly";
|
|
|
+ LOG.error(msg, e);
|
|
|
+ throw new IOException(msg, e);
|
|
|
+ }
|
|
|
|
|
|
if (credServiceUrl == null || credServiceUrl.isEmpty()) {
|
|
|
- LOG.error("CredService Url not found in configuration to initialize"
|
|
|
- + " RemoteSASKeyGenerator");
|
|
|
- return false;
|
|
|
+ final String msg = "CredService Url not found in configuration to "
|
|
|
+ + "initialize RemoteSASKeyGenerator";
|
|
|
+ LOG.error(msg);
|
|
|
+ throw new IOException(msg);
|
|
|
}
|
|
|
|
|
|
remoteCallHelper = new WasbRemoteCallHelper();
|
|
|
- LOG.debug("Initialization of RemoteSASKeyGenerator instance successfull");
|
|
|
- return true;
|
|
|
+ this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
|
|
|
+ this.isKerberosSupportEnabled = conf.getBoolean(
|
|
|
+ Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
|
|
|
+ LOG.debug("Initialization of RemoteSASKeyGenerator instance successful");
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public URI getContainerSASUri(String storageAccount, String container)
|
|
|
throws SASKeyGenerationException {
|
|
|
-
|
|
|
try {
|
|
|
-
|
|
|
LOG.debug("Generating Container SAS Key for Container {} "
|
|
|
+ "inside Storage Account {} ", container, storageAccount);
|
|
|
URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
|
|
@@ -138,38 +153,39 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
|
|
|
container);
|
|
|
uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
|
|
|
+ getSasKeyExpiryPeriod());
|
|
|
- uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
|
|
|
- this.delegationToken);
|
|
|
-
|
|
|
- RemoteSASKeyGenerationResponse sasKeyResponse =
|
|
|
- makeRemoteRequest(uriBuilder.build());
|
|
|
-
|
|
|
- if (sasKeyResponse == null) {
|
|
|
- throw new SASKeyGenerationException("RemoteSASKeyGenerationResponse"
|
|
|
- + " object null from remote call");
|
|
|
- } else if (sasKeyResponse.getResponseCode()
|
|
|
- == REMOTE_CALL_SUCCESS_CODE) {
|
|
|
- return new URI(sasKeyResponse.getSasKey());
|
|
|
+ if (isSecurityEnabled && StringUtils.isNotEmpty(delegationToken)) {
|
|
|
+ uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
|
|
|
+ this.delegationToken);
|
|
|
+ }
|
|
|
+
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
+ UserGroupInformation connectUgi = ugi.getRealUser();
|
|
|
+ if (connectUgi == null) {
|
|
|
+ connectUgi = ugi;
|
|
|
} else {
|
|
|
- throw new SASKeyGenerationException("Remote Service encountered error"
|
|
|
- + " in SAS Key generation : "
|
|
|
- + sasKeyResponse.getResponseMessage());
|
|
|
+ uriBuilder.addParameter(Constants.DOAS_PARAM, ugi.getShortUserName());
|
|
|
}
|
|
|
+
|
|
|
+ if (isSecurityEnabled && !connectUgi.hasKerberosCredentials()) {
|
|
|
+ connectUgi = UserGroupInformation.getLoginUser();
|
|
|
+ }
|
|
|
+ return getSASKey(uriBuilder.build(), connectUgi);
|
|
|
} catch (URISyntaxException uriSyntaxEx) {
|
|
|
throw new SASKeyGenerationException("Encountered URISyntaxException "
|
|
|
+ "while building the HttpGetRequest to remote cred service",
|
|
|
uriSyntaxEx);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new SASKeyGenerationException("Encountered IOException"
|
|
|
+ + " while building the HttpGetRequest to remote service", e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public URI getRelativeBlobSASUri(String storageAccount, String container,
|
|
|
String relativePath) throws SASKeyGenerationException {
|
|
|
-
|
|
|
try {
|
|
|
-
|
|
|
LOG.debug("Generating RelativePath SAS Key for relativePath {} inside"
|
|
|
- + " Container {} inside Storage Account {} ",
|
|
|
+ + " Container {} inside Storage Account {} ",
|
|
|
relativePath, container, storageAccount);
|
|
|
URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
|
|
|
uriBuilder.setPath("/" + BLOB_SAS_OP);
|
|
@@ -181,41 +197,98 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
|
|
|
relativePath);
|
|
|
uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
|
|
|
+ getSasKeyExpiryPeriod());
|
|
|
- uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
|
|
|
- this.delegationToken);
|
|
|
-
|
|
|
- RemoteSASKeyGenerationResponse sasKeyResponse =
|
|
|
- makeRemoteRequest(uriBuilder.build());
|
|
|
-
|
|
|
- if (sasKeyResponse == null) {
|
|
|
- throw new SASKeyGenerationException("RemoteSASKeyGenerationResponse"
|
|
|
- + " object null from remote call");
|
|
|
- } else if (sasKeyResponse.getResponseCode()
|
|
|
- == REMOTE_CALL_SUCCESS_CODE) {
|
|
|
- return new URI(sasKeyResponse.getSasKey());
|
|
|
+
|
|
|
+ if (isSecurityEnabled && StringUtils.isNotEmpty(
|
|
|
+ delegationToken)) {
|
|
|
+ uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
|
|
|
+ this.delegationToken);
|
|
|
+ }
|
|
|
+
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
+ UserGroupInformation connectUgi = ugi.getRealUser();
|
|
|
+ if (connectUgi == null) {
|
|
|
+ connectUgi = ugi;
|
|
|
} else {
|
|
|
- throw new SASKeyGenerationException("Remote Service encountered error"
|
|
|
- + " in SAS Key generation : "
|
|
|
- + sasKeyResponse.getResponseMessage());
|
|
|
+ uriBuilder.addParameter(Constants.DOAS_PARAM, ugi.getShortUserName());
|
|
|
}
|
|
|
+
|
|
|
+ if (isSecurityEnabled && !connectUgi.hasKerberosCredentials()) {
|
|
|
+ connectUgi = UserGroupInformation.getLoginUser();
|
|
|
+ }
|
|
|
+ return getSASKey(uriBuilder.build(), connectUgi);
|
|
|
} catch (URISyntaxException uriSyntaxEx) {
|
|
|
throw new SASKeyGenerationException("Encountered URISyntaxException"
|
|
|
+ " while building the HttpGetRequest to " + " remote service",
|
|
|
uriSyntaxEx);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new SASKeyGenerationException("Encountered IOException"
|
|
|
+ + " while building the HttpGetRequest to remote service", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private URI getSASKey(final URI uri, UserGroupInformation connectUgi)
|
|
|
+ throws URISyntaxException, SASKeyGenerationException {
|
|
|
+ final RemoteSASKeyGenerationResponse sasKeyResponse;
|
|
|
+ try {
|
|
|
+ connectUgi.checkTGTAndReloginFromKeytab();
|
|
|
+ sasKeyResponse = connectUgi.doAs(
|
|
|
+ new PrivilegedExceptionAction<RemoteSASKeyGenerationResponse>() {
|
|
|
+ @Override
|
|
|
+ public RemoteSASKeyGenerationResponse run() throws Exception {
|
|
|
+ AuthenticatedURL.Token token = null;
|
|
|
+ if (isKerberosSupportEnabled && UserGroupInformation
|
|
|
+ .isSecurityEnabled() && (delegationToken == null
|
|
|
+ || delegationToken.isEmpty())) {
|
|
|
+ token = new AuthenticatedURL.Token();
|
|
|
+ final Authenticator kerberosAuthenticator =
|
|
|
+ new KerberosDelegationTokenAuthenticator();
|
|
|
+ try {
|
|
|
+ kerberosAuthenticator.authenticate(uri.toURL(), token);
|
|
|
+ Validate.isTrue(token.isSet(),
|
|
|
+ "Authenticated Token is NOT present. "
|
|
|
+ + "The request cannot proceed.");
|
|
|
+ } catch (AuthenticationException e) {
|
|
|
+ throw new IOException(
|
|
|
+ "Authentication failed in check authorization", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return makeRemoteRequest(uri,
|
|
|
+ (token != null ? token.toString() : null));
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (InterruptedException | IOException e) {
|
|
|
+ final String msg = "Error fetching SAS Key from Remote Service: " + uri;
|
|
|
+ LOG.error(msg, e);
|
|
|
+ if (e instanceof InterruptedException) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
+ throw new SASKeyGenerationException(msg, e);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (sasKeyResponse.getResponseCode() == REMOTE_CALL_SUCCESS_CODE) {
|
|
|
+ return new URI(sasKeyResponse.getSasKey());
|
|
|
+ } else {
|
|
|
+ throw new SASKeyGenerationException(
|
|
|
+ "Remote Service encountered error in SAS Key generation : "
|
|
|
+ + sasKeyResponse.getResponseMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Helper method to make a remote request.
|
|
|
* @param uri - Uri to use for the remote request
|
|
|
+ * @param token - hadoop.auth token for the remote request
|
|
|
* @return RemoteSASKeyGenerationResponse
|
|
|
*/
|
|
|
- private RemoteSASKeyGenerationResponse makeRemoteRequest(URI uri)
|
|
|
- throws SASKeyGenerationException {
|
|
|
+ private RemoteSASKeyGenerationResponse makeRemoteRequest(URI uri,
|
|
|
+ String token) throws SASKeyGenerationException {
|
|
|
|
|
|
try {
|
|
|
- String responseBody =
|
|
|
- remoteCallHelper.makeRemoteGetRequest(new HttpGet(uri));
|
|
|
+ HttpGet httpGet = new HttpGet(uri);
|
|
|
+ if (token != null) {
|
|
|
+ httpGet.setHeader("Cookie", AuthenticatedURL.AUTH_COOKIE + "=" + token);
|
|
|
+ }
|
|
|
+ String responseBody = remoteCallHelper.makeRemoteGetRequest(httpGet);
|
|
|
|
|
|
ObjectMapper objectMapper = new ObjectMapper();
|
|
|
return objectMapper.readValue(responseBody,
|