瀏覽代碼

HADOOP-14581. Restrict setOwner to list of user when security is enabled in wasb.
Contributed by Varada Hemeswari

(cherry picked from commit 1e69e5260351effc8077d1bdc397cec57cf1ff1b)

Steve Loughran 7 年之前
父節點
當前提交
7d272ea124

+ 37 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java

@@ -38,6 +38,8 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.regex.Pattern;
+import java.util.Arrays;
+import java.util.List;
 
 
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonParser;
@@ -687,6 +689,14 @@ public class NativeAzureFileSystem extends FileSystem {
    */
    */
   static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup";
   static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup";
 
 
+  /**
+   * Configuration property used to specify list of users that can perform
+   * chown operation when authorization is enabled in WASB.
+   */
+  public static final String AZURE_CHOWN_USERLIST_PROPERTY_NAME = "fs.azure.chown.allowed.userlist";
+
+  static final String AZURE_CHOWN_USERLIST_PROPERTY_DEFAULT_VALUE = "*";
+
   static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME =
   static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME =
       "fs.azure.block.location.impersonatedhost";
       "fs.azure.block.location.impersonatedhost";
   private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT =
   private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT =
@@ -2930,6 +2940,33 @@ public class NativeAzureFileSystem extends FileSystem {
       throw new FileNotFoundException("File doesn't exist: " + p);
       throw new FileNotFoundException("File doesn't exist: " + p);
     }
     }
 
 
+    /* If authorization is enabled, check if the user has privileges
+    *  to change the ownership of file/folder
+    */
+    if (this.azureAuthorization && username != null) {
+      String[] listOfUsers = getConf().getTrimmedStrings(AZURE_CHOWN_USERLIST_PROPERTY_NAME,
+        AZURE_CHOWN_USERLIST_PROPERTY_DEFAULT_VALUE);
+      boolean shouldSkipUserCheck = listOfUsers.length == 1 && listOfUsers[0].equals("*");
+      // skip the check if the chown allowed users config value is set as '*'
+      if (!shouldSkipUserCheck) {
+        UserGroupInformation currentUgi = UserGroupInformation.getCurrentUser();
+        UserGroupInformation actualUgi = currentUgi.getRealUser();
+
+        if (actualUgi == null) {
+          actualUgi = currentUgi;
+        }
+
+        List<String> userList = Arrays.asList(listOfUsers);
+        if (userList.contains("*")) {
+          throw new IllegalArgumentException("User list must contain "
+          + "either '*' or list of user names, but not both.");
+        } else if (!userList.contains(actualUgi.getShortUserName())) {
+          throw new WasbAuthorizationException(String.format("user '%s' does not have the privilege to change the ownership of files/folders.",
+            actualUgi.getShortUserName()));
+        }
+      }
+    }
+
     PermissionStatus newPermissionStatus = new PermissionStatus(
     PermissionStatus newPermissionStatus = new PermissionStatus(
         username == null ?
         username == null ?
             metadata.getPermissionStatus().getUserName() : username,
             metadata.getPermissionStatus().getUserName() : username,

+ 12 - 0
hadoop-tools/hadoop-azure/src/site/markdown/index.md

@@ -412,6 +412,18 @@ The service is expected to return a response in JSON format for GETDELEGATIONTOK
     }
     }
 }
 }
 ```
 ```
+### chown behaviour when authorization is enabled in WASB
+
+When authorization is enabled, only the users listed in the following configuration
+are allowed to change the owning user of files/folders in WASB. The configuration
+value takes a comma seperated list of user names who are allowed to perform chown.
+
+```xml
+<property>
+  <name>fs.azure.chown.allowed.userlist</name>
+  <value>user1,user2</value>
+</property>
+```
 
 
 ## Testing the hadoop-azure Module
 ## Testing the hadoop-azure Module
 
 

+ 172 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java

@@ -18,11 +18,16 @@
 
 
 package org.apache.hadoop.fs.azure;
 package org.apache.hadoop.fs.azure;
 
 
+import java.security.PrivilegedExceptionAction;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
 import org.junit.Assume;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Rule;
@@ -32,6 +37,7 @@ import org.junit.rules.ExpectedException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 
 
 import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_SECURE_MODE;
 import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_SECURE_MODE;
