Browse Source

Fix potential FSImage corruption. Contributed by Ekanth Sethuramalingam, Arpit Agarwal, and Andrew Purtell.

Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
Andrew Purtell 6 years ago
parent
commit
109d44604c
18 changed files with 731 additions and 286 deletions
  1. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  2. 55 53
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclEntryStatusFormat.java
  3. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  4. 25 89
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
  5. 12 14
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
  6. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
  7. 28 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java
  8. 173 45
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SerialNumberManager.java
  9. 107 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SerialNumberMap.java
  10. 70 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrFeature.java
  11. 193 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrFormat.java
  12. 35 43
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java
  13. 10 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FSImageLoader.java
  14. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java
  15. 3 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageXmlWriter.java
  16. 8 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java
  17. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto
  18. 1 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -621,6 +621,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
                                            "dfs.image.transfer.bandwidthPerSec";
                                            "dfs.image.transfer.bandwidthPerSec";
   public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0;  //no throttling
   public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0;  //no throttling
 
 
+  // String table in the fsimage utilizes an expanded bit range.
+  public static final String DFS_IMAGE_EXPANDED_STRING_TABLES_KEY =
+      "dfs.image.string-tables.expanded";
+  public static final boolean DFS_IMAGE_EXPANDED_STRING_TABLES_DEFAULT = true;
+
   // Image transfer timeout
   // Image transfer timeout
   public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout";
   public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout";
   public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000;
   public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000;

+ 55 - 53
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclEntryStatusFormat.java

@@ -32,24 +32,23 @@ import com.google.common.collect.ImmutableList;
  * Class to pack an AclEntry into an integer. <br>
  * Class to pack an AclEntry into an integer. <br>
  * An ACL entry is represented by a 32-bit integer in Big Endian format. <br>
  * An ACL entry is represented by a 32-bit integer in Big Endian format. <br>
  * The bits can be divided in four segments: <br>
  * The bits can be divided in four segments: <br>
- * [0:1) || [1:3) || [3:6) || [6:7) || [7:32) <br>
- * <br>
- * [0:1) -- the scope of the entry (AclEntryScope) <br>
- * [1:3) -- the type of the entry (AclEntryType) <br>
- * [3:6) -- the permission of the entry (FsAction) <br>
- * [6:7) -- A flag to indicate whether Named entry or not <br>
- * [7:8) -- Reserved <br>
- * [8:32) -- the name of the entry, which is an ID that points to a <br>
- * string in the StringTableSection. <br>
+ *
+ * Note:  this format is used both in-memory and on-disk.  Changes will be
+ * incompatible.
+ *
  */
  */
