Procházet zdrojové kódy

HADOOP-13930. Azure: Add Authorization support to WASB. Contributed by Sivaguru Sankaridurg and Dushyanth

Mingliang Liu před 8 roky
rodič
revize
b26870c58e
19 změnil soubory, kde provedl 1373 přidání a 64 odebrání
  1. 10 0
      hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
  2. 2 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
  3. 2 2
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
  4. 153 2
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
  5. 127 56
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteSASKeyGeneratorImpl.java
  6. 247 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteWasbAuthorizerImpl.java
  7. 2 4
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
  8. 40 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizationException.java
  9. 44 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizationOperations.java
  10. 47 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizerInterface.java
  11. 54 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/Constants.java
  12. 48 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/WasbDelegationTokenIdentifier.java
  13. 124 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/WasbTokenRenewer.java
  14. 28 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/package.html
  15. 16 0
      hadoop-tools/hadoop-azure/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
  16. 16 0
      hadoop-tools/hadoop-azure/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
  17. 34 0
      hadoop-tools/hadoop-azure/src/site/markdown/index.md
  18. 102 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
  19. 277 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java

+ 10 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -1300,6 +1300,16 @@
     to specify the time (such as 2s, 2m, 1h, etc.).
   </description>
 </property>
+<property>
+  <name>fs.azure.authorization</name>
+  <value>false</value>
+  <description>
+    Config flag to enable authorization support in WASB. Setting it to "true" enables
+    authorization support to WASB. Currently WASB authorization requires a remote service
+    to provide authorization that needs to be specified via fs.azure.authorization.remote.service.url
+    configuration
+  </description>
+</property>
 
 
 <property>

+ 2 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java

@@ -177,6 +177,8 @@ public class TestCommonConfigurationFields extends TestConfigurationFieldsBase {
     xmlPropsToSkipCompare.add("io.compression.codec.bzip2.library");
     // - org.apache.hadoop.io.SequenceFile
     xmlPropsToSkipCompare.add("io.seqfile.local.dir");
+    // - org.apache.hadoop.fs.azure.NativeAzureFileSystem
+    xmlPropsToSkipCompare.add("fs.azure.authorization");
 
 
   }

+ 2 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java

@@ -303,7 +303,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   private boolean useSecureMode = false;
   private boolean useLocalSasKeyMode = false;
 
-  private String delegationToken;
+
   /**
    * A test hook interface that can modify the operation context we use for
    * Azure Storage operations, e.g. to inject errors.
@@ -478,7 +478,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         this.storageInteractionLayer = new StorageInterfaceImpl();
       } else {
         this.storageInteractionLayer = new SecureStorageInterfaceImpl(
-            useLocalSasKeyMode, conf, delegationToken);
+            useLocalSasKeyMode, conf);
       }
     }
 

+ 153 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java

@@ -25,9 +25,12 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URL;
 import java.nio.charset.Charset;
+import java.security.PrivilegedExceptionAction;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
@@ -57,10 +60,15 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
 import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
+import org.apache.hadoop.fs.azure.security.Constants;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
+import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Time;
 import org.codehaus.jackson.JsonNode;
@@ -1096,7 +1104,39 @@ public class NativeAzureFileSystem extends FileSystem {
   // A counter to create unique (within-process) names for my metrics sources.
   private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
   private boolean appendSupportEnabled = false;
-  
+  private DelegationTokenAuthenticatedURL authURL;
+  private DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token();
+  private String credServiceUrl;
+
+  /**
+   * Configuration key to enable authorization support in WASB.
+   */
+  public static final String KEY_AZURE_AUTHORIZATION =
+      "fs.azure.authorization";
+
+  /**
+   * Default value for the authorization support in WASB.
+   */
+  private static final boolean DEFAULT_AZURE_AUTHORIZATION = false;
+
+  /**
+   * Flag controlling authorization support in WASB.
+   */
+  private boolean azureAuthorization = false;
+
+  /**
+   * Flag controlling Kerberos support in WASB.
+   */
+  private boolean kerberosSupportEnabled = false;
+
+  /**
+   * Authorizer to use when authorization support is enabled in
+   * WASB.
+   */
+  private WasbAuthorizerInterface authorizer = null;
+
+  private String delegationToken = null;
+
   public NativeAzureFileSystem() {
     // set store in initialize()
   }
