浏览代码

HDFS-15850. Superuser actions should be reported to external enforcers (#2784)

Vivek Ratnavel Subramanian 4 年之前
父节点
当前提交
c821008836

+ 7 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java

@@ -205,8 +205,13 @@ public class ContentSummaryComputationContext {
   void checkPermission(INodeDirectory inode, int snapshotId, FsAction access)
   void checkPermission(INodeDirectory inode, int snapshotId, FsAction access)
       throws AccessControlException {
       throws AccessControlException {
     if (dir != null && dir.isPermissionEnabled()
     if (dir != null && dir.isPermissionEnabled()
-        && pc != null && !pc.isSuperUser()) {
-      pc.checkPermission(inode, snapshotId, access);
+        && pc != null) {
+      if (pc.isSuperUser()) {
+        // call external enforcer for audit
+        pc.checkSuperuserPrivilege(inode.getFullPathName());
+      } else {
+        pc.checkPermission(inode, snapshotId, access);
+      }
     }
     }
   }
   }
 }
 }

+ 24 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java

@@ -83,14 +83,25 @@ public class FSDirAttrOp {
     try {
     try {
       iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       fsd.checkOwner(pc, iip);
       fsd.checkOwner(pc, iip);
-      if (!pc.isSuperUser()) {
-        if (username != null && !pc.getUser().equals(username)) {
-          throw new AccessControlException("User " + pc.getUser()
-              + " is not a super user (non-super user cannot change owner).");
-        }
-        if (group != null && !pc.isMemberOfGroup(group)) {
-          throw new AccessControlException(
-              "User " + pc.getUser() + " does not belong to " + group);
+      // At this point, the user must be either owner or super user.
+      // superuser: can change owner to a different user,
+      // change owner group to any group
+      // owner: can't change owner to a different user but can change owner
+      // group to different group that the user belongs to.
+      if ((username != null && !pc.getUser().equals(username)) ||
+          (group != null && !pc.isMemberOfGroup(group))) {
+        try {
+          // check if the user is superuser
+          pc.checkSuperuserPrivilege(iip.getPath());
+        } catch (AccessControlException e) {
+          if (username != null && !pc.getUser().equals(username)) {
+            throw new AccessControlException("User " + pc.getUser()
+                + " is not a super user (non-super user cannot change owner).");
+          }
+          if (group != null && !pc.isMemberOfGroup(group)) {
+            throw new AccessControlException(
+                "User " + pc.getUser() + " does not belong to " + group);
+          }
         }
         }
       }
       }
       changed = unprotectedSetOwner(fsd, iip, username, group);
       changed = unprotectedSetOwner(fsd, iip, username, group);
@@ -238,10 +249,12 @@ public class FSDirAttrOp {
     fsd.writeLock();
     fsd.writeLock();
     try {
     try {
       INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE);
+      // Here, the assumption is that the caller of this method has
+      // already checked for super user privilege
       if (fsd.isPermissionEnabled() && !pc.isSuperUser() && allowOwner) {
       if (fsd.isPermissionEnabled() && !pc.isSuperUser() && allowOwner) {
-        INodeDirectory parentDir= iip.getLastINode().getParent();
-        if (parentDir == null ||
-            !parentDir.getUserName().equals(pc.getUser())) {
+        try {
+          fsd.checkOwner(pc, iip.getParentINodesInPath());
+        } catch(AccessControlException ace) {
           throw new AccessControlException(
           throw new AccessControlException(
               "Access denied for user " + pc.getUser() +
               "Access denied for user " + pc.getUser() +
               ". Superuser or owner of parent folder privilege is required");
               ". Superuser or owner of parent folder privilege is required");

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java

@@ -105,6 +105,7 @@ class FSDirStatAndListingOp {
       // superuser to receive null instead.
       // superuser to receive null instead.
       try {
       try {
         iip = fsd.resolvePath(pc, srcArg, dirOp);
         iip = fsd.resolvePath(pc, srcArg, dirOp);
+        pc.checkSuperuserPrivilege(iip.getPath());
       } catch (AccessControlException ace) {
       } catch (AccessControlException ace) {
         return null;
         return null;
       }
       }
@@ -151,12 +152,14 @@ class FSDirStatAndListingOp {
     BlockManager bm = fsd.getBlockManager();
     BlockManager bm = fsd.getBlockManager();
     fsd.readLock();
     fsd.readLock();
     try {
     try {
-      final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
+      // Just get INodesInPath without access checks, since we check for path
+      // access later
+      final INodesInPath iip = fsd.resolvePath(null, src, DirOp.READ);
       src = iip.getPath();
       src = iip.getPath();
       final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
       final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
       if (fsd.isPermissionEnabled()) {
       if (fsd.isPermissionEnabled()) {
-        fsd.checkPathAccess(pc, iip, FsAction.READ);
         fsd.checkUnreadableBySuperuser(pc, iip);
         fsd.checkUnreadableBySuperuser(pc, iip);
+        fsd.checkPathAccess(pc, iip, FsAction.READ);
       }
       }
 
 
       final long fileSize = iip.isSnapshot()
       final long fileSize = iip.isSnapshot()

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java

@@ -437,7 +437,10 @@ public class FSDirXAttrOp {
       if (inode != null &&
       if (inode != null &&
           inode.isDirectory() &&
           inode.isDirectory() &&
           inode.getFsPermission().getStickyBit()) {
           inode.getFsPermission().getStickyBit()) {
-        if (!pc.isSuperUser()) {
+        if (pc.isSuperUser()) {
+          // call external enforcer for audit
+          pc.checkSuperuserPrivilege(iip.getPath());
+        } else {
           fsd.checkOwner(pc, iip);
           fsd.checkOwner(pc, iip);
         }
         }
       } else {
       } else {

+ 18 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -717,18 +717,18 @@ public class FSDirectory implements Closeable {
 
 
     byte[][] components = INode.getPathComponents(src);
     byte[][] components = INode.getPathComponents(src);
     boolean isRaw = isReservedRawName(components);
     boolean isRaw = isReservedRawName(components);
+    components = resolveComponents(components, this);
+    INodesInPath iip = INodesInPath.resolve(rootDir, components, isRaw);
     if (isPermissionEnabled && pc != null && isRaw) {
     if (isPermissionEnabled && pc != null && isRaw) {
       switch(dirOp) {
       switch(dirOp) {
-        case READ_LINK:
-        case READ:
-          break;
-        default:
-          pc.checkSuperuserPrivilege();
-          break;
+      case READ_LINK:
+      case READ:
+        break;
+      default:
+        pc.checkSuperuserPrivilege(iip.getPath());
+        break;
       }
       }
     }
     }
-    components = resolveComponents(components, this);
-    INodesInPath iip = INodesInPath.resolve(rootDir, components, isRaw);
     // verify all ancestors are dirs and traversable.  note that only
     // verify all ancestors are dirs and traversable.  note that only
     // methods that create new namespace items have the signature to throw
     // methods that create new namespace items have the signature to throw
     // PNDE
     // PNDE
@@ -1942,7 +1942,10 @@ public class FSDirectory implements Closeable {
       boolean doCheckOwner, FsAction ancestorAccess, FsAction parentAccess,
       boolean doCheckOwner, FsAction ancestorAccess, FsAction parentAccess,
       FsAction access, FsAction subAccess, boolean ignoreEmptyDir)
       FsAction access, FsAction subAccess, boolean ignoreEmptyDir)
       throws AccessControlException {
       throws AccessControlException {
-    if (!pc.isSuperUser()) {
+    if (pc.isSuperUser()) {
+      // call the external enforcer for audit
+      pc.checkSuperuserPrivilege(iip.getPath());
+    } else {
       readLock();
       readLock();
       try {
       try {
         pc.checkPermission(iip, doCheckOwner, ancestorAccess,
         pc.checkPermission(iip, doCheckOwner, ancestorAccess,
@@ -1958,9 +1961,12 @@ public class FSDirectory implements Closeable {
     if (pc.isSuperUser()) {
     if (pc.isSuperUser()) {
       if (FSDirXAttrOp.getXAttrByPrefixedName(this, iip,
       if (FSDirXAttrOp.getXAttrByPrefixedName(this, iip,
           SECURITY_XATTR_UNREADABLE_BY_SUPERUSER) != null) {
           SECURITY_XATTR_UNREADABLE_BY_SUPERUSER) != null) {
-        throw new AccessControlException(
-            "Access is denied for " + pc.getUser() + " since the superuser "
-            + "is not allowed to perform this operation.");
+        String errorMessage = "Access is denied for " + pc.getUser()
+            + " since the superuser is not allowed to perform this operation.";
+        pc.denyUserAccess(iip.getPath(), errorMessage);
+      } else {
+        // call the external enforcer for audit.
+        pc.checkSuperuserPrivilege(iip.getPath());
       }
       }
     }
     }
   }
   }

+ 41 - 33
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1954,7 +1954,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException {
       EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException {
     INode.checkAbsolutePath(path);
     INode.checkAbsolutePath(path);
     final String operationName = "listOpenFiles";
     final String operationName = "listOpenFiles";
-    checkSuperuserPrivilege();
+    checkSuperuserPrivilege(operationName, path);
     checkOperation(OperationCategory.READ);
     checkOperation(OperationCategory.READ);
     BatchedListEntries<OpenFileEntry> batchedListEntries;
     BatchedListEntries<OpenFileEntry> batchedListEntries;
     String normalizedPath = new Path(path).toString(); // normalize path.
     String normalizedPath = new Path(path).toString(); // normalize path.
@@ -2289,6 +2289,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "truncate";
     final String operationName = "truncate";
     requireEffectiveLayoutVersionForFeature(Feature.TRUNCATE);
     requireEffectiveLayoutVersionForFeature(Feature.TRUNCATE);
     FSDirTruncateOp.TruncateResult r = null;
     FSDirTruncateOp.TruncateResult r = null;
+    FileStatus status;
     try {
     try {
       NameNode.stateChangeLog.info(
       NameNode.stateChangeLog.info(
           "DIR* NameSystem.truncate: src={} newLength={}", src, newLength);
           "DIR* NameSystem.truncate: src={} newLength={}", src, newLength);
@@ -2307,7 +2308,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         r = FSDirTruncateOp.truncate(this, src, newLength, clientName,
         r = FSDirTruncateOp.truncate(this, src, newLength, clientName,
             clientMachine, mtime, toRemoveBlocks, pc);
             clientMachine, mtime, toRemoveBlocks, pc);
       } finally {
       } finally {
-        FileStatus status = r != null ? r.getFileStatus() : null;
+        status = r != null ? r.getFileStatus() : null;
         writeUnlock(operationName,
         writeUnlock(operationName,
             getLockReportInfoSupplier(src, null, status));
             getLockReportInfoSupplier(src, null, status));
       }
       }
@@ -2316,11 +2317,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         removeBlocks(toRemoveBlocks);
         removeBlocks(toRemoveBlocks);
         toRemoveBlocks.clear();
         toRemoveBlocks.clear();
       }
       }
-      logAuditEvent(true, operationName, src, null, r.getFileStatus());
+      logAuditEvent(true, operationName, src, null, status);
     } catch (AccessControlException e) {
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       logAuditEvent(false, operationName, src);
       throw e;
       throw e;
     }
     }
+    assert(r != null);
     return r.getResult();
     return r.getResult();
   }
   }
 
 
@@ -3582,7 +3584,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FSPermissionChecker.setOperationType(operationName);
     FSPermissionChecker.setOperationType(operationName);
     try {
     try {
       if(!allowOwnerSetQuota) {
       if(!allowOwnerSetQuota) {
-        checkSuperuserPrivilege(pc);
+        checkSuperuserPrivilege(operationName, src);
       }
       }
       writeLock();
       writeLock();
       try {
       try {
@@ -5222,18 +5224,22 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
     return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
   }
   }
 
 
+  /**
+   * This method is retained for backward compatibility.
+   * Please use {@link #checkSuperuserPrivilege(String)} instead.
+   *
+   * @throws AccessControlException if user is not a super user.
+   */
   void checkSuperuserPrivilege() throws AccessControlException {
   void checkSuperuserPrivilege() throws AccessControlException {
     if (isPermissionEnabled) {
     if (isPermissionEnabled) {
       FSPermissionChecker pc = getPermissionChecker();
       FSPermissionChecker pc = getPermissionChecker();
-      pc.checkSuperuserPrivilege();
+      pc.checkSuperuserPrivilege(null);
     }
     }
   }
   }
 
 
-  void checkSuperuserPrivilege(FSPermissionChecker pc)
-      throws AccessControlException {
-    if (isPermissionEnabled) {
-      pc.checkSuperuserPrivilege();
-    }
+  void checkSuperuserPrivilege(String operationName)
+      throws IOException {
+    checkSuperuserPrivilege(operationName, null);
   }
   }
 
 
   /**
   /**
@@ -6011,7 +6017,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
    */
   Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
   Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
   String[] cookieTab) throws IOException {
   String[] cookieTab) throws IOException {
-    checkSuperuserPrivilege();
+    final String operationName = "listCorruptFileBlocks";
+    checkSuperuserPrivilege(operationName, path);
     checkOperation(OperationCategory.READ);
     checkOperation(OperationCategory.READ);
 
 
     int count = 0;
     int count = 0;
@@ -6939,7 +6946,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   void allowSnapshot(String path) throws IOException {
   void allowSnapshot(String path) throws IOException {
     checkOperation(OperationCategory.WRITE);
     checkOperation(OperationCategory.WRITE);
     final String operationName = "allowSnapshot";
     final String operationName = "allowSnapshot";
-    checkSuperuserPrivilege(operationName);
+    checkSuperuserPrivilege(operationName, path);
     writeLock();
     writeLock();
     try {
     try {
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
@@ -6956,7 +6963,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   void disallowSnapshot(String path) throws IOException {
   void disallowSnapshot(String path) throws IOException {
     checkOperation(OperationCategory.WRITE);
     checkOperation(OperationCategory.WRITE);
     final String operationName = "disallowSnapshot";
     final String operationName = "disallowSnapshot";
-    checkSuperuserPrivilege(operationName);
+    checkSuperuserPrivilege(operationName, path);
     writeLock();
     writeLock();
     try {
     try {
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
@@ -7666,13 +7673,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "addCachePool";
     final String operationName = "addCachePool";
     checkOperation(OperationCategory.WRITE);
     checkOperation(OperationCategory.WRITE);
     String poolInfoStr = null;
     String poolInfoStr = null;
+    String poolName = req == null ? null : req.getPoolName();
     try {
     try {
-      checkSuperuserPrivilege();
+      checkSuperuserPrivilege(operationName, poolName);
       writeLock();
       writeLock();
       try {
       try {
         checkOperation(OperationCategory.WRITE);
         checkOperation(OperationCategory.WRITE);
         checkNameNodeSafeMode("Cannot add cache pool"
         checkNameNodeSafeMode("Cannot add cache pool"
-            + (req == null ? null : req.getPoolName()));
+            + poolName);
         CachePoolInfo info = FSNDNCacheOp.addCachePool(this, cacheManager, req,
         CachePoolInfo info = FSNDNCacheOp.addCachePool(this, cacheManager, req,
             logRetryCache);
             logRetryCache);
         poolInfoStr = info.toString();
         poolInfoStr = info.toString();
@@ -7694,7 +7702,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     String poolNameStr = "{poolName: " +
     String poolNameStr = "{poolName: " +
         (req == null ? null : req.getPoolName()) + "}";
         (req == null ? null : req.getPoolName()) + "}";
     try {
     try {
-      checkSuperuserPrivilege();
+      checkSuperuserPrivilege(operationName, poolNameStr);
       writeLock();
       writeLock();
       try {
       try {
         checkOperation(OperationCategory.WRITE);
         checkOperation(OperationCategory.WRITE);
@@ -7721,7 +7729,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkOperation(OperationCategory.WRITE);
     checkOperation(OperationCategory.WRITE);
     String poolNameStr = "{poolName: " + cachePoolName + "}";
     String poolNameStr = "{poolName: " + cachePoolName + "}";
     try {
     try {
-      checkSuperuserPrivilege();
+      checkSuperuserPrivilege(operationName, poolNameStr);
       writeLock();
       writeLock();
       try {
       try {
         checkOperation(OperationCategory.WRITE);
         checkOperation(OperationCategory.WRITE);
@@ -7926,8 +7934,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       Metadata metadata = FSDirEncryptionZoneOp.ensureKeyIsInitialized(dir,
       Metadata metadata = FSDirEncryptionZoneOp.ensureKeyIsInitialized(dir,
           keyName, src);
           keyName, src);
       final FSPermissionChecker pc = getPermissionChecker();
       final FSPermissionChecker pc = getPermissionChecker();
-      FSPermissionChecker.setOperationType(operationName);
-      checkSuperuserPrivilege(pc);
+      checkSuperuserPrivilege(operationName, src);
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       writeLock();
       writeLock();
       try {
       try {
@@ -7988,9 +7995,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "listEncryptionZones";
     final String operationName = "listEncryptionZones";
     boolean success = false;
     boolean success = false;
     checkOperation(OperationCategory.READ);
     checkOperation(OperationCategory.READ);
-    final FSPermissionChecker pc = getPermissionChecker();
-    FSPermissionChecker.setOperationType(operationName);
-    checkSuperuserPrivilege(pc);
+    checkSuperuserPrivilege(operationName, dir.rootDir.getFullPathName());
     readLock();
     readLock();
     try {
     try {
       checkOperation(OperationCategory.READ);
       checkOperation(OperationCategory.READ);
@@ -8006,12 +8011,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
 
   void reencryptEncryptionZone(final String zone, final ReencryptAction action,
   void reencryptEncryptionZone(final String zone, final ReencryptAction action,
       final boolean logRetryCache) throws IOException {
       final boolean logRetryCache) throws IOException {
+    final String operationName = "reencryptEncryptionZone";
     boolean success = false;
     boolean success = false;
     try {
     try {
       Preconditions.checkNotNull(zone, "zone is null.");
       Preconditions.checkNotNull(zone, "zone is null.");
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       final FSPermissionChecker pc = dir.getPermissionChecker();
       final FSPermissionChecker pc = dir.getPermissionChecker();
-      checkSuperuserPrivilege(pc);
+      checkSuperuserPrivilege(operationName, zone);
       checkNameNodeSafeMode("NameNode in safemode, cannot " + action
       checkNameNodeSafeMode("NameNode in safemode, cannot " + action
           + " re-encryption on zone " + zone);
           + " re-encryption on zone " + zone);
       reencryptEncryptionZoneInt(pc, zone, action, logRetryCache);
       reencryptEncryptionZoneInt(pc, zone, action, logRetryCache);
@@ -8026,9 +8032,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "listReencryptionStatus";
     final String operationName = "listReencryptionStatus";
     boolean success = false;
     boolean success = false;
     checkOperation(OperationCategory.READ);
     checkOperation(OperationCategory.READ);
-    final FSPermissionChecker pc = getPermissionChecker();
-    FSPermissionChecker.setOperationType(operationName);
-    checkSuperuserPrivilege(pc);
+    checkSuperuserPrivilege(operationName, dir.rootDir.getFullPathName());
     readLock();
     readLock();
     try {
     try {
       checkOperation(OperationCategory.READ);
       checkOperation(OperationCategory.READ);
@@ -8871,15 +8875,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             Arrays.asList(enabledEcPolicies));
             Arrays.asList(enabledEcPolicies));
   }
   }
 
 
-  // This method logs operatoinName without super user privilege.
+  // This method logs operationName without super user privilege.
   // It should be called without holding FSN lock.
   // It should be called without holding FSN lock.
-  void checkSuperuserPrivilege(String operationName)
+  void checkSuperuserPrivilege(String operationName, String path)
       throws IOException {
       throws IOException {
-    try {
-      checkSuperuserPrivilege();
-    } catch (AccessControlException ace) {
-      logAuditEvent(false, operationName, null);
-      throw ace;
+    if (isPermissionEnabled) {
+      try {
+        FSPermissionChecker.setOperationType(operationName);
+        FSPermissionChecker pc = getPermissionChecker();
+        pc.checkSuperuserPrivilege(path);
+      } catch(AccessControlException ace){
+        logAuditEvent(false, operationName, path);
+        throw ace;
+      }
     }
     }
   }
   }
 
 

+ 68 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.server.namenode.INodeAttributeProvider.AccessControlEnforcer;
 import org.apache.hadoop.hdfs.server.namenode.INodeAttributeProvider.AccessControlEnforcer;
+import org.apache.hadoop.hdfs.server.namenode.INodeAttributeProvider.AuthorizationContext;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -144,18 +145,74 @@ public class FSPermissionChecker implements AccessControlEnforcer {
         ? attributeProvider.getExternalAccessControlEnforcer(this) : this;
         ? attributeProvider.getExternalAccessControlEnforcer(this) : this;
   }
   }
 
 
+  private AuthorizationContext getAuthorizationContextForSuperUser(
+      String path) {
+    String opType = operationType.get();
+
+    AuthorizationContext.Builder builder =
+        new INodeAttributeProvider.AuthorizationContext.Builder();
+    builder.fsOwner(fsOwner).
+        supergroup(supergroup).
+        callerUgi(callerUgi).
+        operationName(opType).
+        callerContext(CallerContext.getCurrent());
+
+    // Add path to the context builder only if it is not null.
+    if (path != null && !path.isEmpty()) {
+      builder.path(path);
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * This method is retained to maintain backward compatibility.
+   * Please use the new method {@link #checkSuperuserPrivilege(String)} to make
+   * sure that the external enforcers have the correct context to audit.
+   *
+   * @throws AccessControlException if the caller is not a super user.
+   */
+  public void checkSuperuserPrivilege() throws AccessControlException {
+    checkSuperuserPrivilege(null);
+  }
+
   /**
   /**
-   * Verify if the caller has the required permission. This will result into 
-   * an exception if the caller is not allowed to access the resource.
+   * Checks if the caller has super user privileges.
+   * Throws {@link AccessControlException} for non super users.
+   *
+   * @param path The resource path for which permission is being requested.
+   * @throws AccessControlException if the caller is not a super user.
+   */
+  public void checkSuperuserPrivilege(String path)
+      throws AccessControlException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SUPERUSER ACCESS CHECK: " + this
+          + ", operationName=" + FSPermissionChecker.operationType.get()
+          + ", path=" + path);
+    }
+    getAccessControlEnforcer().checkSuperUserPermissionWithContext(
+        getAuthorizationContextForSuperUser(path));
+  }
+
+  /**
+   * Calls the external enforcer to notify denial of access to the user with
+   * the given error message. Always throws an ACE with the given message.
+   *
+   * @param path The resource path for which permission is being requested.
+   * @param errorMessage message for the exception.
+   * @throws AccessControlException with the error message.
    */
    */
-  public void checkSuperuserPrivilege()
+  public void denyUserAccess(String path, String errorMessage)
       throws AccessControlException {
       throws AccessControlException {
-    if (!isSuperUser()) {
-      throw new AccessControlException("Access denied for user " 
-          + getUser() + ". Superuser privilege is required");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("DENY USER ACCESS: " + this
+          + ", operationName=" + FSPermissionChecker.operationType.get()
+          + ", path=" + path);
     }
     }
+    getAccessControlEnforcer().denyUserAccess(
+        getAuthorizationContextForSuperUser(path), errorMessage);
   }
   }
-  
+
   /**
   /**
    * Check whether current user have permissions to access the path.
    * Check whether current user have permissions to access the path.
    * Traverse is always checked.
    * Traverse is always checked.
@@ -705,6 +762,10 @@ public class FSPermissionChecker implements AccessControlEnforcer {
           UnresolvedPathException, ParentNotDirectoryException {
           UnresolvedPathException, ParentNotDirectoryException {
     try {
     try {
       if (pc == null || pc.isSuperUser()) {
       if (pc == null || pc.isSuperUser()) {
+        if (pc != null) {
+          // call the external enforcer for audit
+          pc.checkSuperuserPrivilege(iip.getPath());
+        }
         checkSimpleTraverse(iip);
         checkSimpleTraverse(iip);
       } else {
       } else {
         pc.checkPermission(iip, false, null, null, null, null, false);
         pc.checkPermission(iip, false, null, null, null, null, false);

+ 43 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributeProvider.java

@@ -362,7 +362,7 @@ public abstract class INodeAttributeProvider {
      * Checks permission on a file system object. Has to throw an Exception
      * Checks permission on a file system object. Has to throw an Exception
      * if the filesystem object is not accessible by the calling Ugi.
      * if the filesystem object is not accessible by the calling Ugi.
      * @param fsOwner Filesystem owner (The Namenode user)
      * @param fsOwner Filesystem owner (The Namenode user)
-     * @param supergroup super user geoup
+     * @param supergroup super user group
      * @param callerUgi UserGroupInformation of the caller
      * @param callerUgi UserGroupInformation of the caller
      * @param inodeAttrs Array of INode attributes for each path element in the
      * @param inodeAttrs Array of INode attributes for each path element in the
      *                   the path
      *                   the path
@@ -393,7 +393,7 @@ public abstract class INodeAttributeProvider {
 
 
     /**
     /**
      * Checks permission on a file system object. Has to throw an Exception
      * Checks permission on a file system object. Has to throw an Exception
-     * if the filesystem object is not accessessible by the calling Ugi.
+     * if the filesystem object is not accessible by the calling Ugi.
      * @param authzContext an {@link AuthorizationContext} object encapsulating
      * @param authzContext an {@link AuthorizationContext} object encapsulating
      *                     the various parameters required to authorize an
      *                     the various parameters required to authorize an
      *                     operation.
      *                     operation.
@@ -405,7 +405,48 @@ public abstract class INodeAttributeProvider {
           + "implement the checkPermissionWithContext(AuthorizationContext) "
           + "implement the checkPermissionWithContext(AuthorizationContext) "
           + "API.");
           + "API.");
     }
     }
+
+    /**
+     * Checks if the user is a superuser or belongs to superuser group.
+     * It throws an AccessControlException if user is not a superuser.
+     *
+     * @param authzContext an {@link AuthorizationContext} object encapsulating
+     *                     the various parameters required to authorize an
+     *                     operation.
+     * @throws AccessControlException - if user is not a super user or part
+     * of the super user group.
+     */
+    default void checkSuperUserPermissionWithContext(
+        AuthorizationContext authzContext)
+        throws AccessControlException {
+      UserGroupInformation callerUgi = authzContext.getCallerUgi();
+      boolean isSuperUser =
+          callerUgi.getShortUserName().equals(authzContext.getFsOwner()) ||
+          callerUgi.getGroupsSet().contains(authzContext.getSupergroup());
+      if (!isSuperUser) {
+        throw new AccessControlException("Access denied for user " +
+            callerUgi.getShortUserName() + ". Superuser privilege is " +
+            "required for operation " + authzContext.getOperationName());
+      }
+    }
+
+    /**
+     * This method must be called when denying access to users to
+     * notify the external enforcers.
+     * This will help the external enforcers to audit the requests
+     * by users that were denied access.
+     * @param authzContext an {@link AuthorizationContext} object encapsulating
+     *                     the various parameters required to authorize an
+     *                     operation.
+     * @throws AccessControlException
+     */
+    default void denyUserAccess(AuthorizationContext authzContext,
+                                String errorMessage)
+        throws AccessControlException {
+      throw new AccessControlException(errorMessage);
+    }
   }
   }
+
   /**
   /**
    * Initialize the provider. This method is called at NameNode startup
    * Initialize the provider. This method is called at NameNode startup
    * time.
    * time.

+ 12 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -1835,9 +1835,9 @@ public class NameNode extends ReconfigurableBase implements
     }
     }
   }
   }
 
 
-  synchronized void monitorHealth() 
-      throws HealthCheckFailedException, AccessControlException {
-    namesystem.checkSuperuserPrivilege();
+  synchronized void monitorHealth() throws IOException {
+    String operationName = "monitorHealth";
+    namesystem.checkSuperuserPrivilege(operationName);
     if (!haEnabled) {
     if (!haEnabled) {
       return; // no-op, if HA is not enabled
       return; // no-op, if HA is not enabled
     }
     }
@@ -1859,9 +1859,9 @@ public class NameNode extends ReconfigurableBase implements
     }
     }
   }
   }
   
   
-  synchronized void transitionToActive() 
-      throws ServiceFailedException, AccessControlException {
-    namesystem.checkSuperuserPrivilege();
+  synchronized void transitionToActive() throws IOException {
+    String operationName = "transitionToActive";
+    namesystem.checkSuperuserPrivilege(operationName);
     if (!haEnabled) {
     if (!haEnabled) {
       throw new ServiceFailedException("HA for namenode is not enabled");
       throw new ServiceFailedException("HA for namenode is not enabled");
     }
     }
@@ -1876,18 +1876,18 @@ public class NameNode extends ReconfigurableBase implements
     state.setState(haContext, ACTIVE_STATE);
     state.setState(haContext, ACTIVE_STATE);
   }
   }
 
 
-  synchronized void transitionToStandby()
-      throws ServiceFailedException, AccessControlException {
-    namesystem.checkSuperuserPrivilege();
+  synchronized void transitionToStandby() throws IOException {
+    String operationName = "transitionToStandby";
+    namesystem.checkSuperuserPrivilege(operationName);
     if (!haEnabled) {
     if (!haEnabled) {
       throw new ServiceFailedException("HA for namenode is not enabled");
       throw new ServiceFailedException("HA for namenode is not enabled");
     }
     }
     state.setState(haContext, STANDBY_STATE);
     state.setState(haContext, STANDBY_STATE);
   }
   }
 
 
-  synchronized void transitionToObserver()
-      throws ServiceFailedException, AccessControlException {
-    namesystem.checkSuperuserPrivilege();
+  synchronized void transitionToObserver() throws IOException {
+    String operationName = "transitionToObserver";
+    namesystem.checkSuperuserPrivilege(operationName);
     if (!haEnabled) {
     if (!haEnabled) {
       throw new ServiceFailedException("HA for namenode is not enabled");
       throw new ServiceFailedException("HA for namenode is not enabled");
     }
     }

+ 34 - 17
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -642,6 +642,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
       minBlockSize, long timeInterval)
       minBlockSize, long timeInterval)
       throws IOException {
       throws IOException {
+    String operationName = "getBlocks";
     if(size <= 0) {
     if(size <= 0) {
       throw new IllegalArgumentException(
       throw new IllegalArgumentException(
           "Unexpected not positive size: "+size);
           "Unexpected not positive size: "+size);
@@ -651,15 +652,16 @@ public class NameNodeRpcServer implements NamenodeProtocols {
           "Unexpected not positive size: "+size);
           "Unexpected not positive size: "+size);
     }
     }
     checkNNStartup();
     checkNNStartup();
-    namesystem.checkSuperuserPrivilege();
+    namesystem.checkSuperuserPrivilege(operationName);
     namesystem.checkNameNodeSafeMode("Cannot execute getBlocks");
     namesystem.checkNameNodeSafeMode("Cannot execute getBlocks");
     return namesystem.getBlocks(datanode, size, minBlockSize, timeInterval);
     return namesystem.getBlocks(datanode, size, minBlockSize, timeInterval);
   }
   }
 
 
   @Override // NamenodeProtocol
   @Override // NamenodeProtocol
   public ExportedBlockKeys getBlockKeys() throws IOException {
   public ExportedBlockKeys getBlockKeys() throws IOException {
+    String operationName = "getBlockKeys";
     checkNNStartup();
     checkNNStartup();
-    namesystem.checkSuperuserPrivilege();
+    namesystem.checkSuperuserPrivilege(operationName);
     return namesystem.getBlockManager().getBlockKeys();
     return namesystem.getBlockManager().getBlockKeys();
   }
   }
 
 
@@ -667,9 +669,10 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   public void errorReport(NamenodeRegistration registration,
   public void errorReport(NamenodeRegistration registration,
                           int errorCode, 
                           int errorCode, 
                           String msg) throws IOException {
                           String msg) throws IOException {
+    String operationName = "errorReport";
     checkNNStartup();
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.UNCHECKED);
     namesystem.checkOperation(OperationCategory.UNCHECKED);
-    namesystem.checkSuperuserPrivilege();
+    namesystem.checkSuperuserPrivilege(operationName);
     verifyRequest(registration);
     verifyRequest(registration);
     LOG.info("Error report from " + registration + ": " + msg);
     LOG.info("Error report from " + registration + ": " + msg);
     if (errorCode == FATAL) {
     if (errorCode == FATAL) {
@@ -680,8 +683,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   @Override // NamenodeProtocol
   @Override // NamenodeProtocol
   public NamenodeRegistration registerSubordinateNamenode(
   public NamenodeRegistration registerSubordinateNamenode(
       NamenodeRegistration registration) throws IOException {
       NamenodeRegistration registration) throws IOException {
+    String operationName = "registerSubordinateNamenode";
     checkNNStartup();
     checkNNStartup();
-    namesystem.checkSuperuserPrivilege();
+    namesystem.checkSuperuserPrivilege(operationName);
     verifyLayoutVersion(registration.getVersion());
     verifyLayoutVersion(registration.getVersion());
     NamenodeRegistration myRegistration = nn.setRegistration();
     NamenodeRegistration myRegistration = nn.setRegistration();
     namesystem.registerBackupNode(registration, myRegistration);
     namesystem.registerBackupNode(registration, myRegistration);
@@ -691,8 +695,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   @Override // NamenodeProtocol
   @Override // NamenodeProtocol
   public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
   public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
       throws IOException {
       throws IOException {
+    String operationName = "startCheckpoint";
     checkNNStartup();
     checkNNStartup();
-    namesystem.checkSuperuserPrivilege();
+    namesystem.checkSuperuserPrivilege(operationName);
     verifyRequest(registration);
     verifyRequest(registration);
     if(!nn.isRole(NamenodeRole.NAMENODE))
     if(!nn.isRole(NamenodeRole.NAMENODE))
       throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");
       throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");
@@ -714,8 +719,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   @Override // NamenodeProtocol
   @Override // NamenodeProtocol
   public void endCheckpoint(NamenodeRegistration registration,
   public void endCheckpoint(NamenodeRegistration registration,
                             CheckpointSignature sig) throws IOException {
                             CheckpointSignature sig) throws IOException {
+    String operationName = "endCheckpoint";
     checkNNStartup();
     checkNNStartup();
-    namesystem.checkSuperuserPrivilege();
+    namesystem.checkSuperuserPrivilege(operationName);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
       return; // Return previous response
@@ -1322,17 +1328,19 @@ public class NameNodeRpcServer implements NamenodeProtocols {
 
 
   @Override // NamenodeProtocol
   @Override // NamenodeProtocol
   public long getTransactionID() throws IOException {
   public long getTransactionID() throws IOException {
+    String operationName = "getTransactionID";
     checkNNStartup();
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.UNCHECKED);
     namesystem.checkOperation(OperationCategory.UNCHECKED);
-    namesystem.checkSuperuserPrivilege();
+    namesystem.checkSuperuserPrivilege(operationName);
     return namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
     return namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
   }
   }
   
   
   @Override // NamenodeProtocol
   @Override // NamenodeProtocol
   public long getMostRecentCheckpointTxId() throws IOException {
   public long getMostRecentCheckpointTxId() throws IOException {
+    String operationName = "getMostRecentCheckpointTxId";
     checkNNStartup();
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.UNCHECKED);
     namesystem.checkOperation(OperationCategory.UNCHECKED);
-    namesystem.checkSuperuserPrivilege();
+    namesystem.checkSuperuserPrivilege(operationName);
     return namesystem.getFSImage().getMostRecentCheckpointTxId();
     return namesystem.getFSImage().getMostRecentCheckpointTxId();
   }
   }
   
   
@@ -1345,23 +1353,26 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   @Override // NamenodeProtocol
   @Override // NamenodeProtocol
   public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
   public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
       throws IOException {
       throws IOException {
+    String operationName = "getEditLogManifest";
     checkNNStartup();
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.READ);
     namesystem.checkOperation(OperationCategory.READ);
-    namesystem.checkSuperuserPrivilege();
+    namesystem.checkSuperuserPrivilege(operationName);
     return namesystem.getEditLog().getEditLogManifest(sinceTxId);
     return namesystem.getEditLog().getEditLogManifest(sinceTxId);
   }
   }
 
 
   @Override // NamenodeProtocol
   @Override // NamenodeProtocol
   public boolean isUpgradeFinalized() throws IOException {
   public boolean isUpgradeFinalized() throws IOException {
+    String operationName = "isUpgradeFinalized";
     checkNNStartup();
     checkNNStartup();
-    namesystem.checkSuperuserPrivilege();
+    namesystem.checkSuperuserPrivilege(operationName);
     return namesystem.isUpgradeFinalized();
     return namesystem.isUpgradeFinalized();
   }
   }
 
 
   @Override // NamenodeProtocol
   @Override // NamenodeProtocol
   public boolean isRollingUpgrade() throws IOException {
   public boolean isRollingUpgrade() throws IOException {
+    String operationName = "isRollingUpgrade";
     checkNNStartup();
     checkNNStartup();
-    namesystem.checkSuperuserPrivilege();
+    namesystem.checkSuperuserPrivilege(operationName);
     return namesystem.isRollingUpgrade();
     return namesystem.isRollingUpgrade();
   }
   }
     
     
@@ -2345,9 +2356,10 @@ public class NameNodeRpcServer implements NamenodeProtocols {
 
 
   @Override // ClientProtocol
   @Override // ClientProtocol
   public long getCurrentEditLogTxid() throws IOException {
   public long getCurrentEditLogTxid() throws IOException {
+    String operationName = "getCurrentEditLogTxid";
     checkNNStartup();
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.READ); // only active
     namesystem.checkOperation(OperationCategory.READ); // only active
-    namesystem.checkSuperuserPrivilege();
+    namesystem.checkSuperuserPrivilege(operationName);
     // if it's not yet open for write, we may be in the process of transitioning
     // if it's not yet open for write, we may be in the process of transitioning
     // from standby to active and may not yet know what the latest committed
     // from standby to active and may not yet know what the latest committed
     // txid is
     // txid is
@@ -2374,9 +2386,10 @@ public class NameNodeRpcServer implements NamenodeProtocols {
 
 
   @Override // ClientProtocol
   @Override // ClientProtocol
   public EventBatchList getEditsFromTxid(long txid) throws IOException {
   public EventBatchList getEditsFromTxid(long txid) throws IOException {
+    String operationName = "getEditsFromTxid";
     checkNNStartup();
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.READ); // only active
     namesystem.checkOperation(OperationCategory.READ); // only active
-    namesystem.checkSuperuserPrivilege();
+    namesystem.checkSuperuserPrivilege(operationName);
     int maxEventsPerRPC = nn.getConf().getInt(
     int maxEventsPerRPC = nn.getConf().getInt(
         DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY,
         DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY,
         DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT);
         DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT);
@@ -2521,8 +2534,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   @Override
   @Override
   public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
   public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
       ErasureCodingPolicy[] policies) throws IOException {
       ErasureCodingPolicy[] policies) throws IOException {
+    String operationName = "addErasureCodingPolicies";
     checkNNStartup();
     checkNNStartup();
-    namesystem.checkSuperuserPrivilege();
+    namesystem.checkSuperuserPrivilege(operationName);
     final CacheEntryWithPayload cacheEntry =
     final CacheEntryWithPayload cacheEntry =
         RetryCache.waitForCompletion(retryCache, null);
         RetryCache.waitForCompletion(retryCache, null);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
     if (cacheEntry != null && cacheEntry.isSuccess()) {
@@ -2544,8 +2558,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   @Override
   @Override
   public void removeErasureCodingPolicy(String ecPolicyName)
   public void removeErasureCodingPolicy(String ecPolicyName)
       throws IOException {
       throws IOException {
+    String operationName = "removeErasureCodingPolicy";
     checkNNStartup();
     checkNNStartup();
-    namesystem.checkSuperuserPrivilege();
+    namesystem.checkSuperuserPrivilege(operationName);
     final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
       return;
@@ -2562,8 +2577,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   @Override // ClientProtocol
   @Override // ClientProtocol
   public void enableErasureCodingPolicy(String ecPolicyName)
   public void enableErasureCodingPolicy(String ecPolicyName)
       throws IOException {
       throws IOException {
+    String operationName = "enableErasureCodingPolicy";
     checkNNStartup();
     checkNNStartup();
-    namesystem.checkSuperuserPrivilege();
+    namesystem.checkSuperuserPrivilege(operationName);
     final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
       return;
@@ -2580,8 +2596,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   @Override // ClientProtocol
   @Override // ClientProtocol
   public void disableErasureCodingPolicy(String ecPolicyName)
   public void disableErasureCodingPolicy(String ecPolicyName)
       throws IOException {
       throws IOException {
+    String operationName = "disableErasureCodingPolicy";
     checkNNStartup();
     checkNNStartup();
-    namesystem.checkSuperuserPrivilege();
+    namesystem.checkSuperuserPrivilege(operationName);
     final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
       return;

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -377,9 +377,10 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
    */
    */
   public void fsck() throws AccessControlException {
   public void fsck() throws AccessControlException {
     final long startTime = Time.monotonicNow();
     final long startTime = Time.monotonicNow();
+    String operationName = "fsck";
     try {
     try {
       if(blockIds != null) {
       if(blockIds != null) {
-        namenode.getNamesystem().checkSuperuserPrivilege();
+        namenode.getNamesystem().checkSuperuserPrivilege(operationName, path);
         StringBuilder sb = new StringBuilder();
         StringBuilder sb = new StringBuilder();
         sb.append("FSCK started by " +
         sb.append("FSCK started by " +
             UserGroupInformation.getCurrentUser() + " from " +
             UserGroupInformation.getCurrentUser() + " from " +

+ 12 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java

@@ -65,8 +65,14 @@ public class XAttrPermissionFilter {
       boolean isRawPath)
       boolean isRawPath)
       throws AccessControlException {
       throws AccessControlException {
     final boolean isSuperUser = pc.isSuperUser();
     final boolean isSuperUser = pc.isSuperUser();
+    final String xAttrString =
+        "XAttr [ns=" + xAttr.getNameSpace() + ", name=" + xAttr.getName() + "]";
     if (xAttr.getNameSpace() == XAttr.NameSpace.USER || 
     if (xAttr.getNameSpace() == XAttr.NameSpace.USER || 
         (xAttr.getNameSpace() == XAttr.NameSpace.TRUSTED && isSuperUser)) {
         (xAttr.getNameSpace() == XAttr.NameSpace.TRUSTED && isSuperUser)) {
+      if (isSuperUser) {
+        // call the external enforcer for audit.
+        pc.checkSuperuserPrivilege(xAttrString);
+      }
       return;
       return;
     }
     }
     if (xAttr.getNameSpace() == XAttr.NameSpace.RAW && isRawPath) {
     if (xAttr.getNameSpace() == XAttr.NameSpace.RAW && isRawPath) {
@@ -75,14 +81,16 @@ public class XAttrPermissionFilter {
     if (XAttrHelper.getPrefixedName(xAttr).
     if (XAttrHelper.getPrefixedName(xAttr).
         equals(SECURITY_XATTR_UNREADABLE_BY_SUPERUSER)) {
         equals(SECURITY_XATTR_UNREADABLE_BY_SUPERUSER)) {
       if (xAttr.getValue() != null) {
       if (xAttr.getValue() != null) {
-        throw new AccessControlException("Attempt to set a value for '" +
+        // Notify external enforcer for audit
+        String errorMessage = "Attempt to set a value for '" +
             SECURITY_XATTR_UNREADABLE_BY_SUPERUSER +
             SECURITY_XATTR_UNREADABLE_BY_SUPERUSER +
-            "'. Values are not allowed for this xattr.");
+            "'. Values are not allowed for this xattr.";
+        pc.denyUserAccess(xAttrString, errorMessage);
       }
       }
       return;
       return;
     }
     }
-    throw new AccessControlException("User doesn't have permission for xattr: "
-        + XAttrHelper.getPrefixedName(xAttr));
+    pc.denyUserAccess(xAttrString, "User doesn't have permission for xattr: "
+            + XAttrHelper.getPrefixedName(xAttr));
   }
   }
 
 
   static void checkPermissionForApi(FSPermissionChecker pc,
   static void checkPermissionForApi(FSPermissionChecker pc,