浏览代码

HDFS-5923. Do not persist the ACL bit in the FsPermission. Contributed by Haohui Mai.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4685@1567784 13f79535-47bb-0310-9956-ffa450edef68
Chris Nauroth 11 年之前
父节点
当前提交
fc14360b03
共有 13 个文件被更改,包括 292 次插入313 次删除
  1. 14 52
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/FsPermission.java
  2. 4 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/AclCommands.java
  3. 19 40
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/permission/TestFsPermission.java
  4. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4685.txt
  5. 61 78
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
  6. 0 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  7. 11 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
  8. 88 36
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
  9. 2 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
  10. 19 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/AclTestHelpers.java
  11. 35 36
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
  12. 6 20
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithAcl.java
  13. 30 30
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java

+ 14 - 52
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/FsPermission.java

@@ -58,7 +58,6 @@ public class FsPermission implements Writable {
   private FsAction groupaction = null;
   private FsAction otheraction = null;
   private boolean stickyBit = false;
-  private boolean aclBit = false;
 
   private FsPermission() {}
 
@@ -73,20 +72,7 @@ public class FsPermission implements Writable {
   }
 
   public FsPermission(FsAction u, FsAction g, FsAction o, boolean sb) {
-    this(u, g, o, sb, false);
-  }
-
-  /**
-   * Construct by the given {@link FsAction} and special bits.
-   * @param u user action
-   * @param g group action
-   * @param o other action
-   * @param sb sticky bit
-   * @param ab ACL bit
-   */
-  public FsPermission(FsAction u, FsAction g, FsAction o, boolean sb,
-      boolean ab) {
-    set(u, g, o, sb, ab);
+    set(u, g, o, sb);
   }
 
   /**
@@ -106,7 +92,6 @@ public class FsPermission implements Writable {
     this.groupaction = other.groupaction;
     this.otheraction = other.otheraction;
     this.stickyBit = other.stickyBit;
-    this.aclBit = other.aclBit;
   }
   
   /**
@@ -127,18 +112,16 @@ public class FsPermission implements Writable {
   /** Return other {@link FsAction}. */
   public FsAction getOtherAction() {return otheraction;}
 
-  private void set(FsAction u, FsAction g, FsAction o, boolean sb, boolean ab) {
+  private void set(FsAction u, FsAction g, FsAction o, boolean sb) {
     useraction = u;
     groupaction = g;
     otheraction = o;
     stickyBit = sb;
-    aclBit = ab;
   }
 
   public void fromShort(short n) {
     FsAction[] v = FSACTION_VALUES;
-    set(v[(n >>> 6) & 7], v[(n >>> 3) & 7], v[n & 7], (((n >>> 9) & 1) == 1),
-      (((n >>> 10) & 1) == 1) );
+    set(v[(n >>> 6) & 7], v[(n >>> 3) & 7], v[n & 7], (((n >>> 9) & 1) == 1) );
   }
 
   @Override
@@ -164,8 +147,7 @@ public class FsPermission implements Writable {
    * Encode the object to a short.
    */
   public short toShort() {
-    int s =  (aclBit ? 1 << 10 : 0)       |
-             (stickyBit ? 1 << 9 : 0)     |
+    int s =  (stickyBit ? 1 << 9 : 0)     |
              (useraction.ordinal() << 6)  |
              (groupaction.ordinal() << 3) |
              otheraction.ordinal();
@@ -180,8 +162,7 @@ public class FsPermission implements Writable {
       return this.useraction == that.useraction
           && this.groupaction == that.groupaction
           && this.otheraction == that.otheraction
-          && this.stickyBit == that.stickyBit
-          && this.aclBit == that.aclBit;
+          && this.stickyBit == that.stickyBit;
     }
     return false;
   }
@@ -191,19 +172,15 @@ public class FsPermission implements Writable {
 
   @Override
   public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append(useraction.SYMBOL);
-    sb.append(groupaction.SYMBOL);
-    sb.append(otheraction.SYMBOL);
-    if (stickyBit) {
-      sb.replace(sb.length() - 1, sb.length(),
+    String str = useraction.SYMBOL + groupaction.SYMBOL + otheraction.SYMBOL;
+    if(stickyBit) {
+      StringBuilder str2 = new StringBuilder(str);
+      str2.replace(str2.length() - 1, str2.length(),
            otheraction.implies(FsAction.EXECUTE) ? "t" : "T");
-    }
-    if (aclBit) {
-      sb.append('+');
+      str = str2.toString();
     }
 
-    return sb.toString();
+    return str;
   }
 
   /**
@@ -293,15 +270,6 @@ public class FsPermission implements Writable {
     return stickyBit;
   }
 
-  /**
-   * Returns true if there is also an ACL (access control list).
-   *
-   * @return boolean true if there is also an ACL (access control list).
-   */
-  public boolean getAclBit() {
-    return aclBit;
-  }
-
   /** Set the user file creation mask (umask) */
   public static void setUMask(Configuration conf, FsPermission umask) {
     conf.set(UMASK_LABEL, String.format("%1$03o", umask.toShort()));
@@ -351,14 +319,13 @@ public class FsPermission implements Writable {
     if (unixSymbolicPermission == null) {
       return null;
     }
-    else if (unixSymbolicPermission.length() != 10 &&
-        unixSymbolicPermission.length() != 11) {
-      throw new IllegalArgumentException("invalid length(unixSymbolicPermission="
+    else if (unixSymbolicPermission.length() != 10) {
+      throw new IllegalArgumentException("length != 10(unixSymbolicPermission="
           + unixSymbolicPermission + ")");
     }
 
     int n = 0;
-    for(int i = 1; i < 10; i++) {
+    for(int i = 1; i < unixSymbolicPermission.length(); i++) {
       n = n << 1;
       char c = unixSymbolicPermission.charAt(i);
       n += (c == '-' || c == 'T' || c == 'S') ? 0: 1;
@@ -369,11 +336,6 @@ public class FsPermission implements Writable {
         unixSymbolicPermission.charAt(9) == 'T')
       n += 01000;
 
-    // Add ACL bit value if set
-    if (unixSymbolicPermission.length() == 11 &&
-         unixSymbolicPermission.charAt(10) == '+')
-      n += (1 << 10);
-
     return new FsPermission((short)n);
   }
   

+ 4 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/AclCommands.java

@@ -92,11 +92,12 @@ class AclCommands extends FsCommand {
       }
 
       FsPermission perm = item.stat.getPermission();
-      if (perm.getAclBit()) {
-        printExtendedAcl(perm, entries);
-      } else {
+      if (entries.isEmpty()) {
         printMinimalAcl(perm);
+      } else {
+        printExtendedAcl(perm, entries);
       }
+
       out.println();
     }
 

+ 19 - 40
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/permission/TestFsPermission.java

@@ -54,7 +54,7 @@ public class TestFsPermission extends TestCase {
    * the expected values back out for all combinations
    */
   public void testConvertingPermissions() {
-    for(short s = 0; s <= 03777; s++) {
+    for(short s = 0; s <= 01777; s++) {
       assertEquals(s, new FsPermission(s).toShort());
     }
 
@@ -70,15 +70,6 @@ public class TestFsPermission extends TestCase {
             FsPermission f2 = new FsPermission(f);
             assertEquals(s, f2.toShort());
 
-            // Cover constructor with sticky bit and ACL bit.
-            for(boolean ab : new boolean [] { false, true }) {
-              short s2 = (short)(ab ? s | (1 << 10) : s);
-              FsPermission f3 = new FsPermission(u, g, o, sb, ab);
-              assertEquals(s2, f3.toShort());
-              FsPermission f4 = new FsPermission(f3);
-              assertEquals(s2, f4.toShort());
-            }
-
             s++;
           }
         }
@@ -87,34 +78,27 @@ public class TestFsPermission extends TestCase {
   }
 
   public void testSpecialBitsToString() {
-    for(boolean ab : new boolean [] { false, true }) {
-      for(boolean sb : new boolean [] { false, true }) {
-        for(FsAction u : FsAction.values()) {
-          for(FsAction g : FsAction.values()) {
-            for(FsAction o : FsAction.values()) {
-              FsPermission f = new FsPermission(u, g, o, sb, ab);
-              String fString = f.toString();
+    for (boolean sb : new boolean[] { false, true }) {
+      for (FsAction u : FsAction.values()) {
+        for (FsAction g : FsAction.values()) {
+          for (FsAction o : FsAction.values()) {
+            FsPermission f = new FsPermission(u, g, o, sb);
+            String fString = f.toString();
 
-              // Check that sticky bit is represented correctly.
-              if(f.getStickyBit() && f.getOtherAction().implies(EXECUTE))
-                assertEquals('t', fString.charAt(8));
-              else if(f.getStickyBit() && !f.getOtherAction().implies(EXECUTE))
-                assertEquals('T', fString.charAt(8));
-              else if(!f.getStickyBit()  && f.getOtherAction().implies(EXECUTE))
-                assertEquals('x', fString.charAt(8));
-              else
-                assertEquals('-', fString.charAt(8));
+            // Check that sticky bit is represented correctly.
+            if (f.getStickyBit() && f.getOtherAction().implies(EXECUTE))
+              assertEquals('t', fString.charAt(8));
+            else if (f.getStickyBit() && !f.getOtherAction().implies(EXECUTE))
+              assertEquals('T', fString.charAt(8));
+            else if (!f.getStickyBit() && f.getOtherAction().implies(EXECUTE))
+              assertEquals('x', fString.charAt(8));
+            else
+              assertEquals('-', fString.charAt(8));
 
-              // Check that ACL bit is represented correctly.
-              if (f.getAclBit()) {
-                assertEquals(10, fString.length());
-                assertEquals('+', fString.charAt(9));
-              } else {
-                assertEquals(9, fString.length());
-              }
-            }
+            assertEquals(9, fString.length());
           }
         }
+
       }
     }
   }
@@ -122,7 +106,7 @@ public class TestFsPermission extends TestCase {
   public void testFsPermission() {
     String symbolic = "-rwxrwxrwx";
 
-    for(int i = 0; i < (1 << 11); i++) {
+    for(int i = 0; i < (1 << 10); i++) {
       StringBuilder b = new StringBuilder("----------");
       String binary = String.format("%11s", Integer.toBinaryString(i));
       String permBinary = binary.substring(2, binary.length());
@@ -141,11 +125,6 @@ public class TestFsPermission extends TestCase {
         b.setCharAt(9, replacement);
       }
 
-      // Check for ACL bit.
-      if (binary.charAt(0) == '1') {
-        b.append('+');
-      }
-
       assertEquals(i, FsPermission.valueOf(b.toString()).toShort());
     }
   }

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4685.txt

@@ -75,6 +75,9 @@ HDFS-4685 (Unreleased)
     HDFS-5925. ACL configuration flag must only reject ACL API calls, not ACLs
     present in fsimage or edits. (cnauroth)
 
+    HDFS-5923. Do not persist the ACL bit in the FsPermission.
+    (Haohui Mai via cnauroth)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 61 - 78
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -67,12 +68,8 @@ final class AclStorage {
    */
   public static void copyINodeDefaultAcl(INode child) {
     INodeDirectory parent = child.getParent();
-    if (!parent.getFsPermission().getAclBit()) {
-      return;
-    }
-
-    // The default ACL is applicable to new child files and directories only.
-    if (!child.isFile() && !child.isDirectory()) {
+    AclFeature parentAclFeature = parent.getAclFeature();
+    if (parentAclFeature == null || !(child.isFile() || child.isDirectory())) {
       return;
     }
 
@@ -153,12 +150,8 @@ final class AclStorage {
    * @return List<AclEntry> containing extended inode ACL entries
    */
   public static List<AclEntry> readINodeAcl(INode inode, int snapshotId) {
-    FsPermission perm = inode.getFsPermission(snapshotId);
-    if (perm.getAclBit()) {
-      return inode.getAclFeature(snapshotId).getEntries();
-    } else {
-      return Collections.emptyList();
-    }
+    AclFeature f = inode.getAclFeature(snapshotId);
+    return f == null ? ImmutableList.<AclEntry> of() : f.getEntries();
   }
 
   /**
@@ -176,57 +169,51 @@ final class AclStorage {
    * @return List<AclEntry> containing all logical inode ACL entries
    */
   public static List<AclEntry> readINodeLogicalAcl(INode inode) {
-    final List<AclEntry> existingAcl;
     FsPermission perm = inode.getFsPermission();
-    if (perm.getAclBit()) {
-      // Split ACL entries stored in the feature into access vs. default.
-      List<AclEntry> featureEntries = inode.getAclFeature().getEntries();
-      ScopedAclEntries scoped = new ScopedAclEntries(featureEntries);
-      List<AclEntry> accessEntries = scoped.getAccessEntries();
-      List<AclEntry> defaultEntries = scoped.getDefaultEntries();
+    AclFeature f = inode.getAclFeature();
+    if (f == null) {
+      return getMinimalAcl(perm);
+    }
 
-      // Pre-allocate list size for the explicit entries stored in the feature
-      // plus the 3 implicit entries (owner, group and other) from the permission
-      // bits.
-      existingAcl = Lists.newArrayListWithCapacity(featureEntries.size() + 3);
-
-      if (!accessEntries.isEmpty()) {
-        // Add owner entry implied from user permission bits.
-        existingAcl.add(new AclEntry.Builder()
-          .setScope(AclEntryScope.ACCESS)
-          .setType(AclEntryType.USER)
-          .setPermission(perm.getUserAction())
+    final List<AclEntry> existingAcl;
+    // Split ACL entries stored in the feature into access vs. default.
+    List<AclEntry> featureEntries = f.getEntries();
+    ScopedAclEntries scoped = new ScopedAclEntries(featureEntries);
+    List<AclEntry> accessEntries = scoped.getAccessEntries();
+    List<AclEntry> defaultEntries = scoped.getDefaultEntries();
+
+    // Pre-allocate list size for the explicit entries stored in the feature
+    // plus the 3 implicit entries (owner, group and other) from the permission
+    // bits.
+    existingAcl = Lists.newArrayListWithCapacity(featureEntries.size() + 3);
+
+    if (!accessEntries.isEmpty()) {
+      // Add owner entry implied from user permission bits.
+      existingAcl.add(new AclEntry.Builder().setScope(AclEntryScope.ACCESS)
+          .setType(AclEntryType.USER).setPermission(perm.getUserAction())
           .build());
 
-        // Next add all named user and group entries taken from the feature.
-        existingAcl.addAll(accessEntries);
+      // Next add all named user and group entries taken from the feature.
+      existingAcl.addAll(accessEntries);
 
-        // Add mask entry implied from group permission bits.
-        existingAcl.add(new AclEntry.Builder()
-          .setScope(AclEntryScope.ACCESS)
-          .setType(AclEntryType.MASK)
-          .setPermission(perm.getGroupAction())
+      // Add mask entry implied from group permission bits.
+      existingAcl.add(new AclEntry.Builder().setScope(AclEntryScope.ACCESS)
+          .setType(AclEntryType.MASK).setPermission(perm.getGroupAction())
           .build());
 
-        // Add other entry implied from other permission bits.
-        existingAcl.add(new AclEntry.Builder()
-          .setScope(AclEntryScope.ACCESS)
-          .setType(AclEntryType.OTHER)
-          .setPermission(perm.getOtherAction())
+      // Add other entry implied from other permission bits.
+      existingAcl.add(new AclEntry.Builder().setScope(AclEntryScope.ACCESS)
+          .setType(AclEntryType.OTHER).setPermission(perm.getOtherAction())
           .build());
-      } else {
-        // It's possible that there is a default ACL but no access ACL.  In this
-        // case, add the minimal access ACL implied by the permission bits.
-        existingAcl.addAll(getMinimalAcl(perm));
-      }
-
-      // Add all default entries after the access entries.
-      existingAcl.addAll(defaultEntries);
     } else {
-      // If the inode doesn't have an extended ACL, then return a minimal ACL.
-      existingAcl = getMinimalAcl(perm);
+      // It's possible that there is a default ACL but no access ACL. In this
+      // case, add the minimal access ACL implied by the permission bits.
+      existingAcl.addAll(getMinimalAcl(perm));
     }
 
+    // Add all default entries after the access entries.
+    existingAcl.addAll(defaultEntries);
+
     // The above adds entries in the correct order, so no need to sort here.
     return existingAcl;
   }
@@ -240,32 +227,28 @@ final class AclStorage {
    */
   public static void removeINodeAcl(INode inode, int snapshotId)
       throws QuotaExceededException {
+    AclFeature f = inode.getAclFeature();
+    if (f == null) {
+      return;
+    }
+
     FsPermission perm = inode.getFsPermission();
-    if (perm.getAclBit()) {
-      List<AclEntry> featureEntries = inode.getAclFeature().getEntries();
-      final FsAction groupPerm;
-      if (featureEntries.get(0).getScope() == AclEntryScope.ACCESS) {
-        // Restore group permissions from the feature's entry to permission
-        // bits, overwriting the mask, which is not part of a minimal ACL.
-        AclEntry groupEntryKey = new AclEntry.Builder()
-          .setScope(AclEntryScope.ACCESS)
-          .setType(AclEntryType.GROUP)
-          .build();
-        int groupEntryIndex = Collections.binarySearch(featureEntries,
+    List<AclEntry> featureEntries = f.getEntries();
+    if (featureEntries.get(0).getScope() == AclEntryScope.ACCESS) {
+      // Restore group permissions from the feature's entry to permission
+      // bits, overwriting the mask, which is not part of a minimal ACL.
+      AclEntry groupEntryKey = new AclEntry.Builder()
+          .setScope(AclEntryScope.ACCESS).setType(AclEntryType.GROUP).build();
+      int groupEntryIndex = Collections.binarySearch(featureEntries,
           groupEntryKey, AclTransformation.ACL_ENTRY_COMPARATOR);
-        assert groupEntryIndex >= 0;
-        groupPerm = featureEntries.get(groupEntryIndex).getPermission();
-      } else {
-        groupPerm = perm.getGroupAction();
-      }
-
-      // Remove the feature and turn off the ACL bit.
-      inode.removeAclFeature(snapshotId);
-      FsPermission newPerm = new FsPermission(perm.getUserAction(),
-        groupPerm, perm.getOtherAction(),
-        perm.getStickyBit(), false);
+      assert groupEntryIndex >= 0;
+      FsAction groupPerm = featureEntries.get(groupEntryIndex).getPermission();
+      FsPermission newPerm = new FsPermission(perm.getUserAction(), groupPerm,
+          perm.getOtherAction(), perm.getStickyBit());
       inode.setPermission(newPerm, snapshotId);
     }
+
+    inode.removeAclFeature(snapshotId);
   }
 
   /**
@@ -297,7 +280,7 @@ final class AclStorage {
       }
 
       // Attach entries to the feature.
-      if (perm.getAclBit()) {
+      if (inode.getAclFeature() != null) {
         inode.removeAclFeature(snapshotId);
       }
       inode.addAclFeature(createAclFeature(accessEntries, defaultEntries),
@@ -305,7 +288,7 @@ final class AclStorage {
       newPerm = createFsPermissionForExtendedAcl(accessEntries, perm);
     } else {
       // This is a minimal ACL.  Remove the ACL feature if it previously had one.
-      if (perm.getAclBit()) {
+      if (inode.getAclFeature() != null) {
         inode.removeAclFeature(snapshotId);
       }
       newPerm = createFsPermissionForMinimalAcl(newAcl, perm);
@@ -363,7 +346,7 @@ final class AclStorage {
     return new FsPermission(accessEntries.get(0).getPermission(),
       accessEntries.get(accessEntries.size() - 2).getPermission(),
       accessEntries.get(accessEntries.size() - 1).getPermission(),
-      existingPerm.getStickyBit(), true);
+      existingPerm.getStickyBit());
   }
 
   /**
@@ -381,7 +364,7 @@ final class AclStorage {
     return new FsPermission(accessEntries.get(0).getPermission(),
       accessEntries.get(1).getPermission(),
       accessEntries.get(2).getPermission(),
-      existingPerm.getStickyBit(), false);
+      existingPerm.getStickyBit());
   }
 
   /**

+ 0 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -1177,16 +1177,6 @@ public class FSDirectory implements Closeable {
       throw new FileNotFoundException("File does not exist: " + src);
     }
     int snapshotId = inodesInPath.getLatestSnapshotId();
-    FsPermission oldPerm = inode.getPermissionStatus(snapshotId).getPermission();
-    // This method cannot toggle the ACL bit.
-    if (oldPerm.getAclBit() != permissions.getAclBit()) {
-      permissions = new FsPermission(
-        permissions.getUserAction(),
-        permissions.getGroupAction(),
-        permissions.getOtherAction(),
-        permissions.getStickyBit(),
-        oldPerm.getAclBit());
-    }
     inode.setPermission(permissions, snapshotId);
   }
 

+ 11 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -696,10 +696,13 @@ public class FSEditLog implements LogsPurgeable {
       .setBlockSize(newNode.getPreferredBlockSize())
       .setBlocks(newNode.getBlocks())
       .setPermissionStatus(permissions)
-      .setAclEntries(permissions.getPermission().getAclBit() ?
-        AclStorage.readINodeLogicalAcl(newNode) : null)
       .setClientName(newNode.getFileUnderConstructionFeature().getClientName())
       .setClientMachine(newNode.getFileUnderConstructionFeature().getClientMachine());
+
+    AclFeature f = newNode.getAclFeature();
+    if (f != null) {
+      op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));
+    }
     logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
@@ -749,9 +752,12 @@ public class FSEditLog implements LogsPurgeable {
       .setInodeId(newNode.getId())
       .setPath(path)
       .setTimestamp(newNode.getModificationTime())
-      .setPermissionStatus(permissions)
-      .setAclEntries(permissions.getPermission().getAclBit() ?
-        AclStorage.readINodeLogicalAcl(newNode) : null);
+      .setPermissionStatus(permissions);
+
+    AclFeature f = newNode.getAclFeature();
+    if (f != null) {
+      op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));
+    }
     logEdit(op);
   }
   

+ 88 - 36
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java

@@ -297,7 +297,76 @@ public abstract class FSEditLogOp {
     XMLUtils.addSaxString(contentHandler, "RPC_CALLID", 
         Integer.valueOf(callId).toString());
   }
-  
+
+  private static final class AclEditLogUtil {
+    private static final int ACL_EDITLOG_ENTRY_HAS_NAME_OFFSET = 6;
+    private static final int ACL_EDITLOG_ENTRY_TYPE_OFFSET = 3;
+    private static final int ACL_EDITLOG_ENTRY_SCOPE_OFFSET = 5;
+    private static final int ACL_EDITLOG_PERM_MASK = 7;
+    private static final int ACL_EDITLOG_ENTRY_TYPE_MASK = 3;
+    private static final int ACL_EDITLOG_ENTRY_SCOPE_MASK = 1;
+
+    private static final FsAction[] FSACTION_VALUES = FsAction.values();
+    private static final AclEntryScope[] ACL_ENTRY_SCOPE_VALUES = AclEntryScope
+        .values();
+    private static final AclEntryType[] ACL_ENTRY_TYPE_VALUES = AclEntryType
+        .values();
+
+    private static List<AclEntry> read(DataInputStream in, int logVersion)
+        throws IOException {
+      if (!LayoutVersion.supports(Feature.EXTENDED_ACL, logVersion)) {
+        return null;
+      }
+
+      int size = in.readInt();
+      if (size == 0) {
+        return null;
+      }
+
+      List<AclEntry> aclEntries = Lists.newArrayListWithCapacity(size);
+      for (int i = 0; i < size; ++i) {
+        int v = in.read();
+        int p = v & ACL_EDITLOG_PERM_MASK;
+        int t = (v >> ACL_EDITLOG_ENTRY_TYPE_OFFSET)
+            & ACL_EDITLOG_ENTRY_TYPE_MASK;
+        int s = (v >> ACL_EDITLOG_ENTRY_SCOPE_OFFSET)
+            & ACL_EDITLOG_ENTRY_SCOPE_MASK;
+        boolean hasName = ((v >> ACL_EDITLOG_ENTRY_HAS_NAME_OFFSET) & 1) == 1;
+        String name = hasName ? FSImageSerialization.readString(in) : null;
+        aclEntries.add(new AclEntry.Builder().setName(name)
+            .setPermission(FSACTION_VALUES[p])
+            .setScope(ACL_ENTRY_SCOPE_VALUES[s])
+            .setType(ACL_ENTRY_TYPE_VALUES[t]).build());
+      }
+
+      return aclEntries;
+    }
+
+    private static void write(List<AclEntry> aclEntries, DataOutputStream out)
+        throws IOException {
+      if (aclEntries == null) {
+        out.writeInt(0);
+        return;
+      }
+
+      out.writeInt(aclEntries.size());
+      for (AclEntry e : aclEntries) {
+        boolean hasName = e.getName() != null;
+        int v = (e.getScope().ordinal() << ACL_EDITLOG_ENTRY_SCOPE_OFFSET)
+            | (e.getType().ordinal() << ACL_EDITLOG_ENTRY_TYPE_OFFSET)
+            | e.getPermission().ordinal();
+
+        if (hasName) {
+          v |= 1 << ACL_EDITLOG_ENTRY_HAS_NAME_OFFSET;
+        }
+        out.write(v);
+        if (hasName) {
+          FSImageSerialization.writeString(e.getName(), out);
+        }
+      }
+    }
+  }
+
   @SuppressWarnings("unchecked")
   static abstract class AddCloseOp extends FSEditLogOp implements BlockListUpdatingOp {
     int length;
@@ -399,11 +468,12 @@ public abstract class FSEditLogOp {
       permissions.write(out);
 
       if (this.opCode == OP_ADD) {
-        if (permissions.getPermission().getAclBit()) {
+        boolean hasAcl = aclEntries != null;
+        out.writeBoolean(hasAcl);
+        if (hasAcl) {
           AclFeatureProto.newBuilder()
-            .addAllEntries(PBHelper.convertAclEntryProto(aclEntries))
-            .build()
-            .writeDelimitedTo(out);
+              .addAllEntries(PBHelper.convertAclEntryProto(aclEntries)).build()
+              .writeDelimitedTo(out);
         }
 
         FSImageSerialization.writeString(clientName,out);
@@ -464,13 +534,7 @@ public abstract class FSEditLogOp {
 
       // clientname, clientMachine and block locations of last block.
       if (this.opCode == OP_ADD) {
-        if (permissions.getPermission().getAclBit()) {
-          aclEntries = PBHelper.convertAclEntry(
-            AclFeatureProto.parseDelimitedFrom((DataInputStream)in)
-            .getEntriesList());
-        } else {
-          aclEntries = null;
-        }
+        aclEntries = AclEditLogUtil.read(in, logVersion);
 
         this.clientName = FSImageSerialization.readString(in);
         this.clientMachine = FSImageSerialization.readString(in);
@@ -562,7 +626,7 @@ public abstract class FSEditLogOp {
       }
       FSEditLogOp.permissionStatusToXml(contentHandler, permissions);
       if (this.opCode == OP_ADD) {
-        if (permissions.getPermission().getAclBit()) {
+        if (aclEntries != null) {
           appendAclEntriesToXml(contentHandler, aclEntries);
         }
         appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
@@ -590,11 +654,7 @@ public abstract class FSEditLogOp {
         this.blocks = new Block[0];
       }
       this.permissions = permissionStatusFromXml(st);
-      if (permissions.getPermission().getAclBit()) {
-        aclEntries = readAclEntriesFromXml(st);
-      } else {
-        aclEntries = null;
-      }
+      aclEntries = readAclEntriesFromXml(st);
       readRpcIdsFromXml(st);
     }
   }
@@ -1304,11 +1364,13 @@ public abstract class FSEditLogOp {
       FSImageSerialization.writeLong(timestamp, out); // mtime
       FSImageSerialization.writeLong(timestamp, out); // atime, unused at this
       permissions.write(out);
-      if (permissions.getPermission().getAclBit()) {
+
+      boolean hasAcl = aclEntries != null;
+      out.writeBoolean(hasAcl);
+      if (hasAcl) {
         AclFeatureProto.newBuilder()
-          .addAllEntries(PBHelper.convertAclEntryProto(aclEntries))
-          .build()
-          .writeDelimitedTo(out);
+            .addAllEntries(PBHelper.convertAclEntryProto(aclEntries)).build()
+            .writeDelimitedTo(out);
       }
     }
     
@@ -1347,13 +1409,7 @@ public abstract class FSEditLogOp {
       }
 
       this.permissions = PermissionStatus.read(in);
-      if (permissions.getPermission().getAclBit()) {
-        aclEntries = PBHelper.convertAclEntry(
-          AclFeatureProto.parseDelimitedFrom((DataInputStream)in)
-          .getEntriesList());
-      } else {
-        aclEntries = null;
-      }
+      aclEntries = AclEditLogUtil.read(in, logVersion);
     }
 
     @Override
@@ -1389,7 +1445,7 @@ public abstract class FSEditLogOp {
       XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
           Long.valueOf(timestamp).toString());
       FSEditLogOp.permissionStatusToXml(contentHandler, permissions);
-      if (permissions.getPermission().getAclBit()) {
+      if (aclEntries != null) {
         appendAclEntriesToXml(contentHandler, aclEntries);
       }
     }
@@ -1400,11 +1456,7 @@ public abstract class FSEditLogOp {
       this.path = st.getValue("PATH");
       this.timestamp = Long.valueOf(st.getValue("TIMESTAMP"));
       this.permissions = permissionStatusFromXml(st);
-      if (permissions.getPermission().getAclBit()) {
-        aclEntries = readAclEntriesFromXml(st);
-      } else {
-        aclEntries = null;
-      }
+      aclEntries = readAclEntriesFromXml(st);
     }
   }
 
@@ -3895,7 +3947,7 @@ public abstract class FSEditLogOp {
   private static List<AclEntry> readAclEntriesFromXml(Stanza st) {
     List<AclEntry> aclEntries = Lists.newArrayList();
     if (!st.hasChildren("ENTRY"))
-      return aclEntries;
+      return null;
 
     List<Stanza> stanzas = st.getChildren("ENTRY");
     for (Stanza s : stanzas) {

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -239,9 +239,8 @@ class FSPermissionChecker {
       return;
     }
     FsPermission mode = inode.getFsPermission(snapshotId);
-    if (mode.getAclBit()) {
-      AclFeature aclFeature = inode.getAclFeature(snapshotId);
-      assert aclFeature != null;
+    AclFeature aclFeature = inode.getAclFeature(snapshotId);
+    if (aclFeature != null) {
       List<AclEntry> featureEntries = aclFeature.getEntries();
       // It's possible that the inode has a default ACL but no access ACL.
       if (featureEntries.get(0).getScope() == AclEntryScope.ACCESS) {

+ 19 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/AclTestHelpers.java

@@ -17,6 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.AclEntryType;
@@ -93,4 +99,17 @@ public final class AclTestHelpers {
       .setType(type)
       .build();
   }
+
+  /**
+   * Asserts the value of the FsPermission bits on the inode of a specific path.
+   *
+   * @param fs FileSystem to use for check
+   * @param pathToCheck Path inode to check
+   * @param perm short expected permission bits
+   * @throws IOException thrown if there is an I/O error
+   */
+  public static void assertPermission(FileSystem fs, Path pathToCheck,
+      short perm) throws IOException {
+    assertEquals(perm, fs.getFileStatus(pathToCheck).getPermission().toShort());
+  }
 }

+ 35 - 36
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java

@@ -91,7 +91,7 @@ public abstract class FSAclBaseTest {
       aclEntry(DEFAULT, GROUP, READ_EXECUTE),
       aclEntry(DEFAULT, MASK, READ_EXECUTE),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)02750);
+    assertPermission((short)0750);
     assertAclFeature(true);
   }
 
@@ -113,7 +113,7 @@ public abstract class FSAclBaseTest {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "foo", READ_EXECUTE),
       aclEntry(ACCESS, GROUP, READ_EXECUTE) }, returned);
-    assertPermission((short)02750);
+    assertPermission((short)0750);
     assertAclFeature(true);
   }
 
@@ -134,7 +134,7 @@ public abstract class FSAclBaseTest {
       aclEntry(DEFAULT, GROUP, READ_EXECUTE),
       aclEntry(DEFAULT, MASK, READ_EXECUTE),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)02750);
+    assertPermission((short)0750);
     assertAclFeature(true);
   }
 
@@ -150,7 +150,7 @@ public abstract class FSAclBaseTest {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "foo", READ_WRITE),
       aclEntry(ACCESS, GROUP, READ) }, returned);
-    assertPermission((short)02660);
+    assertPermission((short)0660);
     assertAclFeature(true);
   }
 
@@ -168,7 +168,7 @@ public abstract class FSAclBaseTest {
       aclEntry(DEFAULT, USER, ALL),
       aclEntry(DEFAULT, GROUP, READ_EXECUTE),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)02750);
+    assertPermission((short)0750);
     assertAclFeature(true);
   }
 
@@ -185,7 +185,7 @@ public abstract class FSAclBaseTest {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "foo", ALL),
       aclEntry(ACCESS, GROUP, READ) }, returned);
-    assertPermission((short)02600);
+    assertPermission((short)0600);
     assertAclFeature(true);
   }
 
@@ -213,7 +213,7 @@ public abstract class FSAclBaseTest {
       aclEntry(DEFAULT, GROUP, READ_EXECUTE),
       aclEntry(DEFAULT, MASK, READ_EXECUTE),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)03750);
+    assertPermission((short)01750);
     assertAclFeature(true);
   }
 
@@ -259,7 +259,7 @@ public abstract class FSAclBaseTest {
       aclEntry(DEFAULT, GROUP, READ_EXECUTE),
       aclEntry(DEFAULT, MASK, READ_EXECUTE),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)02750);
+    assertPermission((short)0750);
     assertAclFeature(true);
   }
 
@@ -282,7 +282,7 @@ public abstract class FSAclBaseTest {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bar", READ_WRITE),
       aclEntry(ACCESS, GROUP, READ_WRITE) }, returned);
-    assertPermission((short)02760);
+    assertPermission((short)0760);
     assertAclFeature(true);
   }
 
@@ -307,7 +307,7 @@ public abstract class FSAclBaseTest {
       aclEntry(DEFAULT, GROUP, READ_EXECUTE),
       aclEntry(DEFAULT, MASK, READ_EXECUTE),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)02750);
+    assertPermission((short)0750);
     assertAclFeature(true);
   }
 
@@ -355,7 +355,7 @@ public abstract class FSAclBaseTest {
       aclEntry(DEFAULT, USER, ALL),
       aclEntry(DEFAULT, GROUP, READ_EXECUTE),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)02750);
+    assertPermission((short)0750);
     assertAclFeature(true);
   }
 
@@ -381,7 +381,7 @@ public abstract class FSAclBaseTest {
       aclEntry(DEFAULT, GROUP, READ_EXECUTE),
       aclEntry(DEFAULT, MASK, READ_EXECUTE),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)03750);
+    assertPermission((short)01750);
     assertAclFeature(true);
   }
 
@@ -409,7 +409,7 @@ public abstract class FSAclBaseTest {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "foo", ALL),
       aclEntry(ACCESS, GROUP, READ_EXECUTE) }, returned);