@@ -1227,6 +1267,31 @@ public class NativeAzureFileSystem extends FileSystem {
     // Initialize thread counts from user configuration
     deleteThreadCount = conf.getInt(AZURE_DELETE_THREADS, DEFAULT_AZURE_DELETE_THREADS);
     renameThreadCount = conf.getInt(AZURE_RENAME_THREADS, DEFAULT_AZURE_RENAME_THREADS);
+
+    this.azureAuthorization = conf.getBoolean(KEY_AZURE_AUTHORIZATION,
+        DEFAULT_AZURE_AUTHORIZATION);
+    this.kerberosSupportEnabled = conf.getBoolean(
+        Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
+
+    if (this.azureAuthorization) {
+
+      this.authorizer =
+          new RemoteWasbAuthorizerImpl();
+      authorizer.init(conf);
+    }
+    if (UserGroupInformation.isSecurityEnabled() && kerberosSupportEnabled) {
+      DelegationTokenAuthenticator authenticator = new KerberosDelegationTokenAuthenticator();
+      authURL = new DelegationTokenAuthenticatedURL(authenticator);
+      credServiceUrl = conf.get(Constants.KEY_CRED_SERVICE_URL, String
+          .format("http://%s:%s",
+              InetAddress.getLocalHost().getCanonicalHostName(),
+              Constants.DEFAULT_CRED_SERVICE_PORT));
+    }
+  }
+
+  @VisibleForTesting
+  public void updateWasbAuthorizer(WasbAuthorizerInterface authorizer) {
+    this.authorizer = authorizer;
   }
 
   private NativeFileSystemStore createDefaultStore(Configuration conf) {
@@ -1340,6 +1405,15 @@ public class NativeAzureFileSystem extends FileSystem {
     return store;
   }
 
+  private void performAuthCheck(String path, String accessType,
+      String operation) throws WasbAuthorizationException, IOException {
+
+    if (azureAuthorization && !this.authorizer.authorize(path, accessType)) {
+      throw new WasbAuthorizationException(operation
+          + " operation for Path : " + path + " not allowed");
+    }
+  }
+
   /**
    * Gets the metrics source for this file system.
    * This is mainly here for unit testing purposes.
@@ -1362,6 +1436,10 @@ public class NativeAzureFileSystem extends FileSystem {
     LOG.debug("Opening file: {} for append", f);
 
     Path absolutePath = makeAbsolute(f);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.WRITE.toString(), "append");
+
     String key = pathToKey(absolutePath);
     FileMetadata meta = null;
     try {
@@ -1562,6 +1640,10 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     Path absolutePath = makeAbsolute(f);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.WRITE.toString(), "create");
+
     String key = pathToKey(absolutePath);
 
     FileMetadata existingMetadata = store.retrieveMetadata(key);
@@ -1684,6 +1766,10 @@ public class NativeAzureFileSystem extends FileSystem {
     LOG.debug("Deleting file: {}", f.toString());
 
     Path absolutePath = makeAbsolute(f);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), "delete");
+
     String key = pathToKey(absolutePath);
 
     // Capture the metadata for the path.
@@ -1954,6 +2040,10 @@ public class NativeAzureFileSystem extends FileSystem {
 
     // Capture the absolute path and the path to key.
     Path absolutePath = makeAbsolute(f);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), "getFileStatus");
+
     String key = pathToKey(absolutePath);
     if (key.length() == 0) { // root always exists
       return newDirectory(null, absolutePath);
@@ -2052,6 +2142,10 @@ public class NativeAzureFileSystem extends FileSystem {
     LOG.debug("Listing status for {}", f.toString());
 
     Path absolutePath = makeAbsolute(f);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), "list");
+
     String key = pathToKey(absolutePath);
     Set<FileStatus> status = new TreeSet<FileStatus>();
     FileMetadata meta = null;
@@ -2274,6 +2368,10 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     Path absolutePath = makeAbsolute(f);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), "mkdirs");
+
     PermissionStatus permissionStatus = null;
     if(noUmask) {
       // ensure owner still has wx permissions at the minimum
@@ -2327,6 +2425,10 @@ public class NativeAzureFileSystem extends FileSystem {
     LOG.debug("Opening file: {}", f.toString());
 
     Path absolutePath = makeAbsolute(f);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.READ.toString(), "read");
+
     String key = pathToKey(absolutePath);
     FileMetadata meta = null;
     try {
@@ -2383,7 +2485,12 @@ public class NativeAzureFileSystem extends FileSystem {
           + " through WASB that has colons in the name");
     }
 
-    String srcKey = pathToKey(makeAbsolute(src));
+    Path absolutePath = makeAbsolute(src);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), "rename");
+
+    String srcKey = pathToKey(absolutePath);
 
     if (srcKey.length() == 0) {
       // Cannot rename root of file system
@@ -2685,6 +2792,10 @@ public class NativeAzureFileSystem extends FileSystem {
   @Override
   public void setPermission(Path p, FsPermission permission) throws FileNotFoundException, IOException {
     Path absolutePath = makeAbsolute(p);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), "setPermission");
+
     String key = pathToKey(absolutePath);
     FileMetadata metadata = null;
     try {
@@ -2723,6 +2834,10 @@ public class NativeAzureFileSystem extends FileSystem {
   public void setOwner(Path p, String username, String groupname)
       throws IOException {
     Path absolutePath = makeAbsolute(p);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), "setOwner");
+
     String key = pathToKey(absolutePath);
     FileMetadata metadata = null;
 
@@ -2785,6 +2900,42 @@ public class NativeAzureFileSystem extends FileSystem {
     isClosed = true;
   }
 
+  @Override
+  public Token<?> getDelegationToken(final String renewer) throws IOException {
+    if(kerberosSupportEnabled) {
+      try {
+        final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+        UserGroupInformation connectUgi = ugi.getRealUser();
+        final UserGroupInformation proxyUser = connectUgi;
+        if (connectUgi == null) {
+          connectUgi = ugi;
+        }
+        if(!connectUgi.hasKerberosCredentials()){
+          connectUgi = UserGroupInformation.getLoginUser();
+        }
+        connectUgi.checkTGTAndReloginFromKeytab();
+        return connectUgi.doAs(new PrivilegedExceptionAction<Token<?>>() {
+          @Override
+          public Token<?> run() throws Exception {
+            return authURL.getDelegationToken(new URL(credServiceUrl
+                    + Constants.DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT),
+                authToken, renewer, (proxyUser != null)? ugi.getShortUserName(): null);
+          }
+        });
+      } catch (Exception ex) {
+        LOG.error("Error in fetching the delegation token from remote service",
+            ex);
+        if (ex instanceof IOException) {
+          throw (IOException) ex;
+        } else {
+          throw new IOException(ex);
+        }
+      }
+    } else {
+      return super.getDelegationToken(renewer);
+    }
+  }
+
   /**
    * A handler that defines what to do with blobs whose upload was
    * interrupted.

+ 127 - 56
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteSASKeyGeneratorImpl.java

@@ -19,10 +19,23 @@
 package org.apache.hadoop.fs.azure;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Iterator;
 
+import org.apache.commons.lang.Validate;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azure.security.Constants;
+import org.apache.hadoop.fs.azure.security.WasbDelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.Authenticator;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.utils.URIBuilder;
 import org.codehaus.jackson.JsonParseException;
@@ -42,12 +55,6 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
 
   public static final Logger LOG =
       LoggerFactory.getLogger(AzureNativeFileSystemStore.class);
-  /**
-   * Configuration parameter name expected in the Configuration
-   * object to provide the url of the remote service {@value}
-   */
-  private static final String KEY_CRED_SERVICE_URL =
-      "fs.azure.cred.service.url";
 
   /**
    * Container SAS Key generation OP name. {@value}
@@ -81,7 +88,7 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
    * Query parameter name for user info {@value}
    */
   private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME =
-      "delegation_token";
+      "delegation";
 
   /**
    * Query parameter name for the relative path inside the storage
@@ -93,24 +100,40 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
   private String delegationToken = "";
   private String credServiceUrl = "";
   private WasbRemoteCallHelper remoteCallHelper = null;
+  private boolean isSecurityEnabled;
+  private boolean isKerberosSupportEnabled;
 
   public RemoteSASKeyGeneratorImpl(Configuration conf) {
     super(conf);
   }
 
-  public boolean initialize(Configuration conf, String delegationToken) {
+  public boolean initialize(Configuration conf) {
 
     LOG.debug("Initializing RemoteSASKeyGeneratorImpl instance");
-    credServiceUrl = conf.get(KEY_CRED_SERVICE_URL);
+    Iterator<Token<? extends TokenIdentifier>> tokenIterator = null;
+    try {
+      tokenIterator = UserGroupInformation.getCurrentUser().getCredentials()
+          .getAllTokens().iterator();
+      while (tokenIterator.hasNext()) {
+        Token<? extends TokenIdentifier> iteratedToken = tokenIterator.next();
+        if (iteratedToken.getKind().equals(WasbDelegationTokenIdentifier.TOKEN_KIND)) {
+          delegationToken = iteratedToken.encodeToUrlString();
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Error in fetching the WASB delegation token");
+    }
 
-    if (delegationToken == null || delegationToken.isEmpty()) {
-      LOG.error("Delegation Token not provided for initialization"
-          + " of RemoteSASKeyGenerator");
+    try {
+      credServiceUrl = conf.get(Constants.KEY_CRED_SERVICE_URL, String
+          .format("http://%s:%s",
+              InetAddress.getLocalHost().getCanonicalHostName(),
+              Constants.DEFAULT_CRED_SERVICE_PORT));
+    } catch (UnknownHostException e) {
+      LOG.error("Invalid CredService Url, configure it correctly.");
       return false;
     }
 
-    this.delegationToken = delegationToken;
-
     if (credServiceUrl == null || credServiceUrl.isEmpty()) {
       LOG.error("CredService Url not found in configuration to initialize"
           + " RemoteSASKeyGenerator");
@@ -118,16 +141,17 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
     }
 
     remoteCallHelper = new WasbRemoteCallHelper();
-    LOG.debug("Initialization of RemoteSASKeyGenerator instance successfull");
+    this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
+    this.isKerberosSupportEnabled = conf.getBoolean(
+        Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
+    LOG.debug("Initialization of RemoteSASKeyGenerator instance successful");
     return true;
   }
 
   @Override
   public URI getContainerSASUri(String storageAccount, String container)
       throws SASKeyGenerationException {
-
     try {
-
       LOG.debug("Generating Container SAS Key for Container {} "
           + "inside Storage Account {} ", container, storageAccount);
       URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
@@ -138,84 +162,131 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
           container);
       uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
           + getSasKeyExpiryPeriod());
-      uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
-          this.delegationToken);
-
-      RemoteSASKeyGenerationResponse sasKeyResponse =
-          makeRemoteRequest(uriBuilder.build());
-
-      if (sasKeyResponse == null) {
-        throw new SASKeyGenerationException("RemoteSASKeyGenerationResponse"
-            + " object null from remote call");
-      } else if (sasKeyResponse.getResponseCode()
-          == REMOTE_CALL_SUCCESS_CODE) {
-        return new URI(sasKeyResponse.getSasKey());
+      if (isSecurityEnabled && (delegationToken != null && !delegationToken
+          .isEmpty())) {
+        uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
+            this.delegationToken);
+      }
+
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      UserGroupInformation connectUgi = ugi.getRealUser();
+      if (connectUgi == null) {
+        connectUgi = ugi;
       } else {
-        throw new SASKeyGenerationException("Remote Service encountered error"
-            + " in SAS Key generation : "
-            + sasKeyResponse.getResponseMessage());
+        uriBuilder.addParameter(Constants.DOAS_PARAM, ugi.getShortUserName());
+      }
+
+      if(isSecurityEnabled && !connectUgi.hasKerberosCredentials()){
+        connectUgi = UserGroupInformation.getLoginUser();
       }
+      return getSASKey(uriBuilder.build(), connectUgi);
     } catch (URISyntaxException uriSyntaxEx) {
       throw new SASKeyGenerationException("Encountered URISyntaxException "
           + "while building the HttpGetRequest to remote cred service",
           uriSyntaxEx);
+    } catch (IOException e) {
+      throw new SASKeyGenerationException("Encountered IOException"
+          + " while building the HttpGetRequest to remote service", e);
     }
   }
 
   @Override
   public URI getRelativeBlobSASUri(String storageAccount, String container,
       String relativePath) throws SASKeyGenerationException {
-
     try {
-
       LOG.debug("Generating RelativePath SAS Key for relativePath {} inside"
           + " Container {} inside Storage Account {} ",
           relativePath, container, storageAccount);
       URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
       uriBuilder.setPath("/" + BLOB_SAS_OP);
-      uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME,
-          storageAccount);
-      uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME,
-          container);
+      uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME, storageAccount);
+      uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME, container);
       uriBuilder.addParameter(RELATIVE_PATH_QUERY_PARAM_NAME,
           relativePath);
       uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
           + getSasKeyExpiryPeriod());
-      uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
-          this.delegationToken);
-
-      RemoteSASKeyGenerationResponse sasKeyResponse =
-          makeRemoteRequest(uriBuilder.build());
-
-      if (sasKeyResponse == null) {
-        throw new SASKeyGenerationException("RemoteSASKeyGenerationResponse"
-            + " object null from remote call");
-      } else if (sasKeyResponse.getResponseCode()
-          == REMOTE_CALL_SUCCESS_CODE) {
-        return new URI(sasKeyResponse.getSasKey());
-      } else {
-        throw new SASKeyGenerationException("Remote Service encountered error"
-            + " in SAS Key generation : "
-            + sasKeyResponse.getResponseMessage());
+
+      if (isSecurityEnabled && (delegationToken != null && !delegationToken
+          .isEmpty())) {
+        uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
+            this.delegationToken);
+      }
+
+        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+        UserGroupInformation connectUgi = ugi.getRealUser();
+        if (connectUgi == null) {
+          connectUgi = ugi;
+        } else{
+          uriBuilder.addParameter(Constants.DOAS_PARAM, ugi.getShortUserName());
+        }
+
+      if(isSecurityEnabled && !connectUgi.hasKerberosCredentials()){
+        connectUgi = UserGroupInformation.getLoginUser();
       }
+      return getSASKey(uriBuilder.build(), connectUgi);
     } catch (URISyntaxException uriSyntaxEx) {
       throw new SASKeyGenerationException("Encountered URISyntaxException"
           + " while building the HttpGetRequest to " + " remote service",
           uriSyntaxEx);
+    } catch (IOException e) {
+      throw new SASKeyGenerationException("Encountered IOException"
+      + " while building the HttpGetRequest to remote service", e);
+    }
+  }
+
+  private URI getSASKey(final URI uri, UserGroupInformation connectUgi)
+      throws URISyntaxException, SASKeyGenerationException {
+    RemoteSASKeyGenerationResponse sasKeyResponse = null;
+    try {
+      connectUgi.checkTGTAndReloginFromKeytab();
+      sasKeyResponse = connectUgi.doAs(new PrivilegedExceptionAction<RemoteSASKeyGenerationResponse>() {
+            @Override
+            public RemoteSASKeyGenerationResponse run() throws Exception {
+              AuthenticatedURL.Token token = null;
+              if (isKerberosSupportEnabled && UserGroupInformation.isSecurityEnabled() && (
+                  delegationToken == null || delegationToken.isEmpty())) {
+                token = new AuthenticatedURL.Token();
+                final Authenticator kerberosAuthenticator = new KerberosDelegationTokenAuthenticator();
+                kerberosAuthenticator.authenticate(uri.toURL(), token);
+                Validate.isTrue(token.isSet(),
+                    "Authenticated Token is NOT present. The request cannot proceed.");
+              }
+              return makeRemoteRequest(uri, (token != null ? token.toString() : null));
+            }
+          });
+    } catch (InterruptedException e) {
+      LOG.error("Error fetching the SAS Key from Remote Service", e);
+    } catch (IOException e) {
+      LOG.error("Error fetching the SAS Key from Remote Service", e);
+    }
+
+    if (sasKeyResponse == null) {
+      throw new SASKeyGenerationException(
+          "RemoteSASKeyGenerationResponse" + " object null from remote call");
+    } else if (sasKeyResponse.getResponseCode() == REMOTE_CALL_SUCCESS_CODE) {
+      return new URI(sasKeyResponse.getSasKey());
+    } else {
+      throw new SASKeyGenerationException("Remote Service encountered error"
+          + " in SAS Key generation : " + sasKeyResponse.getResponseMessage());
     }
   }
 
   /**
    * Helper method to make a remote request.
    * @param uri - Uri to use for the remote request
+   * @param token - hadoop.auth token for the remote request
    * @return RemoteSASKeyGenerationResponse
    */
-  private RemoteSASKeyGenerationResponse makeRemoteRequest(URI uri)
+  private RemoteSASKeyGenerationResponse makeRemoteRequest(URI uri, String token)
       throws SASKeyGenerationException {
 
     try {
+      HttpGet httpGet = new HttpGet(uri);
+      if(token != null){
+        httpGet.setHeader("Cookie", AuthenticatedURL.AUTH_COOKIE + "=" + token);
+      }
       String responseBody =
-          remoteCallHelper.makeRemoteGetRequest(new HttpGet(uri));
+          remoteCallHelper.makeRemoteGetRequest(httpGet);
 
       ObjectMapper objectMapper = new ObjectMapper();
       return objectMapper.readValue(responseBody,

+ 247 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteWasbAuthorizerImpl.java

@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Iterator;
+
+import org.apache.commons.lang.Validate;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.fs.azure.security.Constants;
+import org.apache.hadoop.fs.azure.security.WasbDelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.Authenticator;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.azure.WasbRemoteCallHelper.REMOTE_CALL_SUCCESS_CODE;
+
+/**
+ * Class implementing WasbAuthorizerInterface using a remote
+ * service that implements the authorization operation. This
+ * class expects the url of the remote service to be passed
+ * via config.
+ */
+public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(RemoteWasbAuthorizerImpl.class);
+  private String remoteAuthorizerServiceUrl = "";
+
+  /**
+   * Configuration parameter name expected in the Configuration object to
+   * provide the url of the remote service. {@value}
+   */
+  public static final String KEY_REMOTE_AUTH_SERVICE_URL =
+      "fs.azure.authorization.remote.service.url";
+
+  /**
+   * Authorization operation OP name in the remote service {@value}
+   */
+  private static final String CHECK_AUTHORIZATION_OP =
+      "CHECK_AUTHORIZATION";
+
+  /**
+   * Query parameter specifying the access operation type. {@value}
+   */
+  private static final String ACCESS_OPERATION_QUERY_PARAM_NAME =
+      "operation_type";
+
+  /**
+   * Query parameter specifying the wasb absolute path. {@value}
+   */
+  private static final String WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME =
+      "wasb_absolute_path";
+
+  /**
+   * Query parameter name for user info {@value}
+   */
+  private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME =
+      "delegation";
+
+  private WasbRemoteCallHelper remoteCallHelper = null;
+  private String delegationToken;
+  private boolean isSecurityEnabled;
+  private boolean isKerberosSupportEnabled;
+
+  @Override
+  public void init(Configuration conf)
+      throws WasbAuthorizationException, IOException {
+    LOG.debug("Initializing RemoteWasbAuthorizerImpl instance");
+    delegationToken = UserGroupInformation.getCurrentUser().getCredentials().getToken(WasbDelegationTokenIdentifier.TOKEN_KIND).encodeToUrlString();
+
+    remoteAuthorizerServiceUrl = conf.get(KEY_REMOTE_AUTH_SERVICE_URL, String
+        .format("http://%s:%s",
+            InetAddress.getLocalHost().getCanonicalHostName(),
+            Constants.DEFAULT_CRED_SERVICE_PORT));
+
+    if (remoteAuthorizerServiceUrl == null
+        || remoteAuthorizerServiceUrl.isEmpty()) {
+      throw new WasbAuthorizationException(
+          "fs.azure.authorization.remote.service.url config not set"
+              + " in configuration.");
+    }
+
+    this.remoteCallHelper = new WasbRemoteCallHelper();
+    this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
+    this.isKerberosSupportEnabled = conf.getBoolean(
+        Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
+  }
+
+  @Override
+  public boolean authorize(String wasbAbsolutePath, String accessType)
+      throws WasbAuthorizationException, IOException {
+    try {
+      final URIBuilder uriBuilder = new URIBuilder(remoteAuthorizerServiceUrl);
+      uriBuilder.setPath("/" + CHECK_AUTHORIZATION_OP);
+      uriBuilder.addParameter(WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME,
+          wasbAbsolutePath);
+      uriBuilder.addParameter(ACCESS_OPERATION_QUERY_PARAM_NAME,
+          accessType);
+      if (isSecurityEnabled && (delegationToken != null && !delegationToken
+          .isEmpty())) {
+        uriBuilder
+            .addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME, delegationToken);
+      }
+      String responseBody = null;
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      UserGroupInformation connectUgi = ugi.getRealUser();
+      if (connectUgi == null) {
+        connectUgi = ugi;
+      } else{
+        uriBuilder.addParameter(Constants.DOAS_PARAM, ugi.getShortUserName());
+      }
+      if(isSecurityEnabled && !connectUgi.hasKerberosCredentials()){
+        connectUgi = UserGroupInformation.getLoginUser();
+      }
+      connectUgi.checkTGTAndReloginFromKeytab();
+      try {
+        responseBody = connectUgi.doAs(new PrivilegedExceptionAction<String>(){
+          @Override
+          public String run() throws Exception {
+            AuthenticatedURL.Token token = null;
+            HttpGet httpGet = new HttpGet(uriBuilder.build());
+            if (isKerberosSupportEnabled && UserGroupInformation.isSecurityEnabled() && (
+                delegationToken == null || delegationToken.isEmpty())) {
+              token = new AuthenticatedURL.Token();
+              final Authenticator kerberosAuthenticator = new KerberosDelegationTokenAuthenticator();
+              kerberosAuthenticator
+                  .authenticate(uriBuilder.build().toURL(), token);
+              Validate.isTrue(token.isSet(),
+                  "Authenticated Token is NOT present. The request cannot proceed.");
+              if(token != null){
+                httpGet.setHeader("Cookie", AuthenticatedURL.AUTH_COOKIE + "=" + token);
+              }
+            }
+            return remoteCallHelper.makeRemoteGetRequest(httpGet);
+          }});
+      } catch (InterruptedException e) {
+        LOG.error("Error in check authorization", e);
+      }
+
+      ObjectMapper objectMapper = new ObjectMapper();
+      RemoteAuthorizerResponse authorizerResponse =
+          objectMapper.readValue(responseBody, RemoteAuthorizerResponse.class);
+
+      if (authorizerResponse == null) {
+        throw new WasbAuthorizationException(
+            "RemoteAuthorizerResponse object null from remote call");
+      } else if (authorizerResponse.getResponseCode()
+          == REMOTE_CALL_SUCCESS_CODE) {
+        return authorizerResponse.getAuthorizationResult();
+      } else {
+        throw new WasbAuthorizationException("Remote authorization"
+            + " serivce encountered an error "
+            + authorizerResponse.getResponseMessage());
+      }
+    } catch (URISyntaxException | WasbRemoteCallException
+        | JsonParseException | JsonMappingException ex) {
+      throw new WasbAuthorizationException(ex);
+    }
+  }
+}
+
+/**
+ * POJO representing the response expected from a remote
+ * authorization service.
+ * The remote service is expected to return the authorization
+ * response in the following JSON format
+ * {
+ *    "responseCode" : 0 or non-zero <int>,
+ *    "responseMessage" : relavant message of failure <String>
+ *    "authorizationResult" : authorization result <boolean>
+ *                            true - if auhorization allowed
+ *                            false - otherwise.
+ *
+ * }
+ */
+class RemoteAuthorizerResponse {
+
+  private int responseCode;
+  private boolean authorizationResult;
+  private String responseMessage;
+
+  public RemoteAuthorizerResponse(){
+  }
+
+  public RemoteAuthorizerResponse(int responseCode,
+      boolean authorizationResult, String message) {
+    this.responseCode = responseCode;
+    this.authorizationResult = authorizationResult;
+    this.responseMessage = message;
+  }
+
+  public int getResponseCode() {
+    return responseCode;
+  }
+
+  public void setResponseCode(int responseCode) {
+    this.responseCode = responseCode;
+  }
+
+  public boolean getAuthorizationResult() {
+    return authorizationResult;
+  }
+
+  public void setAuthorizationResult(boolean authorizationResult) {
+    this.authorizationResult = authorizationResult;
+  }
+
+  public String getResponseMessage() {
+    return responseMessage;
+  }
+
+  public void setResponseMessage(String message) {
+    this.responseMessage = message;
+  }
+}

+ 2 - 4
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java

@@ -69,19 +69,17 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
   public static final String SAS_ERROR_CODE = "SAS Error";
   private SASKeyGeneratorInterface sasKeyGenerator;
   private String storageAccount;
-  private String delegationToken;
 
   public SecureStorageInterfaceImpl(boolean useLocalSASKeyMode,
-      Configuration conf, String delegationToken)
+      Configuration conf)
           throws SecureModeException {
 
-    this.delegationToken = delegationToken;
     if (useLocalSASKeyMode) {
       this.sasKeyGenerator = new LocalSASKeyGeneratorImpl(conf);
     } else {
       RemoteSASKeyGeneratorImpl remoteSasKeyGenerator =
           new RemoteSASKeyGeneratorImpl(conf);
-      if (!remoteSasKeyGenerator.initialize(conf, this.delegationToken)) {
+      if (!remoteSasKeyGenerator.initialize(conf)) {
         throw new SecureModeException("Remote SAS Key mode could"
             + " not be initialized");
       }

+ 40 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizationException.java

@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+/**
+ *  Exception that gets thrown during the authorization failures
+ *  in WASB.
+ */
+public class WasbAuthorizationException extends AzureException {
+
+  private static final long serialVersionUID = 1L;
+
+  public WasbAuthorizationException(String message) {
+    super(message);
+  }
+
+  public WasbAuthorizationException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public WasbAuthorizationException(Throwable t) {
+    super(t);
+  }
+}

+ 44 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizationOperations.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+/**
+ * Different authorization operations supported
+ * in WASB.
+ */
+
+public enum WasbAuthorizationOperations {
+
+  READ, WRITE, EXECUTE;
+
+  @Override
+  public String toString() {
+    switch(this) {
+      case READ:
+        return "read";
+      case WRITE:
+        return "write";
+      case EXECUTE:
+        return "execute";
+      default:
+        throw new IllegalArgumentException(
+            "Invalid Auhtorization Operation");
+    }
+  }
+}

+ 47 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizerInterface.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *  Interface to implement authorization support in WASB.
+ *  API's of this interface will be implemented in the
+ *  StorageInterface Layer before making calls to Azure
+ *  Storage.
+ */
+public interface WasbAuthorizerInterface {
+  /**
+   * Initializer method
+   * @param conf - Configuration object
+   */
+  public void init(Configuration conf)
+      throws WasbAuthorizationException, IOException;
+
+  /**
+   * Authorizer API to authorize access in WASB.
+   * @param wasbAbsolutePath : Absolute WASB Path used for access.
+   * @param accessType : Type of access
+   * @return : true - If access allowed false - If access is not allowed.
+   */
+  public boolean authorize(String wasbAbsolutePath, String accessType)
+      throws WasbAuthorizationException, IOException;
+}

+ 54 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/Constants.java

@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.security;
+
+/**
+ * Constants for used with WASB security implementation.
+ */
+public final class Constants {
+
+  private Constants() {
+  }
+
+  /**
+   * Configuration parameter name expected in the Configuration
+   * object to provide the url of the remote service {@value}
+   */
+  public static final String KEY_CRED_SERVICE_URL = "fs.azure.cred.service.url";
+  /**
+   * Default port of the remote service used as delegation token manager and Azure storage SAS key generator.
+   */
+  public static final int DEFAULT_CRED_SERVICE_PORT = 50911;
+
+  /**
+   * Default remote delegation token manager endpoint.
+   */
+  public static final String DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT = "/tokenmanager/v1";
+
+  /**
+   * The configuration property to enable Kerberos support.
+   */
+
+  public static final String AZURE_KERBEROS_SUPPORT_PROPERTY_NAME = "fs.azure.enable.kerberos.support";
+
+  /**
+   * Parameter to be used for impersonation.
+   */
+  public static final String DOAS_PARAM="doas";
+}

+ 48 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/WasbDelegationTokenIdentifier.java

@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.security;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
+
+/**
+ * Delegation token Identifier for WASB delegation tokens.
+ */
+public class WasbDelegationTokenIdentifier extends DelegationTokenIdentifier {
+  public static final Text TOKEN_KIND = new Text("WASB delegation");
+
+  public WasbDelegationTokenIdentifier(){
+    super(TOKEN_KIND);
+  }
+
+  public WasbDelegationTokenIdentifier(Text kind) {
+    super(kind);
+  }
+
+  public WasbDelegationTokenIdentifier(Text kind, Text owner, Text renewer,
+      Text realUser) {
+    super(kind, owner, renewer, realUser);
+  }
+
+  @Override
+  public Text getKind() {
+    return TOKEN_KIND;
+  }
+
+}

+ 124 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/WasbTokenRenewer.java

@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.security;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenRenewer;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
+import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * Token Renewer for renewing WASB delegation tokens with remote service.
+ */
+public class WasbTokenRenewer extends TokenRenewer {
+  public static final Logger LOG = LoggerFactory
+      .getLogger(WasbTokenRenewer.class);
+
+  @Override
+  public boolean handleKind(Text kind) {
+    return WasbDelegationTokenIdentifier.TOKEN_KIND.equals(kind);
+  }
+
+  @Override
+  public boolean isManaged(Token<?> token) throws IOException {
+    return true;
+  }
+
+  @Override
+  public long renew(final Token<?> token, Configuration conf)
+      throws IOException, InterruptedException {
+    LOG.debug("Renewing the delegation token");
+    final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    UserGroupInformation connectUgi = ugi.getRealUser();
+    final UserGroupInformation proxyUser = connectUgi;
+    if (connectUgi == null) {
+      connectUgi = ugi;
+    }
+    if(!connectUgi.hasKerberosCredentials()){
+      connectUgi = UserGroupInformation.getLoginUser();
+    }
+    connectUgi.checkTGTAndReloginFromKeytab();
+    final DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token();
+    authToken
+        .setDelegationToken((Token<AbstractDelegationTokenIdentifier>) token);
+    final String credServiceUrl = conf.get(Constants.KEY_CRED_SERVICE_URL,
+        String.format("http://%s:%s",
+            InetAddress.getLocalHost().getCanonicalHostName(),
+            Constants.DEFAULT_CRED_SERVICE_PORT));
+    DelegationTokenAuthenticator authenticator = new KerberosDelegationTokenAuthenticator();
+    final DelegationTokenAuthenticatedURL authURL = new DelegationTokenAuthenticatedURL(
+        authenticator);
+
+    return connectUgi.doAs(new PrivilegedExceptionAction<Long>() {
+      @Override
+      public Long run() throws Exception {
+        return authURL.renewDelegationToken(new URL(credServiceUrl
+                + Constants.DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT),
+            authToken, (proxyUser != null) ? ugi.getShortUserName() : null);
+      }
+    });
+  }
+
+  @Override
+  public void cancel(final Token<?> token, Configuration conf)
+      throws IOException, InterruptedException {
+    LOG.debug("Cancelling the delegation token");
+    final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    UserGroupInformation connectUgi = ugi.getRealUser();
+    final UserGroupInformation proxyUser = connectUgi;
+    if (connectUgi == null) {
+      connectUgi = ugi;
+    }
+    if(!connectUgi.hasKerberosCredentials()){
+      connectUgi = UserGroupInformation.getLoginUser();
+    }
+    connectUgi.checkTGTAndReloginFromKeytab();
+    final DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token();
+    authToken
+        .setDelegationToken((Token<AbstractDelegationTokenIdentifier>) token);
+    final String credServiceUrl = conf.get(Constants.KEY_CRED_SERVICE_URL,
+        String.format("http://%s:%s",
+            InetAddress.getLocalHost().getCanonicalHostName(),
+            Constants.DEFAULT_CRED_SERVICE_PORT));
+    DelegationTokenAuthenticator authenticator = new KerberosDelegationTokenAuthenticator();
+    final DelegationTokenAuthenticatedURL authURL = new DelegationTokenAuthenticatedURL(
+        authenticator);
+    connectUgi.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        authURL.cancelDelegationToken(new URL(credServiceUrl
+                + Constants.DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT),
+            authToken, (proxyUser != null) ? ugi.getShortUserName() : null);
+        return null;
+      }
+    });
+  }
+}

+ 28 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/package.html

@@ -0,0 +1,28 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+
+<p>
+    Infrastructure for WASB client Security to work with Kerberos and Delegation
+    tokens.
+</p>
+
+</body>
+</html>

+ 16 - 0
hadoop-tools/hadoop-azure/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier

@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.fs.azure.security.WasbDelegationTokenIdentifier

+ 16 - 0
hadoop-tools/hadoop-azure/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer

@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.fs.azure.security.WasbTokenRenewer

+ 34 - 0
hadoop-tools/hadoop-azure/src/site/markdown/index.md

@@ -330,6 +330,40 @@ The service is expected to return a response in JSON format:
   "sasKey" : Requested SAS Key <String>
 }
 ```
+
+## <a name="WASB Authorization" />Authorization Support in WASB.
+
+Authorization support can be enabled in WASB using the following configuration:
+
+```
+    <property>
+      <name>fs.azure.authorization</name>
+      <value>true</value>
+    </property>
+```
+  The current implementation of authorization relies on the presence of an external service that can enforce
+  the authorization. The service is expected to be running on a URL provided by the following config.
+
+```
+    <property>
+      <name>fs.azure.authorization.remote.service.url</name>
+      <value>{URL}</value>
+    </property>
+```
+
+  The remote service is expected to provide support for the following REST call: ```{URL}/CHECK_AUTHORIZATION```
+  An example request:
+  ```{URL}/CHECK_AUTHORIZATION?wasb_absolute_path=<absolute_path>&operation_type=<operation type>&delegation_token=<delegation token>```
+
+  The service is expected to return a response in JSON format:
+  ```
+  {
+    "responseCode" : 0 or non-zero <int>,
+    "responseMessage" : relavant message on failure <String>,
+    "authorizationResult" : true/false <boolean>
+  }
+  ```
+
 ## <a name="Testing_the_hadoop-azure_Module" />Testing the hadoop-azure Module
 
 The hadoop-azure module includes a full suite of unit tests.  Most of the tests

+ 102 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java

@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A mock wasb authorizer implementation.
+ */
+
+public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface {
+
+  private Map<AuthorizationComponent, Boolean> authRules;
+
+  @Override
+  public void init(Configuration conf) {
+    authRules = new HashMap<AuthorizationComponent, Boolean>();
+  }
+
+  public void addAuthRule(String wasbAbsolutePath,
+      String accessType, boolean access) {
+    AuthorizationComponent component =
+        new AuthorizationComponent(wasbAbsolutePath, accessType);
+    this.authRules.put(component, access);
+  }
+
+  @Override
+  public boolean authorize(String wasbAbsolutePath, String accessType)
+      throws WasbAuthorizationException {
+
+    AuthorizationComponent component =
+        new AuthorizationComponent(wasbAbsolutePath, accessType);
+
+    if (authRules.containsKey(component)) {
+      return authRules.get(component);
+    } else {
+      return false;
+    }
+  }
+}
+
+class AuthorizationComponent {
+
+  private String wasbAbsolutePath;
+  private String accessType;
+
+  public AuthorizationComponent(String wasbAbsolutePath,
+      String accessType) {
+    this.wasbAbsolutePath = wasbAbsolutePath;
+    this.accessType = accessType;
+  }
+
+  @Override
+  public int hashCode() {
+    return this.wasbAbsolutePath.hashCode() ^ this.accessType.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+
+    if (obj == this) {
+      return true;
+    }
+
+    if (obj == null
+        || !(obj instanceof AuthorizationComponent)) {
+      return false;
+    }
+
+    return ((AuthorizationComponent)obj).
+              getWasbAbsolutePath().equals(this.wasbAbsolutePath)
+            && ((AuthorizationComponent)obj).
+              getAccessType().equals(this.accessType);
+  }
+
+  public String getWasbAbsolutePath() {
+    return this.wasbAbsolutePath;
+  }
+
+  public String getAccessType() {
+    return accessType;
+  }
+}

+ 277 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java

@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import com.sun.tools.javac.util.Assert;
+
+/**
+ * Test class to hold all WASB authorization tests.
+ */
+public class TestNativeAzureFileSystemAuthorization
+  extends AbstractWasbTestBase {
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, "true");
+    conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URL, "test_url");
+    return AzureBlobStorageTestAccount.create(conf);
+  }
+
+  /**
+   * Positive test to verify Create and delete access check
+   * @throws Throwable
+   */
+  @Test
+  public void testCreateAccessCheckPositive() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    String testFile = "test.dat";
+    Path testPath = new Path(fs.getWorkingDirectory(), testFile);
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule(testPath.toString(),
+        WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+    authorizer.addAuthRule(fs.getWorkingDirectory().toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), true);
+
+    fs.create(testPath);
+    Assert.check(fs.exists(testPath));
+    fs.delete(testPath, false);
+  }
+
+  /**
+   * Negative test to verify Create access check
+   * @throws Throwable
+   */
+
+  @Test(expected=WasbAuthorizationException.class)
+  public void testCreateAccessCheckNegative() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    String testFile = "test.dat";
+    Path testPath = new Path(fs.getWorkingDirectory(), testFile);
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule(testPath.toString(),
+        WasbAuthorizationOperations.WRITE.toString(), false);
+    fs.updateWasbAuthorizer(authorizer);
+
+    fs.create(new Path(testFile));
+  }
+
+  /**
+   * Positive test to verify Create and delete access check
+   * @throws Throwable
+   */
+  @Test
+  public void testListAccessCheckPositive() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    String testFolder = "\\";
+    Path testPath = new Path(fs.getWorkingDirectory(), testFolder);
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule(testPath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    fs.listStatus(testPath);
+  }
+
+  /**
+   * Negative test to verify Create access check
+   * @throws Throwable
+   */
+
+  @Test(expected=WasbAuthorizationException.class)
+  public void testListAccessCheckNegative() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    String testFolder = "\\";
+    Path testPath = new Path(fs.getWorkingDirectory(), testFolder);
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule(testPath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), false);
+    fs.updateWasbAuthorizer(authorizer);
+
+    fs.listStatus(testPath);
+  }
+
+  /**
+   * Positive test to verify rename access check.
+   * @throws Throwable
+   */
+  @Test
+  public void testRenameAccessCheckPositive() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    String testFile = "test.dat";
+    Path testPath = new Path(fs.getWorkingDirectory(), testFile);
+    String renameFile = "test2.dat";
+    Path renamePath = new Path(fs.getWorkingDirectory(), renameFile);
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule(testPath.toString(),
+        WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule(renamePath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule(fs.getWorkingDirectory().toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+    fs.create(testPath);
+
+    Assert.check(fs.exists(testPath));
+    fs.rename(testPath, renamePath);
+    Assert.check(fs.exists(renamePath));
+    fs.delete(renamePath, false);
+  }
+
+  /**
+   * Negative test to verify rename access check.
+   * @throws Throwable
+   */
+  @Test(expected=WasbAuthorizationException.class)
+  public void testRenameAccessCheckNegative() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+    String testFile = "test.dat";
+    Path testPath = new Path(fs.getWorkingDirectory(), testFile);
+    Path renamePath = new Path("test2.dat");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule(testPath.toString(),
+        WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), false);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      fs.create(testPath);
+
+      Assert.check(fs.exists(testPath));
+      fs.rename(testPath, renamePath);
+      Assert.check(fs.exists(renamePath));
+      fs.delete(renamePath, false);
+    } catch (WasbAuthorizationException ex) {
+      throw ex;
+    } finally {
+      authorizer = new MockWasbAuthorizerImpl();
+      authorizer.init(null);
+      authorizer.addAuthRule(testPath.toString(),
+          WasbAuthorizationOperations.EXECUTE.toString(), false);
+      fs.updateWasbAuthorizer(authorizer);
+      Assert.check(fs.exists(testPath));
+      fs.delete(testPath, false);
+    }
+  }
+
+  /**
+   * Positive test for read access check.
+   * @throws Throwable
+   */
+  @Test
+  public void testReadAccessCheckPositive() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+    String testFile = "test.dat";
+    Path testPath = new Path(fs.getWorkingDirectory(), testFile);
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule(testPath.toString(),
+        WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(),
+        WasbAuthorizationOperations.READ.toString(), true);
+    authorizer.addAuthRule(fs.getWorkingDirectory().toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+    fs.create(testPath);
+    Assert.check(fs.exists(testPath));
+    FSDataInputStream inputStream = fs.open(testPath);
+    inputStream.close();
+    fs.delete(testPath, false);
+  }
+
+  /**
+   * Negative test to verify read access check.
+   * @throws Throwable
+   */
+  @Test(expected=WasbAuthorizationException.class)
+  public void testReadAccessCheckNegative() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+    String testFile = "test.dat";
+    Path testPath = new Path(fs.getWorkingDirectory(), testFile);
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule(testPath.toString(),
+        WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(),
+        WasbAuthorizationOperations.READ.toString(), false);
+    fs.updateWasbAuthorizer(authorizer);
+
+    fs.create(new Path(testFile));
+    Assert.check(fs.exists(testPath));
+    FSDataInputStream inputStream = null;
+    try {
+      inputStream = fs.open(new Path(testFile));
+    } catch (WasbAuthorizationException ex) {
+      throw ex;
+    } finally {
+      fs.delete(new Path(testFile), false);
+      if (inputStream != null) {
+        inputStream.close();
+      }
+    }
+  }
+}