浏览代码

HDFS-192. Fix TestBackupNode failures. Contributed by Konstantin Shvachko.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@888145 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko 15 年之前
父节点
当前提交
03cbc87e0c

+ 2 - 0
CHANGES.txt

@@ -550,6 +550,8 @@ Release 0.21.0 - Unreleased
     HDFS-781. Namenode metrics PendingDeletionBlocks is not decremented.
     (Suresh)
 
+    HDFS-192. Fix TestBackupNode failures. (shv)
+
 Release 0.20.2 - Unreleased
 
   IMPROVEMENTS

+ 22 - 10
src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java

@@ -37,7 +37,6 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.Daemon;
 
 /**
  * BackupNode.
@@ -66,8 +65,6 @@ public class BackupNode extends NameNode {
   String nnHttpAddress;
   /** Checkpoint manager */
   Checkpointer checkpointManager;
-  /** Checkpoint daemon */
-  private Daemon cpDaemon;
 
   BackupNode(Configuration conf, NamenodeRole role) throws IOException {
     super(conf, role);
@@ -133,9 +130,17 @@ public class BackupNode extends NameNode {
 
   @Override // NameNode
   public void stop() {
-    if(checkpointManager != null) checkpointManager.shouldRun = false;
-    if(cpDaemon != null) cpDaemon.interrupt();
+    if(checkpointManager != null) {
+      // Prevent from starting a new checkpoint.
+      // Checkpoints that has already been started may proceed until 
+      // the error reporting to the name-node is complete.
+      // Checkpoint manager should not be interrupted yet because it will
+      // close storage file channels and the checkpoint may fail with 
+      // ClosedByInterruptException.
+      checkpointManager.shouldRun = false;
+    }
     if(namenode != null && getRegistration() != null) {
+      // Exclude this node from the list of backup streams on the name-node
       try {
         namenode.errorReport(getRegistration(), NamenodeProtocol.FATAL,
             "Shutting down.");
@@ -143,7 +148,15 @@ public class BackupNode extends NameNode {
         LOG.error("Failed to report to name-node.", e);
       }
     }
-    RPC.stopProxy(namenode); // stop the RPC threads
+    // Stop the RPC client
+    RPC.stopProxy(namenode);
+    namenode = null;
+    // Stop the checkpoint manager
+    if(checkpointManager != null) {
+      checkpointManager.interrupt();
+      checkpointManager = null;
+    }
+    // Stop name-node threads
     super.stop();
   }
 
@@ -224,7 +237,7 @@ public class BackupNode extends NameNode {
     this.nnHttpAddress = getHostPortString(super.getHttpServerAddress(conf));
     // get version and id info from the name-node
     NamespaceInfo nsInfo = null;
-    while(!stopRequested) {
+    while(!isStopRequested()) {
       try {
         nsInfo = handshake(namenode);
         break;
@@ -243,8 +256,7 @@ public class BackupNode extends NameNode {
    */
   private void runCheckpointDaemon(Configuration conf) throws IOException {
     checkpointManager = new Checkpointer(conf, this);
-    cpDaemon = new Daemon(checkpointManager);
-    cpDaemon.start();
+    checkpointManager.start();
   }
 
   /**
@@ -281,7 +293,7 @@ public class BackupNode extends NameNode {
 
     setRegistration();
     NamenodeRegistration nnReg = null;
-    while(!stopRequested) {
+    while(!isStopRequested()) {
       try {
         nnReg = namenode.register(getRegistration());
         break;

+ 4 - 2
src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.util.Daemon;
 
 /**
  * The Checkpointer is responsible for supporting periodic checkpoints 
@@ -49,7 +50,7 @@ import org.apache.hadoop.http.HttpServer;
  * The start of a checkpoint is triggered by one of the two factors:
  * (1) time or (2) the size of the edits file.
  */
-class Checkpointer implements Runnable {
+class Checkpointer extends Daemon {
   public static final Log LOG = 
     LogFactory.getLog(Checkpointer.class.getName());
 
@@ -144,7 +145,8 @@ class Checkpointer implements Runnable {
         LOG.error("Exception in doCheckpoint: ", e);
       } catch(Throwable e) {
         LOG.error("Throwable Exception in doCheckpoint: ", e);
-        Runtime.getRuntime().exit(-1);
+        shutdown();
+        break;
       }
       try {
         Thread.sleep(periodMSec);

+ 0 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java

@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;

+ 11 - 10
src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -24,7 +24,6 @@ import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Iterator;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -216,7 +215,7 @@ public class FSEditLog {
   /**
    * Shutdown the file store.
    */
-  public synchronized void close() {
+  synchronized void close() {
     while (isSyncRunning) {
       try {
         wait(1000);
@@ -275,12 +274,6 @@ public class FSEditLog {
 
     String lsd = fsimage.listStorageDirectories();
     FSNamesystem.LOG.info("current list of storage dirs:" + lsd);
-    //EditLogOutputStream
-    if (editStreams == null || editStreams.size() <= 1) {
-      FSNamesystem.LOG.fatal(
-      "Fatal Error : All storage directories are inaccessible."); 
-      Runtime.getRuntime().exit(-1);
-    }
 
     ArrayList<StorageDirectory> al = null;
     for (EditLogOutputStream eStream : errorStreams) {
@@ -311,6 +304,12 @@ public class FSEditLog {
       } 
     }
     
+    if (editStreams == null || editStreams.size() <= 0) {
+      String msg = "Fatal Error: All storage directories are inaccessible.";
+      FSNamesystem.LOG.fatal(msg, new IOException(msg)); 
+      Runtime.getRuntime().exit(-1);
+    }
+
     // removed failed SDs
     if(propagate && al != null) fsimage.processIOError(al, false);
     
@@ -867,6 +866,7 @@ public class FSEditLog {
         try {
           eStream.flush();
         } catch (IOException ie) {
+          FSNamesystem.LOG.error("Unable to sync edit log.", ie);
           //
           // remember the streams that encountered an error.
           //
@@ -874,8 +874,6 @@ public class FSEditLog {
             errorStreams = new ArrayList<EditLogOutputStream>(1);
           }
           errorStreams.add(eStream);
-          FSNamesystem.LOG.error("Unable to sync edit log. " +
-                                 "Fatal Error.");
         }
       }
       long elapsed = FSNamesystem.now() - start;
@@ -1165,6 +1163,7 @@ public class FSEditLog {
         // replace by the new stream
         itE.replace(eStream);
       } catch (IOException e) {
+        FSNamesystem.LOG.warn("Error in editStream " + eStream.getName(), e);
         if(errorStreams == null)
           errorStreams = new ArrayList<EditLogOutputStream>(1);
         errorStreams.add(eStream);
@@ -1225,6 +1224,7 @@ public class FSEditLog {
         // replace by the new stream
         itE.replace(eStream);
       } catch (IOException e) {
+        FSNamesystem.LOG.warn("Error in editStream " + eStream.getName(), e);
         if(errorStreams == null)
           errorStreams = new ArrayList<EditLogOutputStream>(1);
         errorStreams.add(eStream);
@@ -1390,6 +1390,7 @@ public class FSEditLog {
       try {
         eStream.write(data, 0, length);
       } catch (IOException ie) {
+        FSNamesystem.LOG.warn("Error in editStream " + eStream.getName(), ie);
         if(errorStreams == null)
           errorStreams = new ArrayList<EditLogOutputStream>(1);
         errorStreams.add(eStream);

+ 2 - 3
src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -1708,7 +1708,7 @@ public class FSImage extends Storage {
     ckptState = CheckpointStates.UPLOAD_DONE;
   }
 
-  void close() throws IOException {
+  synchronized void close() throws IOException {
     getEditLog().close();
     unlockAll();
   }
@@ -1919,8 +1919,7 @@ public class FSImage extends Storage {
         checkSchemeConsistency(u);
         dirs.add(u);
       } catch (Exception e) {
-        LOG.error("Error while processing URI: " + name + 
-            ". The error message was: " + e.getMessage());
+        LOG.error("Error while processing URI: " + name, e);
       }
     }
     return dirs;

+ 4 - 6
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -3814,14 +3814,12 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     getFSImage().rollFSImage();
   }
 
-  NamenodeCommand startCheckpoint(NamenodeRegistration bnReg, // backup node
-                                  NamenodeRegistration nnReg) // active name-node
+  synchronized NamenodeCommand startCheckpoint(
+                                NamenodeRegistration bnReg, // backup node
+                                NamenodeRegistration nnReg) // active name-node
   throws IOException {
-    NamenodeCommand cmd;
-    synchronized(this) {
-      cmd = getFSImage().startCheckpoint(bnReg, nnReg);
-    }
     LOG.info("Start checkpoint for " + bnReg.getAddress());
+    NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);
     getEditLog().logSync();
     return cmd;
   }

+ 10 - 4
src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -431,9 +431,11 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
    * Stop all NameNode threads and wait for all to finish.
    */
   public void stop() {
-    if (stopRequested)
-      return;
-    stopRequested = true;
+    synchronized(this) {
+      if (stopRequested)
+        return;
+      stopRequested = true;
+    }
     if (plugins != null) {
       for (ServicePlugin p : plugins) {
         try {
@@ -458,7 +460,11 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
       namesystem.shutdown();
     }
   }
-  
+
+  synchronized boolean isStopRequested() {
+    return stopRequested;
+  }
+
   /////////////////////////////////////////////////////
   // NamenodeProtocol
   /////////////////////////////////////////////////////

+ 1 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java

@@ -589,6 +589,7 @@ public class SecondaryNameNode implements Runnable {
         sdEdits = it.next();
       if ((sdName == null) || (sdEdits == null))
         throw new IOException("Could not locate checkpoint directories");
+      this.layoutVersion = -1; // to avoid assert in loadFSImage()
       loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
       loadFSEdits(sdEdits);
       sig.validateStorageInfo(this);

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java

@@ -215,12 +215,12 @@ public class TestBackupNode extends TestCase {
     try {
       // start name-node and backup node 1
       cluster = new MiniDFSCluster(conf1, 0, true, null);
-      conf1.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY, "0.0.0.0:7770");
+      conf1.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY, "0.0.0.0:7771");
       conf1.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY, "0.0.0.0:7775");
       backup1 = startBackupNode(conf1, StartupOption.BACKUP, 1);
       // try to start backup node 2
       conf2 = new HdfsConfiguration(conf1);
-      conf2.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY, "0.0.0.0:7771");
+      conf2.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY, "0.0.0.0:7772");
       conf2.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY, "0.0.0.0:7776");
       try {
         backup2 = startBackupNode(conf2, StartupOption.BACKUP, 2);