+import static org.junit.Assert.assertEquals;
 
 
 /**
 /**
  * Test class to hold all WASB authorization tests.
  * Test class to hold all WASB authorization tests.
@@ -47,6 +53,7 @@ public class TestNativeAzureFileSystemAuthorization
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
     conf.set(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, "true");
     conf.set(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, "true");
     conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URLS, "http://localhost/");
     conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URLS, "http://localhost/");
+    conf.set(NativeAzureFileSystem.AZURE_CHOWN_USERLIST_PROPERTY_NAME, "user1 , user2");
     return AzureBlobStorageTestAccount.create(conf);
     return AzureBlobStorageTestAccount.create(conf);
   }
   }
 
 
@@ -673,4 +680,168 @@ public class TestNativeAzureFileSystemAuthorization
     Path testPathWithTripleSlash = new Path("wasb:///" + testPath);
     Path testPathWithTripleSlash = new Path("wasb:///" + testPath);
     fs.listStatus(testPathWithTripleSlash);
     fs.listStatus(testPathWithTripleSlash);
   }
   }
-}
+
+  /**
+   * Negative test for setOwner when Authorization is enabled
+   */
+  @Test
+  public void testSetOwnerThrowsForUnauthorisedUsers() throws Throwable {
+    expectedEx.expect(WasbAuthorizationException.class);
+
+    Path testPath = new Path("/testSetOwnerNegative");
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl(fs);
+    authorizer.init(null);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    String owner = null;
+    UserGroupInformation unauthorisedUser = UserGroupInformation.createUserForTesting(
+          "unauthoriseduser", new String[] {"group1"});
+    try {
+      fs.mkdirs(testPath);
+      ContractTestUtils.assertPathExists(fs, "test path does not exist", testPath);
+      owner = fs.getFileStatus(testPath).getOwner();
+
+      unauthorisedUser.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+          fs.setOwner(testPath, "newowner", null);
+          return null;
+        }
+      });
+    } finally {
+      // check that the owner is not modified
+      assertEquals(owner, fs.getFileStatus(testPath).getOwner());
+      fs.delete(testPath, false);
+    }
+  }
+
+  /**
+   * Test for setOwner when Authorization is enabled and
+   * the user is specified in chown allowed user list
+   * */
+  @Test
+  public void testSetOwnerSucceedsForAuthorisedUsers() throws Throwable {
+    Path testPath = new Path("/testsetownerpositive");
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl(fs);
+    authorizer.init(null);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    String newOwner = "newowner";
+    String newGroup = "newgroup";
+
+    UserGroupInformation authorisedUser = UserGroupInformation.createUserForTesting(
+          "user2", new String[]{"group1"});
+    try {
+
+      fs.mkdirs(testPath);
+      ContractTestUtils.assertPathExists(fs, "test path does not exist", testPath);
+
+      String owner = fs.getFileStatus(testPath).getOwner();
+      Assume.assumeTrue("changing owner requires original and new owner to be different",
+        !StringUtils.equalsIgnoreCase(owner, newOwner));
+
+      authorisedUser.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+          fs.setOwner(testPath, newOwner, newGroup);
+          assertEquals(newOwner, fs.getFileStatus(testPath).getOwner());
+          assertEquals(newGroup, fs.getFileStatus(testPath).getGroup());
+          return null;
+        }
+      });
+
+    } finally {
+      fs.delete(testPath, false);
+    }
+  }
+
+  /**
+   * Test for setOwner when Authorization is enabled and
+   * the userlist is specified as '*'
+   * */
+  @Test
+  public void testSetOwnerSucceedsForAnyUserWhenWildCardIsSpecified() throws Throwable {
+    Configuration conf = fs.getConf();
+    conf.set(NativeAzureFileSystem.AZURE_CHOWN_USERLIST_PROPERTY_NAME, "*");
+    Path testPath = new Path("/testsetownerpositivewildcard");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl(fs);
+    authorizer.init(null);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    String newOwner = "newowner";
+    String newGroup = "newgroup";
+
+    UserGroupInformation user = UserGroupInformation.createUserForTesting(
+          "anyuser", new String[]{"group1"});
+    try {
+
+      fs.mkdirs(testPath);
+      ContractTestUtils.assertPathExists(fs, "test path does not exist", testPath);
+
+      String owner = fs.getFileStatus(testPath).getOwner();
+      Assume.assumeTrue("changing owner requires original and new owner to be different",
+        !StringUtils.equalsIgnoreCase(owner, newOwner));
+
+      user.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+          fs.setOwner(testPath, newOwner, newGroup);
+          assertEquals(newOwner, fs.getFileStatus(testPath).getOwner());
+          assertEquals(newGroup, fs.getFileStatus(testPath).getGroup());
+          return null;
+        }
+      });
+
+    } finally {
+      fs.delete(testPath, false);
+    }
+  }
+
+  /** Test for setOwner  throws for illegal setup of chown
+   * allowed testSetOwnerSucceedsForAuthorisedUsers
+   */
+  @Test
+  public void testSetOwnerFailsForIllegalSetup() throws Throwable {
+    expectedEx.expect(IllegalArgumentException.class);
+
+    Configuration conf = fs.getConf();
+    conf.set(NativeAzureFileSystem.AZURE_CHOWN_USERLIST_PROPERTY_NAME, "user1, *");
+    Path testPath = new Path("/testSetOwnerFailsForIllegalSetup");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl(fs);
+    authorizer.init(null);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    String owner = null;
+    UserGroupInformation user = UserGroupInformation.createUserForTesting(
+          "anyuser", new String[]{"group1"});
+    try {
+
+      fs.mkdirs(testPath);
+      ContractTestUtils.assertPathExists(fs, "test path does not exist", testPath);
+
+      owner = fs.getFileStatus(testPath).getOwner();
+
+      user.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+          fs.setOwner(testPath, "newowner", null);
+          return null;
+        }
+      });
+    } finally {
+      // check that the owner is not modified
+      assertEquals(owner, fs.getFileStatus(testPath).getOwner());
+      fs.delete(testPath, false);
+    }
+  }
+}