-    assertPermission((short)02770);
+    assertPermission((short)0770);
     assertAclFeature(true);
   }
 
@@ -429,7 +429,7 @@ public abstract class FSAclBaseTest {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "foo", ALL),
       aclEntry(ACCESS, GROUP, READ_EXECUTE) }, returned);
-    assertPermission((short)02770);
+    assertPermission((short)0770);
     assertAclFeature(true);
   }
 
@@ -474,7 +474,7 @@ public abstract class FSAclBaseTest {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "foo", ALL),
       aclEntry(ACCESS, GROUP, READ_EXECUTE) }, returned);
-    assertPermission((short)03770);
+    assertPermission((short)01770);
     assertAclFeature(true);
   }
 
@@ -575,7 +575,7 @@ public abstract class FSAclBaseTest {
       aclEntry(DEFAULT, GROUP, READ_EXECUTE),
       aclEntry(DEFAULT, MASK, ALL),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)02770);
+    assertPermission((short)0770);
     assertAclFeature(true);
   }
 
@@ -594,7 +594,7 @@ public abstract class FSAclBaseTest {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "foo", READ),
       aclEntry(ACCESS, GROUP, READ) }, returned);
-    assertPermission((short)02640);
+    assertPermission((short)0640);
     assertAclFeature(true);
   }
 
