Forráskód Böngészése

HDFS-5914. Incorporate ACLs with the changes from HDFS-5698. Contributed by Haohui Mai.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4685@1566991 13f79535-47bb-0310-9956-ffa450edef68
Chris Nauroth 11 éve
szülő
commit
d03acc7560
14 módosított fájl, 184 hozzáadás és 153 törlés
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4685.txt
  2. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
  3. 0 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclConfigFlag.java
  4. 5 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
  5. 12 57
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
  6. 59 16
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
  7. 53 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
  8. 0 21
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
  9. 30 12
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
  10. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/acl.proto
  11. 15 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto
  12. 0 30
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAclConfigFlag.java
  13. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
  14. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4685.txt

@@ -67,6 +67,9 @@ HDFS-4685 (Unreleased)
     HDFS-5899. Add configuration flag to disable/enable support for ACLs.
     (cnauroth)
 
+    HDFS-5914. Incorporate ACLs with the changes from HDFS-5698.
+    (Haohui Mai via cnauroth)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java

@@ -113,8 +113,11 @@ public class LayoutVersion {
         + " Use distinct StorageUuid per storage directory."),
     ADD_LAYOUT_FLAGS(-50, "Add support for layout flags."),
     CACHING(-51, "Support for cache pools and path-based caching"),
+    // Hadoop 2.4.0
     PROTOBUF_FORMAT(-52, "Use protobuf to serialize FSImage"),
-    EXTENDED_ACL(-53, "Extended ACL");
+    EXTENDED_ACL(-53, "Extended ACL"),
+    RESERVED_REL2_4_0(-54, -51, "Reserved for release 2.4.0", true,
+        PROTOBUF_FORMAT, EXTENDED_ACL);
 
     final int lv;
     final int ancestorLV;

+ 0 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclConfigFlag.java

@@ -59,15 +59,6 @@ final class AclConfigFlag {
     check("Cannot load edit log containing an ACL.");
   }
 
