瀏覽代碼

Merge r1590941 from trunk, HDFS-2882. DN continues to start up, even if block pool fails to initialize (Contributed by Vinayakumar B)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.4@1590943 13f79535-47bb-0310-9956-ffa450edef68
Vinayakumar B 11 年之前
父節點
當前提交
63d7a7b327

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -55,6 +55,9 @@ Release 2.4.1 - UNRELEASED
     HDFS-6245. datanode fails to start with a bad disk even when failed
     volumes is set. (Arpit Agarwal)
 
+    HDFS-2882. DN continues to start up, even if block pool fails to initialize
+    (vinayakumarb)
+
 Release 2.4.0 - 2014-04-07 
 
   INCOMPATIBLE CHANGES

+ 18 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

@@ -145,7 +145,11 @@ class BPOfferService {
       return null;
     }
   }
-  
+
+  boolean hasBlockPoolId() {
+    return getNamespaceInfo() != null;
+  }
+
   synchronized NamespaceInfo getNamespaceInfo() {
     return bpNSInfo;
   }
@@ -680,4 +684,17 @@ class BPOfferService {
     return true;
   }
 
+  /*
+   * Let the actor retry for initialization until all namenodes of cluster have
+   * failed.
+   */
+  boolean shouldRetryInit() {
+    if (hasBlockPoolId()) {
+      // One of the namenode registered successfully. lets continue retry for
+      // other.
+      return true;
+    }
+    return isAlive();
+  }
+
 }

