فهرست منبع

HDFS-2010. Fix NameNode to exit if all edit streams become inaccessible. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1073@1141753 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 14 سال پیش
والد
کامیت
f1ed926d38

+ 2 - 0
hdfs/CHANGES.HDFS-1073.txt

@@ -64,3 +64,5 @@ HDFS-2093. Handle case where an entirely empty log is left during NN crash
            (todd)
 HDFS-2102. Zero-pad edits filename to make them lexically sortable. (Ivan
            Kelly via todd)
+HDFS-2010. Fix NameNode to exit if all edit streams become inaccessible. (atm
+           via todd)

+ 23 - 7
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -17,10 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.zip.Checksum;
 
@@ -104,6 +101,10 @@ public class FSEditLog  {
 
   // is an automatic sync scheduled?
   private volatile boolean isAutoSyncScheduled = false;
+  
+  // Used to exit in the event of a failure to sync to all journals. It's a
+  // member variable so it can be swapped out for testing.
+  private Runtime runtime = Runtime.getRuntime();
 
   // these are statistics counters.
   private long numTransactions;        // number of transactions
@@ -394,7 +395,7 @@ public class FSEditLog  {
     // Fetch the transactionId of this thread. 
     long mytxid = myTransactionId.get().txid;
     
-    List<JournalAndStream> stillGoodJournals =
+    List<JournalAndStream> candidateJournals =
       Lists.newArrayListWithCapacity(journals.size());
     List<JournalAndStream> badJournals = Lists.newArrayList();
     
@@ -434,7 +435,7 @@ public class FSEditLog  {
           if (!jas.isActive()) continue;
           try {
             jas.getCurrentStream().setReadyToFlush();
-            stillGoodJournals.add(jas);
+            candidateJournals.add(jas);
           } catch (IOException ie) {
             LOG.error("Unable to get ready to flush.", ie);
             badJournals.add(jas);
@@ -448,7 +449,7 @@ public class FSEditLog  {
   
       // do the sync
       long start = now();
-      for (JournalAndStream jas : stillGoodJournals) {
+      for (JournalAndStream jas : candidateJournals) {
         if (!jas.isActive()) continue;
         try {
           jas.getCurrentStream().flush();
@@ -463,8 +464,15 @@ public class FSEditLog  {
       long elapsed = now() - start;
       disableAndReportErrorOnJournals(badJournals);
   
-      if (metrics != null) // Metrics non-null only when used inside name node
+      if (metrics != null) { // Metrics non-null only when used inside name node
         metrics.addSync(elapsed);
+      }
+      
+      if (badJournals.size() >= journals.size()) {
+        LOG.fatal("Could not sync any journal to persistent storage. " +
+            "Unsynced transactions: " + (txid - synctxid));
+        runtime.exit(1);
+      }
     } finally {
       // Prevent RuntimeException from blocking other log edit sync 
       synchronized (this) {
@@ -755,6 +763,14 @@ public class FSEditLog  {
     return journals;
   }
   
+  /**
+   * Used only by unit tests.
+   */
+  @VisibleForTesting
+  void setRuntimeForTesting(Runtime runtime) {
+    this.runtime = runtime;
+  }
+  
   /**
    * Return a manifest of what finalized edit logs are available
    */

+ 181 - 0
hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java

@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestEditLogJournalFailures {
+
+  private int editsPerformed = 0;
+  private Configuration conf;
+  private MiniDFSCluster cluster;
+  private FileSystem fs;
+  private Runtime runtime;
+
+  /**
+   * Create the mini cluster for testing and sub in a custom runtime so that
+   * edit log journal failures don't actually cause the JVM to exit.
+   */
+  @Before
+  public void setUpMiniCluster() throws IOException {
+    conf = new HdfsConfiguration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    
+    runtime = Runtime.getRuntime();
+    runtime = spy(runtime);
+    doNothing().when(runtime).exit(anyInt());
+    
+    cluster.getNameNode().getFSImage().getEditLog().setRuntimeForTesting(runtime);
+  }
+   
+  @After
+  public void shutDownMiniCluster() throws IOException {
+    fs.close();
+    cluster.shutdown();
+  }
+   
+  @Test
+  public void testSingleFailedEditsDirOnFlush() throws IOException {
+    assertTrue(doAnEdit());
+    // Invalidate one edits journal.
+    invalidateEditsDirAtIndex(0, true);
+    // Make sure runtime.exit(...) hasn't been called at all yet.
+    assertExitInvocations(0);
+    assertTrue(doAnEdit());
+    // A single journal failure should not result in a call to runtime.exit(...).
+    assertExitInvocations(0);
+    assertFalse(cluster.getNameNode().isInSafeMode());
+  }
+   
+  @Test
+  public void testAllEditsDirsFailOnFlush() throws IOException {
+    assertTrue(doAnEdit());
+    // Invalidate both edits journals.
+    invalidateEditsDirAtIndex(0, true);
+    EditLogOutputStream elos = invalidateEditsDirAtIndex(1, true);
+    // Make sure runtime.exit(...) hasn't been called at all yet.
+    assertExitInvocations(0);
+    assertTrue(doAnEdit());
+    // The previous edit could not be synced to any persistent storage, should
+    // have halted the NN.
+    assertExitInvocations(1);
+    // Restore an edits journal to working order.
+    restoreEditsDirAtIndex(1, elos);
+    assertTrue(doAnEdit());
+    // Make sure we didn't make another call to runtime.exit(...).
+    assertExitInvocations(1);
+  }
+  
+  @Test
+  public void testSingleFailedEditsDirOnSetReadyToFlush() throws IOException {
+    assertTrue(doAnEdit());
+    // Invalidate one edits journal.
+    invalidateEditsDirAtIndex(0, false);
+    // Make sure runtime.exit(...) hasn't been called at all yet.
+    assertExitInvocations(0);
+    assertTrue(doAnEdit());
+    // A single journal failure should not result in a call to runtime.exit(...).
+    assertExitInvocations(0);
+    assertFalse(cluster.getNameNode().isInSafeMode());
+  }
+
+  /**
+   * Replace the journal at index <code>index</code> with one that throws an
+   * exception on flush.
+   * 
+   * @param index the index of the journal to take offline.
+   * @return the original <code>EditLogOutputStream</code> of the journal.
+   */
+  private EditLogOutputStream invalidateEditsDirAtIndex(int index,
+      boolean failOnFlush) throws IOException {
+    FSImage fsimage = cluster.getNamesystem().getFSImage();
+    FSEditLog editLog = fsimage.getEditLog();
+    
+
+    FSEditLog.JournalAndStream jas = editLog.getJournals().get(index);
+    EditLogFileOutputStream elos =
+      (EditLogFileOutputStream) jas.getCurrentStream();
+    EditLogFileOutputStream spyElos = spy(elos);
+    
+    if (failOnFlush) {
+      doThrow(new IOException("fail on flush()")).when(spyElos).flush();
+    } else {
+      doThrow(new IOException("fail on setReadyToFlush()")).when(spyElos)
+        .setReadyToFlush();
+    }
+    doNothing().when(spyElos).abort();
+     
+    jas.setCurrentStreamForTests(spyElos);
+     
+    return elos;
+  }
+
+  /**
+   * Restore the journal at index <code>index</code> with the passed
+   * {@link EditLogOutputStream}.
+   * 
+   * @param index index of the journal to restore.
+   * @param elos the {@link EditLogOutputStream} to put at that index.
+   */
+  private void restoreEditsDirAtIndex(int index, EditLogOutputStream elos) {
+    FSImage fsimage = cluster.getNamesystem().getFSImage();
+    FSEditLog editLog = fsimage.getEditLog();
+
+    FSEditLog.JournalAndStream jas = editLog.getJournals().get(index);
+    jas.setCurrentStreamForTests(elos);
+  }
+
+  /**
+   * Do a mutative metadata operation on the file system.
+   * 
+   * @return true if the operation was successful, false otherwise.
+   */
+  private boolean doAnEdit() throws IOException {
+    return fs.mkdirs(new Path("/tmp", Integer.toString(editsPerformed++)));
+  }
+
+  /**
+   * Make sure that Runtime.exit(...) has been called
+   * <code>expectedExits<code> number of times.
+   * 
+   * @param expectedExits the number of times Runtime.exit(...) should have been called.
+   */
+  private void assertExitInvocations(int expectedExits) {
+    verify(runtime, times(expectedExits)).exit(anyInt());
+  }
+}