@@ -612,7 +612,7 @@ public abstract class FSAclBaseTest {
       aclEntry(DEFAULT, GROUP, READ_EXECUTE),
       aclEntry(DEFAULT, MASK, ALL),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)02750);
+    assertPermission((short)0750);
     assertAclFeature(true);
   }
 
@@ -652,7 +652,7 @@ public abstract class FSAclBaseTest {
       aclEntry(DEFAULT, USER, ALL),
       aclEntry(DEFAULT, GROUP, READ_EXECUTE),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)02750);
+    assertPermission((short)0750);
     assertAclFeature(true);
   }
 
@@ -672,7 +672,7 @@ public abstract class FSAclBaseTest {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "foo", READ),
       aclEntry(ACCESS, GROUP, READ) }, returned);
-    assertPermission((short)02670);
+    assertPermission((short)0670);
     assertAclFeature(true);
   }
 
@@ -696,7 +696,7 @@ public abstract class FSAclBaseTest {
       aclEntry(DEFAULT, GROUP, READ_EXECUTE),
       aclEntry(DEFAULT, MASK, ALL),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)03770);
+    assertPermission((short)01770);
     assertAclFeature(true);
   }
 
@@ -741,7 +741,7 @@ public abstract class FSAclBaseTest {
       aclEntry(DEFAULT, GROUP, READ_EXECUTE),
       aclEntry(DEFAULT, MASK, ALL),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)02700);
