Browse Source

HDFS-6826. Plugin interface to enable delegation of HDFS authorization assertions. Contributed by Arun Suresh.

(cherry picked from commit 456cec127b23b9195784dd4b35b75a2b69ad2a4a)
Jitendra Pandey 10 years ago
parent
commit
d286673c60
15 changed files with 659 additions and 115 deletions
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  3. 45 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DefaultINodeAttributesProvider.java
  4. 29 22
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
  5. 37 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  6. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
  7. 19 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  8. 138 84
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
  9. 135 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributeProvider.java
  10. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java
  11. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
  12. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
  13. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
  14. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java
  15. 229 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeAttributeProvider.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -41,6 +41,9 @@ Release 2.7.0 - UNRELEASED
 
 
     HDFS-7838. Expose truncate API for libhdfs. (yliu)
     HDFS-7838. Expose truncate API for libhdfs. (yliu)
 
 
+    HDFS-6826. Plugin interface to enable delegation of HDFS authorization 
+    assertions. (Arun Suresh via jitendra)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HDFS-7752. Improve description for
     HDFS-7752. Improve description for

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -480,6 +480,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_DATANODE_IPC_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_IPC_DEFAULT_PORT;
   public static final String  DFS_DATANODE_IPC_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_IPC_DEFAULT_PORT;
   public static final String  DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY = "dfs.datanode.min.supported.namenode.version";
   public static final String  DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY = "dfs.datanode.min.supported.namenode.version";
   public static final String  DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT = "2.1.0-beta";
   public static final String  DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT = "2.1.0-beta";
+  public static final String  DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY = "dfs.namenode.inode.attributes.provider.class";
 
 
   public static final String  DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY = "dfs.block.access.token.enable";
   public static final String  DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY = "dfs.block.access.token.enable";
   public static final boolean DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT = false;
   public static final boolean DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT = false;

+ 45 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DefaultINodeAttributesProvider.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+/**
+ * A default implementation of the INodeAttributesProvider
+ *
+ */
+public class DefaultINodeAttributesProvider extends INodeAttributeProvider {
+
+  public static INodeAttributeProvider DEFAULT_PROVIDER =
+      new DefaultINodeAttributesProvider();
+
+  @Override
+  public void start() {
+    // NO-OP
+  }
+
+  @Override
+  public void stop() {
+    // NO-OP
+  }
+
+  @Override
+  public INodeAttributes getAttributes(String[] pathElements,
+      INodeAttributes inode) {
+    return inode;
+  }
+
+}

+ 29 - 22
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java

