浏览代码

HDFS-13668. FSPermissionChecker may throws AIOOE when check inode permission. Contributed by He Xiaoqiao.

(cherry picked from commit 975d60685eaf9961bdbd3547600b3e38bb088835)
drankye 6 年之前
父节点
当前提交
b34c650a41

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -409,7 +409,7 @@ public class FSPermissionChecker implements AccessControlEnforcer {
     }
     final FsPermission mode = inode.getFsPermission();
     final AclFeature aclFeature = inode.getAclFeature();
-    if (aclFeature != null) {
+    if (aclFeature != null && aclFeature.getEntriesSize() > 0) {
       // It's possible that the inode has a default ACL but no access ACL.
       int firstEntry = aclFeature.getEntryAt(0);
       if (AclEntryStatusFormat.getScope(firstEntry) == AclEntryScope.ACCESS) {

+ 40 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeAttributeProvider.java

@@ -57,6 +57,11 @@ public class TestINodeAttributeProvider {
   public static class MyAuthorizationProvider extends INodeAttributeProvider {
 
     public static class MyAccessControlEnforcer implements AccessControlEnforcer {
+      AccessControlEnforcer ace;
+
+      public MyAccessControlEnforcer(AccessControlEnforcer defaultEnforcer) {
+        this.ace = defaultEnforcer;
+      }
 
       @Override
       public void checkPermission(String fsOwner, String supergroup,
@@ -65,6 +70,13 @@ public class TestINodeAttributeProvider {
           int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
           FsAction parentAccess, FsAction access, FsAction subAccess,
           boolean ignoreEmptyDir) throws AccessControlException {
+        if (ancestorIndex > 1
+            && inodes[1].getLocalName().equals("user")
+            && inodes[2].getLocalName().equals("acl")) {
+          this.ace.checkPermission(fsOwner, supergroup, ugi, inodeAttrs, inodes,
+              pathByNameArr, snapshotId, path, ancestorIndex, doCheckOwner,
+              ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir);
+        }
         CALLED.add("checkPermission|" + ancestorAccess + "|" + parentAccess + "|" + access);
       }
     }
@@ -84,6 +96,7 @@ public class TestINodeAttributeProvider {
         final INodeAttributes inode) {
       CALLED.add("getAttributes");
       final boolean useDefault = useDefault(pathElements);
+      final boolean useNullAcl = useNullAclFeature(pathElements);
       return new INodeAttributes() {
         @Override
         public boolean isDirectory() {
@@ -126,7 +139,10 @@ public class TestINodeAttributeProvider {
         @Override
         public AclFeature getAclFeature() {
           AclFeature f;
-          if (useDefault) {
+          if (useNullAcl) {
+            int[] entries = new int[0];
+            f = new AclFeature(entries);
+          } else if (useDefault) {
             f = inode.getAclFeature();
           } else {
             AclEntry acl = new AclEntry.Builder().setType(AclEntryType.GROUP).
@@ -167,8 +183,8 @@ public class TestINodeAttributeProvider {
 
     @Override
     public AccessControlEnforcer getExternalAccessControlEnforcer(
-        AccessControlEnforcer deafultEnforcer) {
-      return new MyAccessControlEnforcer();
+        AccessControlEnforcer defaultEnforcer) {
+      return new MyAccessControlEnforcer(defaultEnforcer);
     }
 
     private boolean useDefault(String[] pathElements) {
@@ -176,6 +192,11 @@ public class TestINodeAttributeProvider {
           !(pathElements[0].equals("user") && pathElements[1].equals("authz"));
     }
 
+    private boolean useNullAclFeature(String[] pathElements) {
+      return (pathElements.length > 2)
+          && pathElements[1].equals("user")
+          && pathElements[2].equals("acl");
+    }
   }
 
   @Before
@@ -368,4 +389,20 @@ public class TestINodeAttributeProvider {
       });
     }
   }
+
+  @Test
+  public void testAclFeature() throws Exception {
+    UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
+            "testuser", new String[]{"testgroup"});
+    ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+      FileSystem fs = miniDFS.getFileSystem();
+      Path aclDir = new Path("/user/acl");
+      fs.mkdirs(aclDir);
+      Path aclChildDir = new Path(aclDir, "subdir");
+      fs.mkdirs(aclChildDir);
+      AclStatus aclStatus = fs.getAclStatus(aclDir);
+      Assert.assertEquals(0, aclStatus.getEntries().size());
+      return null;
+    });
+  }
 }