Преглед изворни кода

HDFS-5743. Use protobuf to serialize snapshot information. Contributed by Jing Zhao.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5698@1559868 13f79535-47bb-0310-9956-ffa450edef68
Jing Zhao пре 11 година
родитељ
комит
ecc6989bd6

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt

@@ -12,3 +12,5 @@ HDFS-5698 subtasks
 
     HDFS-5793. Optimize the serialization of PermissionStatus. (Haohui Mai via
     jing9)
+
+    HDFS-5743. Use protobuf to serialize snapshot information. (jing9)

+ 153 - 68
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java

@@ -21,10 +21,13 @@ package org.apache.hadoop.hdfs.server.namenode;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -33,28 +36,80 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.StringMap;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FilesUnderConstructionSection.FileUnderConstructionEntry;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeDirectorySection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.DstReference;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 
-final class FSImageFormatPBINode {
-  private final static int USER_GROUP_STRID_MASK = (1 << 24) - 1;
+@InterfaceAudience.Private
+public final class FSImageFormatPBINode {
+  private final static long USER_GROUP_STRID_MASK = (1 << 24) - 1;
   private final static int USER_STRID_OFFSET = 40;
   private final static int GROUP_STRID_OFFSET = 16;
 
-  final static class Loader {
-    private PermissionStatus loadPermission(long id) {
+  public final static class Loader {
+    public static PermissionStatus loadPermission(long id,
+        final String[] stringTable) {
       short perm = (short) (id & ((1 << GROUP_STRID_OFFSET) - 1));
       int gsid = (int) ((id >> GROUP_STRID_OFFSET) & USER_GROUP_STRID_MASK);
       int usid = (int) ((id >> USER_STRID_OFFSET) & USER_GROUP_STRID_MASK);
-      return new PermissionStatus(parent.stringTable[usid],
-          parent.stringTable[gsid], new FsPermission(perm));
+      return new PermissionStatus(stringTable[usid], stringTable[gsid],
+          new FsPermission(perm));
+    }
+
+    public static INodeReference loadINodeReference(
+        INodeSection.INodeReference r, FSDirectory dir) throws IOException {
+      long referredId = r.getReferredId();
+      INode referred = dir.getInode(referredId);
+      WithCount withCount = (WithCount) referred.getParentReference();
+      if (withCount == null) {
+        withCount = new INodeReference.WithCount(null, referred);
+      }
+      final INodeReference ref;
+      if (r.hasDstSnapshotId()) { // DstReference
+        ref = new INodeReference.DstReference(null, withCount,
+            r.getDstSnapshotId());
+      } else {
+        ref = new INodeReference.WithName(null, withCount, r.getName()
+            .toByteArray(), r.getLastSnapshotId());
+      }
+      return ref;
+    }
+
+    public static INodeDirectory loadINodeDirectory(INodeSection.INode n,
+        final String[] stringTable) {
+      assert n.getType() == INodeSection.INode.Type.DIRECTORY;
+      INodeSection.INodeDirectory d = n.getDirectory();
+
+      final PermissionStatus permissions = loadPermission(d.getPermission(),
+          stringTable);
+      final INodeDirectory dir = new INodeDirectory(n.getId(), n.getName()
+          .toByteArray(), permissions, d.getModificationTime());
+
+      final long nsQuota = d.getNsQuota(), dsQuota = d.getDsQuota();
+      if (nsQuota >= 0 || dsQuota >= 0) {
+        dir.addDirectoryWithQuotaFeature(nsQuota, dsQuota);
+      }
+      return dir;
+    }
+
+    public static void updateBlocksMap(INodeFile file, BlockManager bm) {
+      // Add file->block mapping
+      final BlockInfo[] blocks = file.getBlocks();
+      if (blocks != null) {
+        for (int i = 0; i < blocks.length; i++) {
+          file.setBlock(i, bm.addBlockCollection(blocks[i], file));
+        }
+      }
     }
 
     private final FSDirectory dir;
@@ -80,9 +135,20 @@ final class FSImageFormatPBINode {
           INode child = dir.getInode(id);
           addToParent(p, child);
         }
+        for (int i = 0; i < e.getNumOfRef(); i++) {
+          INodeReference ref = loadINodeReference(in);
+          addToParent(p, ref);
+        }
       }
     }
 
+    private INodeReference loadINodeReference(InputStream in)
+        throws IOException {
+      INodeSection.INodeReference ref = INodeSection.INodeReference
+          .parseDelimitedFrom(in);
+      return loadINodeReference(ref, dir);
+    }
+
     void loadINodeSection(InputStream in) throws IOException {
       INodeSection s = INodeSection.parseDelimitedFrom(in);
       fsn.resetLastInodeId(s.getLastInodeId());
@@ -108,7 +174,7 @@ final class FSImageFormatPBINode {
           break;
         }
         // update the lease manager
-        INodeFile file = fsn.dir.getInode(entry.getInodeId()).asFile();
+        INodeFile file = dir.getInode(entry.getInodeId()).asFile();
         FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
         Preconditions.checkState(uc != null); // file must be under-construction
         fsn.leaseManager.addLease(uc.getClientName(), entry.getFullPath());
@@ -116,8 +182,7 @@ final class FSImageFormatPBINode {
     }
 
     private void addToParent(INodeDirectory parent, INode child) {
-      FSDirectory fsDir = fsn.dir;
-      if (parent == fsDir.rootDir && FSDirectory.isReservedName(child)) {
+      if (parent == dir.rootDir && FSDirectory.isReservedName(child)) {
         throw new HadoopIllegalArgumentException("File name \""
             + child.getLocalName() + "\" is reserved. Please "
             + " change the name of the existing file or directory to another "
@@ -127,10 +192,10 @@ final class FSImageFormatPBINode {
       if (!parent.addChild(child)) {
         return;
       }
-      fsn.dir.cacheName(child);
+      dir.cacheName(child);
 
       if (child.isFile()) {
-        updateBlocksMap(child.asFile());
+        updateBlocksMap(child.asFile(), fsn.getBlockManager());
       }
     }
 
@@ -139,7 +204,7 @@ final class FSImageFormatPBINode {
       case FILE:
         return loadINodeFile(n);
       case DIRECTORY:
-        return loadINodeDirectory(n);
+        return loadINodeDirectory(n, parent.getStringTable());
       case SYMLINK:
         return loadINodeSymlink(n);
       default:
@@ -148,21 +213,6 @@ final class FSImageFormatPBINode {
       return null;
     }
 
-    private INodeDirectory loadINodeDirectory(INodeSection.INode n) {
-      assert n.getType() == INodeSection.INode.Type.DIRECTORY;
-      INodeSection.INodeDirectory d = n.getDirectory();
-
-      final PermissionStatus permissions = loadPermission(d.getPermission());
-      final INodeDirectory dir = new INodeDirectory(n.getId(), n.getName()
-          .toByteArray(), permissions, d.getModificationTime());
-
-      final long nsQuota = d.getNsQuota(), dsQuota = d.getDsQuota();
-      if (nsQuota >= 0 || dsQuota >= 0) {
-        dir.addDirectoryWithQuotaFeature(nsQuota, dsQuota);
-      }
-      return dir;
-    }
-
     private INodeFile loadINodeFile(INodeSection.INode n) {
       assert n.getType() == INodeSection.INode.Type.FILE;
       INodeSection.INodeFile f = n.getFile();
@@ -173,7 +223,8 @@ final class FSImageFormatPBINode {
       for (int i = 0, e = bp.size(); i < e; ++i) {
         blocks[i] = new BlockInfo(PBHelper.convert(bp.get(i)), replication);
       }
-      final PermissionStatus permissions = loadPermission(f.getPermission());
+      final PermissionStatus permissions = loadPermission(f.getPermission(),
+          parent.getStringTable());
 
       final INodeFile file = new INodeFile(n.getId(),
           n.getName().toByteArray(), permissions, f.getModificationTime(),
@@ -197,13 +248,14 @@ final class FSImageFormatPBINode {
     private INodeSymlink loadINodeSymlink(INodeSection.INode n) {
       assert n.getType() == INodeSection.INode.Type.SYMLINK;
       INodeSection.INodeSymlink s = n.getSymlink();
-      final PermissionStatus permissions = loadPermission(s.getPermission());
+      final PermissionStatus permissions = loadPermission(s.getPermission(),
+          parent.getStringTable());
       return new INodeSymlink(n.getId(), n.getName().toByteArray(), permissions,
           0, 0, s.getTarget().toStringUtf8());
     }
 
     private void loadRootINode(INodeSection.INode p) {
-      INodeDirectory root = loadINodeDirectory(p);
+      INodeDirectory root = loadINodeDirectory(p, parent.getStringTable());
       final Quota.Counts q = root.getQuotaCounts();
       final long nsQuota = q.get(Quota.NAMESPACE);
       final long dsQuota = q.get(Quota.DISKSPACE);
@@ -213,20 +265,53 @@ final class FSImageFormatPBINode {
       dir.rootDir.cloneModificationTime(root);
       dir.rootDir.clonePermissionStatus(root);
     }
+  }
 
-    private void updateBlocksMap(INodeFile file) {
-      // Add file->block mapping
-      final BlockInfo[] blocks = file.getBlocks();
-      if (blocks != null) {
-        final BlockManager bm = fsn.getBlockManager();
-        for (int i = 0; i < blocks.length; i++) {
-          file.setBlock(i, bm.addBlockCollection(blocks[i], file));
-        }
+  public final static class Saver {
+    private static long buildPermissionStatus(INodeAttributes n,
+        final StringMap stringMap) {
+      long userId = stringMap.getStringId(n.getUserName());
+      long groupId = stringMap.getStringId(n.getGroupName());
+      return ((userId & USER_GROUP_STRID_MASK) << USER_STRID_OFFSET)
+          | ((groupId & USER_GROUP_STRID_MASK) << GROUP_STRID_OFFSET)
+          | n.getFsPermissionShort();
+    }
+
+    public static INodeSection.INodeFile.Builder buildINodeFile(
+        INodeFileAttributes file, final StringMap stringMap) {
+      INodeSection.INodeFile.Builder b = INodeSection.INodeFile.newBuilder()
+          .setAccessTime(file.getAccessTime())
+          .setModificationTime(file.getModificationTime())
+          .setPermission(buildPermissionStatus(file, stringMap))
+          .setPreferredBlockSize(file.getPreferredBlockSize())
+          .setReplication(file.getFileReplication());
+      return b;
+    }
+
+    public static INodeSection.INodeDirectory.Builder buildINodeDirectory(
+        INodeDirectoryAttributes dir, final StringMap stringMap) {
+      Quota.Counts quota = dir.getQuotaCounts();
+      INodeSection.INodeDirectory.Builder b = INodeSection.INodeDirectory
+          .newBuilder().setModificationTime(dir.getModificationTime())
+          .setNsQuota(quota.get(Quota.NAMESPACE))
+          .setDsQuota(quota.get(Quota.DISKSPACE))
+          .setPermission(buildPermissionStatus(dir, stringMap));
+      return b;
+    }
+
+    public static INodeSection.INodeReference.Builder buildINodeReference(
+        INodeReference ref) throws IOException {
+      INodeSection.INodeReference.Builder rb = INodeSection.INodeReference
+          .newBuilder().setReferredId(ref.getId());
+      if (ref instanceof WithName) {
+        rb.setLastSnapshotId(((WithName) ref).getLastSnapshotId()).setName(
+            ByteString.copyFrom(ref.getLocalNameBytes()));
+      } else if (ref instanceof DstReference) {
+        rb.setDstSnapshotId(((DstReference) ref).getDstSnapshotId());
       }
+      return rb;
     }
-  }
 
-  final static class Saver {
     private final FSNamesystem fsn;
     private final FileSummary.Builder summary;
     private final FSImageFormatProtobuf.Saver parent;
@@ -238,20 +323,33 @@ final class FSImageFormatPBINode {
     }
 
     void serializeINodeDirectorySection(OutputStream out) throws IOException {
-      for (INodeWithAdditionalFields n : fsn.dir.getINodeMap().getMap()) {
-        if (!n.isDirectory())
+      Iterator<INodeWithAdditionalFields> iter = fsn.getFSDirectory()
+          .getINodeMap().getMapIterator();
+      while (iter.hasNext()) {
+        INodeWithAdditionalFields n = iter.next();
+        if (!n.isDirectory()) {
           continue;
-
+        }
         ReadOnlyList<INode> children = n.asDirectory().getChildrenList(
             Snapshot.CURRENT_STATE_ID);
         if (children.size() > 0) {
           INodeDirectorySection.DirEntry.Builder b = INodeDirectorySection.
               DirEntry.newBuilder().setParent(n.getId());
+          List<INodeReference> refs = new ArrayList<INodeReference>();
           for (INode inode : children) {
-            b.addChildren(inode.getId());
+            if (!inode.isReference()) {
+              b.addChildren(inode.getId());
+            } else {
+              refs.add(inode.asReference());
+            }
           }
+          b.setNumOfRef(refs.size());
           INodeDirectorySection.DirEntry e = b.build();
           e.writeDelimitedTo(out);
+          for (INodeReference ref : refs) {
+            INodeSection.INodeReference.Builder rb = buildINodeReference(ref);
+            rb.build().writeDelimitedTo(out);
+          }
         }
       }
       parent.commitSection(summary,
@@ -266,7 +364,9 @@ final class FSImageFormatPBINode {
       INodeSection s = b.build();
       s.writeDelimitedTo(out);
 
-      for (INodeWithAdditionalFields n : inodesMap.getMap()) {
+      Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
+      while (iter.hasNext()) {
+        INodeWithAdditionalFields n = iter.next();
         save(out, n);
       }
       parent.commitSection(summary, FSImageFormatProtobuf.SectionName.INODE);
@@ -286,14 +386,6 @@ final class FSImageFormatPBINode {
           FSImageFormatProtobuf.SectionName.FILES_UNDERCONSTRUCTION);
     }
 
-    private long buildPermissionStatus(INode n) {
-      int userId = parent.getStringId(n.getUserName());
-      int groupId = parent.getStringId(n.getGroupName());
-      return ((userId & USER_GROUP_STRID_MASK) << USER_STRID_OFFSET)
-          | ((groupId & USER_GROUP_STRID_MASK) << GROUP_STRID_OFFSET)
-          | n.getFsPermissionShort();
-    }
-
     private void save(OutputStream out, INode n) throws IOException {
       if (n.isDirectory()) {
         save(out, n.asDirectory());
@@ -305,25 +397,16 @@ final class FSImageFormatPBINode {
     }
 
     private void save(OutputStream out, INodeDirectory n) throws IOException {
-      Quota.Counts quota = n.getQuotaCounts();
-      INodeSection.INodeDirectory.Builder b = INodeSection.INodeDirectory
-          .newBuilder().setModificationTime(n.getModificationTime())
-          .setNsQuota(quota.get(Quota.NAMESPACE))
-          .setDsQuota(quota.get(Quota.DISKSPACE))
-          .setPermission(buildPermissionStatus(n));
-
+      INodeSection.INodeDirectory.Builder b = buildINodeDirectory(n,
+          parent.getStringMap());
       INodeSection.INode r = buildINodeCommon(n)
           .setType(INodeSection.INode.Type.DIRECTORY).setDirectory(b).build();
       r.writeDelimitedTo(out);
     }
 
     private void save(OutputStream out, INodeFile n) throws IOException {
-      INodeSection.INodeFile.Builder b = INodeSection.INodeFile.newBuilder()
-          .setAccessTime(n.getAccessTime())
-          .setModificationTime(n.getModificationTime())
-          .setPermission(buildPermissionStatus(n))
-          .setPreferredBlockSize(n.getPreferredBlockSize())
-          .setReplication(n.getFileReplication());
+      INodeSection.INodeFile.Builder b = buildINodeFile(n,
+          parent.getStringMap());
 
       for (Block block : n.getBlocks()) {
         b.addBlocks(PBHelper.convert(block));
@@ -331,7 +414,8 @@ final class FSImageFormatPBINode {
 
       FileUnderConstructionFeature uc = n.getFileUnderConstructionFeature();
       if (uc != null) {
-        INodeSection.FileUnderConstructionFeature f = INodeSection.FileUnderConstructionFeature
+        INodeSection.FileUnderConstructionFeature f =
+            INodeSection.FileUnderConstructionFeature
             .newBuilder().setClientName(uc.getClientName())
             .setClientMachine(uc.getClientMachine()).build();
         b.setFileUC(f);
@@ -344,7 +428,8 @@ final class FSImageFormatPBINode {
 
     private void save(OutputStream out, INodeSymlink n) throws IOException {
       INodeSection.INodeSymlink.Builder b = INodeSection.INodeSymlink
-          .newBuilder().setPermission(buildPermissionStatus(n))
+          .newBuilder()
+          .setPermission(buildPermissionStatus(n, parent.getStringMap()))
           .setTarget(ByteString.copyFrom(n.getSymlink()));
       INodeSection.INode r = buildINodeCommon(n)
           .setType(INodeSection.INode.Type.SYMLINK).setSymlink(b).build();

+ 66 - 18
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java

@@ -32,20 +32,24 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.security.DigestOutputStream;
 import java.security.MessageDigest;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.StringTableSection;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FSImageFormatPBSnapshot;
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -59,18 +63,19 @@ import com.google.protobuf.CodedOutputStream;
 /**
  * Utility class to read / write fsimage in protobuf format.
  */
-final class FSImageFormatProtobuf {
+@InterfaceAudience.Private
+public final class FSImageFormatProtobuf {
   private static final Log LOG = LogFactory.getLog(FSImageFormatProtobuf.class);
 
   static final byte[] MAGIC_HEADER = "HDFSIMG1".getBytes();
   private static final int FILE_VERSION = 1;
 
-  static final class Loader implements FSImageFormat.AbstractLoader {
+  public static final class Loader implements FSImageFormat.AbstractLoader {
     private static final int MINIMUM_FILE_LENGTH = 8;
     private final Configuration conf;
     private final FSNamesystem fsn;
 
-    String[] stringTable;
+    private String[] stringTable;
 
     /** The MD5 sum of the loaded file */
     private MD5Hash imgDigest;
@@ -92,6 +97,10 @@ final class FSImageFormatProtobuf {
       return imgTxId;
     }
 
+    public String[] getStringTable() {
+      return stringTable;
+    }
+
     void load(File file) throws IOException {
       imgDigest = MD5FileUtils.computeMd5ForFile(file);
       RandomAccessFile raFile = new RandomAccessFile(file, "r");
@@ -153,6 +162,8 @@ final class FSImageFormatProtobuf {
 
       FSImageFormatPBINode.Loader inodeLoader = new FSImageFormatPBINode.Loader(
           fsn, this);
+      FSImageFormatPBSnapshot.Loader snapshotLoader =
+          new FSImageFormatPBSnapshot.Loader(fsn, this);
 
       ArrayList<FileSummary.Section> sections = Lists.newArrayList(summary
           .getSectionsList());
@@ -203,6 +214,12 @@ final class FSImageFormatProtobuf {
         case FILES_UNDERCONSTRUCTION:
           inodeLoader.loadFilesUnderConstructionSection(in);
           break;
+        case SNAPSHOT:
+          snapshotLoader.loadSnapshotsSection(in);
+          break;
+        case SNAPSHOT_DIFF:
+          snapshotLoader.loadSnapshotDiffSection(in);
+          break;
         default:
           LOG.warn("Unregconized section " + n);
           break;
@@ -210,8 +227,8 @@ final class FSImageFormatProtobuf {
       }
     }
 
-    private void loadNameSystemSection(InputStream in, FileSummary.Section sections)
-        throws IOException {
+    private void loadNameSystemSection(InputStream in,
+        FileSummary.Section sections) throws IOException {
       NameSystemSection s = NameSystemSection.parseDelimitedFrom(in);
       fsn.setGenerationStampV1(s.getGenstampV1());
       fsn.setGenerationStampV2(s.getGenstampV2());
@@ -231,11 +248,11 @@ final class FSImageFormatProtobuf {
     }
   }
 
-  static final class Saver {
+  public static final class Saver {
     final SaveNamespaceContext context;
     private long currentOffset = MAGIC_HEADER.length;
     private MD5Hash savedDigest;
-    private HashMap<String, Integer> stringMap = Maps.newHashMap();
+    private StringMap stringMap = new StringMap();
 
     private FileChannel fileChannel;
     // OutputStream for the section data
@@ -251,7 +268,7 @@ final class FSImageFormatProtobuf {
       return savedDigest;
     }
 
-    void commitSection(FileSummary.Builder summary, SectionName name)
+    public void commitSection(FileSummary.Builder summary, SectionName name)
         throws IOException {
       long oldOffset = currentOffset;
       flushSectionOutputStream();
@@ -294,11 +311,19 @@ final class FSImageFormatProtobuf {
     }
 
     private void saveInodes(FileSummary.Builder summary) throws IOException {
-      FSImageFormatPBINode.Saver saver = new FSImageFormatPBINode.Saver(this,
-          summary);
-      saver.serializeINodeSection(sectionOutputStream);
-      saver.serializeINodeDirectorySection(sectionOutputStream);
-      saver.serializeFilesUCSection(sectionOutputStream);
+      FSImageFormatPBINode.Saver inodeSaver = new FSImageFormatPBINode.Saver(
+          this, summary);
+      inodeSaver.serializeINodeSection(sectionOutputStream);
+      inodeSaver.serializeINodeDirectorySection(sectionOutputStream);
+      inodeSaver.serializeFilesUCSection(sectionOutputStream);
+    }
+
+    private void saveSnapshots(FileSummary.Builder summary) throws IOException {
+      FSImageFormatPBSnapshot.Saver snapshotSaver =
+          new FSImageFormatPBSnapshot.Saver(this, summary,
+              context.getSourceNamesystem());
+      snapshotSaver.serializeSnapshotsSection(sectionOutputStream);
+      snapshotSaver.serializeSnapshotDiffSection(sectionOutputStream);
     }
 
     private void saveInternal(FileOutputStream fout,
@@ -324,6 +349,7 @@ final class FSImageFormatProtobuf {
 
       saveNameSystemSection(b);
       saveInodes(b);
+      saveSnapshots(b);
       saveStringTableSection(b);
 
       // Flush the buffered data into the file before appending the header
@@ -371,11 +397,22 @@ final class FSImageFormatProtobuf {
       commitSection(summary, SectionName.STRING_TABLE);
     }
 
+    public StringMap getStringMap() {
+      return stringMap;
+    }
+  }
+
+  public static class StringMap {
+    private final Map<String, Integer> stringMap;
+
+    public StringMap() {
+      stringMap = Maps.newHashMap();
+    }
+
     int getStringId(String str) {
       if (str == null) {
         return 0;
       }
-
       Integer v = stringMap.get(str);
       if (v == null) {
         int nv = stringMap.size() + 1;
@@ -384,17 +421,28 @@ final class FSImageFormatProtobuf {
       }
       return v;
     }
+
+    int size() {
+      return stringMap.size();
+    }
+
+    Set<Entry<String, Integer>> entrySet() {
+      return stringMap.entrySet();
+    }
   }
 
   /**
    * Supported section name. The order of the enum determines the order of
    * loading.
    */
-  enum SectionName {
+  public enum SectionName {
     NS_INFO("NS_INFO"),
     STRING_TABLE("STRING_TABLE"),
-    INODE("INODE"), INODE_DIR("INODE_DIR"),
-    FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION");
+    INODE("INODE"),
+    SNAPSHOT("SNAPSHOT"),
+    INODE_DIR("INODE_DIR"),
+    FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION"),
+    SNAPSHOT_DIFF("SNAPSHOT_DIFF"),
     SECRET_MANAGER("SECRET_MANAGER"),
     CACHE_MANAGER("CACHE_MANAGER");
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -171,7 +171,7 @@ public class INodeDirectory extends INodeWithAdditionalFields
     return children == null? -1: Collections.binarySearch(children, name);
   }
   
-  protected DirectoryWithSnapshotFeature addSnapshotFeature(
+  public DirectoryWithSnapshotFeature addSnapshotFeature(
       DirectoryDiffList diffs) {
     Preconditions.checkState(!isWithSnapshot(), 
         "Directory is already with snapshot");

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -252,7 +252,7 @@ public class INodeFile extends INodeWithAdditionalFields
   
   /* Start of Snapshot Feature */
 
-  private FileWithSnapshotFeature addSnapshotFeature(FileDiffList diffs) {
+  public FileWithSnapshotFeature addSnapshotFeature(FileDiffList diffs) {
     Preconditions.checkState(!isWithSnapshot(), 
         "File is already with snapshot");
     FileWithSnapshotFeature sf = new FileWithSnapshotFeature(diffs);

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -46,8 +47,8 @@ public class INodeMap {
   /** Synchronized by external lock. */
   private final GSet<INode, INodeWithAdditionalFields> map;
   
-  GSet<INode, INodeWithAdditionalFields> getMap() {
-    return map;
+  public Iterator<INodeWithAdditionalFields> getMapIterator() {
+    return map.iterator();
   }
 
   private INodeMap(GSet<INode, INodeWithAdditionalFields> map) {

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java

@@ -244,7 +244,7 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
       this.isSnapshotRoot = isSnapshotRoot;
     }
 
-    ChildrenDiff getChildrenDiff() {
+    public ChildrenDiff getChildrenDiff() {
       return diff;
     }
     
@@ -343,6 +343,10 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
       return super.toString() + " childrenSize=" + childrenSize + ", " + diff;
     }
 
+    int getChildrenSize() {
+      return childrenSize;
+    }
+
     @Override
     void write(DataOutput out, ReferenceMap referenceMap) throws IOException {
       writeSnapshot(out);

+ 422 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java

@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.snapshot;
+
+import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Loader.loadINodeDirectory;
+import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Loader.loadINodeReference;
+import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Loader.loadPermission;
+import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Loader.updateBlocksMap;
+import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Saver.buildINodeDirectory;
+import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Saver.buildINodeFile;
+import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Saver.buildINodeReference;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotDiffSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotDiffSection.CreatedListEntry;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotsSection;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryAttributes;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
+import org.apache.hadoop.hdfs.server.namenode.INodeMap;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference;
+import org.apache.hadoop.hdfs.server.namenode.INodeWithAdditionalFields;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiffList;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root;
+import org.apache.hadoop.hdfs.util.Diff.ListType;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+
+@InterfaceAudience.Private
+public class FSImageFormatPBSnapshot {
+  /**
+   * Loading snapshot related information from protobuf based FSImage
+   */
+  public final static class Loader {
+    private final FSNamesystem fsn;
+    private final FSDirectory fsDir;
+    private final FSImageFormatProtobuf.Loader parent;
+    private final Map<Integer, Snapshot> snapshotMap;
+
+
+    public Loader(FSNamesystem fsn, FSImageFormatProtobuf.Loader parent) {
+      this.fsn = fsn;
+      this.fsDir = fsn.getFSDirectory();
+      this.snapshotMap = new HashMap<Integer, Snapshot>();
+      this.parent = parent;
+    }
+
+    /**
+     * Load the snapshots section from fsimage. Also convert snapshottable
+     * directories into {@link INodeDirectorySnapshottable}.
+     *
+     * @return A map containing all the snapshots loaded from the fsimage.
+     */
+    public void loadSnapshotsSection(InputStream in) throws IOException {
+      SnapshotManager sm = fsn.getSnapshotManager();
+      SnapshotsSection section = SnapshotsSection.parseDelimitedFrom(in);
+      int snum = section.getNumSnapshots();
+      sm.setNumSnapshots(snum);
+      sm.setSnapshotCounter(section.getSnapshotCounter());
+      for (long sdirId : section.getSnapshottableDirList()) {
+        INodeDirectory dir = fsDir.getInode(sdirId).asDirectory();
+        final INodeDirectorySnapshottable sdir;
+        if (!dir.isSnapshottable()) {
+          sdir = new INodeDirectorySnapshottable(dir);
+          fsDir.addToInodeMap(sdir);
+        } else {
+          // dir is root, and admin set root to snapshottable before
+          sdir = (INodeDirectorySnapshottable) dir;
+          sdir.setSnapshotQuota(INodeDirectorySnapshottable.SNAPSHOT_LIMIT);
+        }
+        sm.addSnapshottable(sdir);
+      }
+      loadSnapshots(in, snum);
+    }
+
+    private void loadSnapshots(InputStream in, int size) throws IOException {
+      for (int i = 0; i < size; i++) {
+        SnapshotsSection.Snapshot pbs = SnapshotsSection.Snapshot
+            .parseDelimitedFrom(in);
+        INodeDirectory root = loadINodeDirectory(pbs.getRoot(),
+            parent.getStringTable());
+        int sid = pbs.getSnapshotId();
+        INodeDirectorySnapshottable parent = (INodeDirectorySnapshottable) fsDir
+            .getInode(root.getId()).asDirectory();
+        Snapshot snapshot = new Snapshot(sid, root, parent);
+        // add the snapshot to parent, since we follow the sequence of
+        // snapshotsByNames when saving, we do not need to sort when loading
+        parent.addSnapshot(snapshot);
+        snapshotMap.put(sid, snapshot);
+      }
+    }
+
+    /**
+     * Load the snapshot diff section from fsimage.
+     */
+    public void loadSnapshotDiffSection(InputStream in) throws IOException {
+      while (true) {
+        SnapshotDiffSection.DiffEntry entry = SnapshotDiffSection.DiffEntry
+            .parseDelimitedFrom(in);
+        if (entry == null) {
+          break;
+        }
+        long inodeId = entry.getInodeId();
+        INode inode = fsDir.getInode(inodeId);
+        if (inode.isFile()) {
+          loadFileDiffList(in, inode.asFile(), entry.getNumOfDiff());
+        } else if (inode.isDirectory()) {
+          loadDirectoryDiffList(in, inode.asDirectory(), entry.getNumOfDiff());
+        }
+      }
+    }
+
+    /** Load FileDiff list for a file with snapshot feature */
+    private void loadFileDiffList(InputStream in, INodeFile file, int size)
+        throws IOException {
+      final FileDiffList diffs = new FileDiffList();
+      for (int i = 0; i < size; i++) {
+        SnapshotDiffSection.FileDiff pbf = SnapshotDiffSection.FileDiff
+            .parseDelimitedFrom(in);
+        INodeFileAttributes copy = null;
+        if (pbf.hasSnapshotCopy()) {
+          INodeSection.INodeFile fileInPb = pbf.getSnapshotCopy();
+          PermissionStatus permission = loadPermission(
+              fileInPb.getPermission(), parent.getStringTable());
+          copy = new INodeFileAttributes.SnapshotCopy(pbf.getName()
+              .toByteArray(), permission, fileInPb.getModificationTime(),
+              fileInPb.getAccessTime(), (short) fileInPb.getReplication(),
+              fileInPb.getPreferredBlockSize());
+        }
+
+        FileDiff diff = new FileDiff(pbf.getSnapshotId(), copy, null,
+            pbf.getFileSize());
+        diffs.addFirst(diff);
+      }
+      file.addSnapshotFeature(diffs);
+    }
+
+    /** Load the created list in a DirectoryDiff */
+    private List<INode> loadCreatedList(InputStream in, INodeDirectory dir,
+        int size) throws IOException {
+      List<INode> clist = new ArrayList<INode>(size);
+      for (long c = 0; c < size; c++) {
+        CreatedListEntry entry = CreatedListEntry.parseDelimitedFrom(in);
+        INode created = SnapshotFSImageFormat.loadCreated(entry.getName()
+            .toByteArray(), dir);
+        clist.add(created);
+      }
+      return clist;
+    }
+
+    private void addToDeletedList(INode dnode, INodeDirectory parent) {
+      dnode.setParent(parent);
+      if (dnode.isFile()) {
+        updateBlocksMap(dnode.asFile(), fsn.getBlockManager());
+      }
+    }
+
+    /**
+     * Load the deleted list in a DirectoryDiff
+     * @param totalSize the total size of the deleted list
+     * @param deletedNodes non-reference inodes in the deleted list. These
+     *        inodes' ids are directly recorded in protobuf
+     */
+    private List<INode> loadDeletedList(InputStream in, INodeDirectory dir,
+        int totalSize, List<Long> deletedNodes) throws IOException {
+      List<INode> dlist = new ArrayList<INode>(totalSize);
+      // load non-reference inodes
+      for (long deletedId : deletedNodes) {
+        INode deleted = fsDir.getInode(deletedId);
+        dlist.add(deleted);
+        addToDeletedList(deleted, dir);
+      }
+      // load reference nodes in the deleted list
+      int refNum = totalSize - deletedNodes.size();
+      for (int r = 0; r < refNum; r++) {
+        INodeSection.INodeReference ref = INodeSection.INodeReference
+            .parseDelimitedFrom(in);
+        INodeReference refNode = loadINodeReference(ref, fsDir);
+        dlist.add(refNode);
+        addToDeletedList(refNode, dir);
+      }
+      Collections.sort(dlist, new Comparator<INode>() {
+        @Override
+        public int compare(INode n1, INode n2) {
+          return n1.compareTo(n2.getLocalNameBytes());
+        }
+      });
+      return dlist;
+    }
+
+    /** Load DirectoryDiff list for a directory with snapshot feature */
+    private void loadDirectoryDiffList(InputStream in, INodeDirectory dir,
+        int size) throws IOException {
+      if (!dir.isWithSnapshot()) {
+        dir.addSnapshotFeature(null);
+      }
+      DirectoryDiffList diffs = dir.getDiffs();
+      for (int i = 0; i < size; i++) {
+        // load a directory diff
+        SnapshotDiffSection.DirectoryDiff diffInPb = SnapshotDiffSection.
+            DirectoryDiff.parseDelimitedFrom(in);
+        final int snapshotId = diffInPb.getSnapshotId();
+        final Snapshot snapshot = snapshotMap.get(snapshotId);
+        int childrenSize = diffInPb.getChildrenSize();
+        boolean useRoot = diffInPb.getIsSnapshotRoot();
+        INodeDirectoryAttributes copy = null;
+        if (useRoot) {
+          copy = snapshot.getRoot();
+        }else if (diffInPb.hasSnapshotCopy()) {
+          INodeSection.INodeDirectory dirCopyInPb = diffInPb.getSnapshotCopy();
+          final byte[] name = diffInPb.getName().toByteArray();
+          PermissionStatus permission = loadPermission(dirCopyInPb
+              .getPermission(), parent.getStringTable());
+          long modTime = dirCopyInPb.getModificationTime();
+          boolean noQuota = dirCopyInPb.getNsQuota() == -1
+              && dirCopyInPb.getDsQuota() == -1;
+          copy = noQuota ? new INodeDirectoryAttributes.SnapshotCopy(name,
+              permission, modTime)
+              : new INodeDirectoryAttributes.CopyWithQuota(name, permission,
+                  modTime, dirCopyInPb.getNsQuota(), dirCopyInPb.getDsQuota());
+        }
+        // load created list
+        List<INode> clist = loadCreatedList(in, dir, diffInPb.getClistSize());
+        // load deleted list
+        List<INode> dlist = loadDeletedList(in, dir, diffInPb.getDlistSize(),
+            diffInPb.getDeletedINodeList());
+        // create the directory diff
+        DirectoryDiff diff = new DirectoryDiff(snapshotId, copy, null,
+            childrenSize, clist, dlist, useRoot);
+        diffs.addFirst(diff);
+      }
+    }
+  }
+
+  /**
+   * Saving snapshot related information to protobuf based FSImage
+   */
+  public final static class Saver {
+    private final FSNamesystem fsn;
+    private final FileSummary.Builder headers;
+    private final FSImageFormatProtobuf.Saver parent;
+
+    public Saver(FSImageFormatProtobuf.Saver parent,
+        FileSummary.Builder headers, FSNamesystem fsn) {
+      this.parent = parent;
+      this.headers = headers;
+      this.fsn = fsn;
+    }
+
+    /**
+     * save all the snapshottable directories and snapshots to fsimage
+     */
+    public void serializeSnapshotsSection(OutputStream out) throws IOException {
+      SnapshotManager sm = fsn.getSnapshotManager();
+      SnapshotsSection.Builder b = SnapshotsSection.newBuilder()
+          .setSnapshotCounter(sm.getSnapshotCounter())
+          .setNumSnapshots(sm.getNumSnapshots());
+
+      INodeDirectorySnapshottable[] snapshottables = sm.getSnapshottableDirs();
+      for (INodeDirectorySnapshottable sdir : snapshottables) {
+        b.addSnapshottableDir(sdir.getId());
+      }
+      b.build().writeDelimitedTo(out);
+      int i = 0;
+      for(INodeDirectorySnapshottable sdir : snapshottables) {
+        for(Snapshot s : sdir.getSnapshotsByNames()) {
+          Root sroot = s.getRoot();
+          SnapshotsSection.Snapshot.Builder sb = SnapshotsSection.Snapshot
+              .newBuilder().setSnapshotId(s.getId());
+          INodeSection.INodeDirectory.Builder db = buildINodeDirectory(sroot,
+              parent.getStringMap());
+          INodeSection.INode r = INodeSection.INode.newBuilder()
+              .setId(sroot.getId())
+              .setType(INodeSection.INode.Type.DIRECTORY)
+              .setName(ByteString.copyFrom(sroot.getLocalNameBytes()))
+              .setDirectory(db).build();
+          sb.setRoot(r).build().writeDelimitedTo(out);
+          i++;
+        }
+      }
+      Preconditions.checkState(i == sm.getNumSnapshots());
+      parent.commitSection(headers, FSImageFormatProtobuf.SectionName.SNAPSHOT);
+    }
+
+    /**
+     * save all the snapshot diff to fsimage
+     */
+    public void serializeSnapshotDiffSection(OutputStream out)
+        throws IOException {
+      INodeMap inodesMap = fsn.getFSDirectory().getINodeMap();
+      Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
+      while (iter.hasNext()) {
+        INodeWithAdditionalFields inode = iter.next();
+        if (inode.isFile()) {
+          serializeFileDiffList(inode.asFile(), out);
+        } else if (inode.isDirectory()) {
+          serializeDirDiffList(inode.asDirectory(), out);
+        }
+      }
+      parent.commitSection(headers,
+          FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF);
+    }
+
+    private void serializeFileDiffList(INodeFile file, OutputStream out)
+        throws IOException {
+      FileWithSnapshotFeature sf = file.getFileWithSnapshotFeature();
+      if (sf != null) {
+        List<FileDiff> diffList = sf.getDiffs().asList();
+        SnapshotDiffSection.DiffEntry entry = SnapshotDiffSection.DiffEntry
+            .newBuilder().setInodeId(file.getId())
+            .setNumOfDiff(diffList.size()).build();
+        entry.writeDelimitedTo(out);
+        for (int i = diffList.size() - 1; i >= 0; i--) {
+          FileDiff diff = diffList.get(i);
+          SnapshotDiffSection.FileDiff.Builder fb = SnapshotDiffSection.FileDiff
+              .newBuilder().setSnapshotId(diff.getSnapshotId())
+              .setFileSize(diff.getFileSize());
+          INodeFileAttributes copy = diff.snapshotINode;
+          if (copy != null) {
+            fb.setName(ByteString.copyFrom(copy.getLocalNameBytes()))
+                .setSnapshotCopy(buildINodeFile(copy, parent.getStringMap()));
+          }
+          fb.build().writeDelimitedTo(out);
+        }
+      }
+    }
+
+    private void saveCreatedDeletedList(List<INode> created,
+        List<INodeReference> deletedRefs, OutputStream out) throws IOException {
+      // local names of the created list member
+      for (INode c : created) {
+        SnapshotDiffSection.CreatedListEntry.newBuilder()
+            .setName(ByteString.copyFrom(c.getLocalNameBytes())).build()
+            .writeDelimitedTo(out);
+      }
+      // reference nodes in deleted list
+      for (INodeReference ref : deletedRefs) {
+        INodeSection.INodeReference.Builder rb = buildINodeReference(ref);
+        rb.build().writeDelimitedTo(out);
+      }
+    }
+
+    private void serializeDirDiffList(INodeDirectory dir, OutputStream out)
+        throws IOException {
+      DirectoryWithSnapshotFeature sf = dir.getDirectoryWithSnapshotFeature();
+      if (sf != null) {
+        List<DirectoryDiff> diffList = sf.getDiffs().asList();
+        SnapshotDiffSection.DiffEntry entry = SnapshotDiffSection.DiffEntry
+            .newBuilder().setInodeId(dir.getId())
+            .setNumOfDiff(diffList.size()).build();
+        entry.writeDelimitedTo(out);
+        for (int i = diffList.size() - 1; i >= 0; i--) { // reverse order!
+          DirectoryDiff diff = diffList.get(i);
+          SnapshotDiffSection.DirectoryDiff.Builder db = SnapshotDiffSection.
+              DirectoryDiff.newBuilder().setSnapshotId(diff.getSnapshotId())
+                           .setChildrenSize(diff.getChildrenSize())
+                           .setIsSnapshotRoot(diff.isSnapshotRoot());
+          INodeDirectoryAttributes copy = diff.snapshotINode;
+          if (!diff.isSnapshotRoot() && copy != null) {
+            db.setName(ByteString.copyFrom(copy.getLocalNameBytes()))
+                .setSnapshotCopy(
+                    buildINodeDirectory(copy, parent.getStringMap()));
+          }
+          // process created list and deleted list
+          List<INode> created = diff.getChildrenDiff()
+              .getList(ListType.CREATED);
+          db.setClistSize(created.size());
+          List<INode> deleted = diff.getChildrenDiff().getList(ListType.DELETED);
+          db.setDlistSize(deleted.size());
+          List<INodeReference> refs = new ArrayList<INodeReference>();
+          for (INode d : deleted) {
+            if (d.isReference()) {
+              refs.add(d.asReference());
+            } else {
+              db.addDeletedINode(d.getId());
+            }
+          }
+          db.build().writeDelimitedTo(out);
+          saveCreatedDeletedList(created, refs, out);
+        }
+      }
+    }
+  }
+
+  private FSImageFormatPBSnapshot(){}
+}

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java

@@ -136,7 +136,7 @@ public class SnapshotFSImageFormat {
    * @param parent The directory that the created list belongs to.
    * @return The created node.
    */
-  private static INode loadCreated(byte[] createdNodeName,
+  public static INode loadCreated(byte[] createdNodeName,
       INodeDirectory parent) throws IOException {
     // the INode in the created list should be a reference to another INode
     // in posterior SnapshotDiffs or one of the current children

+ 17 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java

@@ -270,6 +270,23 @@ public class SnapshotManager implements SnapshotStats {
     return numSnapshots.get();
   }
   
+  void setNumSnapshots(int num) {
+    numSnapshots.set(num);
+  }
+
+  int getSnapshotCounter() {
+    return snapshotCounter;
+  }
+
+  void setSnapshotCounter(int counter) {
+    snapshotCounter = counter;
+  }
+
+  INodeDirectorySnapshottable[] getSnapshottableDirs() {
+    return snapshottables.values().toArray(
+        new INodeDirectorySnapshottable[snapshottables.size()]);
+  }
+
   /**
    * Write {@link #snapshotCounter}, {@link #numSnapshots},
    * and all snapshots to the DataOutput.

+ 70 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto

@@ -112,6 +112,17 @@ message INodeSection {
     optional bytes target = 2;
   }
 
+  message INodeReference {
+    // id of the referred inode 
+    optional uint64 referredId = 1;
+    // local name recorded in WithName
+    optional bytes name = 2;
+    // recorded in DstReference
+    optional uint32 dstSnapshotId = 3;
+    // recorded in WithName
+    optional uint32 lastSnapshotId = 4;
+  }
+
   message INode {
     enum Type {
       FILE = 1;
@@ -153,14 +164,72 @@ message INodeDirectorySection {
   message DirEntry {
     optional uint64 parent = 1;
     repeated uint64 children = 2;
+    optional uint64 numOfRef = 3;
+    // repeated INodeReference...
   }
   // repeated DirEntry, ended at the boundary of the section.
 }
 
+/**
+ * This section records the information about snapshot
+ * NAME: SNAPSHOT
+ */
+message SnapshotsSection {
+  message Snapshot {
+    optional uint32 snapshotId = 1;
+    // Snapshot root
+    optional INodeSection.INode root = 2;
+  }
+
+  optional uint32 snapshotCounter = 1;
+  repeated uint64 snapshottableDir = 2;
+  // total number of snapshots
+  optional uint32 numSnapshots = 3;
+  // repeated Snapshots...
+}
+
+/**
+ * This section records information about snapshot diffs
+ * NAME: SNAPSHOT_DIFF
+ */
+message SnapshotDiffSection {
+  message CreatedListEntry {
+    optional bytes name = 1;
+  }
+
+  message DirectoryDiff {
+    optional uint32 snapshotId = 1;
+    optional uint32 childrenSize = 2;
+    optional bool isSnapshotRoot = 3;
+    optional bytes name = 4;
+    optional INodeSection.INodeDirectory snapshotCopy = 5;
+    optional uint32 clistSize = 6;
+    optional uint32 dlistSize = 7;
+    repeated uint64 deletedINode = 8; // id of deleted inode 
+    // repeated CreatedListEntry
+    // repeated INodeReference (number of ref: dlistSize - dlist.size)
+  }
+
+  message FileDiff {
+    optional uint32 snapshotId = 1;
+    optional uint64 fileSize = 2;
+    optional bytes name = 3;
+    optional INodeSection.INodeFile snapshotCopy = 4;
+  }
+
+  message DiffEntry {
+    optional uint64 inodeId = 1;
+    optional uint32 numOfDiff = 2;
+    // repeated DirectoryDiff or FileDiff
+  }
+
+  // repeated DiffEntry
+}
+
 /**
  * This section maps string to id
  * NAME: STRING_TABLE
- **/
+ */
 message StringTableSection {
   message Entry {
     optional uint32 id = 1;

+ 0 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java

@@ -73,7 +73,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
-;
 
 /** Testing rename with snapshots. */
 public class TestRenameWithSnapshots {