+    assertPermission((short)0700);
     assertAclFeature(true);
   }
 
@@ -761,7 +761,7 @@ public abstract class FSAclBaseTest {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "foo", READ),
       aclEntry(ACCESS, GROUP, READ) }, returned);
-    assertPermission((short)02600);
+    assertPermission((short)0600);
     assertAclFeature(true);
   }
 
@@ -783,7 +783,7 @@ public abstract class FSAclBaseTest {
       aclEntry(DEFAULT, GROUP, READ_EXECUTE),
       aclEntry(DEFAULT, MASK, ALL),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)02700);
+    assertPermission((short)0700);
     assertAclFeature(true);
   }
 
@@ -800,7 +800,7 @@ public abstract class FSAclBaseTest {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "foo", ALL),
       aclEntry(ACCESS, GROUP, READ_EXECUTE) }, returned);
-    assertPermission(filePath, (short)02640);
+    assertPermission(filePath, (short)0640);
     assertAclFeature(filePath, true);
   }
 
@@ -854,7 +854,7 @@ public abstract class FSAclBaseTest {
       aclEntry(DEFAULT, GROUP, READ_EXECUTE),
       aclEntry(DEFAULT, MASK, ALL),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission(dirPath, (short)02750);
+    assertPermission(dirPath, (short)0750);
     assertAclFeature(dirPath, true);
   }
 
