瀏覽代碼

HDFS-16259. Catch and re-throw sub-classes of AccessControlException thrown by any permission provider plugins (eg Ranger) (#3598)

Stephen O'Donnell 3 年之前
父節點
當前提交
2f35cc36cd

+ 35 - 25
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -273,31 +273,41 @@ public class FSPermissionChecker implements AccessControlEnforcer {
     AccessControlEnforcer enforcer = getAccessControlEnforcer();
 
     String opType = operationType.get();
-    if (this.authorizeWithContext && opType != null) {
-      INodeAttributeProvider.AuthorizationContext.Builder builder =
-          new INodeAttributeProvider.AuthorizationContext.Builder();
-      builder.fsOwner(fsOwner).
-          supergroup(supergroup).
-          callerUgi(callerUgi).
-          inodeAttrs(inodeAttrs).
-          inodes(inodes).
-          pathByNameArr(components).
-          snapshotId(snapshotId).
-          path(path).
-          ancestorIndex(ancestorIndex).
-          doCheckOwner(doCheckOwner).
-          ancestorAccess(ancestorAccess).
-          parentAccess(parentAccess).
-          access(access).
-          subAccess(subAccess).
-          ignoreEmptyDir(ignoreEmptyDir).
-          operationName(opType).
-          callerContext(CallerContext.getCurrent());
-      enforcer.checkPermissionWithContext(builder.build());
-    } else {
-      enforcer.checkPermission(fsOwner, supergroup, callerUgi, inodeAttrs,
-          inodes, components, snapshotId, path, ancestorIndex, doCheckOwner,
-          ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir);
+    try {
+      if (this.authorizeWithContext && opType != null) {
+        INodeAttributeProvider.AuthorizationContext.Builder builder =
+            new INodeAttributeProvider.AuthorizationContext.Builder();
+        builder.fsOwner(fsOwner).
+            supergroup(supergroup).
+            callerUgi(callerUgi).
+            inodeAttrs(inodeAttrs).
+            inodes(inodes).
+            pathByNameArr(components).
+            snapshotId(snapshotId).
+            path(path).
+            ancestorIndex(ancestorIndex).
+            doCheckOwner(doCheckOwner).
+            ancestorAccess(ancestorAccess).
+            parentAccess(parentAccess).
+            access(access).
+            subAccess(subAccess).
+            ignoreEmptyDir(ignoreEmptyDir).
+            operationName(opType).
+            callerContext(CallerContext.getCurrent());
+        enforcer.checkPermissionWithContext(builder.build());
+      } else {
+        enforcer.checkPermission(fsOwner, supergroup, callerUgi, inodeAttrs,
+            inodes, components, snapshotId, path, ancestorIndex, doCheckOwner,
+            ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir);
+      }
+    } catch (AccessControlException ace) {
+      Class<?> exceptionClass = ace.getClass();
+      if (exceptionClass.equals(AccessControlException.class)
+          || exceptionClass.equals(TraverseAccessControlException.class)) {
+        throw ace;
+      }
+      // Only form a new ACE for subclasses which come from external enforcers
+      throw new AccessControlException(ace);
     }
 
   }

+ 56 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeAttributeProvider.java

@@ -48,6 +48,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.Assert.fail;
+
 public class TestINodeAttributeProvider {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestINodeAttributeProvider.class);
@@ -57,6 +59,15 @@ public class TestINodeAttributeProvider {
   private static final short HDFS_PERMISSION = 0777;
   private static final short PROVIDER_PERMISSION = 0770;
   private static boolean runPermissionCheck = false;
+  private static boolean shouldThrowAccessException = false;
+
+  public static class MyAuthorizationProviderAccessException
+      extends AccessControlException {
+
+    public MyAuthorizationProviderAccessException() {
+      super();
+    }
+  };
 
   public static class MyAuthorizationProvider extends INodeAttributeProvider {
 
@@ -82,6 +93,9 @@ public class TestINodeAttributeProvider {
               ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir);
         }
         CALLED.add("checkPermission|" + ancestorAccess + "|" + parentAccess + "|" + access);
+        if (shouldThrowAccessException) {
+          throw new MyAuthorizationProviderAccessException();
+        }
       }
 
       @Override
@@ -96,6 +110,9 @@ public class TestINodeAttributeProvider {
         CALLED.add("checkPermission|" + authzContext.getAncestorAccess()
             + "|" + authzContext.getParentAccess() + "|" + authzContext
             .getAccess());
+        if (shouldThrowAccessException) {
+          throw new MyAuthorizationProviderAccessException();
+        }
       }
     }
 
@@ -238,6 +255,7 @@ public class TestINodeAttributeProvider {
       miniDFS = null;
     }
     runPermissionCheck = false;
+    shouldThrowAccessException = false;
     Assert.assertTrue(CALLED.contains("stop"));
   }
 
@@ -457,6 +475,44 @@ public class TestINodeAttributeProvider {
     });
   }
 
+  @Test
+  // HDFS-16529 - Ensure enforcer AccessControlException subclass are caught
+  // and re-thrown as plain ACE exceptions.
+  public void testSubClassedAccessControlExceptions() throws Exception {
+    FileSystem fs = FileSystem.get(miniDFS.getConfiguration(0));
+    shouldThrowAccessException = true;
+    final Path userPath = new Path("/user");
+    final Path authz = new Path("/user/authz");
+    final Path authzChild = new Path("/user/authz/child2");
+
+    fs.mkdirs(userPath);
+    fs.setPermission(userPath, new FsPermission(HDFS_PERMISSION));
+    fs.mkdirs(authz);
+    fs.setPermission(authz, new FsPermission(HDFS_PERMISSION));
+    fs.mkdirs(authzChild);
+    fs.setPermission(authzChild, new FsPermission(HDFS_PERMISSION));
+    UserGroupInformation ugi = UserGroupInformation.createUserForTesting("u1",
+        new String[]{"g1"});
+    ugi.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        FileSystem fs = FileSystem.get(miniDFS.getConfiguration(0));
+        try {
+          fs.access(authzChild, FsAction.ALL);
+          fail("Exception should be thrown");
+          // The DFS Client will get a RemoteException containing an
+          // AccessControlException (ACE). If the ACE is a subclass of ACE then
+          // the client does not unwrap it correctly. The change in HDFS-16529
+          // is to ensure ACE is always thrown rather than a sub class to avoid
+          // this issue.
+        } catch (AccessControlException ace) {
+          Assert.assertEquals(AccessControlException.class, ace.getClass());
+        }
+        return null;
+      }
+    });
+  }
+
 
   @Test
   // HDFS-15165 - ContentSummary calls should use the provider permissions(if