+ 40 - 23
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -90,8 +90,13 @@ class BPServiceActor implements Runnable {
   Thread bpThread;
   DatanodeProtocolClientSideTranslatorPB bpNamenode;
   private volatile long lastHeartbeat = 0;
-  private volatile boolean initialized = false;
-  
+
+  static enum RunningState {
+    CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED;
+  }
+
+  private volatile RunningState runningState = RunningState.CONNECTING;
+
   /**
    * Between block reports (which happen on the order of once an hour) the
    * DN reports smaller incremental changes to its block list. This map,
@@ -118,17 +123,12 @@ class BPServiceActor implements Runnable {
     this.dnConf = dn.getDnConf();
   }
 
-  /**
-   * returns true if BP thread has completed initialization of storage
-   * and has registered with the corresponding namenode
-   * @return true if initialized
-   */
-  boolean isInitialized() {
-    return initialized;
-  }
-  
   boolean isAlive() {
-    return shouldServiceRun && bpThread.isAlive();
+    if (!shouldServiceRun || !bpThread.isAlive()) {
+      return false;
+    }
+    return runningState == BPServiceActor.RunningState.RUNNING
+        || runningState == BPServiceActor.RunningState.CONNECTING;
   }
 
   @Override
@@ -809,19 +809,30 @@ class BPServiceActor implements Runnable {
     LOG.info(this + " starting to offer service");
 
     try {
-      // init stuff
-      try {
-        // setup storage
-        connectToNNAndHandshake();
-      } catch (IOException ioe) {
-        // Initial handshake, storage recovery or registration failed
-        // End BPOfferService thread
-        LOG.fatal("Initialization failed for block pool " + this, ioe);
-        return;
+      while (true) {
+        // init stuff
+        try {
+          // setup storage
+          connectToNNAndHandshake();
+          break;
+        } catch (IOException ioe) {
+          // Initial handshake, storage recovery or registration failed
+          runningState = RunningState.INIT_FAILED;
+          if (shouldRetryInit()) {
+            // Retry until all namenode's of BPOS failed initialization
+            LOG.error("Initialization failed for " + this + " "
+                + ioe.getLocalizedMessage());
+            sleepAndLogInterrupts(5000, "initializing");
+          } else {
+            runningState = RunningState.FAILED;
+            LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
+            return;
+          }
+        }
       }
 
-      initialized = true; // bp is initialized;
-      
+      runningState = RunningState.RUNNING;
+
       while (shouldRun()) {
         try {
           offerService();
@@ -830,14 +841,20 @@ class BPServiceActor implements Runnable {
           sleepAndLogInterrupts(5000, "offering service");
         }
       }
+      runningState = RunningState.EXITED;
     } catch (Throwable ex) {
       LOG.warn("Unexpected exception in block pool " + this, ex);
+      runningState = RunningState.FAILED;
     } finally {
       LOG.warn("Ending block pool service for: " + this);
       cleanUp();
     }
   }
 
+  private boolean shouldRetryInit() {
+    return shouldRun() && bpos.shouldRetryInit();
+  }
+
   private boolean shouldRun() {
     return shouldServiceRun && dn.shouldRun();
   }

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java

@@ -88,7 +88,11 @@ class BlockPoolManager {
   
   synchronized void remove(BPOfferService t) {
     offerServices.remove(t);
-    bpByBlockPoolId.remove(t.getBlockPoolId());
+    if (t.hasBlockPoolId()) {
+      // It's possible that the block pool never successfully registered
+      // with any NN, so it was never added it to this map
+      bpByBlockPoolId.remove(t.getBlockPoolId());
+    }
     
     boolean removed = false;
     for (Iterator<BPOfferService> it = bpByNameserviceId.values().iterator();

+ 19 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -857,19 +857,24 @@ public class DataNode extends Configured
    */
   void shutdownBlockPool(BPOfferService bpos) {
     blockPoolManager.remove(bpos);
+    if (bpos.hasBlockPoolId()) {
+      // Possible that this is shutting down before successfully
+      // registering anywhere. If that's the case, we wouldn't have
+      // a block pool id
+      String bpId = bpos.getBlockPoolId();
+      if (blockScanner != null) {
+        blockScanner.removeBlockPool(bpId);
+      }
 
-    String bpId = bpos.getBlockPoolId();
-    if (blockScanner != null) {
-      blockScanner.removeBlockPool(bpId);
-    }
-  
-    if (data != null) { 
-      data.shutdownBlockPool(bpId);
-    }
+      if (data != null) {
+        data.shutdownBlockPool(bpId);
+      }
 
-    if (storage != null) {
-      storage.removeBlockPoolStorage(bpId);
+      if (storage != null) {
+        storage.removeBlockPoolStorage(bpId);
+      }
     }
+
   }
 
   /**
@@ -890,10 +895,10 @@ public class DataNode extends Configured
           + " should have retrieved namespace info before initBlockPool.");
     }
     
+    setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
+
     // Register the new block pool with the BP manager.
     blockPoolManager.addBlockPool(bpos);
-
-    setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
     
     // In the case that this is the first block pool to connect, initialize
     // the dataset, block scanners, etc.
@@ -1078,6 +1083,7 @@ public class DataNode extends Configured
       Token<BlockTokenIdentifier> token) throws IOException {
     checkBlockLocalPathAccess();
     checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ);
+    Preconditions.checkNotNull(data, "Storage not yet initialized");
     BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
     if (LOG.isDebugEnabled()) {
       if (info != null) {
@@ -2437,6 +2443,7 @@ public class DataNode extends Configured
    */
   @Override // DataNodeMXBean
   public String getVolumeInfo() {
+    Preconditions.checkNotNull(data, "Storage not yet initialized");
     return JSON.toString(data.getVolumeInfoMap());
   }
   

+ 9 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java

@@ -221,11 +221,16 @@ public class DataStorage extends Storage {
     // Each storage directory is treated individually.
     // During startup some of them can upgrade or rollback 
     // while others could be uptodate for the regular startup.
-    for(int idx = 0; idx < getNumStorageDirs(); idx++) {
-      doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
-      createStorageID(getStorageDir(idx));
+    try {
+      for (int idx = 0; idx < getNumStorageDirs(); idx++) {
+        doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
+        createStorageID(getStorageDir(idx));
+      }
+    } catch (IOException e) {
+      unlockAll();
+      throw e;
     }
-    
+
     // 3. Update all storages. Some of them might have just been formatted.
     this.writeAll();
     

+ 8 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -844,6 +844,13 @@ public class MiniDFSCluster {
         
         nnCounterForFormat++;
         if (formatThisOne) {
+          // Allow overriding clusterID for specific NNs to test
+          // misconfiguration.
+          if (nn.getClusterId() == null) {
+            StartupOption.FORMAT.setClusterId(clusterId);
+          } else {
+            StartupOption.FORMAT.setClusterId(nn.getClusterId());
+          }
           DFSTestUtil.formatNameNode(conf);
         }
         prevNNDirs = namespaceDirs;
@@ -905,7 +912,7 @@ public class MiniDFSCluster {
     }
   }
 
-  private void copyNameDirs(Collection<URI> srcDirs, Collection<URI> dstDirs,
+  public static void copyNameDirs(Collection<URI> srcDirs, Collection<URI> dstDirs,
       Configuration dstConf) throws IOException {
     URI srcDir = Lists.newArrayList(srcDirs).get(0);
     FileSystem dstFS = FileSystem.getLocal(dstConf).getRaw();

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java

@@ -196,6 +196,7 @@ public class MiniDFSNNTopology {
     private final String nnId;
     private int httpPort;
     private int ipcPort;
+    private String clusterId;
     
     public NNConf(String nnId) {
       this.nnId = nnId;
@@ -213,6 +214,10 @@ public class MiniDFSNNTopology {
       return httpPort;
     }
 
+    String getClusterId() {
+      return clusterId;
+    }
+
     public NNConf setHttpPort(int httpPort) {
       this.httpPort = httpPort;
       return this;
@@ -222,6 +227,11 @@ public class MiniDFSNNTopology {
       this.ipcPort = ipcPort;
       return this;
     }
+
+    public NNConf setClusterId(String clusterId) {
+      this.clusterId = clusterId;
+      return this;
+    }
   }
 
 }

+ 11 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java

@@ -329,7 +329,9 @@ public class TestBPOfferService {
     try {
       waitForInitialization(bpos);
       List<BPServiceActor> actors = bpos.getBPServiceActors();
-      assertEquals(1, actors.size());
+      // even if one of the actor initialization fails also other will be
+      // running until both failed.
+      assertEquals(2, actors.size());
       BPServiceActor actor = actors.get(0);
       waitForBlockReport(actor.getNameNodeProxy());
     } finally {
@@ -342,7 +344,14 @@ public class TestBPOfferService {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        return bpos.countNameNodes() == 1;
+        List<BPServiceActor> actors = bpos.getBPServiceActors();
+        int failedcount = 0;
+        for (BPServiceActor actor : actors) {
+          if (!actor.isAlive()) {
+            failedcount++;
+          }
+        }
+        return failedcount == 1;
       }
     }, 100, 10000);
   }

+ 82 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java

@@ -19,8 +19,10 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -30,11 +32,14 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Assert;
@@ -189,7 +194,7 @@ public class TestDataNodeMultipleRegistrations {
   }
   
   @Test
-  public void testClusterIdMismatch() throws IOException {
+  public void testClusterIdMismatch() throws Exception {
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
         .build();
@@ -203,6 +208,7 @@ public class TestDataNodeMultipleRegistrations {
       
       // add another namenode
       cluster.addNameNode(conf, 9938);
+      Thread.sleep(500);// lets wait for the registration to happen
       bposs = dn.getAllBpOs(); 
       LOG.info("dn bpos len (should be 3):" + bposs.length);
       Assert.assertEquals("should've registered with three namenodes", bposs.length,3);
@@ -212,15 +218,89 @@ public class TestDataNodeMultipleRegistrations {
       cluster.addNameNode(conf, 9948);
       NameNode nn4 = cluster.getNameNode(3);
       assertNotNull("cannot create nn4", nn4);
-      
+
+      Thread.sleep(500);// lets wait for the registration to happen
       bposs = dn.getAllBpOs(); 
       LOG.info("dn bpos len (still should be 3):" + bposs.length);
       Assert.assertEquals("should've registered with three namenodes", 3, bposs.length);
+    } finally {
+        cluster.shutdown();
+    }
+  }
+
+  @Test(timeout = 20000)
+  public void testClusterIdMismatchAtStartupWithHA() throws Exception {
+    MiniDFSNNTopology top = new MiniDFSNNTopology()
+      .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
+        .addNN(new MiniDFSNNTopology.NNConf("nn0"))
+        .addNN(new MiniDFSNNTopology.NNConf("nn1")))
+      .addNameservice(new MiniDFSNNTopology.NSConf("ns2")
+        .addNN(new MiniDFSNNTopology.NNConf("nn2").setClusterId("bad-cid"))
+        .addNN(new MiniDFSNNTopology.NNConf("nn3").setClusterId("bad-cid")));
+
+    top.setFederation(true);
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(top)
+        .numDataNodes(0).build();
+    
+    try {
+      cluster.startDataNodes(conf, 1, true, null, null);
+      // let the initialization be complete
+      Thread.sleep(10000);
+      DataNode dn = cluster.getDataNodes().get(0);
+      assertTrue("Datanode should be running", dn.isDatanodeUp());
+      assertEquals("Only one BPOfferService should be running", 1,
+          dn.getAllBpOs().length);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testDNWithInvalidStorageWithHA() throws Exception {
+    MiniDFSNNTopology top = new MiniDFSNNTopology()
+      .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
+        .addNN(new MiniDFSNNTopology.NNConf("nn0").setClusterId("cluster-1"))
+        .addNN(new MiniDFSNNTopology.NNConf("nn1").setClusterId("cluster-1")));
+
+    top.setFederation(true);
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(top)
+        .numDataNodes(0).build();
+    try {
+      cluster.startDataNodes(conf, 1, true, null, null);
+      // let the initialization be complete
+      Thread.sleep(10000);
+      DataNode dn = cluster.getDataNodes().get(0);
+      assertTrue("Datanode should be running", dn.isDatanodeUp());
+      assertEquals("BPOfferService should be running", 1,
+          dn.getAllBpOs().length);
+      DataNodeProperties dnProp = cluster.stopDataNode(0);
+
+      cluster.getNameNode(0).stop();
+      cluster.getNameNode(1).stop();
+      Configuration nn1 = cluster.getConfiguration(0);
+      Configuration nn2 = cluster.getConfiguration(1);
+      // setting up invalid cluster
+      StartupOption.FORMAT.setClusterId("cluster-2");
+      DFSTestUtil.formatNameNode(nn1);
+      MiniDFSCluster.copyNameDirs(FSNamesystem.getNamespaceDirs(nn1),
+          FSNamesystem.getNamespaceDirs(nn2), nn2);
+      cluster.restartNameNode(0, false);
+      cluster.restartNameNode(1, false);
+      cluster.restartDataNode(dnProp);
+      
+      // let the initialization be complete
+      Thread.sleep(10000);
+      dn = cluster.getDataNodes().get(0);
+      assertFalse("Datanode should have shutdown as only service failed",
+          dn.isDatanodeUp());
     } finally {
       cluster.shutdown();
     }
   }
 
+  
   @Test
   public void testMiniDFSClusterWithMultipleNN() throws IOException {
     Configuration conf = new HdfsConfiguration();
@@ -231,7 +311,6 @@ public class TestDataNodeMultipleRegistrations {
     
     // add a node
     try {
-      Assert.assertNotNull(cluster);
       cluster.waitActive();
       Assert.assertEquals("(1)Should be 2 namenodes", 2, cluster.getNumNameNodes());