@@ -889,7 +889,7 @@ public abstract class FSAclBaseTest {
       aclEntry(DEFAULT, USER, ALL),
       aclEntry(DEFAULT, GROUP, READ_EXECUTE),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission(dirPath, (short)02750);
+    assertPermission(dirPath, (short)0750);
     assertAclFeature(dirPath, true);
   }
 
@@ -913,7 +913,7 @@ public abstract class FSAclBaseTest {
     AclStatus s = fs.getAclStatus(dirPath);
     AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission(dirPath, (short)02750);
+    assertPermission(dirPath, (short)0750);
     assertAclFeature(dirPath, true);
     expected = new AclEntry[] {
       aclEntry(ACCESS, USER, "foo", ALL),
@@ -921,7 +921,7 @@ public abstract class FSAclBaseTest {
     s = fs.getAclStatus(filePath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission(filePath, (short)02640);
+    assertPermission(filePath, (short)0640);
     assertAclFeature(filePath, true);
   }
 
@@ -945,12 +945,12 @@ public abstract class FSAclBaseTest {
     AclStatus s = fs.getAclStatus(dirPath);
     AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission(dirPath, (short)02750);
+    assertPermission(dirPath, (short)0750);
     assertAclFeature(dirPath, true);
     s = fs.getAclStatus(subdirPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission(subdirPath, (short)02750);
+    assertPermission(subdirPath, (short)0750);
     assertAclFeature(subdirPath, true);
   }
 
@@ -977,7 +977,7 @@ public abstract class FSAclBaseTest {
     AclStatus s = fs.getAclStatus(dirPath);
     AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission(dirPath, (short)02750);
+    assertPermission(dirPath, (short)0750);
     assertAclFeature(dirPath, true);
     expected = new AclEntry[] { };
     s = fs.getAclStatus(linkPath);
@@ -1010,7 +1010,7 @@ public abstract class FSAclBaseTest {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "foo", ALL),
       aclEntry(ACCESS, GROUP, READ_EXECUTE) }, returned);
-    assertPermission(filePath, (short)02740);
+    assertPermission(filePath, (short)0740);
     assertAclFeature(filePath, true);
   }
 
@@ -1032,7 +1032,7 @@ public abstract class FSAclBaseTest {
       aclEntry(DEFAULT, GROUP, READ_EXECUTE),
       aclEntry(DEFAULT, MASK, ALL),
       aclEntry(DEFAULT, OTHER, READ_EXECUTE) }, returned);
-    assertPermission(dirPath, (short)02740);
+    assertPermission(dirPath, (short)0740);
     assertAclFeature(dirPath, true);
   }
 
