소스 검색

HDFS-5738. Serialize INode information in protobuf. Contributed by Haohui Mai.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5698@1558171 13f79535-47bb-0310-9956-ffa450edef68
Jing Zhao 11 년 전
부모
커밋
e9f1a76a2d

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt

@@ -1,3 +1,5 @@
 HDFS-5698 subtasks
 
     HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) 
+
+    HDFS-5738. Serialize INode information in protobuf. (Haohui Mai via jing9)

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -216,8 +216,7 @@ public class FSImageFormat {
           FSImageFormatProtobuf.Loader loader = new FSImageFormatProtobuf.Loader(
               conf, fsn);
           impl = loader;
-          is.getChannel().position(0);
-          loader.load(is);
+          loader.load(file);
         } else {
           Loader loader = new Loader(conf, fsn);
           impl = loader;

+ 278 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java

@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeDirectorySection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.Permission;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+
+final class FSImageFormatPBINode {
+  final static class Loader {
+    private static PermissionStatus loadPermission(Permission p) {
+      return new PermissionStatus(p.getUser(), p.getGroup(), new FsPermission(
+          (short) p.getPermission()));
+    }
+
+    private final FSDirectory dir;
+    private final FSNamesystem fsn;
+
+    Loader(FSNamesystem fsn) {
+      this.fsn = fsn;
+      this.dir = fsn.dir;
+    }
+
+    void loadINodeDirectorySection(InputStream in) throws IOException {
+      final INodeMap inodeMap = dir.getINodeMap();
+      while (true) {
+        INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
+            .parseDelimitedFrom(in);
+
+        if (e == null)
+          break;
+
+        INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
+        for (long id : e.getChildrenList()) {
+          INode child = dir.getInode(id);
+          addToParent(p, child);
+        }
+      }
+    }
+
+    void loadINodeSection(InputStream in) throws IOException {
+      INodeSection s = INodeSection.parseDelimitedFrom(in);
+      fsn.resetLastInodeId(s.getLastInodeId());
+      for (int i = 0; i < s.getNumInodes(); ++i) {
+        INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
+        if (p.getId() == INodeId.ROOT_INODE_ID) {
+          loadRootINode(p);
+        } else {
+          INode n = loadINode(p);
+          dir.addToInodeMap(n);
+        }
+      }
+    }
+
+    private void addToParent(INodeDirectory parent, INode child) {
+      FSDirectory fsDir = fsn.dir;
+      if (parent == fsDir.rootDir && FSDirectory.isReservedName(child)) {
+        throw new HadoopIllegalArgumentException("File name \""
+            + child.getLocalName() + "\" is reserved. Please "
+            + " change the name of the existing file or directory to another "
+            + "name before upgrading to this release.");
+      }
+      // NOTE: This does not update space counts for parents
+      if (!parent.addChild(child)) {
+        return;
+      }
+      fsn.dir.cacheName(child);
+
+      if (child.isFile()) {
+        updateBlocksMap(child.asFile());
+      }
+    }
+
+    private INode loadINode(INodeSection.INode n) {
+      switch (n.getType()) {
+      case FILE:
+        return loadINodeFile(n);
+      case DIRECTORY:
+        return loadINodeDirectory(n);
+      default:
+        break;
+      }
+      return null;
+    }
+
+    private INodeDirectory loadINodeDirectory(INodeSection.INode n) {
+      assert n.getType() == INodeSection.INode.Type.DIRECTORY;
+      INodeSection.INodeDirectory d = n.getDirectory();
+
+      final PermissionStatus permissions = loadPermission(d.getPermission());
+      final INodeDirectory dir = new INodeDirectory(n.getId(), n.getName()
+          .toByteArray(), permissions, d.getModificationTime());
+
+      final long nsQuota = d.getNsQuota(), dsQuota = d.getDsQuota();
+      if (nsQuota >= 0 || dsQuota >= 0) {
+        dir.addDirectoryWithQuotaFeature(nsQuota, dsQuota);
+      }
+      return dir;
+    }
+
+    private INodeFile loadINodeFile(INodeSection.INode n) {
+      assert n.getType() == INodeSection.INode.Type.FILE;
+      INodeSection.INodeFile f = n.getFile();
+      List<BlockProto> bp = f.getBlocksList();
+      short replication = (short) f.getReplication();
+
+      BlockInfo[] blocks = new BlockInfo[bp.size()];
+      for (int i = 0, e = bp.size(); i < e; ++i) {
+        blocks[i] = new BlockInfo(PBHelper.convert(bp.get(i)), replication);
+      }
+
+      final PermissionStatus permissions = loadPermission(f.getPermission());
+      final INodeFile file = new INodeFile(n.getId(),
+          n.getName().toByteArray(), permissions, f.getModificationTime(),
+          f.getAccessTime(), blocks, replication, f.getPreferredBlockSize());
+      return file;
+    }
+
+    private void loadRootINode(INodeSection.INode p) {
+      INodeDirectory root = loadINodeDirectory(p);
+      final Quota.Counts q = root.getQuotaCounts();
+      final long nsQuota = q.get(Quota.NAMESPACE);
+      final long dsQuota = q.get(Quota.DISKSPACE);
+      if (nsQuota != -1 || dsQuota != -1) {
+        dir.rootDir.getDirectoryWithQuotaFeature().setQuota(nsQuota, dsQuota);
+      }
+      dir.rootDir.cloneModificationTime(root);
+      dir.rootDir.clonePermissionStatus(root);
+    }
+
+    private void updateBlocksMap(INodeFile file) {
+      // Add file->block mapping
+      final BlockInfo[] blocks = file.getBlocks();
+      if (blocks != null) {
+        final BlockManager bm = fsn.getBlockManager();
+        for (int i = 0; i < blocks.length; i++) {
+          file.setBlock(i, bm.addBlockCollection(blocks[i], file));
+        }
+      }
+    }
+  }
+
+  final static class Saver {
+    private final FSNamesystem fsn;
+    private final FileSummary.Builder headers;
+    private final OutputStream out;
+    private final FSImageFormatProtobuf.Saver parent;
+
+    Saver(FSImageFormatProtobuf.Saver parent, OutputStream out,
+        FileSummary.Builder headers) {
+      this.parent = parent;
+      this.out = out;
+      this.headers = headers;
+      this.fsn = parent.context.getSourceNamesystem();
+    }
+
+    void serializeINodeDirectorySection() throws IOException {
+      for (INodeWithAdditionalFields n : fsn.dir.getINodeMap().getMap()) {
+        if (!n.isDirectory())
+          continue;
+
+        INodeDirectory d = n.asDirectory();
+
+        INodeDirectorySection.DirEntry.Builder b = INodeDirectorySection.DirEntry
+            .newBuilder().setParent(n.getId());
+
+        for (INode inode : d.getChildrenList(Snapshot.CURRENT_STATE_ID))
+          b.addChildren(inode.getId());
+
+        if (b.getChildrenCount() != 0) {
+          INodeDirectorySection.DirEntry e = b.build();
+          e.writeDelimitedTo(out);
+        }
+      }
+      parent.commitSection(headers,
+          FSImageFormatProtobuf.SectionName.INODE_DIR);
+    }
+
+    void serializeINodeSection() throws IOException {
+      final INodeDirectory rootDir = fsn.dir.rootDir;
+      final long numINodes = rootDir.getDirectoryWithQuotaFeature()
+          .getSpaceConsumed().get(Quota.NAMESPACE);
+      INodeSection.Builder b = INodeSection.newBuilder()
+          .setLastInodeId(fsn.getLastInodeId()).setNumInodes(numINodes);
+      INodeSection s = b.build();
+      s.writeDelimitedTo(out);
+
+      long i = 0;
+      for (INodeWithAdditionalFields n : fsn.dir.getINodeMap().getMap()) {
+        save(n);
+        ++i;
+      }
+      Preconditions.checkState(numINodes == i);
+      parent.commitSection(headers, FSImageFormatProtobuf.SectionName.INODE);
+    }
+
+    private INodeSection.Permission.Builder buildPermissionStatus(INode n) {
+      return INodeSection.Permission.newBuilder().setUser(n.getUserName())
+          .setGroup(n.getGroupName()).setPermission(n.getFsPermissionShort());
+    }
+
+    private void save(INode n) throws IOException {
+      if (n.isDirectory()) {
+        save(n.asDirectory());
+      } else if (n.isFile()) {
+        save(n.asFile());
+      }
+    }
+
+    private void save(INodeDirectory n) throws IOException {
+      Quota.Counts quota = n.getQuotaCounts();
+      INodeSection.INodeDirectory.Builder b = INodeSection.INodeDirectory
+          .newBuilder().setModificationTime(n.getModificationTime())
+          .setNsQuota(quota.get(Quota.NAMESPACE))
+          .setDsQuota(quota.get(Quota.DISKSPACE))
+          .setPermission(buildPermissionStatus(n));
+
+      INodeSection.INode r = INodeSection.INode.newBuilder()
+          .setType(INodeSection.INode.Type.DIRECTORY).setId(n.getId())
+          .setName(ByteString.copyFrom(n.getLocalNameBytes())).setDirectory(b).build();
+      r.writeDelimitedTo(out);
+    }
+
+    private void save(INodeFile n) throws IOException {
+      INodeSection.INodeFile.Builder b = INodeSection.INodeFile.newBuilder()
+          .setAccessTime(n.getAccessTime())
+          .setModificationTime(n.getModificationTime())
+          .setPermission(buildPermissionStatus(n))
+          .setPreferredBlockSize(n.getPreferredBlockSize())
+          .setReplication(n.getFileReplication());
+
+      for (Block block : n.getBlocks())
+        b.addBlocks(PBHelper.convert(block));
+
+      INodeSection.INode r = INodeSection.INode.newBuilder()
+          .setType(INodeSection.INode.Type.FILE).setId(n.getId())
+          .setName(ByteString.copyFrom(n.getLocalNameBytes())).setFile(b).build();
+      r.writeDelimitedTo(out);
+    }
+  }
+
+  private FSImageFormatPBINode() {
+  }
+}

+ 192 - 121
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -27,6 +28,8 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.security.DigestInputStream;
 import java.security.DigestOutputStream;
@@ -38,60 +41,32 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
-import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileHeader;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.compress.CompressionCodec;
 
-import com.google.common.base.Preconditions;
+import com.google.common.io.LimitInputStream;
 import com.google.protobuf.CodedOutputStream;
 
 /**
  * Utility class to read / write fsimage in protobuf format.
  */
 final class FSImageFormatProtobuf {
-  private static final Log LOG = LogFactory
-      .getLog(DelegationTokenSecretManager.class);
-
+  private static final Log LOG = LogFactory.getLog(FSImageFormatProtobuf.class);
+  
   static final byte[] MAGIC_HEADER = "HDFSIMG1".getBytes();
   private static final int FILE_VERSION = 1;
-  private static final int PRE_ALLOCATED_HEADER_SIZE = 1024;
-
-  /**
-   * Supported section name
-   */
-  private enum SectionName {
-    NS_INFO("NS_INFO");
-
-    private static final SectionName[] values = SectionName.values();
-    private final String name;
-
-    private SectionName(String name) {
-      this.name = name;
-    }
-
-    private static SectionName fromString(String name) {
-      for (SectionName n : values) {
-        if (n.name.equals(name))
-          return n;
-      }
-      return null;
-    }
-  }
-
-  // Buffer size of when reading / writing fsimage
-  public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024;
-
+  
   static final class Loader implements FSImageFormat.AbstractLoader {
+    private static final int MINIMUM_FILE_LENGTH = 8;
     private final Configuration conf;
     private final FSNamesystem fsn;
 
-    /** The transaction ID of the last edit represented by the loaded file */
-    private long imgTxId;
     /** The MD5 sum of the loaded file */
     private MD5Hash imgDigest;
+    /** The transaction ID of the last edit represented by the loaded file */
+    private long imgTxId;
 
     Loader(Configuration conf, FSNamesystem fsn) {
       this.conf = conf;
@@ -108,65 +83,106 @@ final class FSImageFormatProtobuf {
       return imgTxId;
     }
 
+    void load(File file) throws IOException {
+      RandomAccessFile raFile = new RandomAccessFile(file, "r");
+      FileInputStream fin = new FileInputStream(file);
+      try {
+        loadInternal(raFile, fin);
+      } finally {
+        fin.close();
+        raFile.close();
+      }
+    }
+
+    private boolean checkFileFormat(RandomAccessFile file) throws IOException {
+      if (file.length() < MINIMUM_FILE_LENGTH)
+        return false;
+
+      byte[] magic = new byte[MAGIC_HEADER.length];
+      file.readFully(magic);
+      if (!Arrays.equals(MAGIC_HEADER, magic))
+        return false;
+
+      return true;
+    }
+
+    private FileSummary loadSummary(RandomAccessFile file) throws IOException {
+      final int FILE_LENGTH_FIELD_SIZE = 4;
+      long fileLength = file.length();
+      file.seek(fileLength - FILE_LENGTH_FIELD_SIZE);
+      int summaryLength = file.readInt();
+      file.seek(fileLength - FILE_LENGTH_FIELD_SIZE - summaryLength);
+
+      byte[] summaryBytes = new byte[summaryLength];
+      file.readFully(summaryBytes);
+
+      FileSummary summary = FileSummary
+          .parseDelimitedFrom(new ByteArrayInputStream(summaryBytes));
+      if (summary.getOndiskVersion() != FILE_VERSION) {
+        throw new IOException("Unsupported file version "
+            + summary.getOndiskVersion());
+      }
+
+      if (!LayoutVersion.supports(Feature.PROTOBUF_FORMAT,
+          summary.getLayoutVersion())) {
+        throw new IOException("Unsupported layout version "
+            + summary.getLayoutVersion());
+      }
+      return summary;
+    }
+
     @SuppressWarnings("resource")
-    public void load(FileInputStream fin) throws IOException {
-      FileHeader header = loadHeader(new BufferedInputStream(fin));
+    private void loadInternal(RandomAccessFile raFile, FileInputStream fin)
+        throws IOException {
+      if (!checkFileFormat(raFile)) {
+        throw new IOException("Unrecognized file format");
+      }
+      FileSummary summary = loadSummary(raFile);
 
-      fin.getChannel().position(header.getDataOffset());
       MessageDigest digester = MD5Hash.getDigester();
-      InputStream in = new DigestInputStream(new BufferedInputStream(fin,
-          DEFAULT_BUFFER_SIZE), digester);
-
-      if (header.hasCodec()) {
-        // read compression related info
-        FSImageCompression compression = FSImageCompression.createCompression(
-            conf, header.getCodec());
-        CompressionCodec imageCodec = compression.getImageCodec();
-        if (header.getCodec() != null) {
-          in = imageCodec.createInputStream(in);
+      FileChannel channel = fin.getChannel();
+
+      FSImageFormatPBINode.Loader inodeLoader = new FSImageFormatPBINode.Loader(
+          fsn);
+
+      for (FileSummary.Section s : summary.getSectionsList()) {
+        channel.position(s.getOffset());
+        InputStream in = new DigestInputStream(new BufferedInputStream(
+            new LimitInputStream(fin, s.getLength())), digester);
+
+        if (summary.hasCodec()) {
+          // read compression related info
+          FSImageCompression compression = FSImageCompression
+              .createCompression(conf, summary.getCodec());
+          CompressionCodec imageCodec = compression.getImageCodec();
+          if (summary.getCodec() != null) {
+            in = imageCodec.createInputStream(in);
+          }
         }
-      }
 
-      for (FileHeader.Section s : header.getSectionsList()) {
         String n = s.getName();
         switch (SectionName.fromString(n)) {
         case NS_INFO:
           loadNameSystemSection(in, s);
           break;
+        case INODE:
+          inodeLoader.loadINodeSection(in);
+          break;
+        case INODE_DIR:
+          inodeLoader.loadINodeDirectorySection(in);
+          break;
         default:
           LOG.warn("Unregconized section " + n);
           break;
         }
       }
 
-      updateDigestForFileHeader(header, digester);
+      updateDigestForFileSummary(summary, digester);
 
       imgDigest = new MD5Hash(digester.digest());
-      in.close();
     }
 
-    private FileHeader loadHeader(InputStream fin) throws IOException {
-      byte[] magic = new byte[MAGIC_HEADER.length];
-      if (fin.read(magic) != magic.length
-          || !Arrays.equals(magic, FSImageFormatProtobuf.MAGIC_HEADER)) {
-        throw new IOException("Unrecognized FSImage");
-      }
-
-      FileHeader header = FileHeader.parseDelimitedFrom(fin);
-      if (header.getOndiskVersion() != FILE_VERSION) {
-        throw new IOException("Unsupported file version "
-            + header.getOndiskVersion());
-      }
-
-      if (!LayoutVersion.supports(Feature.PROTOBUF_FORMAT,
-          header.getLayoutVersion())) {
-        throw new IOException("Unsupported layout version "
-            + header.getLayoutVersion());
-      }
-      return header;
-    }
-
-    private void loadNameSystemSection(InputStream in, FileHeader.Section header)
+    private void loadNameSystemSection(InputStream in, FileSummary.Section sections)
         throws IOException {
       NameSystemSection s = NameSystemSection.parseDelimitedFrom(in);
       fsn.setGenerationStampV1(s.getGenstampV1());
@@ -174,62 +190,98 @@ final class FSImageFormatProtobuf {
       fsn.setGenerationStampV1Limit(s.getGenstampV1Limit());
       fsn.setLastAllocatedBlockId(s.getLastAllocatedBlockId());
       imgTxId = s.getTransactionId();
-      long offset = header.getLength() - getOndiskTrunkSize(s);
-      Preconditions.checkArgument(offset == 0);
-      in.skip(offset);
     }
   }
 
   static final class Saver {
-    private final SaveNamespaceContext context;
+    final SaveNamespaceContext context;
+    private long currentOffset = MAGIC_HEADER.length;
     private MD5Hash savedDigest;
 
+    private FileChannel fileChannel;
+    // OutputStream for the section data
+    private OutputStream sectionOutputStream;
+
     Saver(SaveNamespaceContext context) {
       this.context = context;
     }
 
+    public MD5Hash getSavedDigest() {
+      return savedDigest;
+    }
+
+    void commitSection(FileSummary.Builder summary, SectionName name)
+        throws IOException {
+      long oldOffset = currentOffset;
+      sectionOutputStream.flush();
+      long length = fileChannel.position() - oldOffset;
+      summary.addSections(FileSummary.Section.newBuilder().setName(name.name)
+          .setLength(length).setOffset(currentOffset));
+      currentOffset += length;
+    }
+
     void save(File file, FSImageCompression compression) throws IOException {
-      FileHeader.Builder b = FileHeader.newBuilder()
-          .setOndiskVersion(FILE_VERSION)
-          .setLayoutVersion(LayoutVersion.getCurrentLayoutVersion())
-          .setDataOffset(PRE_ALLOCATED_HEADER_SIZE);
-      MessageDigest digester = MD5Hash.getDigester();
-      OutputStream out = null;
+      FileOutputStream fout = new FileOutputStream(file);
       try {
-        FileOutputStream fout = new FileOutputStream(file);
-        FileChannel channel = fout.getChannel();
+        saveInternal(fout, compression);
+      } finally {
+        fout.close();
+      }
+    }
+
+    private void saveFileSummary(FileOutputStream fout, FileSummary summary)
+        throws IOException {
+      summary.writeDelimitedTo(fout);
+      int length = getOndiskTrunkSize(summary);
+      byte[] lengthBytes = new byte[4];
+      ByteBuffer.wrap(lengthBytes).asIntBuffer().put(length);
+      fout.write(lengthBytes);
+    }
 
-        channel.position(PRE_ALLOCATED_HEADER_SIZE);
-        out = new DigestOutputStream(new BufferedOutputStream(fout,
-            DEFAULT_BUFFER_SIZE), digester);
+    private void saveInodes(OutputStream out, FileSummary.Builder summary)
+        throws IOException {
+      FSImageFormatPBINode.Saver saver = new FSImageFormatPBINode.Saver(this,
+          out, summary);
+      saver.serializeINodeSection();
+      saver.serializeINodeDirectorySection();
+    }
 
-        CompressionCodec codec = compression.getImageCodec();
-        if (codec != null) {
-          b.setCodec(codec.getClass().getCanonicalName());
-          out = codec.createOutputStream(out);
-        }
+    private void saveInternal(FileOutputStream fout,
+        FSImageCompression compression) throws IOException {
+      fout.write(MAGIC_HEADER);
+      fileChannel = fout.getChannel();
 
-        save(out, b);
-        out.flush();
-        channel.position(0);
-        FileHeader header = b.build();
-        Preconditions.checkState(MAGIC_HEADER.length
-            + getOndiskTrunkSize(header) < PRE_ALLOCATED_HEADER_SIZE,
-            "Insufficient space to write file header");
-        fout.write(MAGIC_HEADER);
-        header.writeDelimitedTo(fout);
-        updateDigestForFileHeader(header, digester);
-        savedDigest = new MD5Hash(digester.digest());
-      } finally {
-        IOUtils.cleanup(LOG, out);
+      MessageDigest digester = MD5Hash.getDigester();
+      OutputStream out = new DigestOutputStream(new BufferedOutputStream(fout),
+          digester);
+
+      FileSummary.Builder b = FileSummary.newBuilder()
+          .setOndiskVersion(FILE_VERSION)
+          .setLayoutVersion(LayoutVersion.getCurrentLayoutVersion());
+
+      CompressionCodec codec = compression.getImageCodec();
+      if (codec != null) {
+        b.setCodec(codec.getClass().getCanonicalName());
+        sectionOutputStream = codec.createOutputStream(out);
+      } else {
+        sectionOutputStream = out;
       }
+
+      saveNameSystemSection(sectionOutputStream, b);
+      saveInodes(sectionOutputStream, b);
+
+      // Flush the buffered data into the file before appending the header
+      out.flush();
+
+      FileSummary summary = b.build();
+      saveFileSummary(fout, summary);
+      updateDigestForFileSummary(summary, digester);
+      savedDigest = new MD5Hash(digester.digest());
     }
 
-    private void save(OutputStream out, FileHeader.Builder headers)
-        throws IOException {
+    private void saveNameSystemSection(OutputStream out,
+        FileSummary.Builder summary) throws IOException {
       final FSNamesystem fsn = context.getSourceNamesystem();
-      FileHeader.Section.Builder sectionHeader = FileHeader.Section
-          .newBuilder().setName(SectionName.NS_INFO.name);
       NameSystemSection.Builder b = NameSystemSection.newBuilder()
           .setGenstampV1(fsn.getGenerationStampV1())
           .setGenstampV1Limit(fsn.getGenerationStampV1Limit())
@@ -245,33 +297,52 @@ final class FSImageFormatProtobuf {
       b.setNamespaceId(fsn.unprotectedGetNamespaceInfo().getNamespaceID());
       NameSystemSection s = b.build();
       s.writeDelimitedTo(out);
-      sectionHeader.setLength(getOndiskTrunkSize(s));
-      headers.addSections(sectionHeader);
-    }
 
-    public MD5Hash getSavedDigest() {
-      return savedDigest;
+      commitSection(summary, SectionName.NS_INFO);
     }
   }
 
+  /**
+   * Supported section name
+   */
+  enum SectionName {
+    INODE("INODE"), INODE_DIR("INODE_DIR"), NS_INFO("NS_INFO");
+
+    private static final SectionName[] values = SectionName.values();
+
+    private static SectionName fromString(String name) {
+      for (SectionName n : values) {
+        if (n.name.equals(name))
+          return n;
+      }
+      return null;
+    }
+
+    private final String name;
+
+    private SectionName(String name) {
+      this.name = name;
+    }
+  }
+  
   private static int getOndiskTrunkSize(com.google.protobuf.GeneratedMessage s) {
     return CodedOutputStream.computeRawVarint32Size(s.getSerializedSize())
         + s.getSerializedSize();
   }
 
   /**
-   * Include the FileHeader when calculating the digest. This is required as the
+   * Include the FileSummary when calculating the digest. This is required as the
    * code does not access the FSImage strictly in sequential order.
    */
-  private static void updateDigestForFileHeader(FileHeader header,
+  private static void updateDigestForFileSummary(FileSummary summary,
       MessageDigest digester) throws IOException {
     ByteArrayOutputStream o = new ByteArrayOutputStream();
     o.write(MAGIC_HEADER);
-    header.writeDelimitedTo(o);
+    summary.writeDelimitedTo(o);
     digester.update(o.toByteArray());
   }
 
   private FSImageFormatProtobuf() {
   }
 
-}
+}

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java

@@ -46,6 +46,10 @@ public class INodeMap {
   /** Synchronized by external lock. */
   private final GSet<INode, INodeWithAdditionalFields> map;
   
+  GSet<INode, INodeWithAdditionalFields> getMap() {
+    return map;
+  }
+
   private INodeMap(GSet<INode, INodeWithAdditionalFields> map) {
     Preconditions.checkArgument(map != null);
     this.map = map;

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java

@@ -126,7 +126,7 @@ class ImageLoaderCurrent implements ImageLoader {
                                       new SimpleDateFormat("yyyy-MM-dd HH:mm");
   private static int[] versions = { -16, -17, -18, -19, -20, -21, -22, -23,
       -24, -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39,
-      -40, -41, -42, -43, -44, -45, -46, -47, -48, -49 };
+      -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50 };
   private int imageVersion = 0;
   
   private final Map<Long, Boolean> subtreeMap = new HashMap<Long, Boolean>();

+ 77 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto

@@ -21,18 +21,21 @@ option java_outer_classname = "FsImageProto";
 
 package hadoop.hdfs.fsimage;
 
+import "hdfs.proto";
+
 /**
  * This file defines the on-disk layout of the file system image. The
  * layout is defined by the following EBNF grammar, in which angle
- * brackets mark protobuf definitions. (e.g., <FileHeader>)
+ * brackets mark protobuf definitions. (e.g., <FileSummary>)
  *
- * FILE := MAGIC <FileHeader> [blank] SECTION*
+ * FILE := MAGIC SECTION* <FileSummary> FileSummaryLength
  * MAGIC := 'HDFSIMG1'
- * SECTION := NameSystemSection | ...
+ * SECTION := <NameSystemSection> | ...
+ * FileSummaryLength := 4 byte int
  *
  * Some notes:
  *
- * The codec field in FileHeader describes the compression codec used
+ * The codec field in FileSummary describes the compression codec used
  * for all sections. The fileheader is always uncompressed.
  *
  * All protobuf messages are serialized in delimited form, which means
@@ -41,22 +44,24 @@ package hadoop.hdfs.fsimage;
  *
  **/
 
-message FileHeader {
-  // The on-disk layout version of the file.
+message FileSummary {
+  // The version of the above EBNF grammars.
   required uint32 ondiskVersion = 1;
   // layoutVersion describes which features are available in the
   // FSImage.
   required uint32 layoutVersion = 2;
-  optional string codec   = 3;
-  // The offset of the first data section
-  required uint64 dataOffset = 4;
+  optional string codec         = 3;
   message Section {
     optional string name = 1;
     optional uint64 length = 2;
+    optional uint64 offset = 3;
   }
-  repeated Section sections = 5;
+  repeated Section sections = 4;
 }
 
+/**
+ * Name: NS_INFO
+ **/
 message NameSystemSection {
   optional uint32 namespaceId = 1;
   optional uint64 genstampV1 = 2;
@@ -64,4 +69,66 @@ message NameSystemSection {
   optional uint64 genstampV1Limit = 4;
   optional uint64 lastAllocatedBlockId = 5;
   optional uint64 transactionId = 6;
+}
+
+/**
+ * Name: INODE
+ **/
+message INodeSection {
+  message Permission {
+    optional string user = 1;
+    optional string group = 2;
+    optional uint32 permission = 3;
+  }
+
+  message INodeFile {
+    optional uint32 replication = 1;
+    optional uint64 modificationTime = 2;
+    optional uint64 accessTime = 3;
+    optional uint64 preferredBlockSize = 4;
+    optional Permission permission = 5;
+    repeated BlockProto blocks = 6;
+  }
+
+  message INodeDirectory {
+    optional uint64 modificationTime = 1;
+    // namespace quota
+    optional uint64 nsQuota = 2;
+    // diskspace quota
+    optional uint64 dsQuota = 3;
+    optional Permission permission = 4;
+  }
+
+  message INode {
+    enum Type {
+      FILE = 1;
+      DIRECTORY = 2;
+      REFERENCE = 3;
+      SYMLINK = 4;
+    };
+    required Type type = 1;
+    required uint64 id = 2;
+
+    optional INodeFile file = 3;
+    optional INodeDirectory directory = 4;
+
+    optional bytes name = 16;
+    repeated bytes features = 17;
+  }
+
+  optional uint64 lastInodeId = 1;
+  optional uint64 numInodes = 2;
+  // repeated INodes..
+}
+
+/**
+ * This section records the children of each directories
+ * NAME: INODE_DIR
+ **/
+message INodeDirectorySection {
+  message DirEntry {
+    optional uint64 parent = 1;
+    repeated uint64 children = 2;
+  }
+  // repeated DirEntry, ended at the boundary of the section.
 }

+ 49 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFSImage {
+  @Test
+  public void testINode() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    fs.mkdirs(new Path("/abc/def"));
+    fs.create(new Path("/abc/def/e")).close();
+    fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    fs.saveNamespace();
+    fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+    cluster.restartNameNode();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    Assert.assertTrue(fs.exists(new Path("/abc/def/e")));
+    Assert.assertTrue(fs.isDirectory(new Path("/abc/def")));
+    cluster.shutdown();
+  }
+}