-  /**
-   * Checks the flag on behalf of fsimage loading.
-   *
-   * @throws AclException if ACLs are disabled
-   */
-  public void checkForFsImage() throws AclException {
-    check("Cannot load fsimage containing an ACL.");
-  }
-
   /**
    * Common check method.
    *

+ 5 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java

@@ -93,7 +93,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEditLogProto;
-import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclFsImageProto;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclFeatureProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.util.XMLUtils;
@@ -400,7 +400,7 @@ public abstract class FSEditLogOp {
 
       if (this.opCode == OP_ADD) {
         if (permissions.getPermission().getAclBit()) {
-          AclFsImageProto.newBuilder()
+          AclFeatureProto.newBuilder()
             .addAllEntries(PBHelper.convertAclEntryProto(aclEntries))
             .build()
             .writeDelimitedTo(out);
@@ -466,7 +466,7 @@ public abstract class FSEditLogOp {
       if (this.opCode == OP_ADD) {
         if (permissions.getPermission().getAclBit()) {
           aclEntries = PBHelper.convertAclEntry(
-            AclFsImageProto.parseDelimitedFrom((DataInputStream)in)
+            AclFeatureProto.parseDelimitedFrom((DataInputStream)in)
             .getEntriesList());
         } else {
           aclEntries = null;
@@ -1305,7 +1305,7 @@ public abstract class FSEditLogOp {
       FSImageSerialization.writeLong(timestamp, out); // atime, unused at this
       permissions.write(out);
       if (permissions.getPermission().getAclBit()) {
-        AclFsImageProto.newBuilder()
+        AclFeatureProto.newBuilder()
           .addAllEntries(PBHelper.convertAclEntryProto(aclEntries))
           .build()
           .writeDelimitedTo(out);
@@ -1349,7 +1349,7 @@ public abstract class FSEditLogOp {
       this.permissions = PermissionStatus.read(in);
       if (permissions.getPermission().getAclBit()) {
         aclEntries = PBHelper.convertAclEntry(
-          AclFsImageProto.parseDelimitedFrom((DataInputStream)in)
+          AclFeatureProto.parseDelimitedFrom((DataInputStream)in)
           .getEntriesList());
       } else {
         aclEntries = null;

+ 12 - 57
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -51,9 +51,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutFlags;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclFsImageProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.hdfs.protocol.LayoutFlags;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -114,7 +111,6 @@ import com.google.common.annotations.VisibleForTesting;
  *   } when {@link Feature#FSIMAGE_NAME_OPTIMIZATION} is not supported
  *   replicationFactor: short, modificationTime: long,
  *   accessTime: long, preferredBlockSize: long,
- *
  *   numberOfBlocks: int (-1 for INodeDirectory, -2 for INodeSymLink),
  *   { 
  *     nsQuota: long, dsQuota: long, 
@@ -122,11 +118,7 @@ import com.google.common.annotations.VisibleForTesting;
  *       isINodeSnapshottable: byte,
  *       isINodeWithSnapshot: byte (if isINodeSnapshottable is false)
  *     } (when {@link Feature#SNAPSHOT} is supported), 
- *     fsPermission: short, PermissionStatus,
- *     AclEntries {
- *       size: int,
- *       protobuf encoding of {@link AclFsImageProto}
- *     }(when {@link Feature#EXTENDED_ACL} is supported),
+ *     fsPermission: short, PermissionStatus
  *   } for INodeDirectory
  *   or 
  *   {
@@ -141,12 +133,9 @@ import com.google.common.annotations.VisibleForTesting;
  *       {clientName: short + byte[], clientMachine: short + byte[]} (when 
  *       isINodeFileUnderConstructionSnapshot is true),
  *     } (when {@link Feature#SNAPSHOT} is supported and writing snapshotINode), 
- *     fsPermission: short, PermissionStatus,
- *     AclEntries {
- *       size: int,
- *       protobuf encoding of {@link AclFsImageProto}
- *     }(when {@link Feature#EXTENDED_ACL} is supported),
- *   } for INodeFile,
+ *     fsPermission: short, PermissionStatus
+ *   } for INodeFile
+ * }
  * 
  * INodeDirectoryInfo {
  *   fullPath of the directory: short + byte[],
@@ -783,18 +772,9 @@ public class FSImageFormat {
       if (underConstruction) {
         file.toUnderConstruction(clientName, clientMachine, null);
       }
-
-      if (permissions.getPermission().getAclBit()) {
-        AclFeature aclFeature = loadAclFeature(in, imgVersion);
-        if (aclFeature != null) {
-          file.addAclFeature(aclFeature);
-        }
-      }
-
-      return fileDiffs == null ? file : new INodeFile(file, fileDiffs);
-
-    } else if (numBlocks == -1) {
-      //directory
+        return fileDiffs == null ? file : new INodeFile(file, fileDiffs);
+      } else if (numBlocks == -1) {
+        //directory
       
       //read quotas
       final long nsQuota = in.readLong();
@@ -824,14 +804,6 @@ public class FSImageFormat {
       if (nsQuota >= 0 || dsQuota >= 0) {
         dir.addDirectoryWithQuotaFeature(nsQuota, dsQuota);
       }
-
-      if (permissions.getPermission().getAclBit()) {
-        AclFeature aclFeature = loadAclFeature(in, imgVersion);
-        if (aclFeature != null) {
-          dir.addAclFeature(aclFeature);
-        }
-      }
-
       if (withSnapshot) {
         dir.addSnapshotFeature(null);
       }
@@ -872,19 +844,6 @@ public class FSImageFormat {
     throw new IOException("Unknown inode type: numBlocks=" + numBlocks);
   }
 
-    private AclFeature loadAclFeature(DataInput in, final int imgVersion)
-        throws IOException {
-      namesystem.getAclConfigFlag().checkForFsImage();
-      AclFeature aclFeature = null;
-      if (LayoutVersion.supports(Feature.EXTENDED_ACL, imgVersion)) {
-        AclFsImageProto p = AclFsImageProto
-            .parseDelimitedFrom((DataInputStream) in);
-        aclFeature = new AclFeature(PBHelper.convertAclEntry(
-            p.getEntriesList()));
-      }
-      return aclFeature;
-    }
-
     /** Load {@link INodeFileAttributes}. */
     public INodeFileAttributes loadINodeFileAttributes(DataInput in)
         throws IOException {
@@ -902,10 +861,9 @@ public class FSImageFormat {
       final short replication = namesystem.getBlockManager().adjustReplication(
           in.readShort());
       final long preferredBlockSize = in.readLong();
-      AclFeature aclFeature = permissions.getPermission().getAclBit() ?
-          loadAclFeature(in, layoutVersion) : null;
-      return new INodeFileAttributes.SnapshotCopy(name, permissions, aclFeature,
-          modificationTime, accessTime, replication, preferredBlockSize);
+
+      return new INodeFileAttributes.SnapshotCopy(name, permissions, null, modificationTime,
+          accessTime, replication, preferredBlockSize);
     }
 
     public INodeDirectoryAttributes loadINodeDirectoryAttributes(DataInput in)
@@ -923,14 +881,11 @@ public class FSImageFormat {
       //read quotas
       final long nsQuota = in.readLong();
       final long dsQuota = in.readLong();
-      AclFeature aclFeature = permissions.getPermission().getAclBit() ?
-          loadAclFeature(in, layoutVersion) : null;
   
       return nsQuota == -1L && dsQuota == -1L?
-          new INodeDirectoryAttributes.SnapshotCopy(name, permissions,
-            aclFeature, modificationTime)
+          new INodeDirectoryAttributes.SnapshotCopy(name, permissions, null, modificationTime)
         : new INodeDirectoryAttributes.CopyWithQuota(name, permissions,
-            aclFeature, modificationTime, nsQuota, dsQuota);
+            null, modificationTime, nsQuota, dsQuota);
     }
   
     private void loadFilesUnderConstruction(DataInput in,

+ 59 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java

@@ -30,14 +30,17 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SaverContext;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FilesUnderConstructionSection.FileUnderConstructionEntry;
@@ -50,6 +53,7 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.protobuf.ByteString;
 
 @InterfaceAudience.Private
@@ -69,6 +73,11 @@ public final class FSImageFormatPBINode {
           new FsPermission(perm));
     }
 