-public enum AclEntryStatusFormat {
+public enum AclEntryStatusFormat implements LongBitFormat.Enum {
 
 
-  SCOPE(null, 1),
-  TYPE(SCOPE.BITS, 2),
-  PERMISSION(TYPE.BITS, 3),
-  NAMED_ENTRY_CHECK(PERMISSION.BITS, 1),
-  RESERVED(NAMED_ENTRY_CHECK.BITS, 1),
-  NAME(RESERVED.BITS, 24);
+  PERMISSION(null, 3),
+  TYPE(PERMISSION.BITS, 2),
+  SCOPE(TYPE.BITS, 1),
+  NAME(SCOPE.BITS, 24);
+
+  private static final FsAction[] FSACTION_VALUES = FsAction.values();
+  private static final AclEntryScope[] ACL_ENTRY_SCOPE_VALUES =
+      AclEntryScope.values();
+  private static final AclEntryType[] ACL_ENTRY_TYPE_VALUES =
+      AclEntryType.values();
 
 
   private final LongBitFormat BITS;
   private final LongBitFormat BITS;
 
 
@@ -59,30 +58,29 @@ public enum AclEntryStatusFormat {
 
 
   static AclEntryScope getScope(int aclEntry) {
   static AclEntryScope getScope(int aclEntry) {
     int ordinal = (int) SCOPE.BITS.retrieve(aclEntry);
     int ordinal = (int) SCOPE.BITS.retrieve(aclEntry);
-    return AclEntryScope.values()[ordinal];
+    return ACL_ENTRY_SCOPE_VALUES[ordinal];
   }
   }
 
 
   static AclEntryType getType(int aclEntry) {
   static AclEntryType getType(int aclEntry) {
     int ordinal = (int) TYPE.BITS.retrieve(aclEntry);
     int ordinal = (int) TYPE.BITS.retrieve(aclEntry);
-    return AclEntryType.values()[ordinal];
+    return ACL_ENTRY_TYPE_VALUES[ordinal];
   }
   }
 
 
   static FsAction getPermission(int aclEntry) {
   static FsAction getPermission(int aclEntry) {
     int ordinal = (int) PERMISSION.BITS.retrieve(aclEntry);
     int ordinal = (int) PERMISSION.BITS.retrieve(aclEntry);
-    return FsAction.values()[ordinal];
+    return FSACTION_VALUES[ordinal];
   }
   }
 
 
   static String getName(int aclEntry) {
   static String getName(int aclEntry) {
-    int nameExists = (int) NAMED_ENTRY_CHECK.BITS.retrieve(aclEntry);
-    if (nameExists == 0) {
-      return null;
-    }
-    int id = (int) NAME.BITS.retrieve(aclEntry);
-    AclEntryType type = getType(aclEntry);
-    if (type == AclEntryType.USER) {
-      return SerialNumberManager.INSTANCE.getUser(id);
-    } else if (type == AclEntryType.GROUP) {
-      return SerialNumberManager.INSTANCE.getGroup(id);
+    return getName(aclEntry, null);
+  }
+
+  static String getName(int aclEntry,
+      SerialNumberManager.StringTable stringTable) {
+    SerialNumberManager snm = getSerialNumberManager(getType(aclEntry));
+    if (snm != null) {
+      int nid = (int)NAME.BITS.retrieve(aclEntry);
+      return snm.getString(nid, stringTable);
     }
     }
     return null;
     return null;
   }
   }
@@ -94,29 +92,26 @@ public enum AclEntryStatusFormat {
     aclEntryInt = TYPE.BITS.combine(aclEntry.getType().ordinal(), aclEntryInt);
     aclEntryInt = TYPE.BITS.combine(aclEntry.getType().ordinal(), aclEntryInt);
     aclEntryInt = PERMISSION.BITS.combine(aclEntry.getPermission().ordinal(),
     aclEntryInt = PERMISSION.BITS.combine(aclEntry.getPermission().ordinal(),
         aclEntryInt);
         aclEntryInt);
-    if (aclEntry.getName() != null) {
-      aclEntryInt = NAMED_ENTRY_CHECK.BITS.combine(1, aclEntryInt);
-      if (aclEntry.getType() == AclEntryType.USER) {
-        int userId = SerialNumberManager.INSTANCE.getUserSerialNumber(aclEntry
-            .getName());
-        aclEntryInt = NAME.BITS.combine(userId, aclEntryInt);
-      } else if (aclEntry.getType() == AclEntryType.GROUP) {
-        int groupId = SerialNumberManager.INSTANCE
-            .getGroupSerialNumber(aclEntry.getName());
-        aclEntryInt = NAME.BITS.combine(groupId, aclEntryInt);
-      }
+    SerialNumberManager snm = getSerialNumberManager(aclEntry.getType());
+    if (snm != null) {
+      int nid = snm.getSerialNumber(aclEntry.getName());
+      aclEntryInt = NAME.BITS.combine(nid, aclEntryInt);
     }
     }
     return (int) aclEntryInt;
     return (int) aclEntryInt;
   }
   }
 
 
   static AclEntry toAclEntry(int aclEntry) {
   static AclEntry toAclEntry(int aclEntry) {
-    AclEntry.Builder builder = new AclEntry.Builder();
-    builder.setScope(getScope(aclEntry)).setType(getType(aclEntry))
-        .setPermission(getPermission(aclEntry));
-    if (getName(aclEntry) != null) {
-      builder.setName(getName(aclEntry));
-    }
-    return builder.build();
+    return toAclEntry(aclEntry, null);
+  }
+
+  static AclEntry toAclEntry(int aclEntry,
+      SerialNumberManager.StringTable stringTable) {
+    return new AclEntry.Builder()
+        .setScope(getScope(aclEntry))
+        .setType(getType(aclEntry))
+        .setPermission(getPermission(aclEntry))
+        .setName(getName(aclEntry, stringTable))
+        .build();
   }
   }
 
 
   public static int[] toInt(List<AclEntry> aclEntries) {
   public static int[] toInt(List<AclEntry> aclEntries) {
@@ -127,12 +122,19 @@ public enum AclEntryStatusFormat {
     return entries;
     return entries;
   }
   }
 
 
-  public static ImmutableList<AclEntry> toAclEntries(int[] entries) {
-    ImmutableList.Builder<AclEntry> b = new ImmutableList.Builder<AclEntry>();
-    for (int entry : entries) {
-      AclEntry aclEntry = toAclEntry(entry);
-      b.add(aclEntry);
+  private static SerialNumberManager getSerialNumberManager(AclEntryType type) {
+    switch (type) {
+    case USER:
+      return SerialNumberManager.USER;
+    case GROUP:
+      return SerialNumberManager.GROUP;
+    default:
+      return null;
     }
     }
-    return b.build();
+  }
+
+  @Override
+  public int getLength() {
+    return BITS.getLength();
   }
   }
 }
 }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -60,7 +60,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshotFeature;
 import org.apache.hadoop.hdfs.util.ByteArray;
 import org.apache.hadoop.hdfs.util.ByteArray;
 import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
@@ -223,6 +222,8 @@ public class FSDirectory implements Closeable {
   private final NameCache<ByteArray> nameCache;
   private final NameCache<ByteArray> nameCache;
 
 
   FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
   FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
+    // used to enable/disable the use of expanded string tables.
+    SerialNumberManager.initialize(conf);
     this.dirLock = new ReentrantReadWriteLock(true); // fair
     this.dirLock = new ReentrantReadWriteLock(true); // fair
     this.inodeId = new INodeId();
     this.inodeId = new INodeId();
     rootDir = createRoot(ns);
     rootDir = createRoot(ns);

+ 25 - 89
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java

@@ -31,10 +31,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclEntryScope;
-import org.apache.hadoop.fs.permission.AclEntryType;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttr;
@@ -56,6 +52,8 @@ import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.XAttrCom
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.XAttrFeatureProto;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.XAttrFeatureProto;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.QuotaByStorageTypeEntryProto;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.QuotaByStorageTypeEntryProto;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.QuotaByStorageTypeFeatureProto;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.QuotaByStorageTypeFeatureProto;
+import org.apache.hadoop.hdfs.server.namenode.INodeWithAdditionalFields.PermissionStatusFormat;
+import org.apache.hadoop.hdfs.server.namenode.SerialNumberManager.StringTable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
@@ -66,80 +64,35 @@ import com.google.protobuf.ByteString;
 
 
 @InterfaceAudience.Private
 @InterfaceAudience.Private
 public final class FSImageFormatPBINode {
 public final class FSImageFormatPBINode {
-  private final static long USER_GROUP_STRID_MASK = (1 << 24) - 1;
-  private final static int USER_STRID_OFFSET = 40;
-  private final static int GROUP_STRID_OFFSET = 16;
   private static final Log LOG = LogFactory.getLog(FSImageFormatPBINode.class);
   private static final Log LOG = LogFactory.getLog(FSImageFormatPBINode.class);
 
 
-  private static final int ACL_ENTRY_NAME_MASK = (1 << 24) - 1;
-  private static final int ACL_ENTRY_NAME_OFFSET = 6;
-  private static final int ACL_ENTRY_TYPE_OFFSET = 3;
-  private static final int ACL_ENTRY_SCOPE_OFFSET = 5;
-  private static final int ACL_ENTRY_PERM_MASK = 7;
-  private static final int ACL_ENTRY_TYPE_MASK = 3;
-  private static final int ACL_ENTRY_SCOPE_MASK = 1;
-  private static final FsAction[] FSACTION_VALUES = FsAction.values();
-  private static final AclEntryScope[] ACL_ENTRY_SCOPE_VALUES = AclEntryScope
-      .values();
-  private static final AclEntryType[] ACL_ENTRY_TYPE_VALUES = AclEntryType
-      .values();
-  
-  private static final int XATTR_NAMESPACE_MASK = 3;
-  private static final int XATTR_NAMESPACE_OFFSET = 30;
-  private static final int XATTR_NAME_MASK = (1 << 24) - 1;
-  private static final int XATTR_NAME_OFFSET = 6;
-
-  /* See the comments in fsimage.proto for an explanation of the following. */
-  private static final int XATTR_NAMESPACE_EXT_OFFSET = 5;
-  private static final int XATTR_NAMESPACE_EXT_MASK = 1;
-
-  private static final XAttr.NameSpace[] XATTR_NAMESPACE_VALUES =
-      XAttr.NameSpace.values();
-  
-
+  // the loader must decode all fields referencing serial number based fields
+  // via to<Item> methods with the string table.
   public final static class Loader {
   public final static class Loader {
     public static PermissionStatus loadPermission(long id,
     public static PermissionStatus loadPermission(long id,
-        final String[] stringTable) {
-      short perm = (short) (id & ((1 << GROUP_STRID_OFFSET) - 1));
-      int gsid = (int) ((id >> GROUP_STRID_OFFSET) & USER_GROUP_STRID_MASK);
-      int usid = (int) ((id >> USER_STRID_OFFSET) & USER_GROUP_STRID_MASK);
-      return new PermissionStatus(stringTable[usid], stringTable[gsid],
-          new FsPermission(perm));
+        final StringTable stringTable) {
+      return PermissionStatusFormat.toPermissionStatus(id, stringTable);
     }
     }
 
 
     public static ImmutableList<AclEntry> loadAclEntries(
     public static ImmutableList<AclEntry> loadAclEntries(
-        AclFeatureProto proto, final String[] stringTable) {
+        AclFeatureProto proto, final StringTable stringTable) {
       ImmutableList.Builder<AclEntry> b = ImmutableList.builder();
       ImmutableList.Builder<AclEntry> b = ImmutableList.builder();
       for (int v : proto.getEntriesList()) {
       for (int v : proto.getEntriesList()) {
-        int p = v & ACL_ENTRY_PERM_MASK;
-        int t = (v >> ACL_ENTRY_TYPE_OFFSET) & ACL_ENTRY_TYPE_MASK;
-        int s = (v >> ACL_ENTRY_SCOPE_OFFSET) & ACL_ENTRY_SCOPE_MASK;
-        int nid = (v >> ACL_ENTRY_NAME_OFFSET) & ACL_ENTRY_NAME_MASK;
-        String name = stringTable[nid];
-        b.add(new AclEntry.Builder().setName(name)
-            .setPermission(FSACTION_VALUES[p])
-            .setScope(ACL_ENTRY_SCOPE_VALUES[s])
-            .setType(ACL_ENTRY_TYPE_VALUES[t]).build());
+        b.add(AclEntryStatusFormat.toAclEntry(v, stringTable));
       }
       }
       return b.build();
       return b.build();
     }
     }
     
     
     public static ImmutableList<XAttr> loadXAttrs(
     public static ImmutableList<XAttr> loadXAttrs(
-        XAttrFeatureProto proto, final String[] stringTable) {
+        XAttrFeatureProto proto, final StringTable stringTable) {
       ImmutableList.Builder<XAttr> b = ImmutableList.builder();
       ImmutableList.Builder<XAttr> b = ImmutableList.builder();
       for (XAttrCompactProto xAttrCompactProto : proto.getXAttrsList()) {
       for (XAttrCompactProto xAttrCompactProto : proto.getXAttrsList()) {
         int v = xAttrCompactProto.getName();
         int v = xAttrCompactProto.getName();
-        int nid = (v >> XATTR_NAME_OFFSET) & XATTR_NAME_MASK;
-        int ns = (v >> XATTR_NAMESPACE_OFFSET) & XATTR_NAMESPACE_MASK;
-        ns |=
-            ((v >> XATTR_NAMESPACE_EXT_OFFSET) & XATTR_NAMESPACE_EXT_MASK) << 2;
-        String name = stringTable[nid];
         byte[] value = null;
         byte[] value = null;
         if (xAttrCompactProto.getValue() != null) {
         if (xAttrCompactProto.getValue() != null) {
           value = xAttrCompactProto.getValue().toByteArray();
           value = xAttrCompactProto.getValue().toByteArray();
         }
         }
-        b.add(new XAttr.Builder().setNameSpace(XATTR_NAMESPACE_VALUES[ns])
-            .setName(name).setValue(value).build());
+        b.add(XAttrFormat.toXAttr(v, value, stringTable));
       }
       }
       
       
       return b.build();
       return b.build();
@@ -408,44 +361,28 @@ public final class FSImageFormatPBINode {
     }
     }
   }
   }
 
 
+  // the saver can directly write out fields referencing serial numbers.
+  // the serial number maps will be compacted when loading.
   public final static class Saver {
   public final static class Saver {
-    private static long buildPermissionStatus(INodeAttributes n,
-        final SaverContext.DeduplicationMap<String> stringMap) {
-      long userId = stringMap.getId(n.getUserName());
-      long groupId = stringMap.getId(n.getGroupName());
-      return ((userId & USER_GROUP_STRID_MASK) << USER_STRID_OFFSET)
-          | ((groupId & USER_GROUP_STRID_MASK) << GROUP_STRID_OFFSET)
-          | n.getFsPermissionShort();
+    private static long buildPermissionStatus(INodeAttributes n) {
+      return n.getPermissionLong();
     }
     }
 
 
-    private static AclFeatureProto.Builder buildAclEntries(AclFeature f,
-        final SaverContext.DeduplicationMap<String> map) {
+    private static AclFeatureProto.Builder buildAclEntries(AclFeature f) {
       AclFeatureProto.Builder b = AclFeatureProto.newBuilder();
       AclFeatureProto.Builder b = AclFeatureProto.newBuilder();
       for (int pos = 0, e; pos < f.getEntriesSize(); pos++) {
       for (int pos = 0, e; pos < f.getEntriesSize(); pos++) {
         e = f.getEntryAt(pos);
         e = f.getEntryAt(pos);
-        int nameId = map.getId(AclEntryStatusFormat.getName(e));
-        int v = ((nameId & ACL_ENTRY_NAME_MASK) << ACL_ENTRY_NAME_OFFSET)
-            | (AclEntryStatusFormat.getType(e).ordinal() << ACL_ENTRY_TYPE_OFFSET)
-            | (AclEntryStatusFormat.getScope(e).ordinal() << ACL_ENTRY_SCOPE_OFFSET)
-            | (AclEntryStatusFormat.getPermission(e).ordinal());
-        b.addEntries(v);
+        b.addEntries(e);
       }
       }
       return b;
       return b;
     }
     }
     
     
-    private static XAttrFeatureProto.Builder buildXAttrs(XAttrFeature f,
-        final SaverContext.DeduplicationMap<String> stringMap) {
+    private static XAttrFeatureProto.Builder buildXAttrs(XAttrFeature f) {
       XAttrFeatureProto.Builder b = XAttrFeatureProto.newBuilder();
       XAttrFeatureProto.Builder b = XAttrFeatureProto.newBuilder();
       for (XAttr a : f.getXAttrs()) {
       for (XAttr a : f.getXAttrs()) {
         XAttrCompactProto.Builder xAttrCompactBuilder = XAttrCompactProto.
         XAttrCompactProto.Builder xAttrCompactBuilder = XAttrCompactProto.
             newBuilder();
             newBuilder();
-        int nsOrd = a.getNameSpace().ordinal();
-        Preconditions.checkArgument(nsOrd < 8, "Too many namespaces.");
-        int v = ((nsOrd & XATTR_NAMESPACE_MASK) << XATTR_NAMESPACE_OFFSET)
-            | ((stringMap.getId(a.getName()) & XATTR_NAME_MASK) <<
-                XATTR_NAME_OFFSET);
-        v |= (((nsOrd >> 2) & XATTR_NAMESPACE_EXT_MASK) <<
-            XATTR_NAMESPACE_EXT_OFFSET);
+        int v = XAttrFormat.toInt(a);
         xAttrCompactBuilder.setName(v);
         xAttrCompactBuilder.setName(v);
         if (a.getValue() != null) {
         if (a.getValue() != null) {
           xAttrCompactBuilder.setValue(PBHelper.getByteString(a.getValue()));
           xAttrCompactBuilder.setValue(PBHelper.getByteString(a.getValue()));
@@ -477,18 +414,18 @@ public final class FSImageFormatPBINode {
       INodeSection.INodeFile.Builder b = INodeSection.INodeFile.newBuilder()
       INodeSection.INodeFile.Builder b = INodeSection.INodeFile.newBuilder()
           .setAccessTime(file.getAccessTime())
           .setAccessTime(file.getAccessTime())
           .setModificationTime(file.getModificationTime())
           .setModificationTime(file.getModificationTime())
-          .setPermission(buildPermissionStatus(file, state.getStringMap()))
+          .setPermission(buildPermissionStatus(file))
           .setPreferredBlockSize(file.getPreferredBlockSize())
           .setPreferredBlockSize(file.getPreferredBlockSize())
           .setReplication(file.getFileReplication())
           .setReplication(file.getFileReplication())
           .setStoragePolicyID(file.getLocalStoragePolicyID());
           .setStoragePolicyID(file.getLocalStoragePolicyID());
 
 
       AclFeature f = file.getAclFeature();
       AclFeature f = file.getAclFeature();
       if (f != null) {
       if (f != null) {
-        b.setAcl(buildAclEntries(f, state.getStringMap()));
+        b.setAcl(buildAclEntries(f));
       }
       }
       XAttrFeature xAttrFeature = file.getXAttrFeature();
       XAttrFeature xAttrFeature = file.getXAttrFeature();
       if (xAttrFeature != null) {
       if (xAttrFeature != null) {
-        b.setXAttrs(buildXAttrs(xAttrFeature, state.getStringMap()));
+        b.setXAttrs(buildXAttrs(xAttrFeature));
       }
       }
       return b;
       return b;
     }
     }
@@ -500,7 +437,7 @@ public final class FSImageFormatPBINode {
           .newBuilder().setModificationTime(dir.getModificationTime())
           .newBuilder().setModificationTime(dir.getModificationTime())
           .setNsQuota(quota.getNameSpace())
           .setNsQuota(quota.getNameSpace())
           .setDsQuota(quota.getStorageSpace())
           .setDsQuota(quota.getStorageSpace())
-          .setPermission(buildPermissionStatus(dir, state.getStringMap()));
+          .setPermission(buildPermissionStatus(dir));
 
 
       if (quota.getTypeSpaces().anyGreaterOrEqual(0)) {
       if (quota.getTypeSpaces().anyGreaterOrEqual(0)) {
         b.setTypeQuotas(buildQuotaByStorageTypeEntries(quota));
         b.setTypeQuotas(buildQuotaByStorageTypeEntries(quota));
@@ -508,11 +445,11 @@ public final class FSImageFormatPBINode {
 
 
       AclFeature f = dir.getAclFeature();
       AclFeature f = dir.getAclFeature();
       if (f != null) {
       if (f != null) {
-        b.setAcl(buildAclEntries(f, state.getStringMap()));
+        b.setAcl(buildAclEntries(f));
       }
       }
       XAttrFeature xAttrFeature = dir.getXAttrFeature();
       XAttrFeature xAttrFeature = dir.getXAttrFeature();
       if (xAttrFeature != null) {
       if (xAttrFeature != null) {
-        b.setXAttrs(buildXAttrs(xAttrFeature, state.getStringMap()));
+        b.setXAttrs(buildXAttrs(xAttrFeature));
       }
       }
       return b;
       return b;
     }
     }
@@ -645,10 +582,9 @@ public final class FSImageFormatPBINode {
     }
     }
 
 
     private void save(OutputStream out, INodeSymlink n) throws IOException {
     private void save(OutputStream out, INodeSymlink n) throws IOException {
-      SaverContext state = parent.getSaverContext();
       INodeSection.INodeSymlink.Builder b = INodeSection.INodeSymlink
       INodeSection.INodeSymlink.Builder b = INodeSection.INodeSymlink
           .newBuilder()
           .newBuilder()
-          .setPermission(buildPermissionStatus(n, state.getStringMap()))
+          .setPermission(buildPermissionStatus(n))
           .setTarget(ByteString.copyFrom(n.getSymlink()))
           .setTarget(ByteString.copyFrom(n.getSymlink()))
           .setModificationTime(n.getModificationTime())
           .setModificationTime(n.getModificationTime())
           .setAccessTime(n.getAccessTime());
           .setAccessTime(n.getAccessTime());

+ 12 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java

@@ -79,10 +79,10 @@ public final class FSImageFormatProtobuf {
       .getLogger(FSImageFormatProtobuf.class);
       .getLogger(FSImageFormatProtobuf.class);
 
 
   public static final class LoaderContext {
   public static final class LoaderContext {
-    private String[] stringTable;
+    private SerialNumberManager.StringTable stringTable;
     private final ArrayList<INodeReference> refList = Lists.newArrayList();
     private final ArrayList<INodeReference> refList = Lists.newArrayList();
 
 
-    public String[] getStringTable() {
+    public SerialNumberManager.StringTable getStringTable() {
       return stringTable;
       return stringTable;
     }
     }
 
 
@@ -121,14 +121,8 @@ public final class FSImageFormatProtobuf {
         return map.entrySet();
         return map.entrySet();
       }
       }
     }
     }
-    private final ArrayList<INodeReference> refList = Lists.newArrayList();
-
-    private final DeduplicationMap<String> stringMap = DeduplicationMap
-        .newMap();
 
 
-    public DeduplicationMap<String> getStringMap() {
-      return stringMap;
-    }
+    private final ArrayList<INodeReference> refList = Lists.newArrayList();
 
 
     public ArrayList<INodeReference> getRefList() {
     public ArrayList<INodeReference> getRefList() {
       return refList;
       return refList;
@@ -311,11 +305,12 @@ public final class FSImageFormatProtobuf {
 
 
     private void loadStringTableSection(InputStream in) throws IOException {
     private void loadStringTableSection(InputStream in) throws IOException {
       StringTableSection s = StringTableSection.parseDelimitedFrom(in);
       StringTableSection s = StringTableSection.parseDelimitedFrom(in);
-      ctx.stringTable = new String[s.getNumEntry() + 1];
+      ctx.stringTable =
+          SerialNumberManager.newStringTable(s.getNumEntry(), s.getMaskBits());
       for (int i = 0; i < s.getNumEntry(); ++i) {
       for (int i = 0; i < s.getNumEntry(); ++i) {
         StringTableSection.Entry e = StringTableSection.Entry
         StringTableSection.Entry e = StringTableSection.Entry
             .parseDelimitedFrom(in);
             .parseDelimitedFrom(in);
-        ctx.stringTable[e.getId()] = e.getStr();
+        ctx.stringTable.put(e.getId(), e.getStr());
       }
       }
     }
     }
 
 
@@ -568,12 +563,15 @@ public final class FSImageFormatProtobuf {
     private void saveStringTableSection(FileSummary.Builder summary)
     private void saveStringTableSection(FileSummary.Builder summary)
         throws IOException {
         throws IOException {
       OutputStream out = sectionOutputStream;
       OutputStream out = sectionOutputStream;
+      SerialNumberManager.StringTable stringTable =
+          SerialNumberManager.getStringTable();
       StringTableSection.Builder b = StringTableSection.newBuilder()
       StringTableSection.Builder b = StringTableSection.newBuilder()
-          .setNumEntry(saverContext.stringMap.size());
+          .setNumEntry(stringTable.size())
+          .setMaskBits(stringTable.getMaskBits());
       b.build().writeDelimitedTo(out);
       b.build().writeDelimitedTo(out);
-      for (Entry<String, Integer> e : saverContext.stringMap.entrySet()) {
+      for (Entry<Integer, String> e : stringTable) {
         StringTableSection.Entry.Builder eb = StringTableSection.Entry
         StringTableSection.Entry.Builder eb = StringTableSection.Entry
-            .newBuilder().setId(e.getValue()).setStr(e.getKey());
+            .newBuilder().setId(e.getKey()).setStr(e.getValue());
         eb.build().writeDelimitedTo(out);
         eb.build().writeDelimitedTo(out);
       }
       }
       commitSection(summary, SectionName.STRING_TABLE);
       commitSection(summary, SectionName.STRING_TABLE);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -118,7 +118,7 @@ public class INodeDirectory extends INodeWithAdditionalFields
   @Override
   @Override
   public byte getLocalStoragePolicyID() {
   public byte getLocalStoragePolicyID() {
     XAttrFeature f = getXAttrFeature();
     XAttrFeature f = getXAttrFeature();
-    ImmutableList<XAttr> xattrs = f == null ? ImmutableList.<XAttr> of() : f
+    List<XAttr> xattrs = f == null ? ImmutableList.<XAttr> of() : f
         .getXAttrs();
         .getXAttrs();
     for (XAttr xattr : xattrs) {
     for (XAttr xattr : xattrs) {
       if (BlockStoragePolicySuite.isStoragePolicyXAttr(xattr)) {
       if (BlockStoragePolicySuite.isStoragePolicyXAttr(xattr)) {

+ 28 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java

@@ -33,7 +33,9 @@ import com.google.common.base.Preconditions;
 @InterfaceAudience.Private
 @InterfaceAudience.Private
 public abstract class INodeWithAdditionalFields extends INode
 public abstract class INodeWithAdditionalFields extends INode
     implements LinkedElement {
     implements LinkedElement {
-  static enum PermissionStatusFormat {
+  // Note: this format is used both in-memory and on-disk.  Changes will be
+  // incompatible.
+  enum PermissionStatusFormat implements LongBitFormat.Enum {
     MODE(null, 16),
     MODE(null, 16),
     GROUP(MODE.BITS, 24),
     GROUP(MODE.BITS, 24),
     USER(GROUP.BITS, 24);
     USER(GROUP.BITS, 24);
@@ -46,12 +48,14 @@ public abstract class INodeWithAdditionalFields extends INode
 
 
     static String getUser(long permission) {
     static String getUser(long permission) {
       final int n = (int)USER.BITS.retrieve(permission);
       final int n = (int)USER.BITS.retrieve(permission);
-      return SerialNumberManager.INSTANCE.getUser(n);
+      String s = SerialNumberManager.USER.getString(n);
+      assert s != null;
+      return s;
     }
     }
 
 
     static String getGroup(long permission) {
     static String getGroup(long permission) {
       final int n = (int)GROUP.BITS.retrieve(permission);
       final int n = (int)GROUP.BITS.retrieve(permission);
-      return SerialNumberManager.INSTANCE.getGroup(n);
+      return SerialNumberManager.GROUP.getString(n);
     }
     }
     
     
     static short getMode(long permission) {
     static short getMode(long permission) {
@@ -61,16 +65,33 @@ public abstract class INodeWithAdditionalFields extends INode
     /** Encode the {@link PermissionStatus} to a long. */
     /** Encode the {@link PermissionStatus} to a long. */
     static long toLong(PermissionStatus ps) {
     static long toLong(PermissionStatus ps) {
       long permission = 0L;
       long permission = 0L;
-      final int user = SerialNumberManager.INSTANCE.getUserSerialNumber(
+      final int user = SerialNumberManager.USER.getSerialNumber(
           ps.getUserName());
           ps.getUserName());
       permission = USER.BITS.combine(user, permission);
       permission = USER.BITS.combine(user, permission);
-      final int group = SerialNumberManager.INSTANCE.getGroupSerialNumber(
+      // ideally should assert on group but inodes are created with null
+      // group and then updated only when added to a directory.
+      final int group = SerialNumberManager.GROUP.getSerialNumber(
           ps.getGroupName());
           ps.getGroupName());
       permission = GROUP.BITS.combine(group, permission);
       permission = GROUP.BITS.combine(group, permission);
       final int mode = ps.getPermission().toShort();
       final int mode = ps.getPermission().toShort();
       permission = MODE.BITS.combine(mode, permission);
       permission = MODE.BITS.combine(mode, permission);
       return permission;
       return permission;
     }
     }
+
+    static PermissionStatus toPermissionStatus(long id,
+        SerialNumberManager.StringTable stringTable) {
+      int uid = (int)USER.BITS.retrieve(id);
+      int gid = (int)GROUP.BITS.retrieve(id);
+      return new PermissionStatus(
+        SerialNumberManager.USER.getString(uid, stringTable),
+        SerialNumberManager.GROUP.getString(gid, stringTable),
+        new FsPermission(getMode(id)));
+    }
+
+    @Override
+    public int getLength() {
+      return BITS.getLength();
+    }
   }
   }
 
 
   /** The inode id. */
   /** The inode id. */
@@ -175,7 +196,7 @@ public abstract class INodeWithAdditionalFields extends INode
 
 
   @Override
   @Override
   final void setUser(String user) {
   final void setUser(String user) {
-    int n = SerialNumberManager.INSTANCE.getUserSerialNumber(user);
+    int n = SerialNumberManager.USER.getSerialNumber(user);
     updatePermissionStatus(PermissionStatusFormat.USER, n);
     updatePermissionStatus(PermissionStatusFormat.USER, n);
   }
   }
 
 
@@ -189,7 +210,7 @@ public abstract class INodeWithAdditionalFields extends INode
 
 
   @Override
   @Override
   final void setGroup(String group) {
   final void setGroup(String group) {
-    int n = SerialNumberManager.INSTANCE.getGroupSerialNumber(group);
+    int n = SerialNumberManager.GROUP.getSerialNumber(group);
     updatePermissionStatus(PermissionStatusFormat.GROUP, n);
     updatePermissionStatus(PermissionStatusFormat.GROUP, n);
   }
   }
 
 

+ 173 - 45
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SerialNumberManager.java

@@ -17,67 +17,195 @@
  */
  */
 package org.apache.hadoop.hdfs.server.namenode;
 package org.apache.hadoop.hdfs.server.namenode;
 
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.INodeWithAdditionalFields.PermissionStatusFormat;
+import org.apache.hadoop.hdfs.util.LongBitFormat;
 
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_EXPANDED_STRING_TABLES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_EXPANDED_STRING_TABLES_KEY;
 
 
-/** Manage name-to-serial-number maps for users and groups. */
-class SerialNumberManager {
-  /** This is the only instance of {@link SerialNumberManager}.*/
-  static final SerialNumberManager INSTANCE = new SerialNumberManager();
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
 
 
-  private final SerialNumberMap<String> usermap = new SerialNumberMap<String>();
-  private final SerialNumberMap<String> groupmap = new SerialNumberMap<String>();
+/** Manage name-to-serial-number maps for various string tables. */
+public enum SerialNumberManager {
+  GLOBAL(), // NEVER EVER directly access!
+  USER(PermissionStatusFormat.USER, AclEntryStatusFormat.NAME),
+  GROUP(PermissionStatusFormat.GROUP, AclEntryStatusFormat.NAME),
+  XATTR(XAttrFormat.NAME);
 
 
-  private SerialNumberManager() {}
+  private static final SerialNumberManager[] values = values();
+  private static final int maxEntryBits;
+  private static final int maxEntryNumber;
+  private static final int maskBits;
+  private static boolean initialized;
 
 
-  int getUserSerialNumber(String u) {return usermap.get(u);}
-  int getGroupSerialNumber(String g) {return groupmap.get(g);}
-  String getUser(int n) {return usermap.get(n);}
-  String getGroup(int n) {return groupmap.get(n);}
+  private SerialNumberMap<String> serialMap;
+  private int bitLength = Integer.SIZE;
+  private boolean enabled;
 
 
-  {
-    getUserSerialNumber(null);
-    getGroupSerialNumber(null);
+  static {
+    maxEntryBits = Integer.numberOfLeadingZeros(values.length);
+    maxEntryNumber = (1 << maxEntryBits) - 1;
+    maskBits = Integer.SIZE - maxEntryBits;
+    for (SerialNumberManager snm : values) {
+      // account for string table mask bits.
+      snm.updateLength(maxEntryBits);
+      // find max allowed length in case global is enabled.
+      GLOBAL.updateLength(snm.getLength());
+    }
+    // can reinitialize once later.
+    initializeSerialMaps(DFS_IMAGE_EXPANDED_STRING_TABLES_DEFAULT);
   }
   }
 
 
-  private static class SerialNumberMap<T> {
-    private final AtomicInteger max = new AtomicInteger(1);
-    private final ConcurrentMap<T, Integer> t2i = new ConcurrentHashMap<T, Integer>();
-    private final ConcurrentMap<Integer, T> i2t = new ConcurrentHashMap<Integer, T>();
+  static synchronized void initialize(Configuration conf) {
+    boolean useExpanded = conf.getBoolean(
+      DFS_IMAGE_EXPANDED_STRING_TABLES_KEY,
+      DFS_IMAGE_EXPANDED_STRING_TABLES_DEFAULT);
+    if (initialized) {
+      if (useExpanded ^ !GLOBAL.enabled) {
+        throw new IllegalStateException("Cannot change serial maps");
+      }
+      return;
+    }
+    initializeSerialMaps(useExpanded);
+    for (SerialNumberManager snm : values) {
+      if (snm.enabled) {
+        FSDirectory.LOG.info(snm + " serial map: bits=" + snm.getLength() +
+          " maxEntries=" + snm.serialMap.getMax());
+      }
+    }
+    initialized = true;
+  }
 
 
-    int get(T t) {
-      if (t == null) {
-        return 0;
+  private static void initializeSerialMaps(boolean useExpanded) {
+    if (useExpanded) {
+      // initialize per-manager serial maps for all but global.
+      for (SerialNumberManager snm : values) {
+        snm.enabled = (snm != GLOBAL);
+        snm.serialMap = snm.enabled ? new SerialNumberMap<String>(snm) : null;
       }
       }
-      Integer sn = t2i.get(t);
-      if (sn == null) {
-        sn = max.getAndIncrement();
-        Integer old = t2i.putIfAbsent(t, sn);
-        if (old != null) {
-          return old;
-        }
-        i2t.put(sn, t);
+    } else {
+      // initialize all managers to use the global serial map.
+      SerialNumberMap<String> globalSerialMap = new SerialNumberMap<>(GLOBAL);
+      for (SerialNumberManager snm : values) {
+        snm.enabled = (snm == GLOBAL);
+        snm.serialMap = globalSerialMap;
       }
       }
-      return sn;
     }
     }
+  }
+
+  SerialNumberManager(LongBitFormat.Enum... elements) {
+    // compute the smallest bit length registered with the serial manager.
+    for (LongBitFormat.Enum element : elements) {
+      updateLength(element.getLength());
+    }
+  }
+
+  int getLength() {
+    return bitLength;
+  }
+
+  private void updateLength(int maxLength) {
+    bitLength = Math.min(bitLength, maxLength);
+  }
+
+  public int getSerialNumber(String u) {
+    return serialMap.get(u);
+  }
+
+  public String getString(int id) {
+    return serialMap.get(id);
+  }
+
+  public String getString(int id, StringTable stringTable) {
+    return (stringTable != null)
+        ? stringTable.get(this, id) : getString(id);
+  }
 
 
-    T get(int i) {
-      if (i == 0) {
-        return null;
+  private int getMask(int bits) {
+    return ordinal() << (Integer.SIZE - bits);
+  }
+
+  private static int getMaskBits() {
+    return GLOBAL.enabled ? 0 : maskBits;
+  }
+
+  private int size() {
+    return enabled ? serialMap.size() : 0;
+  }
+
+  private Iterable<Entry<Integer, String>> entrySet() {
+    if (!enabled) {
+      return Collections.emptySet();
+    }
+    return serialMap.entrySet();
+  }
+
+  // returns snapshot of current values for a save.
+  public static StringTable getStringTable() {
+    // approximate size for capacity.
+    int size = 0;
+    for (final SerialNumberManager snm : values) {
+      size += snm.size();
+    }
+    int tableMaskBits = getMaskBits();
+    StringTable map = new StringTable(size, tableMaskBits);
+    for (final SerialNumberManager snm : values) {
+      final int mask = snm.getMask(tableMaskBits);
+      for (Entry<Integer, String> entry : snm.entrySet()) {
+        map.put(entry.getKey() | mask, entry.getValue());
       }
       }
-      T t = i2t.get(i);
-      if (t == null) {
-        throw new IllegalStateException("!i2t.containsKey(" + i
-            + "), this=" + this);
+    }
+    return map;
+  }
+
+  // returns an empty table for load.
+  public static StringTable newStringTable(int size, int bits) {
+    if (bits > maskBits) {
+      throw new IllegalArgumentException(
+        "String table bits " + bits + " > " + maskBits);
+    }
+    return new StringTable(size, bits);
+  }
+
+  public static class StringTable implements Iterable<Entry<Integer, String>> {
+    private final int tableMaskBits;
+    private final Map<Integer,String> map;
+
+    private StringTable(int size, int loadingMaskBits) {
+      this.tableMaskBits = loadingMaskBits;
+      this.map = new HashMap<>(size);
+    }
+
+    public String get(SerialNumberManager snm, int id) {
+      if (tableMaskBits != 0) {
+        if (id > maxEntryNumber) {
+          throw new IllegalStateException(
+            "serial id " + id + " > " + maxEntryNumber);
+        }
+        id |= snm.getMask(tableMaskBits);
       }
       }
-      return t;
+      return map.get(id);
+    }
+
+    public void put(int id, String str) {
+      map.put(id, str);
+    }
+
+    public Iterator<Entry<Integer, String>> iterator() {
+      return map.entrySet().iterator();
+    }
+
+    public int size() {
+      return map.size();
     }
     }
 
 
-    @Override
-    public String toString() {
-      return "max=" + max + ",\n  t2i=" + t2i + ",\n  i2t=" + i2t;
+    public int getMaskBits() {
+      return tableMaskBits;
     }
     }
   }
   }
-}
+}

+ 107 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SerialNumberMap.java

@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Map object to serial number.
+ * 
+ * <p>It allows to get the serial number of an object, if the object doesn't
+ * exist in the map, a new serial number increased by 1 is generated to
+ * map to the object. The mapped object can also be got through the serial
+ * number.
+ * 
+ * <p>The map is thread-safe.
+ */
+@InterfaceAudience.Private
+public class SerialNumberMap<T> {
+  private String name;
+  private final int max;
+  private final AtomicInteger current = new AtomicInteger(1);
+  private final ConcurrentMap<T, Integer> t2i =
+      new ConcurrentHashMap<T, Integer>();
+  private final ConcurrentMap<Integer, T> i2t =
+      new ConcurrentHashMap<Integer, T>();
+
+  SerialNumberMap(SerialNumberManager snm) {
+    this(snm.name(), snm.getLength());
+  }
+
+  SerialNumberMap(String name, int bitLength) {
+    this.name = name;
+    this.max = (1 << bitLength) - 1;
+  }
+
+  public int get(T t) {
+    if (t == null) {
+      return 0;
+    }
+    Integer sn = t2i.get(t);
+    if (sn == null) {
+      sn = current.getAndIncrement();
+      if (sn > max) {
+        current.getAndDecrement();
+        throw new IllegalStateException(name + ": serial number map is full");
+      }
+      Integer old = t2i.putIfAbsent(t, sn);
+      if (old != null) {
+        return old;
+      }
+      i2t.put(sn, t);
+    }
+    return sn;
+  }
+
+  public T get(int i) {
+    if (i == 0) {
+      return null;
+    }
+    T t = i2t.get(i);
+    if (t == null) {
+      throw new IllegalStateException(
+        name + ": serial number " + i + " does not exist");
+    }
+    return t;
+  }
+
+  int getMax() {
+    return max;
+  }
+
+  Set<Map.Entry<Integer, T>> entrySet() {
+    return new HashSet<>(i2t.entrySet());
+  }
+
+  public int size() {
+    return i2t.size();
+  }
+
+  @Override
+  public String toString() {
+    return "current=" + current + ",\n" +
+        "max=" + max + ",\n  t2i=" + t2i + ",\n  i2t=" + i2t;
+  }
+}

+ 70 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrFeature.java

@@ -17,9 +17,12 @@
  */
  */
 package org.apache.hadoop.hdfs.server.namenode;
 package org.apache.hadoop.hdfs.server.namenode;
 
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttr;
-import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.XAttrHelper;
 
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList;
 
 
@@ -28,16 +31,75 @@ import com.google.common.collect.ImmutableList;
  */
  */
 @InterfaceAudience.Private
 @InterfaceAudience.Private
 public class XAttrFeature implements INode.Feature {
 public class XAttrFeature implements INode.Feature {
-  public static final ImmutableList<XAttr> EMPTY_ENTRY_LIST =
-      ImmutableList.of();
+  static final int PACK_THRESHOLD = 1024;
+
+  /** The packed bytes for small size XAttrs. */
+  private byte[] attrs;
 
 
-  private final ImmutableList<XAttr> xAttrs;
+  /**
+   * List to store large size XAttrs.
+   * Typically XAttr value size is small, so this
+   * list is null usually.
+   */
+  private ImmutableList<XAttr> xAttrs;
+
+  public XAttrFeature(List<XAttr> xAttrs) {
+    if (xAttrs != null && !xAttrs.isEmpty()) {
+      List<XAttr> toPack = new ArrayList<XAttr>();
+      ImmutableList.Builder<XAttr> b = null;
+      for (XAttr attr : xAttrs) {
+        if (attr.getValue() == null ||
+            attr.getValue().length <= PACK_THRESHOLD) {
+          toPack.add(attr);
+        } else {
+          if (b == null) {
+            b = ImmutableList.builder();
+          }
+          b.add(attr);
+        }
+      }
+      this.attrs = XAttrFormat.toBytes(toPack);
+      if (b != null) {
+        this.xAttrs = b.build();
+      }
+    }
+  }
 
 
-  public XAttrFeature(ImmutableList<XAttr> xAttrs) {
-    this.xAttrs = xAttrs;
+  /**
+   * Get the XAttrs.
+   * @return the XAttrs
+   */
+  public List<XAttr> getXAttrs() {
+    if (xAttrs == null) {
+      return XAttrFormat.toXAttrs(attrs);
+    } else {
+      if (attrs == null) {
+        return xAttrs;
+      } else {
+        List<XAttr> result = new ArrayList<>();
+        result.addAll(XAttrFormat.toXAttrs(attrs));
+        result.addAll(xAttrs);
+        return result;
+      }
+    }
   }
   }
 
 
-  public ImmutableList<XAttr> getXAttrs() {
-    return xAttrs;
+  /**
+   * Get XAttr by name with prefix.
+   * @param prefixedName xAttr name with prefix
+   * @return the XAttr
+   */
+  public XAttr getXAttr(String prefixedName) {
+    XAttr attr = XAttrFormat.getXAttr(attrs, prefixedName);
+    if (attr == null && xAttrs != null) {
+      XAttr toFind = XAttrHelper.buildXAttr(prefixedName);
+      for (XAttr a : xAttrs) {
+        if (a.equalsIgnoreValue(toFind)) {
+          attr = a;
+          break;
+        }
+      }
+    }
+    return attr;
   }
   }
 }
 }

+ 193 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrFormat.java

@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.hdfs.XAttrHelper;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.hdfs.util.LongBitFormat;
+
+/**
+ * Class to pack XAttrs into byte[].<br>
+ *
+ * Note:  this format is used both in-memory and on-disk.  Changes will be
+ * incompatible.
+ *
+ */
+public enum XAttrFormat implements LongBitFormat.Enum {
+  RESERVED(null, 5),
+  NS_EXT(RESERVED.BITS, 1),
+  NAME(NS_EXT.BITS, 24),
+  NS(NAME.BITS, 2);
+
+  private static final int NS_EXT_SHIFT = NS.BITS.getLength();
+  private static final int NS_MASK = (1 << NS_EXT_SHIFT) - 1;
+  private static final int XATTR_VALUE_LEN_MAX = 1 << 16;
+  private static final XAttr.NameSpace[] XATTR_NAMESPACE_VALUES =
+      XAttr.NameSpace.values();
+
+  private final LongBitFormat BITS;
+
+  XAttrFormat(LongBitFormat previous, int length) {
+    BITS = new LongBitFormat(name(), previous, length, 0);
+  }
+
+  @Override
+  public int getLength() {
+    return BITS.getLength();
+  }
+
+  static XAttr.NameSpace getNamespace(int record) {
+    long nid = NS.BITS.retrieve(record);
+    nid |= NS_EXT.BITS.retrieve(record) << 2;
+    return XATTR_NAMESPACE_VALUES[(int) nid];
+  }
+
+  public static String getName(int record) {
+    int nid = (int)NAME.BITS.retrieve(record);
+    return SerialNumberManager.XATTR.getString(nid);
+  }
+
+  static int toInt(XAttr a) {
+    int nid = SerialNumberManager.XATTR.getSerialNumber(a.getName());
+    int nsOrd = a.getNameSpace().ordinal();
+    long value = NS.BITS.combine(nsOrd & NS_MASK, 0L);
+    value = NS_EXT.BITS.combine(nsOrd >>> NS_EXT_SHIFT, value);
+    value = NAME.BITS.combine(nid, value);
+    return (int)value;
+  }
+
+  static XAttr toXAttr(int record, byte[] value,
+      SerialNumberManager.StringTable stringTable) {
+    int nid = (int)NAME.BITS.retrieve(record);
+    String name = SerialNumberManager.XATTR.getString(nid, stringTable);
+    return new XAttr.Builder()
+        .setNameSpace(getNamespace(record))
+        .setName(name)
+        .setValue(value)
+        .build();
+  }
+
+  /**
+   * Unpack byte[] to XAttrs.
+   * 
+   * @param attrs the packed bytes of XAttrs
+   * @return XAttrs list
+   */
+  static List<XAttr> toXAttrs(byte[] attrs) {
+    List<XAttr> xAttrs = new ArrayList<>();
+    if (attrs == null || attrs.length == 0) {
+      return xAttrs;
+    }
+    for (int i = 0; i < attrs.length;) {
+      XAttr.Builder builder = new XAttr.Builder();
+      // big-endian
+      int v = Ints.fromBytes(attrs[i], attrs[i + 1],
+          attrs[i + 2], attrs[i + 3]);
+      i += 4;
+      builder.setNameSpace(XAttrFormat.getNamespace(v));
+      builder.setName(XAttrFormat.getName(v));
+      int vlen = ((0xff & attrs[i]) << 8) | (0xff & attrs[i + 1]);
+      i += 2;
+      if (vlen > 0) {
+        byte[] value = new byte[vlen];
+        System.arraycopy(attrs, i, value, 0, vlen);
+        builder.setValue(value);
+        i += vlen;
+      }
+      xAttrs.add(builder.build());
+    }
+    return xAttrs;
+  }
+
+  /**
+   * Get XAttr by name with prefix.
+   * Will unpack the byte[] until find the specific XAttr
+   * 
+   * @param attrs the packed bytes of XAttrs
+   * @param prefixedName the XAttr name with prefix
+   * @return the XAttr
+   */
+  static XAttr getXAttr(byte[] attrs, String prefixedName) {
+    if (prefixedName == null || attrs == null) {
+      return null;
+    }
+
+    XAttr xAttr = XAttrHelper.buildXAttr(prefixedName);
+    for (int i = 0; i < attrs.length;) {
+      // big-endian
+      int v = Ints.fromBytes(attrs[i], attrs[i + 1],
+          attrs[i + 2], attrs[i + 3]);
+      i += 4;
+      XAttr.NameSpace namespace = XAttrFormat.getNamespace(v);
+      String name = XAttrFormat.getName(v);
+      int vlen = ((0xff & attrs[i]) << 8) | (0xff & attrs[i + 1]);
+      i += 2;
+      if (xAttr.getNameSpace() == namespace &&
+          xAttr.getName().equals(name)) {
+        if (vlen > 0) {
+          byte[] value = new byte[vlen];
+          System.arraycopy(attrs, i, value, 0, vlen);
+          return new XAttr.Builder().setNameSpace(namespace).
+              setName(name).setValue(value).build();
+        }
+        return xAttr;
+      }
+      i += vlen;
+    }
+    return null;
+  }
+
+  /**
+   * Pack the XAttrs to byte[].
+   * 
+   * @param xAttrs the XAttrs
+   * @return the packed bytes
+   */
+  static byte[] toBytes(List<XAttr> xAttrs) {
+    if (xAttrs == null || xAttrs.isEmpty()) {
+      return null;
+    }
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    try {
+      for (XAttr a : xAttrs) {
+        // big-endian
+        int v = XAttrFormat.toInt(a);
+        out.write(Ints.toByteArray(v));
+        int vlen = a.getValue() == null ? 0 : a.getValue().length;
+        Preconditions.checkArgument(vlen < XATTR_VALUE_LEN_MAX,
+            "The length of xAttr values is too long.");
+        out.write((byte)(vlen >> 8));
+        out.write((byte)(vlen));
+        if (vlen > 0) {
+          out.write(a.getValue());
+        }
+      }
+    } catch (IOException e) {
+      // in fact, no exception
+    }
+    return out.toByteArray();
+  }
+}

+ 35 - 43
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java

@@ -18,12 +18,9 @@
 
 
 package org.apache.hadoop.hdfs.server.namenode;
 package org.apache.hadoop.hdfs.server.namenode;
 
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
-import java.util.Map;
 
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
@@ -34,26 +31,26 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 @InterfaceAudience.Private
 @InterfaceAudience.Private
 public class XAttrStorage {
 public class XAttrStorage {
 
 
-  private static final Map<String, String> internedNames = Maps.newHashMap();
-
   /**
   /**
-   * Reads the existing extended attributes of an inode. If the 
-   * inode does not have an <code>XAttr</code>, then this method
-   * returns an empty list.
+   * Reads the extended attribute of an inode by name with prefix.
    * <p/>
    * <p/>
-   * Must be called while holding the FSDirectory read lock.
    *
    *
    * @param inode INode to read
    * @param inode INode to read
    * @param snapshotId
    * @param snapshotId
-   * @return List<XAttr> <code>XAttr</code> list. 
+   * @param prefixedName xAttr name with prefix
+   * @return the xAttr
    */
    */
-  public static List<XAttr> readINodeXAttrs(INode inode, int snapshotId) {
-    XAttrFeature f = inode.getXAttrFeature(snapshotId);
-    return f == null ? ImmutableList.<XAttr> of() : f.getXAttrs();
+  public static XAttr readINodeXAttrByPrefixedName(INodesInPath iip,
+      String prefixedName) {
+    XAttrFeature f =
+        iip.getLastINode().getXAttrFeature(iip.getPathSnapshotId());
+    return f == null ? null : f.getXAttr(prefixedName);
   }
   }
-  
+
   /**
   /**
-   * Reads the existing extended attributes of an inode.
+   * Reads the existing extended attributes of an inode. If the 
+   * inode does not have an <code>XAttr</code>, then this method
+   * returns an empty list.
    * <p/>
    * <p/>
    * Must be called while holding the FSDirectory read lock.
    * Must be called while holding the FSDirectory read lock.
    *
    *
@@ -62,9 +59,25 @@ public class XAttrStorage {
    */
    */
   public static List<XAttr> readINodeXAttrs(INodeAttributes inodeAttr) {
   public static List<XAttr> readINodeXAttrs(INodeAttributes inodeAttr) {
     XAttrFeature f = inodeAttr.getXAttrFeature();
     XAttrFeature f = inodeAttr.getXAttrFeature();
-    return f == null ? ImmutableList.<XAttr> of() : f.getXAttrs();
+    return f == null ? new ArrayList<XAttr>(0) : f.getXAttrs();
+  }
+
+  /**
+   * Reads the existing extended attributes of an inode. If the 
+   * inode does not have an <code>XAttr</code>, then this method
+   * returns an empty list.
+   * <p/>
+   * Must be called while holding the FSDirectory read lock.
+   *
+   * @param inode INode to read
+   * @param snapshotId snapshot identifier
+   * @return List<XAttr> <code>XAttr</code> list. 
+   */
+  public static List<XAttr> readINodeXAttrs(INode inode, int snapshotId) {
+    XAttrFeature f = inode.getXAttrFeature(snapshotId);
+    return f == null ? new ArrayList<XAttr>(0) : f.getXAttrs();
   }
   }
-  
+
   /**
   /**
    * Update xattrs of inode.
    * Update xattrs of inode.
    * <p/>
    * <p/>
@@ -76,33 +89,12 @@ public class XAttrStorage {
    */
    */
   public static void updateINodeXAttrs(INode inode, 
   public static void updateINodeXAttrs(INode inode, 
       List<XAttr> xAttrs, int snapshotId) throws QuotaExceededException {
       List<XAttr> xAttrs, int snapshotId) throws QuotaExceededException {
-    if (xAttrs == null || xAttrs.isEmpty()) {
-      if (inode.getXAttrFeature() != null) {
-        inode.removeXAttrFeature(snapshotId);
-      }
-      return;
-    }
-    // Dedupe the xAttr name and save them into a new interned list
-    List<XAttr> internedXAttrs = Lists.newArrayListWithCapacity(xAttrs.size());
-    for (XAttr xAttr : xAttrs) {
-      final String name = xAttr.getName();
-      String internedName = internedNames.get(name);
-      if (internedName == null) {
-        internedName = name;
-        internedNames.put(internedName, internedName);
-      }
-      XAttr internedXAttr = new XAttr.Builder()
-          .setName(internedName)
-          .setNameSpace(xAttr.getNameSpace())
-          .setValue(xAttr.getValue())
-          .build();
-      internedXAttrs.add(internedXAttr);
-    }
-    // Save the list of interned xattrs
-    ImmutableList<XAttr> newXAttrs = ImmutableList.copyOf(internedXAttrs);
     if (inode.getXAttrFeature() != null) {
     if (inode.getXAttrFeature() != null) {
       inode.removeXAttrFeature(snapshotId);
       inode.removeXAttrFeature(snapshotId);
     }
     }
-    inode.addXAttrFeature(new XAttrFeature(newXAttrs), snapshotId);
+    if (xAttrs == null || xAttrs.isEmpty()) {
+      return;
+    }
+    inode.addXAttrFeature(new XAttrFeature(xAttrs), snapshotId);
   }
   }
 }
 }

+ 10 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FSImageLoader.java

@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.tools.offlineImageViewer;
 package org.apache.hadoop.hdfs.tools.offlineImageViewer;
 
 
 import java.io.BufferedInputStream;
 import java.io.BufferedInputStream;
-import java.io.EOFException;
 import java.io.FileInputStream;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.IOException;
@@ -48,6 +47,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf;
 import org.apache.hadoop.hdfs.server.namenode.FSImageUtil;
 import org.apache.hadoop.hdfs.server.namenode.FSImageUtil;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto;
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
+import org.apache.hadoop.hdfs.server.namenode.SerialNumberManager;
 import org.apache.hadoop.hdfs.web.JsonUtil;
 import org.apache.hadoop.hdfs.web.JsonUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.LimitInputStream;
 import org.apache.hadoop.util.LimitInputStream;
@@ -64,7 +64,7 @@ import com.google.common.collect.Maps;
 class FSImageLoader {
 class FSImageLoader {
   public static final Log LOG = LogFactory.getLog(FSImageHandler.class);
   public static final Log LOG = LogFactory.getLog(FSImageHandler.class);
 
 
-  private final String[] stringTable;
+  private final SerialNumberManager.StringTable stringTable;
   // byte representation of inodes, sorted by id
   // byte representation of inodes, sorted by id
   private final byte[][] inodes;
   private final byte[][] inodes;
   private final Map<Long, long[]> dirmap;
   private final Map<Long, long[]> dirmap;
@@ -90,8 +90,8 @@ class FSImageLoader {
     }
     }
   };
   };
 
 
-  private FSImageLoader(String[] stringTable, byte[][] inodes,
-                        Map<Long, long[]> dirmap) {
+  private FSImageLoader(SerialNumberManager.StringTable stringTable,
+      byte[][] inodes, Map<Long, long[]> dirmap) {
     this.stringTable = stringTable;
     this.stringTable = stringTable;
     this.inodes = inodes;
     this.inodes = inodes;
     this.dirmap = dirmap;
     this.dirmap = dirmap;
@@ -112,11 +112,10 @@ class FSImageLoader {
 
 
     FsImageProto.FileSummary summary = FSImageUtil.loadSummary(file);
     FsImageProto.FileSummary summary = FSImageUtil.loadSummary(file);
 
 
-
     try (FileInputStream fin = new FileInputStream(file.getFD())) {
     try (FileInputStream fin = new FileInputStream(file.getFD())) {
       // Map to record INodeReference to the referred id
       // Map to record INodeReference to the referred id
       ImmutableList<Long> refIdList = null;
       ImmutableList<Long> refIdList = null;
-      String[] stringTable = null;
+      SerialNumberManager.StringTable stringTable = null;
       byte[][] inodes = null;
       byte[][] inodes = null;
       Map<Long, long[]> dirmap = null;
       Map<Long, long[]> dirmap = null;
 
 
@@ -237,16 +236,17 @@ class FSImageLoader {
     return inodes;
     return inodes;
   }
   }
 
 
-  static String[] loadStringTable(InputStream in) throws
-  IOException {
+  static SerialNumberManager.StringTable loadStringTable(InputStream in)
+      throws IOException {
     FsImageProto.StringTableSection s = FsImageProto.StringTableSection
     FsImageProto.StringTableSection s = FsImageProto.StringTableSection
         .parseDelimitedFrom(in);
         .parseDelimitedFrom(in);
     LOG.info("Loading " + s.getNumEntry() + " strings");
     LOG.info("Loading " + s.getNumEntry() + " strings");
-    String[] stringTable = new String[s.getNumEntry() + 1];
+    SerialNumberManager.StringTable stringTable =
+        SerialNumberManager.newStringTable(s.getNumEntry(), s.getMaskBits());
     for (int i = 0; i < s.getNumEntry(); ++i) {
     for (int i = 0; i < s.getNumEntry(); ++i) {
       FsImageProto.StringTableSection.Entry e = FsImageProto
       FsImageProto.StringTableSection.Entry e = FsImageProto
           .StringTableSection.Entry.parseDelimitedFrom(in);
           .StringTableSection.Entry.parseDelimitedFrom(in);
-      stringTable[e.getId()] = e.getStr();
+      stringTable.put(e.getId(), e.getStr());
     }
     }
     return stringTable;
     return stringTable;
   }
   }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INode;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
+import org.apache.hadoop.hdfs.server.namenode.SerialNumberManager;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.LimitInputStream;
 import org.apache.hadoop.util.LimitInputStream;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
@@ -380,7 +381,7 @@ abstract class PBImageTextWriter implements Closeable {
     }
     }
   }
   }
 
 
-  private String[] stringTable;
+  private SerialNumberManager.StringTable stringTable;
   private PrintStream out;
   private PrintStream out;
   private MetadataMap metadataMap = null;
   private MetadataMap metadataMap = null;
 
 

+ 3 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageXmlWriter.java

@@ -48,7 +48,7 @@ import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotDiffSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotDiffSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotSection;
-import org.apache.hadoop.hdfs.server.namenode.FsImageProto.StringTableSection;
+import org.apache.hadoop.hdfs.server.namenode.SerialNumberManager;
 import org.apache.hadoop.hdfs.util.XMLUtils;
 import org.apache.hadoop.hdfs.util.XMLUtils;
 import org.apache.hadoop.util.LimitInputStream;
 import org.apache.hadoop.util.LimitInputStream;
 
 
@@ -62,7 +62,7 @@ import com.google.common.collect.Lists;
 public final class PBImageXmlWriter {
 public final class PBImageXmlWriter {
   private final Configuration conf;
   private final Configuration conf;
   private final PrintStream out;
   private final PrintStream out;
-  private String[] stringTable;
+  private SerialNumberManager.StringTable stringTable;
 
 
   public PBImageXmlWriter(Configuration conf, PrintStream out) {
   public PBImageXmlWriter(Configuration conf, PrintStream out) {
     this.conf = conf;
     this.conf = conf;
@@ -396,13 +396,7 @@ public final class PBImageXmlWriter {
   }
   }
 
 
   private void loadStringTable(InputStream in) throws IOException {
   private void loadStringTable(InputStream in) throws IOException {
-    StringTableSection s = StringTableSection.parseDelimitedFrom(in);
-    stringTable = new String[s.getNumEntry() + 1];
-    for (int i = 0; i < s.getNumEntry(); ++i) {
-      StringTableSection.Entry e = StringTableSection.Entry
-          .parseDelimitedFrom(in);
-      stringTable[e.getId()] = e.getStr();
-    }
+    stringTable = FSImageLoader.loadStringTable(in);
   }
   }
 
 
   private PBImageXmlWriter o(final String e, final Object v) {
   private PBImageXmlWriter o(final String e, final Object v) {

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java

@@ -26,6 +26,10 @@ import java.io.Serializable;
 public class LongBitFormat implements Serializable {
 public class LongBitFormat implements Serializable {
   private static final long serialVersionUID = 1L;
   private static final long serialVersionUID = 1L;
 
 
+  public interface Enum {
+    int getLength();
+  }
+
   private final String NAME;
   private final String NAME;
   /** Bit offset */
   /** Bit offset */
   private final int OFFSET;
   private final int OFFSET;
@@ -68,4 +72,8 @@ public class LongBitFormat implements Serializable {
   public long getMin() {
   public long getMin() {
     return MIN;
     return MIN;
   }
   }
+
+  public int getLength() {
+    return LENGTH;
+  }
 }
 }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto

@@ -308,6 +308,7 @@ message StringTableSection {
     optional string str = 2;
     optional string str = 2;
   }
   }
   optional uint32 numEntry = 1;
   optional uint32 numEntry = 1;
+  optional uint32 maskBits = 2 [default = 0];
   // repeated Entry
   // repeated Entry
 }
 }
 
 

+ 1 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java

@@ -54,12 +54,8 @@ public class TestCommitBlockSynchronization {
     // set file's parent as root and put the file to inodeMap, so
     // set file's parent as root and put the file to inodeMap, so
     // FSNamesystem's isFileDeleted() method will return false on this file
     // FSNamesystem's isFileDeleted() method will return false on this file
     if (file.getParent() == null) {
     if (file.getParent() == null) {
-      INodeDirectory mparent = mock(INodeDirectory.class);
-      INodeDirectory parent = new INodeDirectory(mparent.getId(), new byte[0],
-          mparent.getPermissionStatus(), mparent.getAccessTime());
-      parent.setLocalName(new byte[0]);
+      INodeDirectory parent = namesystem.getFSDirectory().getRoot();
       parent.addChild(file);
       parent.addChild(file);
-      file.setParent(parent);
     }
     }
     namesystem.dir.getINodeMap().put(file);
     namesystem.dir.getINodeMap().put(file);