浏览代码

HDFS-7929. inotify unable fetch pre-upgrade edit log segments once upgrade starts (Zhe Zhang via Colin P. McCabe)

(cherry picked from commit 43b41f22411439c5e23629197fb2fde45dcf0f0f)
(cherry picked from commit 219eb22c1571f76df32967a930049d983cbf5024)
Colin Patrick Mccabe 10 年之前
父节点
当前提交
9e61835678

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -885,6 +885,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7587. Edit log corruption can happen if append fails with a quota
     violation. (jing9)
 
+    HDFS-7929. inotify unable fetch pre-upgrade edit log segments once upgrade
+    starts (Zhe Zhang via Colin P. McCabe)
+
     BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
 
       HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -406,7 +406,7 @@ public class FSImage implements Closeable {
     for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
       StorageDirectory sd = it.next();
       try {
-        NNUpgradeUtil.doPreUpgrade(sd);
+        NNUpgradeUtil.doPreUpgrade(conf, sd);
       } catch (Exception e) {
         LOG.error("Failed to move aside pre-upgrade storage " +
             "in image directory " + sd.getRoot(), e);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java

@@ -585,7 +585,7 @@ public class FileJournalManager implements JournalManager {
   public void doPreUpgrade() throws IOException {
     LOG.info("Starting upgrade of edits directory " + sd.getRoot());
     try {
-     NNUpgradeUtil.doPreUpgrade(sd);
+     NNUpgradeUtil.doPreUpgrade(conf, sd);
     } catch (IOException ioe) {
      LOG.error("Failed to move aside pre-upgrade storage " +
          "in image directory " + sd.getRoot(), ioe);

+ 38 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNUpgradeUtil.java

@@ -18,15 +18,19 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.io.IOUtils;
 
 abstract class NNUpgradeUtil {
   
@@ -99,15 +103,17 @@ abstract class NNUpgradeUtil {
    * a call to any JM's or local storage dir's doPreUpgrade method fails, then
    * doUpgrade will not be called for any JM. The existing current dir is
    * renamed to previous.tmp, and then a new, empty current dir is created.
-   * 
+   *
+   * @param conf configuration for creating {@link EditLogFileOutputStream}
    * @param sd the storage directory to perform the pre-upgrade procedure.
    * @throws IOException in the event of error
    */
-  static void doPreUpgrade(StorageDirectory sd) throws IOException {
+  static void doPreUpgrade(Configuration conf, StorageDirectory sd)
+      throws IOException {
     LOG.info("Starting upgrade of storage directory " + sd.getRoot());
     File curDir = sd.getCurrentDir();
     File prevDir = sd.getPreviousDir();
-    File tmpDir = sd.getPreviousTmp();
+    final File tmpDir = sd.getPreviousTmp();
 
     Preconditions.checkState(curDir.exists(),
         "Current directory must exist for preupgrade.");
@@ -123,6 +129,35 @@ abstract class NNUpgradeUtil {
     if (!curDir.mkdir()) {
       throw new IOException("Cannot create directory " + curDir);
     }
+
+    List<String> fileNameList = IOUtils.listDirectory(tmpDir, new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+        return dir.equals(tmpDir)
+            && name.startsWith(NNStorage.NameNodeFile.EDITS.getName());
+      }
+    });
+
+    for (String s : fileNameList) {
+      File prevFile = new File(tmpDir, s);
+      Preconditions.checkState(prevFile.canRead(),
+          "Edits log file " + s + " is not readable.");
+      File newFile = new File(curDir, prevFile.getName());
+      Preconditions.checkState(newFile.createNewFile(),
+          "Cannot create new edits log file in " + curDir);
+      EditLogFileInputStream in = new EditLogFileInputStream(prevFile);
+      EditLogFileOutputStream out =
+          new EditLogFileOutputStream(conf, newFile, 512*1024);
+      FSEditLogOp logOp = in.nextValidOp();
+      while (logOp != null) {
+        out.write(logOp);
+        logOp = in.nextOp();
+      }
+      out.setReadyToFlush();
+      out.flushAndSync(true);
+      out.close();
+      in.close();
+    }
   }
   
   /**

+ 49 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java

@@ -28,7 +28,10 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
@@ -42,7 +45,9 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.hdfs.server.namenode.TestParallelImageWrite;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.BeforeClass;
@@ -450,7 +455,50 @@ public class TestDFSUpgrade {
       assertTrue(Storage.is203LayoutVersion(lv));
     }
   }
-  
+
+  @Test
+  public void testPreserveEditLogs() throws Exception {
+    conf = new HdfsConfiguration();
+    conf = UpgradeUtilities.initializeStorageStateConf(1, conf);
+    String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, false);
+
+    log("Normal NameNode upgrade", 1);
+    File[] created =
+        UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+    List<String> beforeUpgrade = new LinkedList<>();
+    for (final File createdDir : created) {
+      List<String> fileNameList =
+          IOUtils.listDirectory(createdDir, EditLogsFilter.INSTANCE);
+      beforeUpgrade.addAll(fileNameList);
+    }
+
+    cluster = createCluster();
+
+    List<String> afterUpgrade = new LinkedList<>();
+    for (final File createdDir : created) {
+      List<String> fileNameList =
+          IOUtils.listDirectory(createdDir, EditLogsFilter.INSTANCE);
+      afterUpgrade.addAll(fileNameList);
+    }
+
+    for (String s : beforeUpgrade) {
+      assertTrue(afterUpgrade.contains(s));
+    }
+
+    cluster.shutdown();
+    UpgradeUtilities.createEmptyDirs(nameNodeDirs);
+  }
+
+  private static enum EditLogsFilter implements FilenameFilter {
+    INSTANCE;
+
+    @Override
+    public boolean accept(File dir, String name) {
+      return name.startsWith(NNStorage.NameNodeFile.EDITS.getName());
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     TestDFSUpgrade t = new TestDFSUpgrade();
     TestDFSUpgrade.initialize();