Bladeren bron

HDFS-10673. Optimize FSPermissionChecker's internal path usage. Contributed by Daryn Sharp.

(cherry picked from commit 438a9f047eb6af2a4b916a4f6ef6f68adeab8068)
Kihwal Lee 8 jaren geleden
bovenliggende
commit
09b06a6e9e

+ 2 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -1527,10 +1527,7 @@ public class FSDirectory implements Closeable {
   FSPermissionChecker getPermissionChecker(String fsOwner, String superGroup,
   FSPermissionChecker getPermissionChecker(String fsOwner, String superGroup,
       UserGroupInformation ugi) throws AccessControlException {
       UserGroupInformation ugi) throws AccessControlException {
     return new FSPermissionChecker(
     return new FSPermissionChecker(
-        fsOwner, superGroup, ugi,
-        attributeProvider == null ?
-            DefaultINodeAttributesProvider.DEFAULT_PROVIDER
-            : attributeProvider);
+        fsOwner, superGroup, ugi, attributeProvider);
   }
   }
 
 
   void checkOwner(FSPermissionChecker pc, INodesInPath iip)
   void checkOwner(FSPermissionChecker pc, INodesInPath iip)
@@ -1660,15 +1657,12 @@ public class FSDirectory implements Closeable {
 
 
   INodeAttributes getAttributes(String fullPath, byte[] path,
   INodeAttributes getAttributes(String fullPath, byte[] path,
       INode node, int snapshot) {
       INode node, int snapshot) {
-    INodeAttributes nodeAttrs;
+    INodeAttributes nodeAttrs = node.getSnapshotINode(snapshot);
     if (attributeProvider != null) {
     if (attributeProvider != null) {
-      nodeAttrs = node.getSnapshotINode(snapshot);
       fullPath = fullPath
       fullPath = fullPath
           + (fullPath.endsWith(Path.SEPARATOR) ? "" : Path.SEPARATOR)
           + (fullPath.endsWith(Path.SEPARATOR) ? "" : Path.SEPARATOR)
           + DFSUtil.bytes2String(path);
           + DFSUtil.bytes2String(path);
       nodeAttrs = attributeProvider.getAttributes(fullPath, nodeAttrs);
       nodeAttrs = attributeProvider.getAttributes(fullPath, nodeAttrs);
-    } else {
-      nodeAttrs = node.getSnapshotINode(snapshot);
     }
     }
     return nodeAttrs;
     return nodeAttrs;
   }
   }

+ 86 - 89
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -45,15 +45,23 @@ import org.apache.hadoop.security.UserGroupInformation;
 class FSPermissionChecker implements AccessControlEnforcer {
 class FSPermissionChecker implements AccessControlEnforcer {
   static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
   static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
 
 
+  private static String constructPath(INodeAttributes[] inodes, int end) {
+    byte[][] components = new byte[end+1][];
+    for (int i=0; i <= end; i++) {
+      components[i] = inodes[i].getLocalNameBytes();
+    }
+    return DFSUtil.byteArray2PathString(components);
+  }
+
   /** @return a string for throwing {@link AccessControlException} */
   /** @return a string for throwing {@link AccessControlException} */
   private String toAccessControlString(INodeAttributes inodeAttrib, String path,
   private String toAccessControlString(INodeAttributes inodeAttrib, String path,
-      FsAction access, FsPermission mode) {
-    return toAccessControlString(inodeAttrib, path, access, mode, false);
+      FsAction access) {
+    return toAccessControlString(inodeAttrib, path, access, false);
   }
   }
 
 
   /** @return a string for throwing {@link AccessControlException} */
   /** @return a string for throwing {@link AccessControlException} */
   private String toAccessControlString(INodeAttributes inodeAttrib,
   private String toAccessControlString(INodeAttributes inodeAttrib,
-      String path, FsAction access, FsPermission mode, boolean deniedFromAcl) {
+      String path, FsAction access, boolean deniedFromAcl) {
     StringBuilder sb = new StringBuilder("Permission denied: ")
     StringBuilder sb = new StringBuilder("Permission denied: ")
       .append("user=").append(getUser()).append(", ")
       .append("user=").append(getUser()).append(", ")
       .append("access=").append(access).append(", ")
       .append("access=").append(access).append(", ")
@@ -61,7 +69,7 @@ class FSPermissionChecker implements AccessControlEnforcer {
       .append(inodeAttrib.getUserName()).append(':')
       .append(inodeAttrib.getUserName()).append(':')
       .append(inodeAttrib.getGroupName()).append(':')
       .append(inodeAttrib.getGroupName()).append(':')
       .append(inodeAttrib.isDirectory() ? 'd' : '-')
       .append(inodeAttrib.isDirectory() ? 'd' : '-')
-      .append(mode);
+      .append(inodeAttrib.getFsPermission());
     if (deniedFromAcl) {
     if (deniedFromAcl) {
       sb.append("+");
       sb.append("+");
     }
     }
@@ -112,6 +120,11 @@ class FSPermissionChecker implements AccessControlEnforcer {
     return attributeProvider;
     return attributeProvider;
   }
   }
 
 
+  private AccessControlEnforcer getAccessControlEnforcer() {
+    return (attributeProvider != null)
+        ? attributeProvider.getExternalAccessControlEnforcer(this) : this;
+  }
+
   /**
   /**
    * Verify if the caller has the required permission. This will result into 
    * Verify if the caller has the required permission. This will result into 
    * an exception if the caller is not allowed to access the resource.
    * an exception if the caller is not allowed to access the resource.
@@ -174,47 +187,24 @@ class FSPermissionChecker implements AccessControlEnforcer {
     final int snapshotId = inodesInPath.getPathSnapshotId();
     final int snapshotId = inodesInPath.getPathSnapshotId();
     final INode[] inodes = inodesInPath.getINodesArray();
     final INode[] inodes = inodesInPath.getINodesArray();
     final INodeAttributes[] inodeAttrs = new INodeAttributes[inodes.length];
     final INodeAttributes[] inodeAttrs = new INodeAttributes[inodes.length];
-    final byte[][] pathByNameArr = new byte[inodes.length][];
+    final byte[][] components = inodesInPath.getPathComponents();
     for (int i = 0; i < inodes.length && inodes[i] != null; i++) {
     for (int i = 0; i < inodes.length && inodes[i] != null; i++) {
-      if (inodes[i] != null) {
-        pathByNameArr[i] = inodes[i].getLocalNameBytes();
-        inodeAttrs[i] = getINodeAttrs(pathByNameArr, i, inodes[i], snapshotId);
-      }
+      inodeAttrs[i] = getINodeAttrs(components, i, inodes[i], snapshotId);
     }
     }
 
 
     String path = inodesInPath.getPath();
     String path = inodesInPath.getPath();
     int ancestorIndex = inodes.length - 2;
     int ancestorIndex = inodes.length - 2;
 
 
-    AccessControlEnforcer enforcer =
-        getAttributesProvider().getExternalAccessControlEnforcer(this);
+    AccessControlEnforcer enforcer = getAccessControlEnforcer();
     enforcer.checkPermission(fsOwner, supergroup, callerUgi, inodeAttrs, inodes,
     enforcer.checkPermission(fsOwner, supergroup, callerUgi, inodeAttrs, inodes,
-        pathByNameArr, snapshotId, path, ancestorIndex, doCheckOwner,
+        components, snapshotId, path, ancestorIndex, doCheckOwner,
         ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir);
         ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir);
   }
   }
 
 
-  /**
-   * Check whether exception e is due to an ancestor inode's not being
-   * directory.
-   */
-  private void checkAncestorType(INode[] inodes, int checkedAncestorIndex,
-      AccessControlException e) throws AccessControlException {
-    for (int i = 0; i <= checkedAncestorIndex; i++) {
-      if (inodes[i] == null) {
-        break;
-      }
-      if (!inodes[i].isDirectory()) {
-        throw new AccessControlException(
-            e.getMessage() + " (Ancestor " + inodes[i].getFullPathName()
-                + " is not a directory).");
-      }
-    }
-    throw e;
-  }
-
   @Override
   @Override
   public void checkPermission(String fsOwner, String supergroup,
   public void checkPermission(String fsOwner, String supergroup,
       UserGroupInformation callerUgi, INodeAttributes[] inodeAttrs,
       UserGroupInformation callerUgi, INodeAttributes[] inodeAttrs,
-      INode[] inodes, byte[][] pathByNameArr, int snapshotId, String path,
+      INode[] inodes, byte[][] components, int snapshotId, String path,
       int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
       int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
       FsAction parentAccess, FsAction access, FsAction subAccess,
       FsAction parentAccess, FsAction access, FsAction subAccess,
       boolean ignoreEmptyDir)
       boolean ignoreEmptyDir)
@@ -222,29 +212,29 @@ class FSPermissionChecker implements AccessControlEnforcer {
     for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
     for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
         ancestorIndex--);
         ancestorIndex--);
 
 
-    checkTraverse(inodeAttrs, inodes, path, ancestorIndex);
+    checkTraverse(inodeAttrs, ancestorIndex);
 
 
     final INodeAttributes last = inodeAttrs[inodeAttrs.length - 1];
     final INodeAttributes last = inodeAttrs[inodeAttrs.length - 1];
     if (parentAccess != null && parentAccess.implies(FsAction.WRITE)
     if (parentAccess != null && parentAccess.implies(FsAction.WRITE)
         && inodeAttrs.length > 1 && last != null) {
         && inodeAttrs.length > 1 && last != null) {
-      checkStickyBit(inodeAttrs[inodeAttrs.length - 2], last, path);
+      checkStickyBit(inodeAttrs, inodeAttrs.length - 2);
     }
     }
     if (ancestorAccess != null && inodeAttrs.length > 1) {
     if (ancestorAccess != null && inodeAttrs.length > 1) {
-      check(inodeAttrs, path, ancestorIndex, ancestorAccess);
+      check(inodeAttrs, ancestorIndex, ancestorAccess);
     }
     }
     if (parentAccess != null && inodeAttrs.length > 1) {
     if (parentAccess != null && inodeAttrs.length > 1) {
-      check(inodeAttrs, path, inodeAttrs.length - 2, parentAccess);
+      check(inodeAttrs, inodeAttrs.length - 2, parentAccess);
     }
     }
     if (access != null) {
     if (access != null) {
-      check(last, path, access);
+      check(inodeAttrs, inodeAttrs.length - 1, access);
     }
     }
     if (subAccess != null) {
     if (subAccess != null) {
       INode rawLast = inodes[inodeAttrs.length - 1];
       INode rawLast = inodes[inodeAttrs.length - 1];
-      checkSubAccess(pathByNameArr, inodeAttrs.length - 1, rawLast,
+      checkSubAccess(components, inodeAttrs.length - 1, rawLast,
           snapshotId, subAccess, ignoreEmptyDir);
           snapshotId, subAccess, ignoreEmptyDir);
     }
     }
     if (doCheckOwner) {
     if (doCheckOwner) {
-      checkOwner(last);
+      checkOwner(inodeAttrs, inodeAttrs.length - 1);
     }
     }
   }
   }
 
 
@@ -262,32 +252,35 @@ class FSPermissionChecker implements AccessControlEnforcer {
   }
   }
 
 
   /** Guarded by {@link FSNamesystem#readLock()} */
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkOwner(INodeAttributes inode
-      ) throws AccessControlException {
-    if (getUser().equals(inode.getUserName())) {
+  private void checkOwner(INodeAttributes[] inodes, int i)
+      throws AccessControlException {
+    if (getUser().equals(inodes[i].getUserName())) {
       return;
       return;
     }
     }
     throw new AccessControlException(
     throw new AccessControlException(
-            "Permission denied. user="
-            + getUser() + " is not the owner of inode=" + inode);
+        "Permission denied. user=" + getUser() +
+        " is not the owner of inode=" + constructPath(inodes, i));
   }
   }
 
 
   /** Guarded by {@link FSNamesystem#readLock()} */
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkTraverse(INodeAttributes[] inodeAttrs, INode[] inodes,
-      String path, int last) throws AccessControlException {
-    int j = 0;
-    try {
-      for (; j <= last; j++) {
-        check(inodeAttrs[j], path, FsAction.EXECUTE);
+  private void checkTraverse(INodeAttributes[] inodeAttrs, int last)
+      throws AccessControlException {
+    for (int i=0; i <= last; i++) {
+      INodeAttributes inode = inodeAttrs[i];
+      if (!inode.isDirectory()) {
+        throw new AccessControlException(
+            constructPath(inodeAttrs, i) + " (is not a directory)");
+      }
+      if (!hasPermission(inode, FsAction.EXECUTE)) {
+        throw new AccessControlException(toAccessControlString(
+            inode, constructPath(inodeAttrs, i), FsAction.EXECUTE));
       }
       }
-    } catch (AccessControlException e) {
-      checkAncestorType(inodes, j, e);
     }
     }
   }
   }
 
 
   /** Guarded by {@link FSNamesystem#readLock()} */
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkSubAccess(byte[][] pathByNameArr, int pathIdx, INode inode,
-      int snapshotId, FsAction access, boolean ignoreEmptyDir)
+  private void checkSubAccess(byte[][] components, int pathIdx,
+      INode inode, int snapshotId, FsAction access, boolean ignoreEmptyDir)
       throws AccessControlException {
       throws AccessControlException {
     if (inode == null || !inode.isDirectory()) {
     if (inode == null || !inode.isDirectory()) {
       return;
       return;
@@ -299,8 +292,12 @@ class FSPermissionChecker implements AccessControlEnforcer {
       ReadOnlyList<INode> cList = d.getChildrenList(snapshotId);
       ReadOnlyList<INode> cList = d.getChildrenList(snapshotId);
       if (!(cList.isEmpty() && ignoreEmptyDir)) {
       if (!(cList.isEmpty() && ignoreEmptyDir)) {
         //TODO have to figure this out with inodeattribute provider
         //TODO have to figure this out with inodeattribute provider
-        check(getINodeAttrs(pathByNameArr, pathIdx, d, snapshotId),
-            inode.getFullPathName(), access);
+        INodeAttributes inodeAttr =
+            getINodeAttrs(components, pathIdx, d, snapshotId);
+        if (!hasPermission(inodeAttr, access)) {
+          throw new AccessControlException(
+              toAccessControlString(inodeAttr, d.getFullPathName(), access));
+        }
       }
       }
 
 
       for(INode child : cList) {
       for(INode child : cList) {
@@ -312,15 +309,21 @@ class FSPermissionChecker implements AccessControlEnforcer {
   }
   }
 
 
   /** Guarded by {@link FSNamesystem#readLock()} */
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void check(INodeAttributes[] inodes, String path, int i, FsAction access
-      ) throws AccessControlException {
-    check(i >= 0 ? inodes[i] : null, path, access);
+  private void check(INodeAttributes[] inodes, int i, FsAction access)
+      throws AccessControlException {
+    INodeAttributes inode = (i >= 0) ? inodes[i] : null;
+    if (inode != null && !hasPermission(inode, access)) {
+      throw new AccessControlException(
+          toAccessControlString(inode, constructPath(inodes, i), access));
+    }
   }
   }
 
 
-  private void check(INodeAttributes inode, String path, FsAction access
-      ) throws AccessControlException {
+  // return whether access is permitted.  note it neither requires a path or
+  // throws so the caller can build the path only if required for an exception.
+  // very beneficial for subaccess checks!
+  private boolean hasPermission(INodeAttributes inode, FsAction access) {
     if (inode == null) {
     if (inode == null) {
-      return;
+      return true;
     }
     }
     final FsPermission mode = inode.getFsPermission();
     final FsPermission mode = inode.getFsPermission();
     final AclFeature aclFeature = inode.getAclFeature();
     final AclFeature aclFeature = inode.getAclFeature();
@@ -328,21 +331,18 @@ class FSPermissionChecker implements AccessControlEnforcer {
       // It's possible that the inode has a default ACL but no access ACL.
       // It's possible that the inode has a default ACL but no access ACL.
       int firstEntry = aclFeature.getEntryAt(0);
       int firstEntry = aclFeature.getEntryAt(0);
       if (AclEntryStatusFormat.getScope(firstEntry) == AclEntryScope.ACCESS) {
       if (AclEntryStatusFormat.getScope(firstEntry) == AclEntryScope.ACCESS) {
-        checkAccessAcl(inode, path, access, mode, aclFeature);
-        return;
+        return hasAclPermission(inode, access, mode, aclFeature);
       }
       }
     }
     }
+    final FsAction checkAction;
     if (getUser().equals(inode.getUserName())) { //user class
     if (getUser().equals(inode.getUserName())) { //user class
-      if (mode.getUserAction().implies(access)) { return; }
-    }
-    else if (getGroups().contains(inode.getGroupName())) { //group class
-      if (mode.getGroupAction().implies(access)) { return; }
+      checkAction = mode.getUserAction();
+    } else if (getGroups().contains(inode.getGroupName())) { //group class
+      checkAction = mode.getGroupAction();
+    } else { //other class
+      checkAction = mode.getOtherAction();
     }
     }
-    else { //other class
-      if (mode.getOtherAction().implies(access)) { return; }
-    }
-    throw new AccessControlException(
-        toAccessControlString(inode, path, access, mode));
+    return checkAction.implies(access);
   }
   }
 
 
   /**
   /**
@@ -368,15 +368,14 @@ class FSPermissionChecker implements AccessControlEnforcer {
    * @param aclFeature AclFeature of inode
    * @param aclFeature AclFeature of inode
    * @throws AccessControlException if the ACL denies permission
    * @throws AccessControlException if the ACL denies permission
    */
    */
-  private void checkAccessAcl(INodeAttributes inode, String path,
-      FsAction access, FsPermission mode, AclFeature aclFeature)
-      throws AccessControlException {
+  private boolean hasAclPermission(INodeAttributes inode,
+      FsAction access, FsPermission mode, AclFeature aclFeature) {
     boolean foundMatch = false;
     boolean foundMatch = false;
 
 
     // Use owner entry from permission bits if user is owner.
     // Use owner entry from permission bits if user is owner.
     if (getUser().equals(inode.getUserName())) {
     if (getUser().equals(inode.getUserName())) {
       if (mode.getUserAction().implies(access)) {
       if (mode.getUserAction().implies(access)) {
-        return;
+        return true;
       }
       }
       foundMatch = true;
       foundMatch = true;
     }
     }
@@ -397,7 +396,7 @@ class FSPermissionChecker implements AccessControlEnforcer {
             FsAction masked = AclEntryStatusFormat.getPermission(entry).and(
             FsAction masked = AclEntryStatusFormat.getPermission(entry).and(
                 mode.getGroupAction());
                 mode.getGroupAction());
             if (masked.implies(access)) {
             if (masked.implies(access)) {
-              return;
+              return true;
             }
             }
             foundMatch = true;
             foundMatch = true;
             break;
             break;
@@ -412,7 +411,7 @@ class FSPermissionChecker implements AccessControlEnforcer {
             FsAction masked = AclEntryStatusFormat.getPermission(entry).and(
             FsAction masked = AclEntryStatusFormat.getPermission(entry).and(
                 mode.getGroupAction());
                 mode.getGroupAction());
             if (masked.implies(access)) {
             if (masked.implies(access)) {
-              return;
+              return true;
             }
             }
             foundMatch = true;
             foundMatch = true;
           }
           }
@@ -421,17 +420,13 @@ class FSPermissionChecker implements AccessControlEnforcer {
     }
     }
 
 
     // Use other entry if user was not denied by an earlier match.
     // Use other entry if user was not denied by an earlier match.
-    if (!foundMatch && mode.getOtherAction().implies(access)) {
-      return;
-    }
-
-    throw new AccessControlException(
-        toAccessControlString(inode, path, access, mode));
+    return !foundMatch && mode.getOtherAction().implies(access);
   }
   }
 
 
   /** Guarded by {@link FSNamesystem#readLock()} */
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkStickyBit(INodeAttributes parent, INodeAttributes inode,
-      String path) throws AccessControlException {
+  private void checkStickyBit(INodeAttributes[] inodes, int index)
+      throws AccessControlException {
+    INodeAttributes parent = inodes[index];
     if (!parent.getFsPermission().getStickyBit()) {
     if (!parent.getFsPermission().getStickyBit()) {
       return;
       return;
     }
     }
@@ -441,6 +436,7 @@ class FSPermissionChecker implements AccessControlEnforcer {
       return;
       return;
     }
     }
 
 
+    INodeAttributes inode = inodes[index + 1];
     // if this user is the file owner, return
     // if this user is the file owner, return
     if (inode.getUserName().equals(getUser())) {
     if (inode.getUserName().equals(getUser())) {
       return;
       return;
@@ -449,9 +445,10 @@ class FSPermissionChecker implements AccessControlEnforcer {
     throw new AccessControlException(String.format(
     throw new AccessControlException(String.format(
         "Permission denied by sticky bit: user=%s, path=\"%s\":%s:%s:%s%s, " +
         "Permission denied by sticky bit: user=%s, path=\"%s\":%s:%s:%s%s, " +
         "parent=\"%s\":%s:%s:%s%s", user,
         "parent=\"%s\":%s:%s:%s%s", user,
-        path, inode.getUserName(), inode.getGroupName(),
+        constructPath(inodes, index + 1),
+        inode.getUserName(), inode.getGroupName(),
         inode.isDirectory() ? "d" : "-", inode.getFsPermission().toString(),
         inode.isDirectory() ? "d" : "-", inode.getFsPermission().toString(),
-        path.substring(0, path.length() - inode.toString().length() - 1 ),
+        constructPath(inodes, index),
         parent.getUserName(), parent.getGroupName(),
         parent.getUserName(), parent.getGroupName(),
         parent.isDirectory() ? "d" : "-", parent.getFsPermission().toString()));
         parent.isDirectory() ? "d" : "-", parent.getFsPermission().toString()));
   }
   }

+ 13 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java

@@ -589,7 +589,19 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
     }
     }
     return DFSUtil.bytes2String(path);
     return DFSUtil.bytes2String(path);
   }
   }
-  
+
+  public byte[][] getPathComponents() {
+    int n = 0;
+    for (INode inode = this; inode != null; inode = inode.getParent()) {
+      n++;
+    }
+    byte[][] components = new byte[n][];
+    for (INode inode = this; inode != null; inode = inode.getParent()) {
+      components[--n] = inode.getLocalNameBytes();
+    }
+    return components;
+  }
+
   @Override
   @Override
   public String toString() {
   public String toString() {
     return getLocalName();
     return getLocalName();

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributeProvider.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 
 
@@ -121,6 +122,15 @@ public abstract class INodeAttributeProvider {
   public abstract INodeAttributes getAttributes(String[] pathElements,
   public abstract INodeAttributes getAttributes(String[] pathElements,
       INodeAttributes inode);
       INodeAttributes inode);
 
 
+  public INodeAttributes getAttributes(byte[][] components,
+      INodeAttributes inode) {
+    String[] elements = new String[components.length];
+    for (int i = 0; i < elements.length; i++) {
+      elements[i] = DFSUtil.bytes2String(components[i]);
+    }
+    return getAttributes(elements, inode);
+  }
+
   /**
   /**
    * Can be over-ridden by implementations to provide a custom Access Control
    * Can be over-ridden by implementations to provide a custom Access Control
    * Enforcer that can provide an alternate implementation of the
    * Enforcer that can provide an alternate implementation of the

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java

@@ -566,11 +566,12 @@ public class TestDFSPermission {
       fs.exists(nfpath);
       fs.exists(nfpath);
       fail("The exists call should have failed.");
       fail("The exists call should have failed.");
     } catch (AccessControlException e) {
     } catch (AccessControlException e) {
-      assertTrue("Permission denied messages must carry file path",
+      assertFalse("Permission denied messages must not carry full file path,"
+              + "since the user does not have permission on /p4: "
+              + e.getMessage(),
           e.getMessage().contains(fpath.getName()));
           e.getMessage().contains(fpath.getName()));
-      assertFalse("Permission denied messages should not specify existing_file"
-              + " is not a directory, since the user does not have permission"
-              + " on /p4",
+      assertFalse("Permission denied messages must not specify /p4"
+          + " is not a directory: " + e.getMessage(),
           e.getMessage().contains("is not a directory"));
           e.getMessage().contains("is not a directory"));
     }
     }
   }
   }