Преглед на файлове

HDFS-17573. Allow turn on both FSImage parallelization and compression (#6929). Contributed by Sung Dong Kim.

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
Sung Dong Kim преди 8 месеца
родител
ревизия
89e38f08ae

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java

@@ -790,6 +790,7 @@ public final class FSImageFormatPBINode {
           outputInodes = 0;
           parent.commitSubSection(summary,
               FSImageFormatProtobuf.SectionName.INODE_DIR_SUB);
+          out = parent.getSectionOutputStream();
         }
       }
       parent.commitSectionAndSubSection(summary,
@@ -817,6 +818,7 @@ public final class FSImageFormatPBINode {
         if (i % parent.getInodesPerSubSection() == 0) {
           parent.commitSubSection(summary,
               FSImageFormatProtobuf.SectionName.INODE_SUB);
+          out = parent.getSectionOutputStream();
         }
       }
       parent.commitSectionAndSubSection(summary,

+ 33 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java

@@ -584,18 +584,6 @@ public final class FSImageFormatProtobuf {
 
   private static boolean enableParallelSaveAndLoad(Configuration conf) {
     boolean loadInParallel = enableParallelLoad;
-    boolean compressionEnabled = conf.getBoolean(
-        DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY,
-        DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT);
-
-    if (loadInParallel) {
-      if (compressionEnabled) {
-        LOG.warn("Parallel Image loading and saving is not supported when {}" +
-                " is set to true. Parallel will be disabled.",
-            DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY);
-        loadInParallel = false;
-      }
-    }
     return loadInParallel;
   }
 
@@ -653,7 +641,11 @@ public final class FSImageFormatProtobuf {
       return inodesPerSubSection;
     }
 
-    /**
+    public OutputStream getSectionOutputStream() {
+      return sectionOutputStream;
+    }
+
+      /**
      * Commit the length and offset of a fsimage section to the summary index,
      * including the sub section, which will be committed before the section is
      * committed.
@@ -664,14 +656,22 @@ public final class FSImageFormatProtobuf {
      */
     public void commitSectionAndSubSection(FileSummary.Builder summary,
         SectionName name, SectionName subSectionName) throws IOException {
-      commitSubSection(summary, subSectionName);
-      commitSection(summary, name);
+      commitSubSection(summary, subSectionName, true);
+      commitSection(summary, name, true);
     }
 
     public void commitSection(FileSummary.Builder summary, SectionName name)
-        throws IOException {
+            throws IOException {
+      commitSection(summary, name, false);
+    }
+
+    public void commitSection(FileSummary.Builder summary, SectionName name,
+        boolean afterSubSectionCommit) throws IOException {
       long oldOffset = currentOffset;
-      flushSectionOutputStream();
+      boolean subSectionCommitted = afterSubSectionCommit && writeSubSections;
+      if (!subSectionCommitted) {
+        flushSectionOutputStream();
+      }
 
       if (codec != null) {
         sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
@@ -685,14 +685,20 @@ public final class FSImageFormatProtobuf {
       subSectionOffset = currentOffset;
     }
 
+    public void commitSubSection(FileSummary.Builder summary, SectionName name)
+            throws IOException {
+      this.commitSubSection(summary, name, false);
+    }
+
     /**
      * Commit the length and offset of a fsimage sub-section to the summary
      * index.
      * @param summary The image summary object
      * @param name The name of the sub-section to commit
+     * @param isLast True if sub-section is the last sub-section of each section
      * @throws IOException
      */
-    public void commitSubSection(FileSummary.Builder summary, SectionName name)
+    public void commitSubSection(FileSummary.Builder summary, SectionName name, boolean isLast)
         throws IOException {
       if (!writeSubSections) {
         return;
@@ -701,7 +707,15 @@ public final class FSImageFormatProtobuf {
       LOG.debug("Saving a subsection for {}", name.toString());
       // The output stream must be flushed before the length is obtained
       // as the flush can move the length forward.
-      sectionOutputStream.flush();
+      flushSectionOutputStream();
+
+      if (codec == null || isLast) {
+        // To avoid empty sub-section, Do not create CompressionOutputStream
+        // if sub-section is last sub-section of each section
+        sectionOutputStream = underlyingOutputStream;
+      } else {
+        sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
+      }
       long length = fileChannel.position() - subSectionOffset;
       if (length == 0) {
         LOG.warn("The requested section for {} is empty. It will not be " +

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java

@@ -530,8 +530,8 @@ public class FSImageFormatPBSnapshot {
           context.checkCancelled();
         }
         if (i % parent.getInodesPerSubSection() == 0) {
-          parent.commitSubSection(headers,
-              FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF_SUB);
+          parent.commitSubSection(headers, FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF_SUB);
+          out = parent.getSectionOutputStream();
         }
       }
       parent.commitSectionAndSubSection(headers,

+ 11 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java

@@ -1120,7 +1120,7 @@ public class TestFSImage {
   }
 
   @Test
-  public void testNoParallelSectionsWithCompressionEnabled()
+  public void testParallelSaveAndLoadWithCompression()
       throws IOException {
     Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
@@ -1137,16 +1137,21 @@ public class TestFSImage {
           getLatestImageSummary(cluster);
       ArrayList<Section> sections = Lists.newArrayList(
           summary.getSectionsList());
+      Section inodeSection =
+              getSubSectionsOfName(sections, SectionName.INODE).get(0);
+      Section dirSection = getSubSectionsOfName(sections,
+              SectionName.INODE_DIR).get(0);
 
       ArrayList<Section> inodeSubSections =
           getSubSectionsOfName(sections, SectionName.INODE_SUB);
       ArrayList<Section> dirSubSections =
           getSubSectionsOfName(sections, SectionName.INODE_DIR_SUB);
+      // Compression and parallel can be enabled at the same time.
+      assertEquals(4, inodeSubSections.size());
+      assertEquals(4, dirSubSections.size());
 
-      // As compression is enabled, there should be no sub-sections in the
-      // image header
-      assertEquals(0, inodeSubSections.size());
-      assertEquals(0, dirSubSections.size());
+      ensureSubSectionsAlignWithParent(inodeSubSections, inodeSection);
+      ensureSubSectionsAlignWithParent(dirSubSections, dirSection);
     } finally {
       if (cluster != null) {
         cluster.shutdown();
@@ -1229,4 +1234,4 @@ public class TestFSImage {
     SnapshotTestHelper.compareDumpedTreeInFile(
         preRestartTree, postRestartTree, true);
   }
-}
+}

+ 35 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.SafeModeAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -77,15 +78,18 @@ public class TestFSImageWithSnapshot {
   MiniDFSCluster cluster;
   FSNamesystem fsn;
   DistributedFileSystem hdfs;
+
+  public void createCluster() throws IOException {
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+    cluster.waitActive();
+    fsn = cluster.getNamesystem();
+    hdfs = cluster.getFileSystem();
+  }
   
   @Before
   public void setUp() throws Exception {
     conf = new Configuration();
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
-        .build();
-    cluster.waitActive();
-    fsn = cluster.getNamesystem();
-    hdfs = cluster.getFileSystem();
+    createCluster();
   }
 
   @After
@@ -512,6 +516,32 @@ public class TestFSImageWithSnapshot {
     hdfs = cluster.getFileSystem();
   }
 
+  /**
+   * Test parallel compressed fsimage can be loaded serially.
+   */
+  @Test
+  public void testLoadParallelCompressedImageSerial() throws Exception {
+    int s = 0;
+    cluster.shutdown();
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+    cluster.waitActive();
+    fsn = cluster.getNamesystem();
+    hdfs = cluster.getFileSystem();
+    hdfs.mkdirs(dir);
+    SnapshotTestHelper.createSnapshot(hdfs, dir, "s");
+
+    Path sub1 = new Path(dir, "sub1");
+    Path sub1file1 = new Path(sub1, "sub1file1");
+    Path sub1file2 = new Path(sub1, "sub1file2");
+    DFSTestUtil.createFile(hdfs, sub1file1, BLOCKSIZE, (short) 1, seed);
+    DFSTestUtil.createFile(hdfs, sub1file2, BLOCKSIZE, (short) 1, seed);
+
+    conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false);
+    conf.setBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, false);
+    checkImage(s);
+  }
+
   void rename(Path src, Path dst) throws Exception {
     printTree("Before rename " + src + " -> " + dst);
     hdfs.rename(src, dst);

+ 58 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshotParallelAndCompress.java

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.slf4j.event.Level;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.test.GenericTestUtils;
+
+/**
+ * This test extends TestFSImageWithSnapshot to test
+ * enable both fsimage load parallel and fsimage compress.
+ */
+public class TestFSImageWithSnapshotParallelAndCompress extends TestFSImageWithSnapshot {
+  {
+    SnapshotTestHelper.disableLogs();
+    GenericTestUtils.setLogLevel(INode.LOG, Level.TRACE);
+  }
+
+  @Override
+  public void createCluster() throws IOException {
+
+    // turn on both parallelization and compression
+    conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
+    conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY, GzipCodec.class.getCanonicalName());
+    conf.setBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, true);
+    conf.setInt(DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, 2);
+    conf.setInt(DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, 2);
+    conf.setInt(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, 2);
+
+    conf = new Configuration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+    cluster.waitActive();
+    fsn = cluster.getNamesystem();
+    hdfs = cluster.getFileSystem();
+  }
+}