@@ -1088,7 +1088,6 @@ public abstract class FSAclBaseTest {
    */
   private static void assertPermission(Path pathToCheck, short perm)
       throws IOException {
-    assertEquals(FsPermission.createImmutable(perm),
-      fs.getFileStatus(pathToCheck).getPermission());
+    AclTestHelpers.assertPermission(fs, pathToCheck, perm);
   }
 }

+ 6 - 20
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithAcl.java

@@ -142,7 +142,7 @@ public class TestFSImageWithAcl {
     AclEntry[] subdirReturned = fs.getAclStatus(subdirPath).getEntries()
       .toArray(new AclEntry[0]);
     Assert.assertArrayEquals(subdirExpected, subdirReturned);
-    assertPermission(fs, subdirPath, (short)02755);
+    assertPermission(fs, subdirPath, (short)0755);
 
     restart(fs, persistNamespace);
 
@@ -152,7 +152,7 @@ public class TestFSImageWithAcl {
     subdirReturned = fs.getAclStatus(subdirPath).getEntries()
       .toArray(new AclEntry[0]);
     Assert.assertArrayEquals(subdirExpected, subdirReturned);
-    assertPermission(fs, subdirPath, (short)02755);
+    assertPermission(fs, subdirPath, (short)0755);
 
     aclSpec = Lists.newArrayList(aclEntry(DEFAULT, USER, "foo", READ_WRITE));
     fs.modifyAclEntries(dirPath, aclSpec);
@@ -163,7 +163,7 @@ public class TestFSImageWithAcl {
     subdirReturned = fs.getAclStatus(subdirPath).getEntries()
       .toArray(new AclEntry[0]);
     Assert.assertArrayEquals(subdirExpected, subdirReturned);
-    assertPermission(fs, subdirPath, (short)02755);
+    assertPermission(fs, subdirPath, (short)0755);
 
     restart(fs, persistNamespace);
 
@@ -173,7 +173,7 @@ public class TestFSImageWithAcl {
     subdirReturned = fs.getAclStatus(subdirPath).getEntries()
       .toArray(new AclEntry[0]);
     Assert.assertArrayEquals(subdirExpected, subdirReturned);
-    assertPermission(fs, subdirPath, (short)02755);
+    assertPermission(fs, subdirPath, (short)0755);
 
     fs.removeAcl(dirPath);
 
@@ -183,7 +183,7 @@ public class TestFSImageWithAcl {
     subdirReturned = fs.getAclStatus(subdirPath).getEntries()
       .toArray(new AclEntry[0]);
     Assert.assertArrayEquals(subdirExpected, subdirReturned);
-    assertPermission(fs, subdirPath, (short)02755);
+    assertPermission(fs, subdirPath, (short)0755);
 
     restart(fs, persistNamespace);
 
@@ -193,7 +193,7 @@ public class TestFSImageWithAcl {
     subdirReturned = fs.getAclStatus(subdirPath).getEntries()
       .toArray(new AclEntry[0]);
     Assert.assertArrayEquals(subdirExpected, subdirReturned);
-    assertPermission(fs, subdirPath, (short)02755);
+    assertPermission(fs, subdirPath, (short)0755);
   }
 
   @Test
@@ -206,20 +206,6 @@ public class TestFSImageWithAcl {
     doTestDefaultAclNewChildren(false);
   }
 
-  /**
-   * Asserts the value of the FsPermission bits on the inode of a specific path.
-   *
-   * @param fs DistributedFileSystem to use for check
-   * @param pathToCheck Path inode to check
-   * @param perm short expected permission bits
-   * @throws IOException thrown if there is an I/O error
-   */
-  private static void assertPermission(DistributedFileSystem fs,
-      Path pathToCheck, short perm) throws IOException {
-    Assert.assertEquals(FsPermission.createImmutable(perm),
-      fs.getFileStatus(pathToCheck).getPermission());
-  }
-
   /**
    * Restart the NameNode, optionally saving a new checkpoint.
    *

+ 30 - 30
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.io.IOUtils;
@@ -118,14 +119,14 @@ public class TestAclWithSnapshot {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)02750, path);
+    assertPermission((short)0750, path);
 
     s = hdfs.getAclStatus(snapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)02750, snapshotPath);
+    assertPermission((short)0750, snapshotPath);
 
     assertDirPermissionGranted(fsAsBruce, BRUCE, snapshotPath);
     assertDirPermissionDenied(fsAsDiana, DIANA, snapshotPath);
@@ -152,14 +153,14 @@ public class TestAclWithSnapshot {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "diana", READ_EXECUTE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)02550, path);
+    assertPermission((short)0550, path);
 
     s = hdfs.getAclStatus(snapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)02750, snapshotPath);
+    assertPermission((short)0750, snapshotPath);
 
     assertDirPermissionDenied(fsAsBruce, BRUCE, path);
     assertDirPermissionGranted(fsAsDiana, DIANA, path);
@@ -201,24 +202,24 @@ public class TestAclWithSnapshot {
     AclStatus s = hdfs.getAclStatus(filePath);
     AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)02550, filePath);
+    assertPermission((short)0550, filePath);
 
     s = hdfs.getAclStatus(subdirPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)02550, subdirPath);
+    assertPermission((short)0550, subdirPath);
 
     s = hdfs.getAclStatus(fileSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)02550, fileSnapshotPath);
+    assertPermission((short)0550, fileSnapshotPath);
     assertFilePermissionGranted(fsAsBruce, BRUCE, fileSnapshotPath);
     assertFilePermissionDenied(fsAsDiana, DIANA, fileSnapshotPath);
 
     s = hdfs.getAclStatus(subdirSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)02550, subdirSnapshotPath);
+    assertPermission((short)0550, subdirSnapshotPath);
     assertDirPermissionGranted(fsAsBruce, BRUCE, subdirSnapshotPath);
     assertDirPermissionDenied(fsAsDiana, DIANA, subdirSnapshotPath);
 
@@ -250,14 +251,14 @@ public class TestAclWithSnapshot {
     AclStatus s = hdfs.getAclStatus(filePath);
     AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)02570, filePath);
+    assertPermission((short)0570, filePath);
     assertFilePermissionDenied(fsAsBruce, BRUCE, filePath);
     assertFilePermissionGranted(fsAsDiana, DIANA, filePath);
 
     s = hdfs.getAclStatus(subdirPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)02570, subdirPath);
+    assertPermission((short)0570, subdirPath);
     assertDirPermissionDenied(fsAsBruce, BRUCE, subdirPath);
     assertDirPermissionGranted(fsAsDiana, DIANA, subdirPath);
 
@@ -267,14 +268,14 @@ public class TestAclWithSnapshot {
     s = hdfs.getAclStatus(fileSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)02550, fileSnapshotPath);
+    assertPermission((short)0550, fileSnapshotPath);
     assertFilePermissionGranted(fsAsBruce, BRUCE, fileSnapshotPath);
     assertFilePermissionDenied(fsAsDiana, DIANA, fileSnapshotPath);
 
     s = hdfs.getAclStatus(subdirSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)02550, subdirSnapshotPath);
+    assertPermission((short)0550, subdirSnapshotPath);
     assertDirPermissionGranted(fsAsBruce, BRUCE, subdirSnapshotPath);
     assertDirPermissionDenied(fsAsDiana, DIANA, subdirSnapshotPath);
   }
@@ -301,14 +302,14 @@ public class TestAclWithSnapshot {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)02750, path);
+    assertPermission((short)0750, path);
 
     s = hdfs.getAclStatus(snapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)02750, snapshotPath);
+    assertPermission((short)0750, snapshotPath);
 
     assertDirPermissionGranted(fsAsBruce, BRUCE, snapshotPath);
     assertDirPermissionDenied(fsAsDiana, DIANA, snapshotPath);
@@ -335,7 +336,7 @@ public class TestAclWithSnapshot {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)02750, snapshotPath);
+    assertPermission((short)0750, snapshotPath);
 
     assertDirPermissionDenied(fsAsBruce, BRUCE, path);
     assertDirPermissionDenied(fsAsDiana, DIANA, path);
@@ -377,24 +378,24 @@ public class TestAclWithSnapshot {
     AclStatus s = hdfs.getAclStatus(filePath);
     AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)02550, filePath);
+    assertPermission((short)0550, filePath);
 
     s = hdfs.getAclStatus(subdirPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)02550, subdirPath);
+    assertPermission((short)0550, subdirPath);
 
     s = hdfs.getAclStatus(fileSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)02550, fileSnapshotPath);
+    assertPermission((short)0550, fileSnapshotPath);
     assertFilePermissionGranted(fsAsBruce, BRUCE, fileSnapshotPath);
     assertFilePermissionDenied(fsAsDiana, DIANA, fileSnapshotPath);
 
     s = hdfs.getAclStatus(subdirSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)02550, subdirSnapshotPath);
+    assertPermission((short)0550, subdirSnapshotPath);
     assertDirPermissionGranted(fsAsBruce, BRUCE, subdirSnapshotPath);
     assertDirPermissionDenied(fsAsDiana, DIANA, subdirSnapshotPath);
 
@@ -436,14 +437,14 @@ public class TestAclWithSnapshot {
     s = hdfs.getAclStatus(fileSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)02550, fileSnapshotPath);
+    assertPermission((short)0550, fileSnapshotPath);
     assertFilePermissionGranted(fsAsBruce, BRUCE, fileSnapshotPath);
     assertFilePermissionDenied(fsAsDiana, DIANA, fileSnapshotPath);
 
     s = hdfs.getAclStatus(subdirSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)02550, subdirSnapshotPath);
+    assertPermission((short)0550, subdirSnapshotPath);
     assertDirPermissionGranted(fsAsBruce, BRUCE, subdirSnapshotPath);
     assertDirPermissionDenied(fsAsDiana, DIANA, subdirSnapshotPath);
   }
@@ -469,7 +470,7 @@ public class TestAclWithSnapshot {
     AclStatus s = hdfs.getAclStatus(path);
     AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)02770, path);
+    assertPermission((short)0770, path);
     assertDirPermissionGranted(fsAsBruce, BRUCE, path);
     assertDirPermissionGranted(fsAsDiana, DIANA, path);
   }
@@ -513,7 +514,7 @@ public class TestAclWithSnapshot {
       aclEntry(DEFAULT, GROUP, NONE),
       aclEntry(DEFAULT, MASK, READ_EXECUTE),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)02700, path);
+    assertPermission((short)0700, path);
 
     s = hdfs.getAclStatus(snapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
@@ -523,7 +524,7 @@ public class TestAclWithSnapshot {
       aclEntry(DEFAULT, GROUP, NONE),
       aclEntry(DEFAULT, MASK, READ_EXECUTE),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)02700, snapshotPath);
+    assertPermission((short)0700, snapshotPath);
 
     assertDirPermissionDenied(fsAsBruce, BRUCE, snapshotPath);
   }
@@ -595,14 +596,14 @@ public class TestAclWithSnapshot {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_WRITE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)02660, filePath);
+    assertPermission((short)0660, filePath);
 
     s = hdfs.getAclStatus(fileSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_WRITE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)02660, filePath);
+    assertPermission((short)0660, filePath);
 
     aclSpec = Lists.newArrayList(
       aclEntry(ACCESS, USER, "bruce", READ));
@@ -631,14 +632,14 @@ public class TestAclWithSnapshot {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_WRITE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)02660, filePath);
+    assertPermission((short)0660, filePath);
 
     s = hdfs.getAclStatus(fileSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_WRITE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)02660, filePath);
+    assertPermission((short)0660, filePath);
 
     aclSpec = Lists.newArrayList(
       aclEntry(ACCESS, USER, "bruce", READ));
@@ -740,8 +741,7 @@ public class TestAclWithSnapshot {
    */
   private static void assertPermission(short perm, Path pathToCheck)
       throws Exception {
-    assertEquals(FsPermission.createImmutable(perm),
-      hdfs.getFileStatus(pathToCheck).getPermission());
+    AclTestHelpers.assertPermission(hdfs, pathToCheck, perm);
   }
 
   /**