소스 검색

HADOOP-14899. Restrict Access to setPermission operation when authorization is enabled in WASB
Contributed by Kannapiran Srinivasan.

Steve Loughran 7 년 전
부모
커밋
34040e8d73

+ 125 - 21
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java

@@ -80,6 +80,8 @@ import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -698,10 +700,29 @@ public class NativeAzureFileSystem extends FileSystem {
    * Configuration property used to specify list of users that can perform
    * chown operation when authorization is enabled in WASB.
    */
-  public static final String AZURE_CHOWN_USERLIST_PROPERTY_NAME = "fs.azure.chown.allowed.userlist";
+  public static final String AZURE_CHOWN_USERLIST_PROPERTY_NAME =
+      "fs.azure.chown.allowed.userlist";
 
   static final String AZURE_CHOWN_USERLIST_PROPERTY_DEFAULT_VALUE = "*";
 
+  /**
+   * Configuration property used to specify list of daemon users that can
+   * perform chmod operation when authorization is enabled in WASB.
+   */
+  public static final String AZURE_DAEMON_USERLIST_PROPERTY_NAME =
+      "fs.azure.daemon.userlist";
+
+  static final String AZURE_DAEMON_USERLIST_PROPERTY_DEFAULT_VALUE = "*";
+
+  /**
+   * Configuration property used to specify list of users that can perform
+   * chmod operation when authorization is enabled in WASB.
+   */
+  public static final String AZURE_CHMOD_USERLIST_PROPERTY_NAME =
+          "fs.azure.chmod.allowed.userlist";
+
+  static final String AZURE_CHMOD_USERLIST_PROPERTY_DEFAULT_VALUE = "*";
+
   static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME =
       "fs.azure.block.location.impersonatedhost";
   private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT =
@@ -1197,7 +1218,9 @@ public class NativeAzureFileSystem extends FileSystem {
   private DelegationTokenAuthenticatedURL authURL;
   private DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token();
   private String credServiceUrl;
-
+  private List<String> chownAllowedUsers;
+  private List<String> chmodAllowedUsers;
+  private List<String> daemonUsers;
   /**
    * Configuration key to enable authorization support in WASB.
    */
@@ -1374,6 +1397,19 @@ public class NativeAzureFileSystem extends FileSystem {
       this.authorizer =
           new RemoteWasbAuthorizerImpl();
       authorizer.init(conf);
+
+      this.chmodAllowedUsers =
+          Arrays.asList(conf.getTrimmedStrings(
+              AZURE_CHMOD_USERLIST_PROPERTY_NAME,
+                  AZURE_CHMOD_USERLIST_PROPERTY_DEFAULT_VALUE));
+      this.chownAllowedUsers =
+          Arrays.asList(conf.getTrimmedStrings(
+              AZURE_CHOWN_USERLIST_PROPERTY_NAME,
+                  AZURE_CHOWN_USERLIST_PROPERTY_DEFAULT_VALUE));
+      this.daemonUsers =
+          Arrays.asList(conf.getTrimmedStrings(
+              AZURE_DAEMON_USERLIST_PROPERTY_NAME,
+                  AZURE_DAEMON_USERLIST_PROPERTY_DEFAULT_VALUE));
     }
 
     if (UserGroupInformation.isSecurityEnabled() && kerberosSupportEnabled) {
@@ -3464,6 +3500,27 @@ public class NativeAzureFileSystem extends FileSystem {
     if (metadata == null) {
       throw new FileNotFoundException("File doesn't exist: " + p);
     }
+
+    // If authorization is enabled, check if the user is
+    // part of chmod allowed list or a daemon user or owner of the file/folder
+    if (azureAuthorization) {
+      UserGroupInformation currentUgi = UserGroupInformation.getCurrentUser();
+
+      // Check if the user is part of chown allowed list or a daemon user.
+      if (!isAllowedUser(currentUgi.getShortUserName(), chmodAllowedUsers)
+          && !isAllowedUser(currentUgi.getShortUserName(), daemonUsers)) {
+
+        //Check if the user is the owner of the file.
+        String owner = metadata.getPermissionStatus().getUserName();
+        if (!currentUgi.getShortUserName().equals(owner)) {
+          throw new WasbAuthorizationException(
+              String.format("user '%s' does not have the privilege to "
+                  + "change the permission of files/folders.",
+                      currentUgi.getShortUserName()));
+        }
+      }
+    }
+
     permission = applyUMask(permission,
         metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory
             : UMaskApplyMode.ChangeExistingFile);
@@ -3509,26 +3566,14 @@ public class NativeAzureFileSystem extends FileSystem {
     *  to change the ownership of file/folder
     */
     if (this.azureAuthorization && username != null) {
-      String[] listOfUsers = getConf().getTrimmedStrings(AZURE_CHOWN_USERLIST_PROPERTY_NAME,
-        AZURE_CHOWN_USERLIST_PROPERTY_DEFAULT_VALUE);
-      boolean shouldSkipUserCheck = listOfUsers.length == 1 && listOfUsers[0].equals("*");
-      // skip the check if the chown allowed users config value is set as '*'
-      if (!shouldSkipUserCheck) {
-        UserGroupInformation currentUgi = UserGroupInformation.getCurrentUser();
-        UserGroupInformation actualUgi = currentUgi.getRealUser();
+      UserGroupInformation currentUgi = UserGroupInformation.getCurrentUser();
 
-        if (actualUgi == null) {
-          actualUgi = currentUgi;
-        }
-
-        List<String> userList = Arrays.asList(listOfUsers);
-        if (userList.contains("*")) {
-          throw new IllegalArgumentException("User list must contain "
-          + "either '*' or list of user names, but not both.");
-        } else if (!userList.contains(actualUgi.getShortUserName())) {
-          throw new WasbAuthorizationException(String.format("user '%s' does not have the privilege to change the ownership of files/folders.",
-            actualUgi.getShortUserName()));
-        }
+      if (!isAllowedUser(currentUgi.getShortUserName(),
+          chownAllowedUsers)) {
+          throw new WasbAuthorizationException(
+            String.format("user '%s' does not have the privilege to change "
+                + "the ownership of files/folders.",
+                    currentUgi.getShortUserName()));
       }
     }
 
@@ -3546,6 +3591,38 @@ public class NativeAzureFileSystem extends FileSystem {
     }
   }
 
+  /**
+   * Is the user allowed?
+   * <ol>
+   *   <li>No user: false</li>
+   *   <li>Empty list: false</li>
+   *   <li>List == ["*"]: true</li>
+   *   <li>Otherwise: is the user in the list?</li>
+   * </ol>
+   * @param username user to check; may be null
+   * @param userList list of users; may be null or empty
+   * @return
+   * @throws IllegalArgumentException if the userList is invalid.
+   */
+  private boolean isAllowedUser(String username, List<String> userList) {
+
+    if (null == userList || userList.isEmpty()) {
+      return false;
+    }
+
+    boolean shouldSkipUserCheck = userList.size() == 1
+        && userList.get(0).equals("*");
+
+    // skip the check if the allowed users config value is set as '*'
+    if (!shouldSkipUserCheck) {
+      Preconditions.checkArgument(!userList.contains("*"),
+        "User list must contain either '*' or a list of user names,"
+            + " but not both.");
+      return userList.contains(username);
+    }
+    return true;
+  }
+
   @Override
   public synchronized void close() throws IOException {
     if (isClosed) {
@@ -3797,4 +3874,31 @@ public class NativeAzureFileSystem extends FileSystem {
       }
     return owner;
   }
+
+  /**
+   * Helper method to update the chownAllowedUsers in tests.
+   * @param chownAllowedUsers list of chown allowed users
+   */
+  @VisibleForTesting
+  void updateChownAllowedUsers(List<String> chownAllowedUsers) {
+    this.chownAllowedUsers = chownAllowedUsers;
+  }
+
+  /**
+   * Helper method to update the chmodAllowedUsers in tests.
+   * @param chmodAllowedUsers list of chmod allowed users
+   */
+  @VisibleForTesting
+  void updateChmodAllowedUsers(List<String> chmodAllowedUsers) {
+    this.chmodAllowedUsers = chmodAllowedUsers;
+  }
+
+  /**
+   * Helper method to update the daemonUsers in tests.
+   * @param daemonUsers list of daemon users
+   */
+  @VisibleForTesting
+  void updateDaemonUsers(List<String> daemonUsers) {
+    this.daemonUsers = daemonUsers;
+  }
 }

+ 17 - 1
hadoop-tools/hadoop-azure/src/site/markdown/index.md

@@ -460,7 +460,7 @@ The service is expected to return a response in JSON format for GETDELEGATIONTOK
 
 When authorization is enabled, only the users listed in the following configuration
 are allowed to change the owning user of files/folders in WASB. The configuration
-value takes a comma seperated list of user names who are allowed to perform chown.
+value takes a comma separated list of user names who are allowed to perform chown.
 
 ```xml
 <property>
@@ -468,6 +468,22 @@ value takes a comma seperated list of user names who are allowed to perform chow
   <value>user1,user2</value>
 </property>
 ```
+### chmod behaviour when authorization is enabled in WASB
+
+When authorization is enabled, only the owner and the users listed in the
+following configurations are allowed to change the permissions of files/folders in WASB.
+The configuration value takes a comma separated list of user names who are allowed to perform chmod.
+
+```xml
+<property>
+  <name>fs.azure.daemon.userlist</name>
+  <value>user1,user2</value>
+</property>
+<property>
+  <name>fs.azure.chmod.allowed.userlist</name>
+  <value>userA,userB</value>
+</property>
+```
 
 Caching of both SAS keys and Authorization responses can be enabled using the following setting:
 The cache settings are applicable only when fs.azure.authorization is enabled.

+ 251 - 25
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java

@@ -21,10 +21,14 @@ package org.apache.hadoop.fs.azure;
 import java.io.FileNotFoundException;
 import java.security.PrivilegedExceptionAction;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -50,6 +54,8 @@ import static org.junit.Assert.*;
 public class TestNativeAzureFileSystemAuthorization
   extends AbstractWasbTestBase {
 
+  private static final short FULL_PERMISSION_WITH_STICKYBIT = 1777;
+
   @VisibleForTesting
   protected MockWasbAuthorizerImpl authorizer;
 
@@ -67,6 +73,8 @@ public class TestNativeAzureFileSystemAuthorization
     conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URLS, "http://localhost/");
     conf.set(NativeAzureFileSystem.AZURE_CHOWN_USERLIST_PROPERTY_NAME, "user1 , user2");
     conf.set(KEY_AUTH_SERVICE_CACHING_ENABLE, "false");
+    conf.set(NativeAzureFileSystem.AZURE_CHMOD_USERLIST_PROPERTY_NAME, "user1 , user2");
+    conf.set(NativeAzureFileSystem.AZURE_DAEMON_USERLIST_PROPERTY_NAME, "hive , hcs , yarn");
     return conf;
   }
 
@@ -928,7 +936,7 @@ public class TestNativeAzureFileSystemAuthorization
 
       // create child with owner as dummyUser
       UserGroupInformation dummyUser = UserGroupInformation.createUserForTesting(
-          "dummyUser", new String[] {"dummygroup"});
+          "user1", new String[] {"dummygroup"});
       dummyUser.doAs(new PrivilegedExceptionAction<Void>() {
         @Override
         public Void run() throws Exception {
@@ -1229,16 +1237,13 @@ public class TestNativeAzureFileSystemAuthorization
    */
   @Test
   public void testSetOwnerThrowsForUnauthorisedUsers() throws Throwable {
-
-    expectedEx.expect(WasbAuthorizationException.class);
-
     final Path testPath = new Path("/testSetOwnerNegative");
 
     authorizer.addAuthRuleForOwner("/", WRITE, true);
-    authorizer.addAuthRuleForOwner(testPath.toString(), READ, true);
+    authorizer.addAuthRuleForOwner("/", READ, true);
     fs.updateWasbAuthorizer(authorizer);
 
-    String owner = null;
+    final String owner;
     UserGroupInformation unauthorisedUser = UserGroupInformation.createUserForTesting(
           "unauthoriseduser", new String[] {"group1"});
     try {
@@ -1249,13 +1254,17 @@ public class TestNativeAzureFileSystemAuthorization
       unauthorisedUser.doAs(new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
+        try {
           fs.setOwner(testPath, "newowner", null);
-          return null;
+          fail("Failing test because setOwner call was expected to throw");
+        } catch (WasbAuthorizationException wex) {
+          // check that the owner is not modified
+          assertOwnerEquals(testPath, owner);
+        }
+        return null;
         }
       });
     } finally {
-      // check that the owner is not modified
-      assertEquals(owner, fs.getFileStatus(testPath).getOwner());
       fs.delete(testPath, false);
     }
   }
@@ -1291,8 +1300,8 @@ public class TestNativeAzureFileSystemAuthorization
       @Override
       public Void run() throws Exception {
           fs.setOwner(testPath, newOwner, newGroup);
-          assertEquals(newOwner, fs.getFileStatus(testPath).getOwner());
-          assertEquals(newGroup, fs.getFileStatus(testPath).getGroup());
+        assertOwnerEquals(testPath, newOwner);
+        assertEquals(newGroup, fs.getFileStatus(testPath).getGroup());
           return null;
         }
       });
@@ -1308,12 +1317,10 @@ public class TestNativeAzureFileSystemAuthorization
    * */
   @Test
   public void testSetOwnerSucceedsForAnyUserWhenWildCardIsSpecified() throws Throwable {
-
-    Configuration conf = fs.getConf();
-    conf.set(NativeAzureFileSystem.AZURE_CHOWN_USERLIST_PROPERTY_NAME, "*");
-    fs.setConf(conf);
+    fs.updateChownAllowedUsers(Collections.singletonList("*"));
     final Path testPath = new Path("/testSetOwnerPositiveWildcard");
 
+    Configuration conf = fs.getConf();
     authorizer.init(conf);
     authorizer.addAuthRuleForOwner("/", WRITE, true);
     authorizer.addAuthRuleForOwner(testPath.getParent().toString(), READ, true);
@@ -1337,7 +1344,7 @@ public class TestNativeAzureFileSystemAuthorization
       @Override
       public Void run() throws Exception {
           fs.setOwner(testPath, newOwner, newGroup);
-          assertEquals(newOwner, fs.getFileStatus(testPath).getOwner());
+          assertOwnerEquals(testPath, newOwner);
           assertEquals(newGroup, fs.getFileStatus(testPath).getGroup());
           return null;
         }
@@ -1353,20 +1360,16 @@ public class TestNativeAzureFileSystemAuthorization
    */
   @Test
   public void testSetOwnerFailsForIllegalSetup() throws Throwable {
+    fs.updateChownAllowedUsers(Arrays.asList("user1", "*"));
 
-    expectedEx.expect(IllegalArgumentException.class);
-
-    Configuration conf = fs.getConf();
-    conf.set(NativeAzureFileSystem.AZURE_CHOWN_USERLIST_PROPERTY_NAME, "user1, *");
-    fs.setConf(conf);
     final Path testPath = new Path("/testSetOwnerFailsForIllegalSetup");
 
+    Configuration conf = fs.getConf();
     authorizer.init(conf);
     authorizer.addAuthRuleForOwner("/", WRITE, true);
     authorizer.addAuthRuleForOwner(testPath.getParent().toString(), READ, true);
     fs.updateWasbAuthorizer(authorizer);
 
-    String owner = null;
     UserGroupInformation user = UserGroupInformation.createUserForTesting(
           "anyuser", new String[]{"group1"});
     try {
@@ -1374,18 +1377,22 @@ public class TestNativeAzureFileSystemAuthorization
       fs.mkdirs(testPath);
       ContractTestUtils.assertPathExists(fs, "test path does not exist", testPath);
 
-      owner = fs.getFileStatus(testPath).getOwner();
+      final String owner = fs.getFileStatus(testPath).getOwner();
 
       user.doAs(new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
+        try {
           fs.setOwner(testPath, "newowner", null);
+          fail("Failing test because setOwner call was expected to throw");
+        } catch (IllegalArgumentException iex) {
+          // check that the owner is not modified
+          assertOwnerEquals(testPath, owner);
+        }
           return null;
         }
       });
     } finally {
-      // check that the owner is not modified
-      assertEquals(owner, fs.getFileStatus(testPath).getOwner());
       fs.delete(testPath, false);
     }
   }
@@ -1436,4 +1443,223 @@ public class TestNativeAzureFileSystemAuthorization
       fs.delete(testPath, true);
     }
   }
