Browse Source

HDFS-12248. SNN will not upload fsimage on IOE and Interrupted exceptions. (Brahma Reddy Battula)

Brahma Reddy Battula 7 years ago
parent
commit
bb6a3c8330

+ 13 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointFaultInjector.java

@@ -24,13 +24,17 @@ import java.io.IOException;
  * Utility class to faciliate some fault injection tests for the checkpointing
  * process.
  */
-class CheckpointFaultInjector {
-  static CheckpointFaultInjector instance = new CheckpointFaultInjector();
-  
-  static CheckpointFaultInjector getInstance() {
+public class CheckpointFaultInjector {
+  public static CheckpointFaultInjector instance =
+      new CheckpointFaultInjector();
+
+  public static CheckpointFaultInjector getInstance() {
     return instance;
   }
-  
+
+  public static void set(CheckpointFaultInjector instance) {
+    CheckpointFaultInjector.instance = instance;
+  }
   public void beforeGetImageSetsHeaders() throws IOException {}
   public void afterSecondaryCallsRollEditLog() throws IOException {}
   public void duringMerge() throws IOException {}
@@ -46,4 +50,8 @@ class CheckpointFaultInjector {
   
   public void afterMD5Rename() throws IOException {}
   public void beforeEditsRename() throws IOException {}
+
+  public void duringUploadInProgess() throws InterruptedException, IOException {
+  }
+
 }

+ 10 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.ha.ServiceFailedException;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointConf;
+import org.apache.hadoop.hdfs.server.namenode.CheckpointFaultInjector;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
@@ -228,7 +229,9 @@ public class StandbyCheckpointer {
       Future<TransferFsImage.TransferResult> upload =
           executor.submit(new Callable<TransferFsImage.TransferResult>() {
             @Override
-            public TransferFsImage.TransferResult call() throws IOException {
+            public TransferFsImage.TransferResult call()
+                throws IOException, InterruptedException {
+              CheckpointFaultInjector.getInstance().duringUploadInProgess();
               return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem
                   .getFSImage().getStorage(), imageType, txid, canceler);
             }
@@ -258,11 +261,12 @@ public class StandbyCheckpointer {
         break;
       }
     }
-    lastUploadTime = monotonicNow();
-
-    // we are primary if we successfully updated the ANN
-    this.isPrimaryCheckPointer = success;
-
+    if (ie == null && ioe == null) {
+      //Update only when response from remote about success or
+      lastUploadTime = monotonicNow();
+      // we are primary if we successfully updated the ANN
+      this.isPrimaryCheckPointer = success;
+    }
     // cleaner than copying code for multiple catch statements and better than catching all
     // exceptions, so we just handle the ones we expect.
     if (ie != null || ioe != null) {

+ 48 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 
 import javax.management.AttributeNotFoundException;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
 import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.CheckpointFaultInjector;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
@@ -568,6 +570,52 @@ public class TestRollingUpgrade {
     testCheckpoint(3);
   }
 
+  @Test(timeout = 60000)
+  public void testRollBackImage() throws Exception {
+    final Configuration conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 10);
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 2);
+    MiniQJMHACluster cluster = null;
+    CheckpointFaultInjector old = CheckpointFaultInjector.getInstance();
+    try {
+      cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2).build();
+      MiniDFSCluster dfsCluster = cluster.getDfsCluster();
+      dfsCluster.waitActive();
+      dfsCluster.transitionToActive(0);
+      DistributedFileSystem dfs = dfsCluster.getFileSystem(0);
+      for (int i = 0; i <= 10; i++) {
+        Path foo = new Path("/foo" + i);
+        dfs.mkdirs(foo);
+      }
+      cluster.getDfsCluster().getNameNodeRpc(0).rollEdits();
+      CountDownLatch ruEdit = new CountDownLatch(1);
+      CheckpointFaultInjector.set(new CheckpointFaultInjector() {
+        @Override
+        public void duringUploadInProgess()
+            throws IOException, InterruptedException {
+          if (ruEdit.getCount() == 1) {
+            ruEdit.countDown();
+            Thread.sleep(180000);
+          }
+        }
+      });
+      ruEdit.await();
+      RollingUpgradeInfo info = dfs
+          .rollingUpgrade(RollingUpgradeAction.PREPARE);
+      Assert.assertTrue(info.isStarted());
+      FSImage fsimage = dfsCluster.getNamesystem(0).getFSImage();
+      queryForPreparation(dfs);
+      // The NN should have a copy of the fsimage in case of rollbacks.
+      Assert.assertTrue(fsimage.hasRollbackFSImage());
+    } finally {
+      CheckpointFaultInjector.set(old);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
   public void testCheckpoint(int nnCount) throws IOException, InterruptedException {
     final Configuration conf = new Configuration();
     conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);