+    public static ImmutableList<AclEntry> loadAclEntries(int id,
+        final ImmutableList<AclEntry>[] aclTable) {
+      return aclTable[id];
+    }
+
     public static INodeReference loadINodeReference(
         INodeSection.INodeReference r, FSDirectory dir) throws IOException {
       long referredId = r.getReferredId();
@@ -89,12 +98,12 @@ public final class FSImageFormatPBINode {
     }
 
     public static INodeDirectory loadINodeDirectory(INodeSection.INode n,
-        final String[] stringTable) {
+        LoaderContext state) {
       assert n.getType() == INodeSection.INode.Type.DIRECTORY;
       INodeSection.INodeDirectory d = n.getDirectory();
 
       final PermissionStatus permissions = loadPermission(d.getPermission(),
-          stringTable);
+          state.getStringTable());
       final INodeDirectory dir = new INodeDirectory(n.getId(), n.getName()
           .toByteArray(), permissions, d.getModificationTime());
 
@@ -102,6 +111,11 @@ public final class FSImageFormatPBINode {
       if (nsQuota >= 0 || dsQuota >= 0) {
         dir.addDirectoryWithQuotaFeature(nsQuota, dsQuota);
       }
+
+      if (d.hasAclId()) {
+        dir.addAclFeature(new AclFeature(loadAclEntries(d.getAclId(),
+            state.getExtendedAclTable())));
+      }
       return dir;
     }
 
@@ -208,7 +222,7 @@ public final class FSImageFormatPBINode {
       case FILE:
         return loadINodeFile(n);
       case DIRECTORY:
-        return loadINodeDirectory(n, parent.getLoaderContext().getStringTable());
+        return loadINodeDirectory(n, parent.getLoaderContext());
       case SYMLINK:
         return loadINodeSymlink(n);
       default:
@@ -222,6 +236,7 @@ public final class FSImageFormatPBINode {
       INodeSection.INodeFile f = n.getFile();
       List<BlockProto> bp = f.getBlocksList();
       short replication = (short) f.getReplication();
+      LoaderContext state = parent.getLoaderContext();
 
       BlockInfo[] blocks = new BlockInfo[bp.size()];
       for (int i = 0, e = bp.size(); i < e; ++i) {
@@ -233,6 +248,12 @@ public final class FSImageFormatPBINode {
       final INodeFile file = new INodeFile(n.getId(),
           n.getName().toByteArray(), permissions, f.getModificationTime(),
           f.getAccessTime(), blocks, replication, f.getPreferredBlockSize());
+
+      if (f.hasAclId()) {
+        file.addAclFeature(new AclFeature(loadAclEntries(f.getAclId(),
+            state.getExtendedAclTable())));
+      }
+
       // under-construction information
       if (f.hasFileUC()) {
         INodeSection.FileUnderConstructionFeature uc = f.getFileUC();
@@ -254,13 +275,15 @@ public final class FSImageFormatPBINode {
       INodeSection.INodeSymlink s = n.getSymlink();
       final PermissionStatus permissions = loadPermission(s.getPermission(),
           parent.getLoaderContext().getStringTable());
-      return new INodeSymlink(n.getId(), n.getName().toByteArray(), permissions,
-          0, 0, s.getTarget().toStringUtf8());
+
+      INodeSymlink sym = new INodeSymlink(n.getId(), n.getName().toByteArray(),
+          permissions, 0, 0, s.getTarget().toStringUtf8());
+
+      return sym;
     }
 
     private void loadRootINode(INodeSection.INode p) {
-      INodeDirectory root = loadINodeDirectory(p, parent.getLoaderContext()
-          .getStringTable());
+      INodeDirectory root = loadINodeDirectory(p, parent.getLoaderContext());
       final Quota.Counts q = root.getQuotaCounts();
       final long nsQuota = q.get(Quota.NAMESPACE);
       final long dsQuota = q.get(Quota.DISKSPACE);
@@ -282,27 +305,45 @@ public final class FSImageFormatPBINode {
           | n.getFsPermissionShort();
     }
 
+    /**
+     * Get a unique id for the AclEntry list. Notice that the code does not
+     * deduplicate the list of aclentry yet.
+     */
+    private static int buildAclEntries(AclFeature f,
+        final SaverContext.DeduplicationMap<ImmutableList<AclEntryProto>> map) {
+      return map.getId(ImmutableList.copyOf(PBHelper.convertAclEntryProto(f
+          .getEntries())));
+    }
+
     public static INodeSection.INodeFile.Builder buildINodeFile(
-        INodeFileAttributes file,
-        final SaverContext.DeduplicationMap<String> stringMap) {
+        INodeFileAttributes file, final SaverContext state) {
       INodeSection.INodeFile.Builder b = INodeSection.INodeFile.newBuilder()
           .setAccessTime(file.getAccessTime())
           .setModificationTime(file.getModificationTime())
-          .setPermission(buildPermissionStatus(file, stringMap))
+          .setPermission(buildPermissionStatus(file, state.getStringMap()))
           .setPreferredBlockSize(file.getPreferredBlockSize())
           .setReplication(file.getFileReplication());
+
+      AclFeature f = file.getAclFeature();
+      if (f != null) {
+        b.setAclId(buildAclEntries(f, state.getExtendedAclMap()));
+      }
       return b;
     }
 
     public static INodeSection.INodeDirectory.Builder buildINodeDirectory(
-        INodeDirectoryAttributes dir,
-        final SaverContext.DeduplicationMap<String> stringMap) {
+        INodeDirectoryAttributes dir, final SaverContext state) {
       Quota.Counts quota = dir.getQuotaCounts();
       INodeSection.INodeDirectory.Builder b = INodeSection.INodeDirectory
           .newBuilder().setModificationTime(dir.getModificationTime())
           .setNsQuota(quota.get(Quota.NAMESPACE))
           .setDsQuota(quota.get(Quota.DISKSPACE))
-          .setPermission(buildPermissionStatus(dir, stringMap));
+          .setPermission(buildPermissionStatus(dir, state.getStringMap()));
+
+      AclFeature f = dir.getAclFeature();
+      if (f != null) {
+        b.setAclId(buildAclEntries(f, state.getExtendedAclMap()));
+      }
       return b;
     }
 
@@ -419,7 +460,7 @@ public final class FSImageFormatPBINode {
 
     private void save(OutputStream out, INodeDirectory n) throws IOException {
       INodeSection.INodeDirectory.Builder b = buildINodeDirectory(n,
-          parent.getSaverContext().getStringMap());
+          parent.getSaverContext());
       INodeSection.INode r = buildINodeCommon(n)
           .setType(INodeSection.INode.Type.DIRECTORY).setDirectory(b).build();
       r.writeDelimitedTo(out);
@@ -427,7 +468,7 @@ public final class FSImageFormatPBINode {
 
     private void save(OutputStream out, INodeFile n) throws IOException {
       INodeSection.INodeFile.Builder b = buildINodeFile(n,
-          parent.getSaverContext().getStringMap());
+          parent.getSaverContext());
 
       for (Block block : n.getBlocks()) {
         b.addBlocks(PBHelper.convert(block));
@@ -448,10 +489,12 @@ public final class FSImageFormatPBINode {
     }
 
     private void save(OutputStream out, INodeSymlink n) throws IOException {
+      SaverContext state = parent.getSaverContext();
       INodeSection.INodeSymlink.Builder b = INodeSection.INodeSymlink
           .newBuilder()
-          .setPermission(buildPermissionStatus(n, parent.getSaverContext().getStringMap()))
+          .setPermission(buildPermissionStatus(n, state.getStringMap()))
           .setTarget(ByteString.copyFrom(n.getSymlink()));
+
       INodeSection.INode r = buildINodeCommon(n)
           .setType(INodeSection.INode.Type.SYMLINK).setSymlink(b).build();
       r.writeDelimitedTo(out);

+ 53 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java

@@ -42,11 +42,16 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclFeatureProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.ExtendedAclSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
@@ -61,6 +66,7 @@ import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressorStream;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.LimitInputStream;
@@ -75,10 +81,15 @@ public final class FSImageFormatProtobuf {
 
   public static final class LoaderContext {
     private String[] stringTable;
+    private ImmutableList<AclEntry>[] extendedAclTable;
 
     public String[] getStringTable() {
       return stringTable;
     }
+
+    public ImmutableList<AclEntry>[] getExtendedAclTable() {
+      return extendedAclTable;
+    }
   }
 
   public static final class SaverContext {
@@ -111,11 +122,19 @@ public final class FSImageFormatProtobuf {
         return map.entrySet();
       }
     }
-    private final DeduplicationMap<String> stringMap = DeduplicationMap.newMap();
+
+    private final DeduplicationMap<String> stringMap = DeduplicationMap
+        .newMap();
+    private final DeduplicationMap<ImmutableList<AclEntryProto>> extendedAclMap = DeduplicationMap
+        .newMap();
 
     public DeduplicationMap<String> getStringMap() {
       return stringMap;
     }
+
+    public DeduplicationMap<ImmutableList<AclEntryProto>> getExtendedAclMap() {
+      return extendedAclMap;
+    }
   }
 
   public static final class Loader implements FSImageFormat.AbstractLoader {
@@ -220,6 +239,9 @@ public final class FSImageFormatProtobuf {
         case STRING_TABLE:
           loadStringTableSection(in);
           break;
+        case EXTENDED_ACL:
+          loadExtendedAclSection(in);
+          break;
         case INODE: {
           currentStep = new Step(StepType.INODES);
           prog.beginStep(Phase.LOADING_FSIMAGE, currentStep);
@@ -279,6 +301,18 @@ public final class FSImageFormatProtobuf {
       }
     }
 
+    @SuppressWarnings("unchecked")
+    private void loadExtendedAclSection(InputStream in) throws IOException {
+      ExtendedAclSection s = ExtendedAclSection.parseDelimitedFrom(in);
+      ctx.extendedAclTable = new ImmutableList[s.getNumEntry() + 1];
+      for (int i = 0; i < s.getNumEntry(); ++i) {
+        ExtendedAclSection.Entry e = ExtendedAclSection.Entry
+            .parseDelimitedFrom(in);
+        ctx.extendedAclTable[e.getId()] = ImmutableList.copyOf(PBHelper
+            .convertAclEntry(e.getAcl().getEntriesList()));
+      }
+    }
+
     private void loadSecretManagerSection(InputStream in) throws IOException {
       SecretManagerSection s = SecretManagerSection.parseDelimitedFrom(in);
       int numKeys = s.getNumKeys(), numTokens = s.getNumTokens();
@@ -447,6 +481,7 @@ public final class FSImageFormatProtobuf {
       saveCacheManagerSection(b);
       prog.endStep(Phase.SAVING_CHECKPOINT, step);
 
+      saveExtendedAclSection(b);
       saveStringTableSection(b);
 
       // We use the underlyingOutputStream to write the header. Therefore flush
@@ -525,6 +560,22 @@ public final class FSImageFormatProtobuf {
       }
       commitSection(summary, SectionName.STRING_TABLE);
     }
+
+    private void saveExtendedAclSection(FileSummary.Builder summary)
+        throws IOException {
+      OutputStream out = sectionOutputStream;
+      ExtendedAclSection.Builder b = ExtendedAclSection.newBuilder()
+          .setNumEntry(saverContext.extendedAclMap.size());
+      b.build().writeDelimitedTo(out);
+      for (Entry<ImmutableList<AclEntryProto>, Integer> e : saverContext.extendedAclMap
+          .entrySet()) {
+        ExtendedAclSection.Entry.Builder eb = ExtendedAclSection.Entry
+            .newBuilder().setId(e.getValue())
+            .setAcl(AclFeatureProto.newBuilder().addAllEntries(e.getKey()));
+        eb.build().writeDelimitedTo(out);
+      }
+      commitSection(summary, SectionName.EXTENDED_ACL);
+    }
   }
 
   /**
@@ -534,6 +585,7 @@ public final class FSImageFormatProtobuf {
   public enum SectionName {
     NS_INFO("NS_INFO"),
     STRING_TABLE("STRING_TABLE"),
+    EXTENDED_ACL("EXTENDED_ACL"),
     INODE("INODE"),
     SNAPSHOT("SNAPSHOT"),
     INODE_DIR("INODE_DIR"),

+ 0 - 21
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java

@@ -21,7 +21,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -35,8 +34,6 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclFsImageProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@@ -207,7 +204,6 @@ public class FSImageSerialization {
     }
 
     writePermissionStatus(file, out);
-    writeAclFeature(file, out);
   }
 
   /** Serialize an {@link INodeFileAttributes}. */
@@ -220,7 +216,6 @@ public class FSImageSerialization {
 
     out.writeShort(file.getFileReplication());
     out.writeLong(file.getPreferredBlockSize());
-    writeAclFeature(file, out);
   }
 
   private static void writeQuota(Quota.Counts quota, DataOutput out)
@@ -254,7 +249,6 @@ public class FSImageSerialization {
     }
     
     writePermissionStatus(node, out);
-    writeAclFeature(node, out);
   }
 
   /**
@@ -268,7 +262,6 @@ public class FSImageSerialization {
     writePermissionStatus(a, out);
     out.writeLong(a.getModificationTime());
     writeQuota(a.getQuotaCounts(), out);
-    writeAclFeature(a, out);
   }
 
   /**
@@ -290,20 +283,6 @@ public class FSImageSerialization {
     writePermissionStatus(node, out);
   }
 
-  private static void writeAclFeature(INodeAttributes node, DataOutput out)
-      throws IOException {
-    if (node.getFsPermission().getAclBit()) {
-      AclFsImageProto.Builder b = AclFsImageProto.newBuilder();
-      OutputStream os = (OutputStream) out;
-
-      AclFeature feature = node.getAclFeature();
-      if (feature != null)
-        b.addAllEntries(PBHelper.convertAclEntryProto(feature.getEntries()));
-
-      b.build().writeDelimitedTo(os);
-    }
-  }
-
   /** Serialize a {@link INodeReference} node */
   private static void writeINodeReference(INodeReference ref, DataOutput out,
       boolean writeUnderConstruction, ReferenceMap referenceMap

+ 30 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java

@@ -38,8 +38,11 @@ import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.server.namenode.AclFeature;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
@@ -115,7 +118,7 @@ public class FSImageFormatPBSnapshot {
         SnapshotSection.Snapshot pbs = SnapshotSection.Snapshot
             .parseDelimitedFrom(in);
         INodeDirectory root = loadINodeDirectory(pbs.getRoot(),
-            parent.getLoaderContext().getStringTable());
+            parent.getLoaderContext());
         int sid = pbs.getSnapshotId();
         INodeDirectorySnapshottable parent = (INodeDirectorySnapshottable) fsDir
             .getInode(root.getId()).asDirectory();
@@ -155,6 +158,7 @@ public class FSImageFormatPBSnapshot {
     private void loadFileDiffList(InputStream in, INodeFile file, int size)
         throws IOException {
       final FileDiffList diffs = new FileDiffList();
+      final LoaderContext state = parent.getLoaderContext();
       for (int i = 0; i < size; i++) {
         SnapshotDiffSection.FileDiff pbf = SnapshotDiffSection.FileDiff
             .parseDelimitedFrom(in);
@@ -162,10 +166,16 @@ public class FSImageFormatPBSnapshot {
         if (pbf.hasSnapshotCopy()) {
           INodeSection.INodeFile fileInPb = pbf.getSnapshotCopy();
           PermissionStatus permission = loadPermission(
-              fileInPb.getPermission(), parent.getLoaderContext()
-                  .getStringTable());
+              fileInPb.getPermission(), state.getStringTable());
+
+          AclFeature acl = null;
+          if (fileInPb.hasAclId()) {
+            acl = new AclFeature(FSImageFormatPBINode.Loader.loadAclEntries(
+                fileInPb.getAclId(), state.getExtendedAclTable()));
+          }
+
           copy = new INodeFileAttributes.SnapshotCopy(pbf.getName()
-              .toByteArray(), permission, null, fileInPb.getModificationTime(),
+              .toByteArray(), permission, acl, fileInPb.getModificationTime(),
               fileInPb.getAccessTime(), (short) fileInPb.getReplication(),
               fileInPb.getPreferredBlockSize());
         }
@@ -236,6 +246,8 @@ public class FSImageFormatPBSnapshot {
         dir.addSnapshotFeature(null);
       }
       DirectoryDiffList diffs = dir.getDiffs();
+      final LoaderContext state = parent.getLoaderContext();
+
       for (int i = 0; i < size; i++) {
         // load a directory diff
         SnapshotDiffSection.DirectoryDiff diffInPb = SnapshotDiffSection.
@@ -247,19 +259,25 @@ public class FSImageFormatPBSnapshot {
         INodeDirectoryAttributes copy = null;
         if (useRoot) {
           copy = snapshot.getRoot();
-        }else if (diffInPb.hasSnapshotCopy()) {
+        } else if (diffInPb.hasSnapshotCopy()) {
           INodeSection.INodeDirectory dirCopyInPb = diffInPb.getSnapshotCopy();
           final byte[] name = diffInPb.getName().toByteArray();
           PermissionStatus permission = loadPermission(
-              dirCopyInPb.getPermission(), parent.getLoaderContext()
-                  .getStringTable());
+              dirCopyInPb.getPermission(), state.getStringTable());
+          AclFeature acl = null;
+          if (dirCopyInPb.hasAclId()) {
+            acl = new AclFeature(FSImageFormatPBINode.Loader.loadAclEntries(
+                dirCopyInPb.getAclId(), state.getExtendedAclTable()));
+          }
+
           long modTime = dirCopyInPb.getModificationTime();
           boolean noQuota = dirCopyInPb.getNsQuota() == -1
               && dirCopyInPb.getDsQuota() == -1;
+
           copy = noQuota ? new INodeDirectoryAttributes.SnapshotCopy(name,
-              permission, null, modTime)
+              permission, acl, modTime)
               : new INodeDirectoryAttributes.CopyWithQuota(name, permission,
-                  null, modTime, dirCopyInPb.getNsQuota(),
+                  acl, modTime, dirCopyInPb.getNsQuota(),
                   dirCopyInPb.getDsQuota());
         }
         // load created list
@@ -314,7 +332,7 @@ public class FSImageFormatPBSnapshot {
           SnapshotSection.Snapshot.Builder sb = SnapshotSection.Snapshot
               .newBuilder().setSnapshotId(s.getId());
           INodeSection.INodeDirectory.Builder db = buildINodeDirectory(sroot,
-              parent.getSaverContext().getStringMap());
+              parent.getSaverContext());
           INodeSection.INode r = INodeSection.INode.newBuilder()
               .setId(sroot.getId())
               .setType(INodeSection.INode.Type.DIRECTORY)
@@ -372,7 +390,7 @@ public class FSImageFormatPBSnapshot {
           INodeFileAttributes copy = diff.snapshotINode;
           if (copy != null) {
             fb.setName(ByteString.copyFrom(copy.getLocalNameBytes()))
-                .setSnapshotCopy(buildINodeFile(copy, parent.getSaverContext().getStringMap()));
+                .setSnapshotCopy(buildINodeFile(copy, parent.getSaverContext()));
           }
           fb.build().writeDelimitedTo(out);
         }
@@ -413,7 +431,7 @@ public class FSImageFormatPBSnapshot {
           if (!diff.isSnapshotRoot() && copy != null) {
             db.setName(ByteString.copyFrom(copy.getLocalNameBytes()))
                 .setSnapshotCopy(
-                    buildINodeDirectory(copy, parent.getSaverContext().getStringMap()));
+                    buildINodeDirectory(copy, parent.getSaverContext()));
           }
           // process created list and deleted list
           List<INode> created = diff.getChildrenDiff()

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/acl.proto

@@ -60,7 +60,7 @@ message AclStatusProto {
   repeated AclEntryProto entries = 4;
 }
 
-message AclFsImageProto {
+message AclFeatureProto {
   repeated AclEntryProto entries = 1;
 }
 

+ 15 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto

@@ -22,6 +22,7 @@ option java_outer_classname = "FsImageProto";
 package hadoop.hdfs.fsimage;
 
 import "hdfs.proto";
+import "acl.proto";
 
 /**
  * This file defines the on-disk layout of the file system image. The
@@ -96,6 +97,7 @@ message INodeSection {
     optional fixed64 permission = 5;
     repeated BlockProto blocks = 6;
     optional FileUnderConstructionFeature fileUC = 7;
+    optional uint32 aclId = 8;
   }
 
   message INodeDirectory {
@@ -105,6 +107,7 @@ message INodeSection {
     // diskspace quota
     optional uint64 dsQuota = 3;
     optional fixed64 permission = 4;
+    optional uint32 aclId = 5;
   }
 
   message INodeSymlink {
@@ -278,3 +281,15 @@ message CacheManagerSection {
   // repeated CacheDirectiveInfoProto directives
 }
 
+/**
+ * This section maps string to id
+ * NAME: EXTENDED_ACL
+ */
+message ExtendedAclSection {
+  message Entry {
+    required uint32 id = 1;
+    optional AclFeatureProto acl = 2;
+  }
+  optional uint32 numEntry = 1;
+  // repeated Entry entries
+}

+ 0 - 30
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAclConfigFlag.java

@@ -141,36 +141,6 @@ public class TestAclConfigFlag {
     restart(true, false);
   }
 
-  @Test
-  public void testFsImage() throws Exception {
-    // With ACLs enabled, set an ACL.
-    initCluster(true, true);
-    fs.mkdirs(PATH);
-    fs.setAcl(PATH, Lists.newArrayList(
-      aclEntry(DEFAULT, USER, "foo", READ_WRITE)));
-
-    // Save a new checkpoint and restart with ACLs still enabled.
-    restart(true, true);
-
-    // Attempt restart with ACLs disabled.
-    try {
-      restart(false, false);
-      fail("expected IOException");
-    } catch (IOException e) {
-      // Unfortunately, we can't assert on the message containing the
-      // configuration key here.  That message is logged, but a more generic
-      // fsimage loading exception propagates up to this layer.
-      GenericTestUtils.assertExceptionContains(
-        "Failed to load an FSImage file", e);
-    }
-
-    // Recover by restarting with ACLs enabled, deleting the ACL, saving a new
-    // checkpoint, and then restarting with ACLs disabled.
-    restart(false, true);
-    fs.removeAcl(PATH);
-    restart(true, false);
-  }
-
   /**
    * We expect an AclException, and we want the exception text to state the
    * configuration key that controls ACL support.

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java

@@ -91,6 +91,7 @@ public class TestNamenodeRetryCache {
     conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BlockSize);
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
     cluster = new MiniDFSCluster.Builder(conf).build();
     cluster.waitActive();
     namesystem = cluster.getNamesystem();

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java

@@ -125,6 +125,7 @@ public class TestRetryCacheWithHA {
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BlockSize);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, ResponseSize);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, ResponseSize);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
     cluster = new MiniDFSCluster.Builder(conf)
         .nnTopology(MiniDFSNNTopology.simpleHATopology())
         .numDataNodes(DataNodes).build();