+
+  /**
+   * Negative test for setPermission when Authorization is enabled.
+   */
+  @Test
+  public void testSetPermissionThrowsForUnauthorisedUsers() throws Throwable {
+    //setPermission is called by a user who is not a daemon user
+    //and not chmodAllowedUsers and not owner of the file/folder.
+    //This test validates a authorization exception during setPermission call
+    testSetPermission("/testSetPermissionNegative", null, null, "unauthorizeduser",
+            true, false);
+  }
+
+  /**
+   * Positive test for setPermission when Authorization is enabled.
+   */
+  @Test
+  public void testSetPermissionForAuthorisedUsers() throws Throwable {
+    //user1 is already part of chmodAllowedUsers.
+    //This test validates the allowed user can do setPermission
+    testSetPermission("/testSetPermissionPositive", null, null, "user1",
+        false, false);
+  }
+
+  /**
+   * Positive test for setPermission as owner when Authorization is enabled.
+   */
+  @Test
+  public void testSetPermissionForOwner() throws Throwable {
+    //setPermission is called by the owner and expect a success
+    //during setPermission call
+    testSetPermission("/testSetPermissionPositiveOwner",
+            null, null, null, false, false);
+  }
+
+  /**
+   * Test setPermission when wildcard is specified in allowed user list.
+   */
+  @Test
+  public void testSetPermissionWhenWildCardInAllowedUserList() throws Throwable {
+    //Allow all to setPermission and expect a success
+    //during setPermission call
+    List<String> chmodAllowedUsers = Collections.singletonList("*");
+
+    testSetPermission("/testSetPermissionWhenWildCardInAllowedUserList",
+            chmodAllowedUsers, null, "testuser", false, false);
+  }
+
+  /**
+   * Test setPermission when invalid configuration value for allowed user list
+   * i.e. wildcard character and a username.
+   */
+  @Test
+  public void testSetPermissionForInvalidAllowedUserList() throws Throwable {
+    //Setting up an invalid chmodAllowedUsers and expects a failure
+    //during setPermission call
+    List<String> chmodAllowedUsers = Arrays.asList("*", "testuser");
+
+    testSetPermission("/testSetPermissionForInvalidAllowedUserList",
+        chmodAllowedUsers, null, "testuser", true, true);
+  }
+
+  /**
+   * Test setPermission for a daemon user.
+   */
+  @Test
+  public void testSetPermissionForDaemonUser() throws Throwable {
+    //hive user is already setup as daemon user.
+    //This test validates the daemon user can do setPermission
+    testSetPermission("/testSetPermissionForDaemonUser", null,
+       null, "hive", false, false);
+  }
+
+  /**
+   * Test setPermission when invalid configuration value for daemon user list
+   * i.e. wildcard character and a daemon username.
+   */
+  @Test
+  public void testSetPermissionForInvalidDaemonUserList() throws Throwable {
+
+    List<String> daemonUsers = Arrays.asList("*", "hive");
+
+    testSetPermission("/testSetPermissionForInvalidDaemonUserList", null,
+        daemonUsers, "testuser", true, true);
+
+  }
+
+  /**
+   * Helper method to test setPermission scenarios. This method handles both positive
+   * and negative scenarios of setPermission tests
+   */
+  private void testSetPermission(String path,
+      List<String> chmodAllowedUsers,
+      List<String> daemonUsers,
+      String user,
+      boolean isSetPermissionFailureCase,
+      boolean isInvalidSetup) throws Throwable {
+
+    final FsPermission filePermission;
+
+    final Path testPath = new Path(path);
+    final FsPermission newPermission = new FsPermission(FULL_PERMISSION_WITH_STICKYBIT);
+    authorizer.addAuthRule("/", WRITE, getCurrentUserShortName(), true);
+    authorizer.addAuthRule("/", READ, getCurrentUserShortName(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    if (chmodAllowedUsers != null && !chmodAllowedUsers.isEmpty()) {
+      fs.updateChmodAllowedUsers(chmodAllowedUsers);
+    }
+
+    if (daemonUsers != null && !daemonUsers.isEmpty()) {
+      fs.updateDaemonUsers(daemonUsers);
+    }
+
+    UserGroupInformation testUser = (user != null) ? UserGroupInformation.createUserForTesting(
+            user, new String[] {"testgrp"}) : null;
+    try {
+      fs.mkdirs(testPath);
+      ContractTestUtils.assertPathExists(fs, "test path does not exist",
+          testPath);
+      filePermission = fs.getFileStatus(testPath).getPermission();
+
+      if (isSetPermissionFailureCase) {
+        executeSetPermissionFailure(testUser, testPath, filePermission,
+            newPermission, isInvalidSetup);
+      } else {
+        executeSetPermissionSuccess(testUser, testPath, filePermission,
+            newPermission);
+      }
+
+    } finally {
+      fs.delete(testPath, false);
+    }
+  }
+
+  /**
+   * This method expects a failure while invoking setPermission call
+   * and validates whether the failure is as expected
+   *
+   */
+  private void executeSetPermissionFailure(UserGroupInformation testUser,
+      final Path testPath,
+      final FsPermission oldPermission,
+      final FsPermission newPermission,
+      final boolean isInvalidSetup)
+          throws Throwable {
+    testUser.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        try {
+          //READ access required for getFileStatus
+          authorizer.addAuthRule("/", READ, getCurrentUserShortName(), true);
+          fs.setPermission(testPath, newPermission);
+          fail("Failing test because setPermission was expected to throw");
+
+        } catch (IllegalArgumentException iex) {
+          if (!isInvalidSetup) {
+            //fail if IllegalArgumentException is not expected
+            fail("Failing test because IllegalArgumentException"
+                + " is not expected to throw");
+          }
+          // check that the file permission is not modified.
+          assertPermissionEquals(testPath, oldPermission);
+        } catch (WasbAuthorizationException wex) {
+          if (isInvalidSetup) {
+            //fail if WasbAuthorizationException is not expected
+            fail("Failing test because WasbAuthorizationException"
+                + " is not expected to throw");
+          }
+          // check that the file permission is not modified.
+          assertPermissionEquals(testPath, oldPermission);
+        }
+        return null;
+      }
+    });
+  }
+
+  /**
+   * This method expects a success while invoking setPermission call
+   * and validates whether the new permissions are set
+   *
+   */
+  private void executeSetPermissionSuccess(UserGroupInformation testUser,
+      final Path testPath,
+      final FsPermission oldPermission,
+      final FsPermission newPermission)
+          throws Throwable {
+    //If user is given, then use doAs
+    if (testUser != null) {
+      testUser.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          fs.setPermission(testPath, newPermission);
+          return null;
+        }
+      });
+    } else {
+      //If user is not given, then run in current user context
+      fs.setPermission(testPath, newPermission);
+    }
+
+    // check that the file permission is modified
+    assertPermissionEquals(testPath, newPermission);
+    // check old permission is not equals to new permission
+    assertNotEquals(newPermission, oldPermission);
+  }
+
+  private void assertPermissionEquals(Path path, FsPermission newPermission)
+      throws IOException {
+    FileStatus status = fs.getFileStatus(path);
+    assertEquals("Wrong permissions in " + status,
+        newPermission, status.getPermission());
+  }
+
+  private void assertOwnerEquals(Path path, String owner) throws IOException {
+    FileStatus status = fs.getFileStatus(path);
+    assertEquals("Wrong owner in " + status, owner, status.getOwner());
+  }
+
 }