Browse Source

HDFS-5783. Compute the digest before loading FSImage. Contributed by Haohui Mai.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5698@1559298 13f79535-47bb-0310-9956-ffa450edef68
Jing Zhao 11 years ago
parent
commit
e5ccec944a

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt

@@ -5,3 +5,5 @@ HDFS-5698 subtasks
     HDFS-5738. Serialize INode information in protobuf. (Haohui Mai via jing9)
 
     HDFS-5772. Serialize under-construction file information in FSImage. (jing9)
+
+    HDFS-5783. Compute the digest before loading FSImage. (Haohui Mai via jing9)

+ 16 - 18
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java

@@ -209,19 +209,16 @@ final class FSImageFormatPBINode {
 
   final static class Saver {
     private final FSNamesystem fsn;
-    private final FileSummary.Builder headers;
-    private final OutputStream out;
+    private final FileSummary.Builder summary;
     private final FSImageFormatProtobuf.Saver parent;
 
-    Saver(FSImageFormatProtobuf.Saver parent, OutputStream out,
-        FileSummary.Builder headers) {
+    Saver(FSImageFormatProtobuf.Saver parent, FileSummary.Builder summary) {
       this.parent = parent;
-      this.out = out;
-      this.headers = headers;
+      this.summary = summary;
       this.fsn = parent.context.getSourceNamesystem();
     }
 
-    void serializeINodeDirectorySection() throws IOException {
+    void serializeINodeDirectorySection(OutputStream out) throws IOException {
       for (INodeWithAdditionalFields n : fsn.dir.getINodeMap().getMap()) {
         if (!n.isDirectory())
           continue;
@@ -238,24 +235,25 @@ final class FSImageFormatPBINode {
           e.writeDelimitedTo(out);
         }
       }
-      parent.commitSection(headers,
+      parent.commitSection(summary,
           FSImageFormatProtobuf.SectionName.INODE_DIR);
     }
 
-    void serializeINodeSection() throws IOException {
+    void serializeINodeSection(OutputStream out) throws IOException {
       INodeMap inodesMap = fsn.dir.getINodeMap();
+
       INodeSection.Builder b = INodeSection.newBuilder()
           .setLastInodeId(fsn.getLastInodeId()).setNumInodes(inodesMap.size());
       INodeSection s = b.build();
       s.writeDelimitedTo(out);
 
       for (INodeWithAdditionalFields n : inodesMap.getMap()) {
-        save(n);
+        save(out, n);
       }
-      parent.commitSection(headers, FSImageFormatProtobuf.SectionName.INODE);
+      parent.commitSection(summary, FSImageFormatProtobuf.SectionName.INODE);
     }
 
-    void serializeFilesUCSection() throws IOException {
+    void serializeFilesUCSection(OutputStream out) throws IOException {
       Map<String, INodeFile> ucMap = fsn.getFilesUnderConstruction();
       for (Map.Entry<String, INodeFile> entry : ucMap.entrySet()) {
         String path = entry.getKey();
@@ -265,7 +263,7 @@ final class FSImageFormatPBINode {
         FileUnderConstructionEntry e = b.build();
         e.writeDelimitedTo(out);
       }
-      parent.commitSection(headers,
+      parent.commitSection(summary,
           FSImageFormatProtobuf.SectionName.FILES_UNDERCONSTRUCTION);
     }
 
@@ -274,15 +272,15 @@ final class FSImageFormatPBINode {
           .setGroup(n.getGroupName()).setPermission(n.getFsPermissionShort());
     }
 
-    private void save(INode n) throws IOException {
+    private void save(OutputStream out, INode n) throws IOException {
       if (n.isDirectory()) {
-        save(n.asDirectory());
+        save(out, n.asDirectory());
       } else if (n.isFile()) {
-        save(n.asFile());
+        save(out, n.asFile());
       }
     }
 
-    private void save(INodeDirectory n) throws IOException {
+    private void save(OutputStream out, INodeDirectory n) throws IOException {
       Quota.Counts quota = n.getQuotaCounts();
       INodeSection.INodeDirectory.Builder b = INodeSection.INodeDirectory
           .newBuilder().setModificationTime(n.getModificationTime())
@@ -296,7 +294,7 @@ final class FSImageFormatPBINode {
       r.writeDelimitedTo(out);
     }
 
-    private void save(INodeFile n) throws IOException {
+    private void save(OutputStream out, INodeFile n) throws IOException {
       INodeSection.INodeFile.Builder b = INodeSection.INodeFile.newBuilder()
           .setAccessTime(n.getAccessTime())
           .setModificationTime(n.getModificationTime())

+ 45 - 46
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.hdfs.server.namenode;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -31,7 +30,6 @@ import java.io.OutputStream;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.security.DigestInputStream;
 import java.security.DigestOutputStream;
 import java.security.MessageDigest;
 import java.util.Arrays;
@@ -43,8 +41,10 @@ import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressorStream;
 
 import com.google.common.io.LimitInputStream;
 import com.google.protobuf.CodedOutputStream;
@@ -84,6 +84,7 @@ final class FSImageFormatProtobuf {
     }
 
     void load(File file) throws IOException {
+      imgDigest = MD5FileUtils.computeMd5ForFile(file);
       RandomAccessFile raFile = new RandomAccessFile(file, "r");
       FileInputStream fin = new FileInputStream(file);
       try {
@@ -139,7 +140,6 @@ final class FSImageFormatProtobuf {
       }
       FileSummary summary = loadSummary(raFile);
 
-      MessageDigest digester = MD5Hash.getDigester();
       FileChannel channel = fin.getChannel();
 
       FSImageFormatPBINode.Loader inodeLoader = new FSImageFormatPBINode.Loader(
@@ -147,8 +147,8 @@ final class FSImageFormatProtobuf {
 
       for (FileSummary.Section s : summary.getSectionsList()) {
         channel.position(s.getOffset());
-        InputStream in = new DigestInputStream(new BufferedInputStream(
-            new LimitInputStream(fin, s.getLength())), digester);
+        InputStream in = new BufferedInputStream(new LimitInputStream(fin,
+            s.getLength()));
 
         if (summary.hasCodec()) {
           // read compression related info
@@ -179,10 +179,6 @@ final class FSImageFormatProtobuf {
           break;
         }
       }
-
-      updateDigestForFileSummary(summary, digester);
-
-      imgDigest = new MD5Hash(digester.digest());
     }
 
     private void loadNameSystemSection(InputStream in, FileSummary.Section sections)
@@ -204,6 +200,8 @@ final class FSImageFormatProtobuf {
     private FileChannel fileChannel;
     // OutputStream for the section data
     private OutputStream sectionOutputStream;
+    private CompressionCodec codec;
+    private OutputStream underlyingOutputStream;
 
     Saver(SaveNamespaceContext context) {
       this.context = context;
@@ -216,15 +214,29 @@ final class FSImageFormatProtobuf {
     void commitSection(FileSummary.Builder summary, SectionName name)
         throws IOException {
       long oldOffset = currentOffset;
-      sectionOutputStream.flush();
+      flushSectionOutputStream();
+
+      if (codec != null) {
+        sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
+      } else {
+        sectionOutputStream = underlyingOutputStream;
+      }
       long length = fileChannel.position() - oldOffset;
       summary.addSections(FileSummary.Section.newBuilder().setName(name.name)
           .setLength(length).setOffset(currentOffset));
       currentOffset += length;
     }
 
+    private void flushSectionOutputStream() throws IOException {
+      if (codec != null) {
+        ((CompressorStream) sectionOutputStream).finish();
+      }
+      sectionOutputStream.flush();
+    }
+
     void save(File file, FSImageCompression compression) throws IOException {
       FileOutputStream fout = new FileOutputStream(file);
+      fileChannel = fout.getChannel();
       try {
         saveInternal(fout, compression);
       } finally {
@@ -232,60 +244,60 @@ final class FSImageFormatProtobuf {
       }
     }
 
-    private void saveFileSummary(FileOutputStream fout, FileSummary summary)
+    private static void saveFileSummary(OutputStream out, FileSummary summary)
         throws IOException {
-      summary.writeDelimitedTo(fout);
+      summary.writeDelimitedTo(out);
       int length = getOndiskTrunkSize(summary);
       byte[] lengthBytes = new byte[4];
       ByteBuffer.wrap(lengthBytes).asIntBuffer().put(length);
-      fout.write(lengthBytes);
+      out.write(lengthBytes);
     }
 
-    private void saveInodes(OutputStream out, FileSummary.Builder summary)
-        throws IOException {
+    private void saveInodes(FileSummary.Builder summary) throws IOException {
       FSImageFormatPBINode.Saver saver = new FSImageFormatPBINode.Saver(this,
-          out, summary);
-      saver.serializeINodeSection();
-      saver.serializeINodeDirectorySection();
-      saver.serializeFilesUCSection();
+          summary);
+      saver.serializeINodeSection(sectionOutputStream);
+      saver.serializeINodeDirectorySection(sectionOutputStream);
+      saver.serializeFilesUCSection(sectionOutputStream);
     }
 
     private void saveInternal(FileOutputStream fout,
         FSImageCompression compression) throws IOException {
-      fout.write(MAGIC_HEADER);
-      fileChannel = fout.getChannel();
-
       MessageDigest digester = MD5Hash.getDigester();
-      OutputStream out = new DigestOutputStream(new BufferedOutputStream(fout),
-          digester);
+      underlyingOutputStream = new DigestOutputStream(new BufferedOutputStream(
+          fout), digester);
+      underlyingOutputStream.write(MAGIC_HEADER);
+
+      fileChannel = fout.getChannel();
 
       FileSummary.Builder b = FileSummary.newBuilder()
           .setOndiskVersion(FILE_VERSION)
           .setLayoutVersion(LayoutVersion.getCurrentLayoutVersion());
 
-      CompressionCodec codec = compression.getImageCodec();
+      codec = compression.getImageCodec();
       if (codec != null) {
         b.setCodec(codec.getClass().getCanonicalName());
-        sectionOutputStream = codec.createOutputStream(out);
+        sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
       } else {
-        sectionOutputStream = out;
+        sectionOutputStream = underlyingOutputStream;
       }
 
-      saveNameSystemSection(sectionOutputStream, b);
-      saveInodes(sectionOutputStream, b);
+      saveNameSystemSection(b);
+      saveInodes(b);
 
       // Flush the buffered data into the file before appending the header
-      out.flush();
+      flushSectionOutputStream();
 
       FileSummary summary = b.build();
-      saveFileSummary(fout, summary);
-      updateDigestForFileSummary(summary, digester);
+      saveFileSummary(underlyingOutputStream, summary);
+      underlyingOutputStream.close();
       savedDigest = new MD5Hash(digester.digest());
     }
 
-    private void saveNameSystemSection(OutputStream out,
+    private void saveNameSystemSection(
         FileSummary.Builder summary) throws IOException {
       final FSNamesystem fsn = context.getSourceNamesystem();
+      OutputStream out = sectionOutputStream;
       NameSystemSection.Builder b = NameSystemSection.newBuilder()
           .setGenstampV1(fsn.getGenerationStampV1())
           .setGenstampV1Limit(fsn.getGenerationStampV1Limit())
@@ -335,19 +347,6 @@ final class FSImageFormatProtobuf {
         + s.getSerializedSize();
   }
 
-  /**
-   * Include the FileSummary when calculating the digest. This is required as the
-   * code does not access the FSImage strictly in sequential order.
-   */
-  private static void updateDigestForFileSummary(FileSummary summary,
-      MessageDigest digester) throws IOException {
-    ByteArrayOutputStream o = new ByteArrayOutputStream();
-    o.write(MAGIC_HEADER);
-    summary.writeDelimitedTo(o);
-    digester.update(o.toByteArray());
-  }
-
   private FSImageFormatProtobuf() {
   }
-
 }

+ 88 - 55
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.EnumSet;
 
@@ -28,6 +29,7 @@ import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSOutputStream;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -36,70 +38,101 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.junit.Test;
 
 public class TestFSImage {
 
-  private MiniDFSCluster cluster;
-  private Configuration conf;
-  private DistributedFileSystem fs;
-  private FSNamesystem fsn;
-
-  @Before
-  public void setUp() throws IOException {
-    conf = new Configuration();
-    cluster = new MiniDFSCluster.Builder(conf).build();
-    cluster.waitActive();
-    fsn = cluster.getNamesystem();
-    fs = cluster.getFileSystem();
+  @Test
+  public void testPersist() throws IOException {
+    Configuration conf = new Configuration();
+    testPersistHelper(conf);
+  }
+
+  @Test
+  public void testCompression() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
+    conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
+        "org.apache.hadoop.io.compress.GzipCodec");
+    testPersistHelper(conf);
   }
 
-  @After
-  public void tearDown() throws IOException {
-    if (cluster != null) {
-      cluster.shutdown();
+  private void testPersistHelper(Configuration conf) throws IOException {
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      cluster.waitActive();
+      FSNamesystem fsn = cluster.getNamesystem();
+      DistributedFileSystem fs = cluster.getFileSystem();
+
+      final Path dir = new Path("/abc/def");
+      final Path file1 = new Path(dir, "f1");
+      final Path file2 = new Path(dir, "f2");
+
+      // create an empty file f1
+      fs.create(file1).close();
+
+      // create an under-construction file f2
+      FSDataOutputStream out = fs.create(file2);
+      out.writeBytes("hello");
+      ((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet
+          .of(SyncFlag.UPDATE_LENGTH));
+
+      // checkpoint
+      fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      fs.saveNamespace();
+      fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+
+      cluster.restartNameNode();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+
+      assertTrue(fs.isDirectory(dir));
+      assertTrue(fs.exists(file1));
+      assertTrue(fs.exists(file2));
+
+      // check internals of file2
+      INodeFile file2Node = fsn.dir.getINode4Write(file2.toString()).asFile();
+      assertEquals("hello".length(), file2Node.computeFileSize());
+      assertTrue(file2Node.isUnderConstruction());
+      BlockInfo[] blks = file2Node.getBlocks();
+      assertEquals(1, blks.length);
+      assertEquals(BlockUCState.UNDER_CONSTRUCTION, blks[0].getBlockUCState());
+      // check lease manager
+      Lease lease = fsn.leaseManager.getLeaseByPath(file2.toString());
+      Assert.assertNotNull(lease);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
   }
 
+  /**
+   * Ensure that the digest written by the saver equals to the digest of the
+   * file.
+   */
   @Test
-  public void testINode() throws IOException {
-    final Path dir = new Path("/abc/def");
-    final Path file1 = new Path(dir, "f1");
-    final Path file2 = new Path(dir, "f2");
-
-    // create an empty file f1
-    fs.create(file1).close();
-
-    // create an under-construction file f2
-    FSDataOutputStream out = fs.create(file2);
-    out.writeBytes("hello");
-    ((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet
-        .of(SyncFlag.UPDATE_LENGTH));
-
-    // checkpoint
-    fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
-    fs.saveNamespace();
-    fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
-
-    cluster.restartNameNode();
-    cluster.waitActive();
-    fs = cluster.getFileSystem();
-
-    assertTrue(fs.isDirectory(dir));
-    assertTrue(fs.exists(file1));
-    assertTrue(fs.exists(file2));
-
-    // check internals of file2
-    INodeFile file2Node = fsn.dir.getINode4Write(file2.toString()).asFile();
-    assertEquals("hello".length(), file2Node.computeFileSize());
-    assertTrue(file2Node.isUnderConstruction());
-    BlockInfo[] blks = file2Node.getBlocks();
-    assertEquals(1, blks.length);
-    assertEquals(BlockUCState.UNDER_CONSTRUCTION, blks[0].getBlockUCState());
-    // check lease manager
-    Lease lease = fsn.leaseManager.getLeaseByPath(file2.toString());
-    Assert.assertNotNull(lease);
+  public void testDigest() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      fs.saveNamespace();
+      fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+      File currentDir = FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0).get(
+          0);
+      File fsimage = FSImageTestUtil.findNewestImageFile(currentDir
+          .getAbsolutePath());
+      assertEquals(MD5FileUtils.readStoredMd5ForFile(fsimage),
+          MD5FileUtils.computeMd5ForFile(fsimage));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
   }
 }