Browse Source

HDFS-14743. Enhance INodeAttributeProvider/ AccessControlEnforcer Interface in HDFS to support Authorization of mkdir, rm, rmdir, copy, move etc... (#1829)

Reviewed-by: Xiaoyu Yao <xyao@apache.org>
Wei-Chiu Chuang 5 years ago
parent
commit
4b95c242ec

+ 59 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1982,6 +1982,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -2012,6 +2013,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -2039,6 +2041,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkOperation(OperationCategory.READ);
     GetBlockLocationsResult res = null;
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     final INode inode;
     try {
       readLock();
@@ -2149,6 +2152,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "concat";
     FileStatus stat = null;
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     checkOperation(OperationCategory.WRITE);
     try {
       writeLock();
@@ -2178,6 +2182,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -2220,6 +2225,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       }
       checkOperation(OperationCategory.WRITE);
       final FSPermissionChecker pc = getPermissionChecker();
+      FSPermissionChecker.setOperationType(operationName);
       writeLock();
       BlocksMapUpdateInfo toRemoveBlocks = new BlocksMapUpdateInfo();
       try {
@@ -2255,6 +2261,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -2293,6 +2300,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     boolean success = false;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -2326,6 +2334,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -2408,6 +2417,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -2435,6 +2445,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   BlockStoragePolicy getStoragePolicy(String src) throws IOException {
     checkOperation(OperationCategory.READ);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(null);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -2462,6 +2473,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   long getPreferredBlockSize(String src) throws IOException {
     checkOperation(OperationCategory.READ);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(null);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -2575,6 +2587,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(null);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2670,6 +2683,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     boolean skipSync = false;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(null);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2810,6 +2824,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       LastBlockWithStatus lbs = null;
       checkOperation(OperationCategory.WRITE);
       final FSPermissionChecker pc = getPermissionChecker();
+      FSPermissionChecker.setOperationType(operationName);
       writeLock();
       try {
         checkOperation(OperationCategory.WRITE);
@@ -2866,6 +2881,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FSDirWriteFileOp.ValidateAddBlockResult r;
     checkOperation(OperationCategory.READ);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -2916,6 +2932,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final BlockType blockType;
     checkOperation(OperationCategory.READ);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(null);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -2964,6 +2981,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         "BLOCK* NameSystem.abandonBlock: {} of file {}", b, src);
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(null);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -3029,6 +3047,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     boolean success = false;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(null);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -3108,6 +3127,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FSDirRenameOp.RenameResult ret = null;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -3136,6 +3156,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FSDirRenameOp.RenameResult res = null;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -3175,6 +3196,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     BlocksMapUpdateInfo toRemovedBlocks = null;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     boolean ret = false;
     try {
       writeLock();
@@ -3283,6 +3305,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkOperation(OperationCategory.READ);
     HdfsFileStatus stat = null;
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       readLock();
       try {
@@ -3307,6 +3330,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "isFileClosed";
     checkOperation(OperationCategory.READ);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     boolean success = false;
     try {
       readLock();
@@ -3335,6 +3359,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -3373,6 +3398,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "contentSummary";
     ContentSummary cs;
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       readLock();
       try {
@@ -3408,6 +3434,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "quotaUsage";
     QuotaUsage quotaUsage;
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       readLock();
       try {
@@ -3439,6 +3466,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkOperation(OperationCategory.WRITE);
     final String operationName = getQuotaCommand(nsQuota, ssQuota);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       checkSuperuserPrivilege(pc);
       writeLock();
@@ -3471,6 +3499,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(null);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -3973,6 +4002,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "listStatus";
     DirectoryListing dl = null;
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       readLock();
       try {
@@ -6793,6 +6823,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "createSnapshot";
     String snapshotPath = null;
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -6829,6 +6860,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     String oldSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotOldName);
     String newSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotNewName);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -6862,6 +6894,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     SnapshottableDirectoryStatus[] status = null;
     checkOperation(OperationCategory.READ);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       readLock();
       try {
@@ -6905,6 +6938,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     String toSnapshotRoot = (toSnapshot == null || toSnapshot.isEmpty()) ?
         path : Snapshot.getSnapshotPath(path, toSnapshot);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     long actualTime = Time.monotonicNow();
     try {
       readLock();
@@ -6978,6 +7012,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         (toSnapshot == null || toSnapshot.isEmpty()) ? path :
             Snapshot.getSnapshotPath(path, toSnapshot);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       readLock();
       try {
@@ -7012,6 +7047,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     String rootPath = null;
     BlocksMapUpdateInfo blocksToBeDeleted = null;
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     checkOperation(OperationCategory.WRITE);
     try {
       writeLock();
@@ -7307,6 +7343,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       cacheManager.waitForRescanIfNeeded();
     }
     checkOperation(OperationCategory.WRITE);
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -7334,6 +7371,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     if (!flags.contains(CacheFlag.FORCE)) {
       cacheManager.waitForRescanIfNeeded();
     }
+    FSPermissionChecker.setOperationType(operationName);
     checkOperation(OperationCategory.WRITE);
     try {
       writeLock();
@@ -7359,6 +7397,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "removeCacheDirective";
     String idStr = "{id: " + Long.toString(id) + "}";
     checkOperation(OperationCategory.WRITE);
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -7381,6 +7420,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       long startId, CacheDirectiveInfo filter) throws IOException {
     final String operationName = "listCacheDirectives";
     checkOperation(OperationCategory.READ);
+    FSPermissionChecker.setOperationType(operationName);
     BatchedListEntries<CacheDirectiveEntry> results;
     cacheManager.waitForRescanIfNeeded();
     try {
@@ -7482,6 +7522,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "listCachePools";
     BatchedListEntries<CachePoolEntry> results;
     checkOperation(OperationCategory.READ);
+    FSPermissionChecker.setOperationType(operationName);
     cacheManager.waitForRescanIfNeeded();
     try {
       readLock();
@@ -7505,6 +7546,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -7528,6 +7570,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkOperation(OperationCategory.WRITE);
     FileStatus auditStat = null;
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -7550,6 +7593,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -7572,6 +7616,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -7594,6 +7639,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -7616,6 +7662,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkOperation(OperationCategory.READ);
     final AclStatus ret;
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       readLock();
       try {
@@ -7652,6 +7699,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       Metadata metadata = FSDirEncryptionZoneOp.ensureKeyIsInitialized(dir,
           keyName, src);
       final FSPermissionChecker pc = getPermissionChecker();
+      FSPermissionChecker.setOperationType(operationName);
       checkSuperuserPrivilege(pc);
       checkOperation(OperationCategory.WRITE);
       writeLock();
@@ -7685,6 +7733,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FileStatus resultingStat = null;
     EncryptionZone encryptionZone;
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     checkOperation(OperationCategory.READ);
     try {
       readLock();
@@ -7711,6 +7760,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     boolean success = false;
     checkOperation(OperationCategory.READ);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     checkSuperuserPrivilege(pc);
     readLock();
     try {
@@ -7748,6 +7798,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     boolean success = false;
     checkOperation(OperationCategory.READ);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     checkSuperuserPrivilege(pc);
     readLock();
     try {
@@ -7834,6 +7885,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkErasureCodingSupported(operationName);
     FileStatus resultingStat = null;
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -8002,6 +8054,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkErasureCodingSupported(operationName);
     FileStatus resultingStat = null;
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -8067,6 +8120,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkOperation(OperationCategory.READ);
     checkErasureCodingSupported(operationName);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -8129,6 +8183,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -8153,6 +8208,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkOperation(OperationCategory.READ);
     List<XAttr> fsXattrs;
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       readLock();
       try {
@@ -8174,6 +8230,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkOperation(OperationCategory.READ);
     List<XAttr> fsXattrs;
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       readLock();
       try {
@@ -8196,6 +8253,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       writeLock();
       try {
@@ -8241,6 +8299,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "checkAccess";
     checkOperation(OperationCategory.READ);
     final FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker.setOperationType(operationName);
     try {
       readLock();
       try {

+ 124 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Stack;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.ipc.CallerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FSExceptionMessages;
@@ -84,11 +85,15 @@ public class FSPermissionChecker implements AccessControlEnforcer {
   private final Collection<String> groups;
   private final boolean isSuper;
   private final INodeAttributeProvider attributeProvider;
+  private final boolean authorizeWithContext;
+
+  private static ThreadLocal<String> operationType = new ThreadLocal<>();
 
 
   protected FSPermissionChecker(String fsOwner, String supergroup,
       UserGroupInformation callerUgi,
       INodeAttributeProvider attributeProvider) {
+    boolean useNewAuthorizationWithContextAPI;
     this.fsOwner = fsOwner;
     this.supergroup = supergroup;
     this.callerUgi = callerUgi;
@@ -96,6 +101,41 @@ public class FSPermissionChecker implements AccessControlEnforcer {
     user = callerUgi.getShortUserName();
     isSuper = user.equals(fsOwner) || groups.contains(supergroup);
     this.attributeProvider = attributeProvider;
+
+    // If the AccessControlEnforcer supports context enrichment, call
+    // the new API. Otherwise choose the old API.
+    Class[] cArg = new Class[1];
+    cArg[0] = INodeAttributeProvider.AuthorizationContext.class;
+
+    AccessControlEnforcer ace;
+    if (attributeProvider == null) {
+      // If attribute provider is null, use FSPermissionChecker default
+      // implementation to authorize, which supports authorization with context.
+      useNewAuthorizationWithContextAPI = true;
+      LOG.info("Default authorization provider supports the new authorization" +
+          " provider API");
+    } else {
+      ace = attributeProvider.getExternalAccessControlEnforcer(this);
+      // if the runtime external authorization provider doesn't support
+      // checkPermissionWithContext(), fall back to the old API
+      // checkPermission().
+      try {
+        Class<?> clazz = ace.getClass();
+        clazz.getDeclaredMethod("checkPermissionWithContext", cArg);
+        useNewAuthorizationWithContextAPI = true;
+        LOG.info("Use the new authorization provider API");
+      } catch (NoSuchMethodException e) {
+        useNewAuthorizationWithContextAPI = false;
+        LOG.info("Fallback to the old authorization provider API because " +
+            "the expected method is not found.");
+      }
+    }
+
+    authorizeWithContext = useNewAuthorizationWithContextAPI;
+  }
+
+  public static void setOperationType(String opType) {
+    operationType.set(opType);
   }
 
   public boolean isMemberOfGroup(String group) {
@@ -190,9 +230,35 @@ public class FSPermissionChecker implements AccessControlEnforcer {
     int ancestorIndex = inodes.length - 2;
 
     AccessControlEnforcer enforcer = getAccessControlEnforcer();
-    enforcer.checkPermission(fsOwner, supergroup, callerUgi, inodeAttrs, inodes,
-        components, snapshotId, path, ancestorIndex, doCheckOwner,
-        ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir);
+
+    String opType = operationType.get();
+    if (this.authorizeWithContext && opType != null) {
+      INodeAttributeProvider.AuthorizationContext.Builder builder =
+          new INodeAttributeProvider.AuthorizationContext.Builder();
+      builder.fsOwner(fsOwner).
+          supergroup(supergroup).
+          callerUgi(callerUgi).
+          inodeAttrs(inodeAttrs).
+          inodes(inodes).
+          pathByNameArr(components).
+          snapshotId(snapshotId).
+          path(path).
+          ancestorIndex(ancestorIndex).
+          doCheckOwner(doCheckOwner).
+          ancestorAccess(ancestorAccess).
+          parentAccess(parentAccess).
+          access(access).
+          subAccess(subAccess).
+          ignoreEmptyDir(ignoreEmptyDir).
+          operationName(opType).
+          callerContext(CallerContext.getCurrent());
+      enforcer.checkPermissionWithContext(builder.build());
+    } else {
+      enforcer.checkPermission(fsOwner, supergroup, callerUgi, inodeAttrs,
+          inodes, components, snapshotId, path, ancestorIndex, doCheckOwner,
+          ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir);
+    }
+
   }
 
   /**
@@ -212,17 +278,45 @@ public class FSPermissionChecker implements AccessControlEnforcer {
     try {
       INodeAttributes[] iNodeAttr = {nodeAttributes};
       AccessControlEnforcer enforcer = getAccessControlEnforcer();
-      enforcer.checkPermission(
-          fsOwner, supergroup, callerUgi,
-          iNodeAttr, // single inode attr in the array
-          new INode[]{inode}, // single inode in the array
-          pathComponents, snapshotId,
-          null, -1, // this will skip checkTraverse() because
-          // not checking ancestor here
-          false, null, null,
-          access, // the target access to be checked against the inode
-          null, // passing null sub access avoids checking children
-          false);
+      String opType = operationType.get();
+      if (this.authorizeWithContext && opType != null) {
+        INodeAttributeProvider.AuthorizationContext.Builder builder =
+            new INodeAttributeProvider.AuthorizationContext.Builder();
+        builder.fsOwner(fsOwner)
+            .supergroup(supergroup)
+            .callerUgi(callerUgi)
+            .inodeAttrs(iNodeAttr) // single inode attr in the array
+            .inodes(new INode[] { inode }) // single inode attr in the array
+            .pathByNameArr(pathComponents)
+            .snapshotId(snapshotId)
+            .path(null)
+            .ancestorIndex(-1)     // this will skip checkTraverse()
+                                   // because not checking ancestor here
+            .doCheckOwner(false)
+            .ancestorAccess(null)
+            .parentAccess(null)
+            .access(access)        // the target access to be checked against
+                                   // the inode
+            .subAccess(null)       // passing null sub access avoids checking
+                                   // children
+            .ignoreEmptyDir(false)
+            .operationName(opType)
+            .callerContext(CallerContext.getCurrent());
+
+        enforcer.checkPermissionWithContext(builder.build());
+      } else {
+        enforcer.checkPermission(
+            fsOwner, supergroup, callerUgi,
+            iNodeAttr, // single inode attr in the array
+            new INode[]{inode}, // single inode in the array
+            pathComponents, snapshotId,
+            null, -1, // this will skip checkTraverse() because
+            // not checking ancestor here
+            false, null, null,
+            access, // the target access to be checked against the inode
+            null, // passing null sub access avoids checking children
+            false);
+      }
     } catch (AccessControlException ace) {
       throw new AccessControlException(
           toAccessControlString(nodeAttributes, inode.getFullPathName(),
@@ -273,6 +367,22 @@ public class FSPermissionChecker implements AccessControlEnforcer {
     }
   }
 
+  @Override
+  public void checkPermissionWithContext(
+      INodeAttributeProvider.AuthorizationContext authzContext)
+      throws AccessControlException {
+    // The default authorization provider does not use the additional context
+    // parameters including operationName and callerContext.
+    this.checkPermission(authzContext.getFsOwner(),
+        authzContext.getSupergroup(), authzContext.getCallerUgi(),
+        authzContext.getInodeAttrs(), authzContext.getInodes(),
+        authzContext.getPathByNameArr(), authzContext.getSnapshotId(),
+        authzContext.getPath(), authzContext.getAncestorIndex(),
+        authzContext.isDoCheckOwner(), authzContext.getAncestorAccess(),
+        authzContext.getParentAccess(), authzContext.getAccess(),
+        authzContext.getSubAccess(), authzContext.isIgnoreEmptyDir());
+  }
+
   private INodeAttributes getINodeAttrs(byte[][] pathByNameArr, int pathIdx,
       INode inode, int snapshotId) {
     INodeAttributes inodeAttrs = inode.getSnapshotINode(snapshotId);

+ 334 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributeProvider.java

@@ -17,19 +17,340 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import java.util.Arrays;
+
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
 public abstract class INodeAttributeProvider {
 
+  public static class AuthorizationContext {
+    private String fsOwner;
+    private String supergroup;
+    private UserGroupInformation callerUgi;
+    private INodeAttributes[] inodeAttrs;
+    private INode[] inodes;
+    private byte[][] pathByNameArr;
+    private int snapshotId;
+    private String path;
+    private int ancestorIndex;
+    private boolean doCheckOwner;
+    private FsAction ancestorAccess;
+    private FsAction parentAccess;
+    private FsAction access;
+    private FsAction subAccess;
+    private boolean ignoreEmptyDir;
+    private String operationName;
+    private CallerContext callerContext;
+
+    public String getFsOwner() {
+      return fsOwner;
+    }
+
+    public void setFsOwner(String fsOwner) {
+      this.fsOwner = fsOwner;
+    }
+
+    public String getSupergroup() {
+      return supergroup;
+    }
+
+    public void setSupergroup(String supergroup) {
+      this.supergroup = supergroup;
+    }
+
+    public UserGroupInformation getCallerUgi() {
+      return callerUgi;
+    }
+
+    public void setCallerUgi(UserGroupInformation callerUgi) {
+      this.callerUgi = callerUgi;
+    }
+
+    public INodeAttributes[] getInodeAttrs() {
+      return inodeAttrs;
+    }
+
+    public void setInodeAttrs(INodeAttributes[] inodeAttrs) {
+      this.inodeAttrs = inodeAttrs;
+    }
+
+    public INode[] getInodes() {
+      return inodes;
+    }
+
+    public void setInodes(INode[] inodes) {
+      this.inodes = inodes;
+    }
+
+    public byte[][] getPathByNameArr() {
+      return pathByNameArr;
+    }
+
+    public void setPathByNameArr(byte[][] pathByNameArr) {
+      this.pathByNameArr = pathByNameArr;
+    }
+
+    public int getSnapshotId() {
+      return snapshotId;
+    }
+
+    public void setSnapshotId(int snapshotId) {
+      this.snapshotId = snapshotId;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public void setPath(String path) {
+      this.path = path;
+    }
+
+    public int getAncestorIndex() {
+      return ancestorIndex;
+    }
+
+    public void setAncestorIndex(int ancestorIndex) {
+      this.ancestorIndex = ancestorIndex;
+    }
+
+    public boolean isDoCheckOwner() {
+      return doCheckOwner;
+    }
+
+    public void setDoCheckOwner(boolean doCheckOwner) {
+      this.doCheckOwner = doCheckOwner;
+    }
+
+    public FsAction getAncestorAccess() {
+      return ancestorAccess;
+    }
+
+    public void setAncestorAccess(FsAction ancestorAccess) {
+      this.ancestorAccess = ancestorAccess;
+    }
+
+    public FsAction getParentAccess() {
+      return parentAccess;
+    }
+
+    public void setParentAccess(FsAction parentAccess) {
+      this.parentAccess = parentAccess;
+    }
+
+    public FsAction getAccess() {
+      return access;
+    }
+
+    public void setAccess(FsAction access) {
+      this.access = access;
+    }
+
+    public FsAction getSubAccess() {
+      return subAccess;
+    }
+
+    public void setSubAccess(FsAction subAccess) {
+      this.subAccess = subAccess;
+    }
+
+    public boolean isIgnoreEmptyDir() {
+      return ignoreEmptyDir;
+    }
+
+    public void setIgnoreEmptyDir(boolean ignoreEmptyDir) {
+      this.ignoreEmptyDir = ignoreEmptyDir;
+    }
+
+    public String getOperationName() {
+      return operationName;
+    }
+
+    public void setOperationName(String operationName) {
+      this.operationName = operationName;
+    }
+
+    public CallerContext getCallerContext() {
+      return callerContext;
+    }
+
+    public void setCallerContext(CallerContext callerContext) {
+      this.callerContext = callerContext;
+    }
+
+    public static class Builder {
+      private String fsOwner;
+      private String supergroup;
+      private UserGroupInformation callerUgi;
+      private INodeAttributes[] inodeAttrs;
+      private INode[] inodes;
+      private byte[][] pathByNameArr;
+      private int snapshotId;
+      private String path;
+      private int ancestorIndex;
+      private boolean doCheckOwner;
+      private FsAction ancestorAccess;
+      private FsAction parentAccess;
+      private FsAction access;
+      private FsAction subAccess;
+      private boolean ignoreEmptyDir;
+      private String operationName;
+      private CallerContext callerContext;
+
+      public AuthorizationContext build() {
+        return new AuthorizationContext(this);
+      }
+
+      public Builder fsOwner(String val) {
+        this.fsOwner = val;
+        return this;
+      }
+
+      public Builder supergroup(String val) {
+        this.supergroup = val;
+        return this;
+      }
+
+      public Builder callerUgi(UserGroupInformation val) {
+        this.callerUgi = val;
+        return this;
+      }
+
+      public Builder inodeAttrs(INodeAttributes[] val) {
+        this.inodeAttrs = val;
+        return this;
+      }
+
+      public Builder inodes(INode[] val) {
+        this.inodes = val;
+        return this;
+      }
+
+      public Builder pathByNameArr(byte[][] val) {
+        this.pathByNameArr = val;
+        return this;
+      }
+
+      public Builder snapshotId(int val) {
+        this.snapshotId = val;
+        return this;
+      }
+
+      public Builder path(String val) {
+        this.path = val;
+        return this;
+      }
+
+      public Builder ancestorIndex(int val) {
+        this.ancestorIndex = val;
+        return this;
+      }
+
+      public Builder doCheckOwner(boolean val) {
+        this.doCheckOwner = val;
+        return this;
+      }
+
+      public Builder ancestorAccess(FsAction val) {
+        this.ancestorAccess = val;
+        return this;
+      }
+
+      public Builder parentAccess(FsAction val) {
+        this.parentAccess = val;
+        return this;
+      }
+
+      public Builder access(FsAction val) {
+        this.access = val;
+        return this;
+      }
+
+      public Builder subAccess(FsAction val) {
+        this.subAccess = val;
+        return this;
+      }
+
+      public Builder ignoreEmptyDir(boolean val) {
+        this.ignoreEmptyDir = val;
+        return this;
+      }
+
+      public Builder operationName(String val) {
+        this.operationName = val;
+        return this;
+      }
+
+      public Builder callerContext(CallerContext val) {
+        this.callerContext = val;
+        return this;
+      }
+    }
+
+    public AuthorizationContext(Builder builder) {
+      this.setFsOwner(builder.fsOwner);
+      this.setSupergroup(builder.supergroup);
+      this.setCallerUgi(builder.callerUgi);
+      this.setInodeAttrs(builder.inodeAttrs);
+      this.setInodes(builder.inodes);
+      this.setPathByNameArr(builder.pathByNameArr);
+      this.setSnapshotId(builder.snapshotId);
+      this.setPath(builder.path);
+      this.setAncestorIndex(builder.ancestorIndex);
+      this.setDoCheckOwner(builder.doCheckOwner);
+      this.setAncestorAccess(builder.ancestorAccess);
+      this.setParentAccess(builder.parentAccess);
+      this.setAccess(builder.access);
+      this.setSubAccess(builder.subAccess);
+      this.setIgnoreEmptyDir(builder.ignoreEmptyDir);
+      this.setOperationName(builder.operationName);
+      this.setCallerContext(builder.callerContext);
+    }
+
+    @VisibleForTesting
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      } else if (obj == null || getClass() != obj.getClass()) {
+        return false;
+      }
+      AuthorizationContext other = (AuthorizationContext)obj;
+      return getFsOwner().equals(other.getFsOwner()) &&
+          getSupergroup().equals(other.getSupergroup()) &&
+          getCallerUgi().equals(other.getCallerUgi()) &&
+          Arrays.deepEquals(getInodeAttrs(), other.getInodeAttrs()) &&
+          Arrays.deepEquals(getInodes(), other.getInodes()) &&
+          Arrays.deepEquals(getPathByNameArr(), other.getPathByNameArr()) &&
+          getSnapshotId() == other.getSnapshotId() &&
+          getPath().equals(other.getPath()) &&
+          getAncestorIndex() == other.getAncestorIndex() &&
+          isDoCheckOwner() == other.isDoCheckOwner() &&
+          getAncestorAccess() == other.getAncestorAccess() &&
+          getParentAccess() == other.getParentAccess() &&
+          getAccess() == other.getAccess() &&
+          getSubAccess() == other.getSubAccess() &&
+          isIgnoreEmptyDir() == other.isIgnoreEmptyDir();
+    }
+
+    @Override
+    public int hashCode() {
+      assert false : "hashCode not designed";
+      return 42; // any arbitrary constant will do
+    }
+  }
+
   /**
    * The AccessControlEnforcer allows implementations to override the
    * default File System permission checking logic enforced on a file system
@@ -39,7 +360,7 @@ public abstract class INodeAttributeProvider {
 
     /**
      * Checks permission on a file system object. Has to throw an Exception
-     * if the filesystem object is not accessessible by the calling Ugi.
+     * if the filesystem object is not accessible by the calling Ugi.
      * @param fsOwner Filesystem owner (The Namenode user)
      * @param supergroup super user geoup
      * @param callerUgi UserGroupInformation of the caller
@@ -58,6 +379,8 @@ public abstract class INodeAttributeProvider {
      *                  the path and all the sub-directories. If path is not a
      *                  directory, there should ideally be no effect.
      * @param ignoreEmptyDir Ignore permission checking for empty directory?
+     * @deprecated use{@link #checkPermissionWithContext(AuthorizationContext)}}
+     * instead
      * @throws AccessControlException
      */
     public abstract void checkPermission(String fsOwner, String supergroup,
@@ -68,6 +391,16 @@ public abstract class INodeAttributeProvider {
         boolean ignoreEmptyDir)
             throws AccessControlException;
 
+    /**
+     * Checks permission on a file system object. Has to throw an Exception
+     * if the filesystem object is not accessessible by the calling Ugi.
+     * @param authzContext an {@link AuthorizationContext} object encapsulating
+     *                     the various parameters required to authorize an
+     *                     operation.
+     * @throws AccessControlException
+     */
+    void checkPermissionWithContext(AuthorizationContext authzContext)
+        throws AccessControlException;
   }
   /**
    * Initialize the provider. This method is called at NameNode startup

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -565,6 +565,8 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
     long fileLen = file.getLen();
     LocatedBlocks blocks = null;
     final FSNamesystem fsn = namenode.getNamesystem();
+    final String operationName = "fsckGetBlockLocations";
+    FSPermissionChecker.setOperationType(operationName);
     fsn.readLock();
     try {
       blocks = FSDirStatAndListingOp.getBlockLocations(
@@ -574,7 +576,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
     } catch (FileNotFoundException fnfe) {
       blocks = null;
     } finally {
-      fsn.readUnlock("fsckGetBlockLocations");
+      fsn.readUnlock(operationName);
     }
     return blocks;
   }

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java

@@ -83,6 +83,9 @@ public class NameNodeAdapter {
       IOException {
     final FSPermissionChecker pc =
         namenode.getNamesystem().getPermissionChecker();
+    // consistent with FSNamesystem#getFileInfo()
+    final String operationName = needBlockToken ? "open" : "getfileinfo";
+    FSPermissionChecker.setOperationType(operationName);
     namenode.getNamesystem().readLock();
     try {
       return FSDirStatAndListingOp.getFileInfo(namenode.getNamesystem()

+ 167 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuthorizationContext.java

@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestAuthorizationContext {
+
+  private String fsOwner = "hdfs";
+  private String superGroup = "hdfs";
+  private UserGroupInformation ugi = UserGroupInformation.
+      createUserForTesting(fsOwner, new String[] {superGroup});
+
+  private INodeAttributes[] emptyINodeAttributes = new INodeAttributes[] {};
+  private INodesInPath iip = mock(INodesInPath.class);
+  private int snapshotId = 0;
+  private INode[] inodes = new INode[] {};
+  private byte[][] components = new byte[][] {};
+  private String path = "";
+  private int ancestorIndex = inodes.length - 2;
+
+  @Before
+  public void setUp() throws IOException {
+    when(iip.getPathSnapshotId()).thenReturn(snapshotId);
+    when(iip.getINodesArray()).thenReturn(inodes);
+    when(iip.getPathComponents()).thenReturn(components);
+    when(iip.getPath()).thenReturn(path);
+  }
+
+  @Test
+  public void testBuilder() {
+    String opType = "test";
+    CallerContext.setCurrent(new CallerContext.Builder(
+        "TestAuthorizationContext").build());
+
+    INodeAttributeProvider.AuthorizationContext.Builder builder =
+        new INodeAttributeProvider.AuthorizationContext.Builder();
+    builder.fsOwner(fsOwner).
+        supergroup(superGroup).
+        callerUgi(ugi).
+        inodeAttrs(emptyINodeAttributes).
+        inodes(inodes).
+        pathByNameArr(components).
+        snapshotId(snapshotId).
+        path(path).
+        ancestorIndex(ancestorIndex).
+        doCheckOwner(true).
+        ancestorAccess(null).
+        parentAccess(null).
+        access(null).
+        subAccess(null).
+        ignoreEmptyDir(true).
+        operationName(opType).
+        callerContext(CallerContext.getCurrent());
+
+    INodeAttributeProvider.AuthorizationContext authzContext = builder.build();
+    assertEquals(authzContext.getFsOwner(), fsOwner);
+    assertEquals(authzContext.getSupergroup(), superGroup);
+    assertEquals(authzContext.getCallerUgi(), ugi);
+    assertEquals(authzContext.getInodeAttrs(), emptyINodeAttributes);
+    assertEquals(authzContext.getInodes(), inodes);
+    assertEquals(authzContext.getPathByNameArr(), components);
+    assertEquals(authzContext.getSnapshotId(), snapshotId);
+    assertEquals(authzContext.getPath(), path);
+    assertEquals(authzContext.getAncestorIndex(), ancestorIndex);
+    assertEquals(authzContext.getOperationName(), opType);
+    assertEquals(authzContext.getCallerContext(), CallerContext.getCurrent());
+  }
+
+  @Test
+  public void testLegacyAPI() throws IOException {
+    INodeAttributeProvider.AccessControlEnforcer
+        mockEnforcer = mock(INodeAttributeProvider.AccessControlEnforcer.class);
+    INodeAttributeProvider mockINodeAttributeProvider =
+        mock(INodeAttributeProvider.class);
+    when(mockINodeAttributeProvider.getExternalAccessControlEnforcer(any())).
+        thenReturn(mockEnforcer);
+
+    FSPermissionChecker checker = new FSPermissionChecker(
+        fsOwner, superGroup, ugi, mockINodeAttributeProvider);
+
+    // set operation type to null to force using the legacy API.
+    FSPermissionChecker.setOperationType(null);
+
+    when(iip.getPathSnapshotId()).thenReturn(snapshotId);
+    when(iip.getINodesArray()).thenReturn(inodes);
+    when(iip.getPathComponents()).thenReturn(components);
+    when(iip.getPath()).thenReturn(path);
+
+    checker.checkPermission(iip, true, null, null, null, null, true);
+
+    verify(mockEnforcer).checkPermission(fsOwner, superGroup, ugi,
+        emptyINodeAttributes, inodes, components, snapshotId, path,
+        ancestorIndex, true, null, null, null, null, true);
+  }
+
+  @Test
+  public void testCheckPermissionWithContextAPI() throws IOException {
+    INodeAttributeProvider.AccessControlEnforcer
+        mockEnforcer = mock(INodeAttributeProvider.AccessControlEnforcer.class);
+    INodeAttributeProvider mockINodeAttributeProvider =
+        mock(INodeAttributeProvider.class);
+    when(mockINodeAttributeProvider.getExternalAccessControlEnforcer(any())).
+        thenReturn(mockEnforcer);
+
+    FSPermissionChecker checker = new FSPermissionChecker(
+        fsOwner, superGroup, ugi, mockINodeAttributeProvider);
+
+    // force it to use the new, checkPermissionWithContext API.
+    String operationName = "abc";
+    FSPermissionChecker.setOperationType(operationName);
+
+    checker.checkPermission(iip, true,
+        null, null, null,
+        null, true);
+
+    INodeAttributeProvider.AuthorizationContext.Builder builder =
+        new INodeAttributeProvider.AuthorizationContext.Builder();
+    builder.fsOwner(fsOwner).
+        supergroup(superGroup).
+        callerUgi(ugi).
+        inodeAttrs(emptyINodeAttributes).
+        inodes(inodes).
+        pathByNameArr(components).
+        snapshotId(snapshotId).
+        path(path).
+        ancestorIndex(ancestorIndex).
+        doCheckOwner(true).
+        ancestorAccess(null).
+        parentAccess(null).
+        access(null).
+        subAccess(null).
+        ignoreEmptyDir(true).
+        operationName(operationName).
+        callerContext(CallerContext.getCurrent());
+    INodeAttributeProvider.AuthorizationContext context = builder.build();
+    // the AuthorizationContext.equals() method is override to always return
+    // true as long as it is compared with another AuthorizationContext object.
+    verify(mockEnforcer).checkPermissionWithContext(context);
+  }
+}

+ 13 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeAttributeProvider.java

@@ -81,6 +81,19 @@ public class TestINodeAttributeProvider {
         }
         CALLED.add("checkPermission|" + ancestorAccess + "|" + parentAccess + "|" + access);
       }
+
+      @Override
+      public void checkPermissionWithContext(
+          AuthorizationContext authzContext) throws AccessControlException {
+        if (authzContext.getAncestorIndex() > 1
+            && authzContext.getInodes()[1].getLocalName().equals("user")
+            && authzContext.getInodes()[2].getLocalName().equals("acl")) {
+          this.ace.checkPermissionWithContext(authzContext);
+        }
+        CALLED.add("checkPermission|" + authzContext.getAncestorAccess()
+            + "|" + authzContext.getParentAccess() + "|" + authzContext
+            .getAccess());
+      }
     }
 
     @Override