@@ -181,7 +181,7 @@ class FSDirStatAndListingOp {
 
 
       if (!targetNode.isDirectory()) {
       if (!targetNode.isDirectory()) {
         return new DirectoryListing(
         return new DirectoryListing(
-            new HdfsFileStatus[]{createFileStatus(fsd,
+            new HdfsFileStatus[]{createFileStatus(fsd, src,
                 HdfsFileStatus.EMPTY_NAME, targetNode, needLocation,
                 HdfsFileStatus.EMPTY_NAME, targetNode, needLocation,
                 parentStoragePolicy, snapshot, isRawPath, iip)}, 0);
                 parentStoragePolicy, snapshot, isRawPath, iip)}, 0);
       }
       }
@@ -200,7 +200,7 @@ class FSDirStatAndListingOp {
         byte curPolicy = isSuperUser && !cur.isSymlink()?
         byte curPolicy = isSuperUser && !cur.isSymlink()?
             cur.getLocalStoragePolicyID():
             cur.getLocalStoragePolicyID():
             BlockStoragePolicySuite.ID_UNSPECIFIED;
             BlockStoragePolicySuite.ID_UNSPECIFIED;
-        listing[i] = createFileStatus(fsd, cur.getLocalNameBytes(), cur,
+        listing[i] = createFileStatus(fsd, src, cur.getLocalNameBytes(), cur,
             needLocation, getStoragePolicyID(curPolicy,
             needLocation, getStoragePolicyID(curPolicy,
                 parentStoragePolicy), snapshot, isRawPath, iip);
                 parentStoragePolicy), snapshot, isRawPath, iip);
         listingCnt++;
         listingCnt++;
@@ -253,7 +253,7 @@ class FSDirStatAndListingOp {
     final HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
     final HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
     for (int i = 0; i < numOfListing; i++) {
     for (int i = 0; i < numOfListing; i++) {
       Snapshot.Root sRoot = snapshots.get(i + skipSize).getRoot();
       Snapshot.Root sRoot = snapshots.get(i + skipSize).getRoot();
-      listing[i] = createFileStatus(fsd, sRoot.getLocalNameBytes(), sRoot,
+      listing[i] = createFileStatus(fsd, src, sRoot.getLocalNameBytes(), sRoot,
           BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
           BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
           false, INodesInPath.fromINode(sRoot));
           false, INodesInPath.fromINode(sRoot));
     }
     }
@@ -270,7 +270,7 @@ class FSDirStatAndListingOp {
    *         or null if file not found
    *         or null if file not found
    */
    */
   static HdfsFileStatus getFileInfo(
   static HdfsFileStatus getFileInfo(
-      FSDirectory fsd, INodesInPath src, boolean isRawPath,
+      FSDirectory fsd, String path, INodesInPath src, boolean isRawPath,
       boolean includeStoragePolicy)
       boolean includeStoragePolicy)
       throws IOException {
       throws IOException {
     fsd.readLock();
     fsd.readLock();
@@ -279,7 +279,7 @@ class FSDirStatAndListingOp {
       byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ?
       byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ?
           i.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED;
           i.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED;
       return i == null ? null : createFileStatus(
       return i == null ? null : createFileStatus(
-          fsd, HdfsFileStatus.EMPTY_NAME, i, policyId,
+          fsd, path, HdfsFileStatus.EMPTY_NAME, i, policyId,
           src.getPathSnapshotId(), isRawPath, src);
           src.getPathSnapshotId(), isRawPath, src);
     } finally {
     } finally {
       fsd.readUnlock();
       fsd.readUnlock();
@@ -303,7 +303,7 @@ class FSDirStatAndListingOp {
     fsd.readLock();
     fsd.readLock();
     try {
     try {
       final INodesInPath iip = fsd.getINodesInPath(srcs, resolveLink);
       final INodesInPath iip = fsd.getINodesInPath(srcs, resolveLink);
-      return getFileInfo(fsd, iip, isRawPath, includeStoragePolicy);
+      return getFileInfo(fsd, src, iip, isRawPath, includeStoragePolicy);
     } finally {
     } finally {
       fsd.readUnlock();
       fsd.readUnlock();
     }
     }
@@ -340,14 +340,15 @@ class FSDirStatAndListingOp {
    * @throws java.io.IOException if any error occurs
    * @throws java.io.IOException if any error occurs
    */
    */
   static HdfsFileStatus createFileStatus(
   static HdfsFileStatus createFileStatus(
-      FSDirectory fsd, byte[] path, INode node, boolean needLocation,
-      byte storagePolicy, int snapshot, boolean isRawPath, INodesInPath iip)
+      FSDirectory fsd, String fullPath, byte[] path, INode node,
+      boolean needLocation, byte storagePolicy, int snapshot, boolean isRawPath,
+      INodesInPath iip)
       throws IOException {
       throws IOException {
     if (needLocation) {
     if (needLocation) {
-      return createLocatedFileStatus(fsd, path, node, storagePolicy,
+      return createLocatedFileStatus(fsd, fullPath, path, node, storagePolicy,
           snapshot, isRawPath, iip);
           snapshot, isRawPath, iip);
     } else {
     } else {
-      return createFileStatus(fsd, path, node, storagePolicy, snapshot,
+      return createFileStatus(fsd, fullPath, path, node, storagePolicy, snapshot,
           isRawPath, iip);
           isRawPath, iip);
     }
     }
   }
   }
@@ -356,8 +357,9 @@ class FSDirStatAndListingOp {
    * Create FileStatus by file INode
    * Create FileStatus by file INode
    */
    */
   static HdfsFileStatus createFileStatus(
   static HdfsFileStatus createFileStatus(
-      FSDirectory fsd, byte[] path, INode node, byte storagePolicy,
-      int snapshot, boolean isRawPath, INodesInPath iip) throws IOException {
+      FSDirectory fsd, String fullPath, byte[] path, INode node,
+      byte storagePolicy, int snapshot, boolean isRawPath,
+      INodesInPath iip) throws IOException {
      long size = 0;     // length is zero for directories
      long size = 0;     // length is zero for directories
      short replication = 0;
      short replication = 0;
      long blocksize = 0;
      long blocksize = 0;
@@ -380,6 +382,8 @@ class FSDirStatAndListingOp {
      int childrenNum = node.isDirectory() ?
      int childrenNum = node.isDirectory() ?
          node.asDirectory().getChildrenNum(snapshot) : 0;
          node.asDirectory().getChildrenNum(snapshot) : 0;
 
 
+     INodeAttributes nodeAttrs =
+         fsd.getAttributes(fullPath, path, node, snapshot);
      return new HdfsFileStatus(
      return new HdfsFileStatus(
         size,
         size,
         node.isDirectory(),
         node.isDirectory(),
@@ -387,9 +391,9 @@ class FSDirStatAndListingOp {
         blocksize,
         blocksize,
         node.getModificationTime(snapshot),
         node.getModificationTime(snapshot),
         node.getAccessTime(snapshot),
         node.getAccessTime(snapshot),
-        getPermissionForFileStatus(node, snapshot, isEncrypted),
-        node.getUserName(snapshot),
-        node.getGroupName(snapshot),
+        getPermissionForFileStatus(nodeAttrs, isEncrypted),
+        nodeAttrs.getUserName(),
+        nodeAttrs.getGroupName(),
         node.isSymlink() ? node.asSymlink().getSymlink() : null,
         node.isSymlink() ? node.asSymlink().getSymlink() : null,
         path,
         path,
         node.getId(),
         node.getId(),
@@ -402,8 +406,9 @@ class FSDirStatAndListingOp {
    * Create FileStatus with location info by file INode
    * Create FileStatus with location info by file INode
    */
    */
   private static HdfsLocatedFileStatus createLocatedFileStatus(
   private static HdfsLocatedFileStatus createLocatedFileStatus(
-      FSDirectory fsd, byte[] path, INode node, byte storagePolicy,
-      int snapshot, boolean isRawPath, INodesInPath iip) throws IOException {
+      FSDirectory fsd, String fullPath, byte[] path, INode node,
+      byte storagePolicy, int snapshot, boolean isRawPath,
+      INodesInPath iip) throws IOException {
     assert fsd.hasReadLock();
     assert fsd.hasReadLock();
     long size = 0; // length is zero for directories
     long size = 0; // length is zero for directories
     short replication = 0;
     short replication = 0;
@@ -437,12 +442,14 @@ class FSDirStatAndListingOp {
     int childrenNum = node.isDirectory() ?
     int childrenNum = node.isDirectory() ?
         node.asDirectory().getChildrenNum(snapshot) : 0;
         node.asDirectory().getChildrenNum(snapshot) : 0;
 
 
+    INodeAttributes nodeAttrs =
+        fsd.getAttributes(fullPath, path, node, snapshot);
     HdfsLocatedFileStatus status =
     HdfsLocatedFileStatus status =
         new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
         new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
           blocksize, node.getModificationTime(snapshot),
           blocksize, node.getModificationTime(snapshot),
           node.getAccessTime(snapshot),
           node.getAccessTime(snapshot),
-          getPermissionForFileStatus(node, snapshot, isEncrypted),
-          node.getUserName(snapshot), node.getGroupName(snapshot),
+          getPermissionForFileStatus(nodeAttrs, isEncrypted),
+          nodeAttrs.getUserName(), nodeAttrs.getGroupName(),
           node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
           node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
           node.getId(), loc, childrenNum, feInfo, storagePolicy);
           node.getId(), loc, childrenNum, feInfo, storagePolicy);
     // Set caching information for the located blocks.
     // Set caching information for the located blocks.
@@ -467,9 +474,9 @@ class FSDirStatAndListingOp {
    * and encrypted bit on if it represents an encrypted file/dir.
    * and encrypted bit on if it represents an encrypted file/dir.
    */
    */
   private static FsPermission getPermissionForFileStatus(
   private static FsPermission getPermissionForFileStatus(
-      INode node, int snapshot, boolean isEncrypted) {
-    FsPermission perm = node.getFsPermission(snapshot);
-    boolean hasAcl = node.getAclFeature(snapshot) != null;
+      INodeAttributes node, boolean isEncrypted) {
+    FsPermission perm = node.getFsPermission();
+    boolean hasAcl = node.getAclFeature() != null;
     if (hasAcl || isEncrypted) {
     if (hasAcl || isEncrypted) {
       perm = new FsPermissionExtension(perm, hasAcl, isEncrypted);
       perm = new FsPermissionExtension(perm, hasAcl, isEncrypted);
     }
     }

+ 37 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -167,6 +167,12 @@ public class FSDirectory implements Closeable {
 
 
   private final FSEditLog editLog;
   private final FSEditLog editLog;
 
 
+  private INodeAttributeProvider attributeProvider;
+
+  public void setINodeAttributeProvider(INodeAttributeProvider provider) {
+    attributeProvider = provider;
+  }
+
   // utility methods to acquire and release read lock and write lock
   // utility methods to acquire and release read lock and write lock
   void readLock() {
   void readLock() {
     this.dirLock.readLock().lock();
     this.dirLock.readLock().lock();
@@ -1623,13 +1629,23 @@ public class FSDirectory implements Closeable {
   FSPermissionChecker getPermissionChecker()
   FSPermissionChecker getPermissionChecker()
     throws AccessControlException {
     throws AccessControlException {
     try {
     try {
-      return new FSPermissionChecker(fsOwnerShortUserName, supergroup,
+      return getPermissionChecker(fsOwnerShortUserName, supergroup,
           NameNode.getRemoteUser());
           NameNode.getRemoteUser());
-    } catch (IOException ioe) {
-      throw new AccessControlException(ioe);
+    } catch (IOException e) {
+      throw new AccessControlException(e);
     }
     }
   }
   }
 
 
+  @VisibleForTesting
+  FSPermissionChecker getPermissionChecker(String fsOwner, String superGroup,
+      UserGroupInformation ugi) throws AccessControlException {
+    return new FSPermissionChecker(
+        fsOwner, superGroup, ugi,
+        attributeProvider == null ?
+            DefaultINodeAttributesProvider.DEFAULT_PROVIDER
+            : attributeProvider);
+  }
+
   void checkOwner(FSPermissionChecker pc, INodesInPath iip)
   void checkOwner(FSPermissionChecker pc, INodesInPath iip)
       throws AccessControlException {
       throws AccessControlException {
     checkPermission(pc, iip, true, null, null, null, null);
     checkPermission(pc, iip, true, null, null, null, null);
@@ -1690,7 +1706,8 @@ public class FSDirectory implements Closeable {
   HdfsFileStatus getAuditFileInfo(INodesInPath iip)
   HdfsFileStatus getAuditFileInfo(INodesInPath iip)
       throws IOException {
       throws IOException {
     return (namesystem.isAuditEnabled() && namesystem.isExternalInvocation())
     return (namesystem.isAuditEnabled() && namesystem.isExternalInvocation())
-        ? FSDirStatAndListingOp.getFileInfo(this, iip, false, false) : null;
+        ? FSDirStatAndListingOp.getFileInfo(this, iip.getPath(), iip, false,
+            false) : null;
   }
   }
 
 
   /**
   /**
@@ -1736,4 +1753,20 @@ public class FSDirectory implements Closeable {
   void resetLastInodeIdWithoutChecking(long newValue) {
   void resetLastInodeIdWithoutChecking(long newValue) {
     inodeId.setCurrentValue(newValue);
     inodeId.setCurrentValue(newValue);
   }
   }
+
+  INodeAttributes getAttributes(String fullPath, byte[] path,
+      INode node, int snapshot) {
+    INodeAttributes nodeAttrs = node;
+    if (attributeProvider != null) {
+      nodeAttrs = node.getSnapshotINode(snapshot);
+      fullPath = fullPath + (fullPath.endsWith(Path.SEPARATOR) ? ""
+                                                               : Path.SEPARATOR)
+          + DFSUtil.bytes2String(path);
+      nodeAttrs = attributeProvider.getAttributes(fullPath, nodeAttrs);
+    } else {
+      nodeAttrs = node.getSnapshotINode(snapshot);
+    }
+    return nodeAttrs;
+  }
+
 }
 }

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -380,7 +380,7 @@ public class FSEditLogLoader {
         // add the op into retry cache if necessary
         // add the op into retry cache if necessary
         if (toAddRetryCache) {
         if (toAddRetryCache) {
           HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
           HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
-              fsNamesys.dir, HdfsFileStatus.EMPTY_NAME, newFile,
+              fsNamesys.dir, path, HdfsFileStatus.EMPTY_NAME, newFile,
               BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
               BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
               false, iip);
               false, iip);
           fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
           fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
@@ -399,7 +399,7 @@ public class FSEditLogLoader {
           // add the op into retry cache if necessary
           // add the op into retry cache if necessary
           if (toAddRetryCache) {
           if (toAddRetryCache) {
             HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
             HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
-                fsNamesys.dir,
+                fsNamesys.dir, path,
                 HdfsFileStatus.EMPTY_NAME, newFile,
                 HdfsFileStatus.EMPTY_NAME, newFile,
                 BlockStoragePolicySuite.ID_UNSPECIFIED,
                 BlockStoragePolicySuite.ID_UNSPECIFIED,
                 Snapshot.CURRENT_STATE_ID, false, iip);
                 Snapshot.CURRENT_STATE_ID, false, iip);
@@ -473,7 +473,7 @@ public class FSEditLogLoader {
         // add the op into retry cache if necessary
         // add the op into retry cache if necessary
         if (toAddRetryCache) {
         if (toAddRetryCache) {
           HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
           HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
-              fsNamesys.dir, HdfsFileStatus.EMPTY_NAME, file,
+              fsNamesys.dir, path, HdfsFileStatus.EMPTY_NAME, file,
               BlockStoragePolicySuite.ID_UNSPECIFIED,
               BlockStoragePolicySuite.ID_UNSPECIFIED,
               Snapshot.CURRENT_STATE_ID, false, iip);
               Snapshot.CURRENT_STATE_ID, false, iip);
           fsNamesys.addCacheEntryWithPayload(appendOp.rpcClientId,
           fsNamesys.addCacheEntryWithPayload(appendOp.rpcClientId,

+ 19 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -62,6 +62,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CAC
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
@@ -281,6 +282,7 @@ import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.ChunkedArrayList;
 import org.apache.hadoop.util.ChunkedArrayList;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.log4j.Appender;
 import org.apache.log4j.Appender;
@@ -525,6 +527,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   private final TopConf topConf;
   private final TopConf topConf;
   private TopMetrics topMetrics;
   private TopMetrics topMetrics;
 
 
+  private INodeAttributeProvider inodeAttributeProvider;
+
   /**
   /**
    * Notify that loading of this FSDirectory is complete, and
    * Notify that loading of this FSDirectory is complete, and
    * it is imageLoaded for use
    * it is imageLoaded for use
@@ -832,6 +836,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
         auditLoggers.get(0) instanceof DefaultAuditLogger;
         auditLoggers.get(0) instanceof DefaultAuditLogger;
       this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
       this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
+      Class<? extends INodeAttributeProvider> klass = conf.getClass(
+          DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY,
+          null, INodeAttributeProvider.class);
+      if (klass != null) {
+        inodeAttributeProvider = ReflectionUtils.newInstance(klass, conf);
+        LOG.info("Using INode attribute provider: " + klass.getName());
+      }
     } catch(IOException e) {
     } catch(IOException e) {
       LOG.error(getClass().getSimpleName() + " initialization failed.", e);
       LOG.error(getClass().getSimpleName() + " initialization failed.", e);
       close();
       close();
@@ -1059,6 +1070,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     
     
     registerMXBean();
     registerMXBean();
     DefaultMetricsSystem.instance().register(this);
     DefaultMetricsSystem.instance().register(this);
+    if (inodeAttributeProvider != null) {
+      inodeAttributeProvider.start();
+      dir.setINodeAttributeProvider(inodeAttributeProvider);
+    }
     snapshotManager.registerMXBean();
     snapshotManager.registerMXBean();
   }
   }
   
   
@@ -1067,6 +1082,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
    */
   void stopCommonServices() {
   void stopCommonServices() {
     writeLock();
     writeLock();
+    if (inodeAttributeProvider != null) {
+      dir.setINodeAttributeProvider(null);
+      inodeAttributeProvider.stop();
+    }
     try {
     try {
       if (blockManager != null) blockManager.close();
       if (blockManager != null) blockManager.close();
     } finally {
     } finally {

+ 138 - 84
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.namenode;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.Set;
 import java.util.Stack;
 import java.util.Stack;
 
 
@@ -30,6 +29,8 @@ import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.AclEntryType;
 import org.apache.hadoop.fs.permission.AclEntryType;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.server.namenode.INodeAttributeProvider.AccessControlEnforcer;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -41,25 +42,25 @@ import org.apache.hadoop.security.UserGroupInformation;
  * 
  * 
  * Some of the helper methods are gaurded by {@link FSNamesystem#readLock()}.
  * Some of the helper methods are gaurded by {@link FSNamesystem#readLock()}.
  */
  */
-class FSPermissionChecker {
+class FSPermissionChecker implements AccessControlEnforcer {
   static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
   static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
 
 
   /** @return a string for throwing {@link AccessControlException} */
   /** @return a string for throwing {@link AccessControlException} */
-  private String toAccessControlString(INode inode, int snapshotId,
+  private String toAccessControlString(INodeAttributes inodeAttrib, String path,
       FsAction access, FsPermission mode) {
       FsAction access, FsPermission mode) {
-    return toAccessControlString(inode, snapshotId, access, mode, false);
+    return toAccessControlString(inodeAttrib, path, access, mode, false);
   }
   }
 
 
   /** @return a string for throwing {@link AccessControlException} */
   /** @return a string for throwing {@link AccessControlException} */
-  private String toAccessControlString(INode inode, int snapshotId, FsAction access,
-      FsPermission mode, boolean deniedFromAcl) {
+  private String toAccessControlString(INodeAttributes inodeAttrib,
+      String path, FsAction access, FsPermission mode, boolean deniedFromAcl) {
     StringBuilder sb = new StringBuilder("Permission denied: ")
     StringBuilder sb = new StringBuilder("Permission denied: ")
-      .append("user=").append(user).append(", ")
+      .append("user=").append(getUser()).append(", ")
       .append("access=").append(access).append(", ")
       .append("access=").append(access).append(", ")
-      .append("inode=\"").append(inode.getFullPathName()).append("\":")
-      .append(inode.getUserName(snapshotId)).append(':')
-      .append(inode.getGroupName(snapshotId)).append(':')
-      .append(inode.isDirectory() ? 'd' : '-')
+      .append("inode=\"").append(path).append("\":")
+      .append(inodeAttrib.getUserName()).append(':')
+      .append(inodeAttrib.getGroupName()).append(':')
+      .append(inodeAttrib.isDirectory() ? 'd' : '-')
       .append(mode);
       .append(mode);
     if (deniedFromAcl) {
     if (deniedFromAcl) {
       sb.append("+");
       sb.append("+");
@@ -67,42 +68,59 @@ class FSPermissionChecker {
     return sb.toString();
     return sb.toString();
   }
   }
 
 
+  private final String fsOwner;
+  private final String supergroup;
+  private final UserGroupInformation callerUgi;
+
   private final String user;
   private final String user;
-  /** A set with group namess. Not synchronized since it is unmodifiable */
   private final Set<String> groups;
   private final Set<String> groups;
   private final boolean isSuper;
   private final boolean isSuper;
+  private final INodeAttributeProvider attributeProvider;
+
 
 
   FSPermissionChecker(String fsOwner, String supergroup,
   FSPermissionChecker(String fsOwner, String supergroup,
-      UserGroupInformation callerUgi) {
-    HashSet<String> s = new HashSet<String>(Arrays.asList(callerUgi.getGroupNames()));
+      UserGroupInformation callerUgi,
+      INodeAttributeProvider attributeProvider) {
+    this.fsOwner = fsOwner;
+    this.supergroup = supergroup;
+    this.callerUgi = callerUgi;
+    HashSet<String> s =
+        new HashSet<String>(Arrays.asList(callerUgi.getGroupNames()));
     groups = Collections.unmodifiableSet(s);
     groups = Collections.unmodifiableSet(s);
     user = callerUgi.getShortUserName();
     user = callerUgi.getShortUserName();
     isSuper = user.equals(fsOwner) || groups.contains(supergroup);
     isSuper = user.equals(fsOwner) || groups.contains(supergroup);
+    this.attributeProvider = attributeProvider;
   }
   }
 
 
-  /**
-   * Check if the callers group contains the required values.
-   * @param group group to check
-   */
-  public boolean containsGroup(String group) {return groups.contains(group);}
+  public boolean containsGroup(String group) {
+    return groups.contains(group);
+  }
 
 
   public String getUser() {
   public String getUser() {
     return user;
     return user;
   }
   }
-  
+
+  public Set<String> getGroups() {
+    return groups;
+  }
+
   public boolean isSuperUser() {
   public boolean isSuperUser() {
     return isSuper;
     return isSuper;
   }
   }
-  
+
+  public INodeAttributeProvider getAttributesProvider() {
+    return attributeProvider;
+  }
+
   /**
   /**
    * Verify if the caller has the required permission. This will result into 
    * Verify if the caller has the required permission. This will result into 
    * an exception if the caller is not allowed to access the resource.
    * an exception if the caller is not allowed to access the resource.
    */
    */
   public void checkSuperuserPrivilege()
   public void checkSuperuserPrivilege()
       throws AccessControlException {
       throws AccessControlException {
-    if (!isSuper) {
+    if (!isSuperUser()) {
       throw new AccessControlException("Access denied for user " 
       throw new AccessControlException("Access denied for user " 
-          + user + ". Superuser privilege is required");
+          + getUser() + ". Superuser privilege is required");
     }
     }
   }
   }
   
   
@@ -154,64 +172,98 @@ class FSPermissionChecker {
     // check if (parentAccess != null) && file exists, then check sb
     // check if (parentAccess != null) && file exists, then check sb
     // If resolveLink, the check is performed on the link target.
     // If resolveLink, the check is performed on the link target.
     final int snapshotId = inodesInPath.getPathSnapshotId();
     final int snapshotId = inodesInPath.getPathSnapshotId();
-    final int length = inodesInPath.length();
-    final INode last = length > 0 ? inodesInPath.getLastINode() : null;
-    final INode parent = length > 1 ? inodesInPath.getINode(-2) : null;
+    final INode[] inodes = inodesInPath.getINodesArray();
+    final INodeAttributes[] inodeAttrs = new INodeAttributes[inodes.length];
+    final byte[][] pathByNameArr = new byte[inodes.length][];
+    for (int i = 0; i < inodes.length && inodes[i] != null; i++) {
+      if (inodes[i] != null) {
+        pathByNameArr[i] = inodes[i].getLocalNameBytes();
+        inodeAttrs[i] = getINodeAttrs(pathByNameArr, i, inodes[i], snapshotId);
+      }
+    }
+
+    String path = inodesInPath.getPath();
+    int ancestorIndex = inodes.length - 2;
+
+    AccessControlEnforcer enforcer =
+        getAttributesProvider().getExternalAccessControlEnforcer(this);
+    enforcer.checkPermission(fsOwner, supergroup, callerUgi, inodeAttrs, inodes,
+        pathByNameArr, snapshotId, path, ancestorIndex, doCheckOwner,
+        ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir);
+  }
 
 
-    checkTraverse(inodesInPath, snapshotId);
+  @Override
+  public void checkPermission(String fsOwner, String supergroup,
+      UserGroupInformation callerUgi, INodeAttributes[] inodeAttrs,
+      INode[] inodes, byte[][] pathByNameArr, int snapshotId, String path,
+      int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
+      FsAction parentAccess, FsAction access, FsAction subAccess,
+      boolean ignoreEmptyDir)
+      throws AccessControlException {
+    for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
+        ancestorIndex--);
+    checkTraverse(inodeAttrs, path, ancestorIndex);
 
 
+    final INodeAttributes last = inodeAttrs[inodeAttrs.length - 1];
     if (parentAccess != null && parentAccess.implies(FsAction.WRITE)
     if (parentAccess != null && parentAccess.implies(FsAction.WRITE)
-        && length > 1 && last != null) {
-      checkStickyBit(parent, last, snapshotId);
+        && inodeAttrs.length > 1 && last != null) {
+      checkStickyBit(inodeAttrs[inodeAttrs.length - 2], last);
     }
     }
-    if (ancestorAccess != null && length > 1) {
-      List<INode> inodes = inodesInPath.getReadOnlyINodes();
-      INode ancestor = null;
-      for (int i = inodes.size() - 2; i >= 0 && (ancestor = inodes.get(i)) ==
-          null; i--);
-      check(ancestor, snapshotId, ancestorAccess);
+    if (ancestorAccess != null && inodeAttrs.length > 1) {
+      check(inodeAttrs, path, ancestorIndex, ancestorAccess);
     }
     }
-    if (parentAccess != null && length > 1 && parent != null) {
-      check(parent, snapshotId, parentAccess);
+    if (parentAccess != null && inodeAttrs.length > 1) {
+      check(inodeAttrs, path, inodeAttrs.length - 2, parentAccess);
     }
     }
     if (access != null) {
     if (access != null) {
-      check(last, snapshotId, access);
+      check(last, path, access);
     }
     }
     if (subAccess != null) {
     if (subAccess != null) {
-      checkSubAccess(last, snapshotId, subAccess, ignoreEmptyDir);
+      INode rawLast = inodes[inodeAttrs.length - 1];
+      checkSubAccess(pathByNameArr, inodeAttrs.length - 1, rawLast,
+          snapshotId, subAccess, ignoreEmptyDir);
     }
     }
     if (doCheckOwner) {
     if (doCheckOwner) {
-      checkOwner(last, snapshotId);
+      checkOwner(last);
     }
     }
   }
   }
 
 
+  private INodeAttributes getINodeAttrs(byte[][] pathByNameArr, int pathIdx,
+      INode inode, int snapshotId) {
+    INodeAttributes inodeAttrs = inode.getSnapshotINode(snapshotId);
+    if (getAttributesProvider() != null) {
+      String[] elements = new String[pathIdx + 1];
+      for (int i = 0; i < elements.length; i++) {
+        elements[i] = DFSUtil.bytes2String(pathByNameArr[i]);
+      }
+      inodeAttrs = getAttributesProvider().getAttributes(elements, inodeAttrs);
+    }
+    return inodeAttrs;
+  }
+
   /** Guarded by {@link FSNamesystem#readLock()} */
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkOwner(INode inode, int snapshotId
+  private void checkOwner(INodeAttributes inode
       ) throws AccessControlException {
       ) throws AccessControlException {
-    if (inode != null && user.equals(inode.getUserName(snapshotId))) {
+    if (getUser().equals(inode.getUserName())) {
       return;
       return;
     }
     }
     throw new AccessControlException(
     throw new AccessControlException(
             "Permission denied. user="
             "Permission denied. user="
-            + user + " is not the owner of inode=" + inode);
+            + getUser() + " is not the owner of inode=" + inode);
   }
   }
 
 
   /** Guarded by {@link FSNamesystem#readLock()} */
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkTraverse(INodesInPath iip, int snapshotId)
-      throws AccessControlException {
-    List<INode> inodes = iip.getReadOnlyINodes();
-    for (int i = 0; i < inodes.size() - 1; i++) {
-      INode inode = inodes.get(i);
-      if (inode == null) {
-        break;
-      }
-      check(inode, snapshotId, FsAction.EXECUTE);
+  private void checkTraverse(INodeAttributes[] inodes, String path, int last
+      ) throws AccessControlException {
+    for(int j = 0; j <= last; j++) {
+      check(inodes[j], path, FsAction.EXECUTE);
     }
     }
   }
   }
 
 
   /** Guarded by {@link FSNamesystem#readLock()} */
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkSubAccess(INode inode, int snapshotId, FsAction access,
-      boolean ignoreEmptyDir) throws AccessControlException {
+  private void checkSubAccess(byte[][] pathByNameArr, int pathIdx, INode inode,
+      int snapshotId, FsAction access, boolean ignoreEmptyDir)
+      throws AccessControlException {
     if (inode == null || !inode.isDirectory()) {
     if (inode == null || !inode.isDirectory()) {
       return;
       return;
     }
     }
@@ -221,7 +273,9 @@ class FSPermissionChecker {
       INodeDirectory d = directories.pop();
       INodeDirectory d = directories.pop();
       ReadOnlyList<INode> cList = d.getChildrenList(snapshotId);
       ReadOnlyList<INode> cList = d.getChildrenList(snapshotId);
       if (!(cList.isEmpty() && ignoreEmptyDir)) {
       if (!(cList.isEmpty() && ignoreEmptyDir)) {
-        check(d, snapshotId, access);
+        //TODO have to figure this out with inodeattribute provider
+        check(getINodeAttrs(pathByNameArr, pathIdx, d, snapshotId),
+            inode.getFullPathName(), access);
       }
       }
 
 
       for(INode child : cList) {
       for(INode child : cList) {
@@ -233,37 +287,37 @@ class FSPermissionChecker {
   }
   }
 
 
   /** Guarded by {@link FSNamesystem#readLock()} */
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void check(INode inode, int snapshotId, FsAction access)
-      throws AccessControlException {
+  private void check(INodeAttributes[] inodes, String path, int i, FsAction access
+      ) throws AccessControlException {
+    check(i >= 0 ? inodes[i] : null, path, access);
+  }
+
+  private void check(INodeAttributes inode, String path, FsAction access
+      ) throws AccessControlException {
     if (inode == null) {
     if (inode == null) {
       return;
       return;
     }
     }
-    FsPermission mode = inode.getFsPermission(snapshotId);
-    AclFeature aclFeature = inode.getAclFeature(snapshotId);
+    final FsPermission mode = inode.getFsPermission();
+    final AclFeature aclFeature = inode.getAclFeature();
     if (aclFeature != null) {
     if (aclFeature != null) {
       // It's possible that the inode has a default ACL but no access ACL.
       // It's possible that the inode has a default ACL but no access ACL.
       int firstEntry = aclFeature.getEntryAt(0);
       int firstEntry = aclFeature.getEntryAt(0);
       if (AclEntryStatusFormat.getScope(firstEntry) == AclEntryScope.ACCESS) {
       if (AclEntryStatusFormat.getScope(firstEntry) == AclEntryScope.ACCESS) {
-        checkAccessAcl(inode, snapshotId, access, mode, aclFeature);
+        checkAccessAcl(inode, path, access, mode, aclFeature);
         return;
         return;
       }
       }
     }
     }
-    checkFsPermission(inode, snapshotId, access, mode);
-  }
-
-  private void checkFsPermission(INode inode, int snapshotId, FsAction access,
-      FsPermission mode) throws AccessControlException {
-    if (user.equals(inode.getUserName(snapshotId))) { //user class
+    if (getUser().equals(inode.getUserName())) { //user class
       if (mode.getUserAction().implies(access)) { return; }
       if (mode.getUserAction().implies(access)) { return; }
     }
     }
-    else if (groups.contains(inode.getGroupName(snapshotId))) { //group class
+    else if (getGroups().contains(inode.getGroupName())) { //group class
       if (mode.getGroupAction().implies(access)) { return; }
       if (mode.getGroupAction().implies(access)) { return; }
     }
     }
     else { //other class
     else { //other class
       if (mode.getOtherAction().implies(access)) { return; }
       if (mode.getOtherAction().implies(access)) { return; }
     }
     }
     throw new AccessControlException(
     throw new AccessControlException(
-      toAccessControlString(inode, snapshotId, access, mode));
+        toAccessControlString(inode, path, access, mode));
   }
   }
 
 
   /**
   /**
@@ -282,20 +336,20 @@ class FSPermissionChecker {
    * - The other entry must not have a name.
    * - The other entry must not have a name.
    * - Default entries may be present, but they are ignored during enforcement.
    * - Default entries may be present, but they are ignored during enforcement.
    *
    *
-   * @param inode INode accessed inode
+   * @param inode INodeAttributes accessed inode
    * @param snapshotId int snapshot ID
    * @param snapshotId int snapshot ID
    * @param access FsAction requested permission
    * @param access FsAction requested permission
    * @param mode FsPermission mode from inode
    * @param mode FsPermission mode from inode
    * @param aclFeature AclFeature of inode
    * @param aclFeature AclFeature of inode
    * @throws AccessControlException if the ACL denies permission
    * @throws AccessControlException if the ACL denies permission
    */
    */
-  private void checkAccessAcl(INode inode, int snapshotId, FsAction access,
-      FsPermission mode, AclFeature aclFeature)
+  private void checkAccessAcl(INodeAttributes inode, String path,
+      FsAction access, FsPermission mode, AclFeature aclFeature)
       throws AccessControlException {
       throws AccessControlException {
     boolean foundMatch = false;
     boolean foundMatch = false;
 
 
     // Use owner entry from permission bits if user is owner.
     // Use owner entry from permission bits if user is owner.
-    if (user.equals(inode.getUserName(snapshotId))) {
+    if (getUser().equals(inode.getUserName())) {
       if (mode.getUserAction().implies(access)) {
       if (mode.getUserAction().implies(access)) {
         return;
         return;
       }
       }
@@ -314,7 +368,7 @@ class FSPermissionChecker {
         if (type == AclEntryType.USER) {
         if (type == AclEntryType.USER) {
           // Use named user entry with mask from permission bits applied if user
           // Use named user entry with mask from permission bits applied if user
           // matches name.
           // matches name.
-          if (user.equals(name)) {
+          if (getUser().equals(name)) {
             FsAction masked = AclEntryStatusFormat.getPermission(entry).and(
             FsAction masked = AclEntryStatusFormat.getPermission(entry).and(
                 mode.getGroupAction());
                 mode.getGroupAction());
             if (masked.implies(access)) {
             if (masked.implies(access)) {
@@ -328,8 +382,8 @@ class FSPermissionChecker {
           // applied if user is a member and entry grants access.  If user is a
           // applied if user is a member and entry grants access.  If user is a
           // member of multiple groups that have entries that grant access, then
           // member of multiple groups that have entries that grant access, then
           // it doesn't matter which is chosen, so exit early after first match.
           // it doesn't matter which is chosen, so exit early after first match.
-          String group = name == null ? inode.getGroupName(snapshotId) : name;
-          if (groups.contains(group)) {
+          String group = name == null ? inode.getGroupName() : name;
+          if (getGroups().contains(group)) {
             FsAction masked = AclEntryStatusFormat.getPermission(entry).and(
             FsAction masked = AclEntryStatusFormat.getPermission(entry).and(
                 mode.getGroupAction());
                 mode.getGroupAction());
             if (masked.implies(access)) {
             if (masked.implies(access)) {
@@ -347,28 +401,28 @@ class FSPermissionChecker {
     }
     }
 
 
     throw new AccessControlException(
     throw new AccessControlException(
-      toAccessControlString(inode, snapshotId, access, mode, true));
+        toAccessControlString(inode, path, access, mode));
   }
   }
 
 
   /** Guarded by {@link FSNamesystem#readLock()} */
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkStickyBit(INode parent, INode inode, int snapshotId
+  private void checkStickyBit(INodeAttributes parent, INodeAttributes inode
       ) throws AccessControlException {
       ) throws AccessControlException {
-    if(!parent.getFsPermission(snapshotId).getStickyBit()) {
+    if (!parent.getFsPermission().getStickyBit()) {
       return;
       return;
     }
     }
 
 
     // If this user is the directory owner, return
     // If this user is the directory owner, return
-    if(parent.getUserName(snapshotId).equals(user)) {
+    if (parent.getUserName().equals(getUser())) {
       return;
       return;
     }
     }
 
 
     // if this user is the file owner, return
     // if this user is the file owner, return
-    if(inode.getUserName(snapshotId).equals(user)) {
+    if (inode.getUserName().equals(getUser())) {
       return;
       return;
     }
     }
 
 
     throw new AccessControlException("Permission denied by sticky bit setting:" +
     throw new AccessControlException("Permission denied by sticky bit setting:" +
-      " user=" + user + ", inode=" + inode);
+      " user=" + getUser() + ", inode=" + inode);
   }
   }
 
 
   /**
   /**
@@ -384,11 +438,11 @@ class FSPermissionChecker {
     if (isSuperUser()) {
     if (isSuperUser()) {
       return;
       return;
     }
     }
-    if (user.equals(pool.getOwnerName())
+    if (getUser().equals(pool.getOwnerName())
         && mode.getUserAction().implies(access)) {
         && mode.getUserAction().implies(access)) {
       return;
       return;
     }
     }
-    if (groups.contains(pool.getGroupName())
+    if (getGroups().contains(pool.getGroupName())
         && mode.getGroupAction().implies(access)) {
         && mode.getGroupAction().implies(access)) {
       return;
       return;
     }
     }
@@ -396,7 +450,7 @@ class FSPermissionChecker {
       return;
       return;
     }
     }
     throw new AccessControlException("Permission denied while accessing pool "
     throw new AccessControlException("Permission denied while accessing pool "
-        + pool.getPoolName() + ": user " + user + " does not have "
+        + pool.getPoolName() + ": user " + getUser() + " does not have "
         + access.toString() + " permissions.");
         + access.toString() + " permissions.");
   }
   }
 }
 }

+ 135 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributeProvider.java

@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public abstract class INodeAttributeProvider {
+
+  /**
+   * The AccessControlEnforcer allows implementations to override the
+   * default File System permission checking logic enforced on a file system
+   * object
+   */
+  public interface AccessControlEnforcer {
+
+    /**
+     * Checks permission on a file system object. Has to throw an Exception
+     * if the filesystem object is not accessessible by the calling Ugi.
+     * @param fsOwner Filesystem owner (The Namenode user)
+     * @param supergroup super user geoup
+     * @param callerUgi UserGroupInformation of the caller
+     * @param inodeAttrs Array of INode attributes for each path element in the
+     *                   the path
+     * @param inodes Array of INodes for each path element in the path
+     * @param pathByNameArr Array of byte arrays of the LocalName
+     * @param snapshotId the snapshotId of the requested path
+     * @param path Path String
+     * @param ancestorIndex Index of ancestor
+     * @param doCheckOwner perform ownership check
+     * @param ancestorAccess The access required by the ancestor of the path.
+     * @param parentAccess The access required by the parent of the path.
+     * @param access The access required by the path.
+     * @param subAccess If path is a directory, It is the access required of
+     *                  the path and all the sub-directories. If path is not a
+     *                  directory, there should ideally be no effect.
+     * @param ignoreEmptyDir Ignore permission checking for empty directory?
+     * @throws AccessControlException
+     */
+    public abstract void checkPermission(String fsOwner, String supergroup,
+        UserGroupInformation callerUgi, INodeAttributes[] inodeAttrs,
+        INode[] inodes, byte[][] pathByNameArr, int snapshotId, String path,
+        int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
+        FsAction parentAccess, FsAction access, FsAction subAccess,
+        boolean ignoreEmptyDir)
+            throws AccessControlException;
+
+  }
+  /**
+   * Initialize the provider. This method is called at NameNode startup
+   * time.
+   */
+  public abstract void start();
+
+  /**
+   * Shutdown the provider. This method is called at NameNode shutdown time.
+   */
+  public abstract void stop();
+
+  @VisibleForTesting
+  String[] getPathElements(String path) {
+    path = path.trim();
+    if (path.charAt(0) != Path.SEPARATOR_CHAR) {
+      throw new IllegalArgumentException("It must be an absolute path: " +
+          path);
+    }
+    int numOfElements = StringUtils.countMatches(path, Path.SEPARATOR);
+    if (path.length() > 1 && path.endsWith(Path.SEPARATOR)) {
+      numOfElements--;
+    }
+    String[] pathElements = new String[numOfElements];
+    int elementIdx = 0;
+    int idx = 0;
+    int found = path.indexOf(Path.SEPARATOR_CHAR, idx);
+    while (found > -1) {
+      if (found > idx) {
+        pathElements[elementIdx++] = path.substring(idx, found);
+      }
+      idx = found + 1;
+      found = path.indexOf(Path.SEPARATOR_CHAR, idx);
+    }
+    if (idx < path.length()) {
+      pathElements[elementIdx] = path.substring(idx);
+    }
+    return pathElements;
+  }
+
+  public INodeAttributes getAttributes(String fullPath, INodeAttributes inode) {
+    return getAttributes(getPathElements(fullPath), inode);
+  }
+
+  public abstract INodeAttributes getAttributes(String[] pathElements,
+      INodeAttributes inode);
+
+  /**
+   * Can be over-ridden by implementations to provide a custom Access Control
+   * Enforcer that can provide an alternate implementation of the
+   * default permission checking logic.
+   * @param defaultEnforcer The Default AccessControlEnforcer
+   * @return The AccessControlEnforcer to use
+   */
+  public AccessControlEnforcer getExternalAccessControlEnforcer(
+      AccessControlEnforcer defaultEnforcer) {
+    return defaultEnforcer;
+  }
+}

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java

@@ -28,6 +28,9 @@ import org.apache.hadoop.hdfs.server.namenode.XAttrFeature;
  */
  */
 @InterfaceAudience.Private
 @InterfaceAudience.Private
 public interface INodeAttributes {
 public interface INodeAttributes {
+
+  public boolean isDirectory();
+
   /**
   /**
    * @return null if the local name is null;
    * @return null if the local name is null;
    *         otherwise, return the local name byte array.
    *         otherwise, return the local name byte array.

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java

@@ -52,6 +52,10 @@ public interface INodeDirectoryAttributes extends INodeAttributes {
           storageSpace(-1).typeSpaces(-1).build();
           storageSpace(-1).typeSpaces(-1).build();
     }
     }
 
 
+    public boolean isDirectory() {
+      return true;
+    }
+
     @Override
     @Override
     public boolean metadataEquals(INodeDirectoryAttributes other) {
     public boolean metadataEquals(INodeDirectoryAttributes other) {
       return other != null
       return other != null

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java

@@ -58,6 +58,11 @@ public interface INodeFileAttributes extends INodeAttributes {
       this.header = file.getHeaderLong();
       this.header = file.getHeaderLong();
     }
     }
 
 
+    @Override
+    public boolean isDirectory() {
+      return false;
+    }
+
     @Override
     @Override
     public short getFileReplication() {
     public short getFileReplication() {
       return HeaderFormat.getReplication(header);
       return HeaderFormat.getReplication(header);

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java

@@ -376,6 +376,12 @@ public class INodesInPath {
     return Collections.unmodifiableList(Arrays.asList(inodes));
     return Collections.unmodifiableList(Arrays.asList(inodes));
   }
   }
 
 
+  public INode[] getINodesArray() {
+    INode[] retArr = new INode[inodes.length];
+    System.arraycopy(inodes, 0, retArr, 0, inodes.length);
+    return retArr;
+  }
+
   /**
   /**
    * @param length number of ancestral INodes in the returned INodesInPath
    * @param length number of ancestral INodes in the returned INodesInPath
    *               instance
    *               instance

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java

@@ -403,7 +403,7 @@ public class TestFSPermissionChecker {
   private void assertPermissionGranted(UserGroupInformation user, String path,
   private void assertPermissionGranted(UserGroupInformation user, String path,
       FsAction access) throws IOException {
       FsAction access) throws IOException {
     INodesInPath iip = dir.getINodesInPath(path, true);
     INodesInPath iip = dir.getINodesInPath(path, true);
-    new FSPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(iip,
+    dir.getPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(iip,
       false, null, null, access, null, false);
       false, null, null, access, null, false);
   }
   }
 
 
@@ -411,7 +411,7 @@ public class TestFSPermissionChecker {
       FsAction access) throws IOException {
       FsAction access) throws IOException {
     try {
     try {
       INodesInPath iip = dir.getINodesInPath(path, true);
       INodesInPath iip = dir.getINodesInPath(path, true);
-      new FSPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(iip,
+      dir.getPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(iip,
         false, null, null, access, null, false);
         false, null, null, access, null, false);
       fail("expected AccessControlException for user + " + user + ", path = " +
       fail("expected AccessControlException for user + " + user + ", path = " +
         path + ", access = " + access);
         path + ", access = " + access);

+ 229 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeAttributeProvider.java

@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.INodeAttributeProvider.AccessControlEnforcer;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestINodeAttributeProvider {
+  private MiniDFSCluster miniDFS;
+  private static final Set<String> CALLED = new HashSet<String>();
+
+  public static class MyAuthorizationProvider extends INodeAttributeProvider {
+
+    public static class MyAccessControlEnforcer implements AccessControlEnforcer {
+
+      @Override
+      public void checkPermission(String fsOwner, String supergroup,
+          UserGroupInformation ugi, INodeAttributes[] inodeAttrs,
+          INode[] inodes, byte[][] pathByNameArr, int snapshotId, String path,
+          int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
+          FsAction parentAccess, FsAction access, FsAction subAccess,
+          boolean ignoreEmptyDir) throws AccessControlException {
+        CALLED.add("checkPermission|" + ancestorAccess + "|" + parentAccess + "|" + access);
+      }
+    }
+
+    @Override
+    public void start() {
+      CALLED.add("start");
+    }
+
+    @Override
+    public void stop() {
+      CALLED.add("stop");
+    }
+
+    @Override
+    public INodeAttributes getAttributes(String[] pathElements,
+        final INodeAttributes inode) {
+      CALLED.add("getAttributes");
+      final boolean useDefault = useDefault(pathElements);
+      return new INodeAttributes() {
+        @Override
+        public boolean isDirectory() {
+          return inode.isDirectory();
+        }
+
+        @Override
+        public byte[] getLocalNameBytes() {
+          return inode.getLocalNameBytes();
+        }
+
+        @Override
+        public String getUserName() {
+          return (useDefault) ? inode.getUserName() : "foo";
+        }
+
+        @Override
+        public String getGroupName() {
+          return (useDefault) ? inode.getGroupName() : "bar";
+        }
+
+        @Override
+        public FsPermission getFsPermission() {
+          return (useDefault) ? inode.getFsPermission()
+                              : new FsPermission(getFsPermissionShort());
+        }
+
+        @Override
+        public short getFsPermissionShort() {
+          return (useDefault) ? inode.getFsPermissionShort()
+                              : (short) getPermissionLong();
+        }
+
+        @Override
+        public long getPermissionLong() {
+          return (useDefault) ? inode.getPermissionLong() : 0770;
+        }
+
+        @Override
+        public AclFeature getAclFeature() {
+          AclFeature f;
+          if (useDefault) {
+            f = inode.getAclFeature();
+          } else {
+            AclEntry acl = new AclEntry.Builder().setType(AclEntryType.GROUP).
+                setPermission(FsAction.ALL).setName("xxx").build();
+            f = new AclFeature(AclEntryStatusFormat.toInt(
+                Lists.newArrayList(acl)));
+          }
+          return f;
+        }
+
+        @Override
+        public XAttrFeature getXAttrFeature() {
+          return (useDefault) ? inode.getXAttrFeature() : null;
+        }
+
+        @Override
+        public long getModificationTime() {
+          return (useDefault) ? inode.getModificationTime() : 0;
+        }
+
+        @Override
+        public long getAccessTime() {
+          return (useDefault) ? inode.getAccessTime() : 0;
+        }
+      };
+
+    }
+
+    @Override
+    public AccessControlEnforcer getExternalAccessControlEnforcer(
+        AccessControlEnforcer deafultEnforcer) {
+      return new MyAccessControlEnforcer();
+    }
+
+    private boolean useDefault(String[] pathElements) {
+      return (pathElements.length < 2) ||
+          !(pathElements[0].equals("user") && pathElements[1].equals("authz"));
+    }
+
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    CALLED.clear();
+    Configuration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY,
+        MyAuthorizationProvider.class.getName());
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+    miniDFS = new MiniDFSCluster.Builder(conf).build();
+  }
+
+  @After
+  public void cleanUp() throws IOException {
+    CALLED.clear();
+    if (miniDFS != null) {
+      miniDFS.shutdown();
+    }
+    Assert.assertTrue(CALLED.contains("stop"));
+  }
+
+  @Test
+  public void testDelegationToProvider() throws Exception {
+    Assert.assertTrue(CALLED.contains("start"));
+    FileSystem fs = FileSystem.get(miniDFS.getConfiguration(0));
+    fs.mkdirs(new Path("/tmp"));
+    fs.setPermission(new Path("/tmp"), new FsPermission((short) 0777));
+    UserGroupInformation ugi = UserGroupInformation.createUserForTesting("u1",
+        new String[]{"g1"});
+    ugi.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        FileSystem fs = FileSystem.get(miniDFS.getConfiguration(0));
+        CALLED.clear();
+        fs.mkdirs(new Path("/tmp/foo"));
+        Assert.assertTrue(CALLED.contains("getAttributes"));
+        Assert.assertTrue(CALLED.contains("checkPermission|null|null|null"));
+        Assert.assertTrue(CALLED.contains("checkPermission|WRITE|null|null"));
+        CALLED.clear();
+        fs.listStatus(new Path("/tmp/foo"));
+        Assert.assertTrue(CALLED.contains("getAttributes"));
+        Assert.assertTrue(
+            CALLED.contains("checkPermission|null|null|READ_EXECUTE"));
+        CALLED.clear();
+        fs.getAclStatus(new Path("/tmp/foo"));
+        Assert.assertTrue(CALLED.contains("getAttributes"));
+        Assert.assertTrue(CALLED.contains("checkPermission|null|null|null"));
+        return null;
+      }
+    });
+  }
+
+  @Test
+  public void testCustomProvider() throws Exception {
+    FileSystem fs = FileSystem.get(miniDFS.getConfiguration(0));
+    fs.mkdirs(new Path("/user/xxx"));
+    FileStatus status = fs.getFileStatus(new Path("/user/xxx"));
+    Assert.assertEquals(System.getProperty("user.name"), status.getOwner());
+    Assert.assertEquals("supergroup", status.getGroup());
+    Assert.assertEquals(new FsPermission((short)0755), status.getPermission());
+    fs.mkdirs(new Path("/user/authz"));
+    status = fs.getFileStatus(new Path("/user/authz"));
+    Assert.assertEquals("foo", status.getOwner());
+    Assert.assertEquals("bar", status.getGroup());
+    Assert.assertEquals(new FsPermission((short) 0770), status.getPermission());
+  }
+
+}