فهرست منبع

HDFS-7373. Allow decommissioning of dead DataNodes. Contributed by Zhe Zhang.

Andrew Wang 10 سال پیش
والد
کامیت
5bd048e837

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -456,6 +456,8 @@ Release 2.7.0 - UNRELEASED
     HDFS-7225. Remove stale block invalidation work when DN re-registers with
     different UUID. (Zhe Zhang and Andrew Wang)
 
+    HDFS-7374. Allow decommissioning of dead DataNodes. (Zhe Zhang)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 16 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -845,16 +845,21 @@ public class DatanodeManager {
   @InterfaceAudience.Private
   @VisibleForTesting
   public void startDecommission(DatanodeDescriptor node) {
-    if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
-      for (DatanodeStorageInfo storage : node.getStorageInfos()) {
-        LOG.info("Start Decommissioning " + node + " " + storage
-            + " with " + storage.numBlocks() + " blocks");
+    if (!node.isDecommissionInProgress()) {
+      if (!node.isAlive) {
+        LOG.info("Dead node " + node + " is decommissioned immediately.");
+        node.setDecommissioned();
+      } else if (!node.isDecommissioned()) {
+        for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+          LOG.info("Start Decommissioning " + node + " " + storage
+              + " with " + storage.numBlocks() + " blocks");
+        }
+        heartbeatManager.startDecommission(node);
+        node.decommissioningStatus.setStartTime(now());
+
+        // all the blocks that reside on this node have to be replicated.
+        checkDecommissionState(node);
       }
-      heartbeatManager.startDecommission(node);
-      node.decommissioningStatus.setStartTime(now());
-      
-      // all the blocks that reside on this node have to be replicated.
-      checkDecommissionState(node);
     }
   }
 
@@ -1009,14 +1014,13 @@ public class DatanodeManager {
   
         // register new datanode
         addDatanode(nodeDescr);
-        checkDecommissioning(nodeDescr);
-        
         // also treat the registration message as a heartbeat
         // no need to update its timestamp
         // because its is done when the descriptor is created
         heartbeatManager.addDatanode(nodeDescr);
-        success = true;
         incrementVersionCount(nodeReg.getSoftwareVersion());
+        checkDecommissioning(nodeDescr);
+        success = true;
       } finally {
         if (!success) {
           removeDatanode(nodeDescr);

+ 19 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -25,7 +25,6 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
 
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -1612,4 +1611,23 @@ public class DFSTestUtil {
     LayoutVersion.updateMap(DataNodeLayoutVersion.FEATURES,
                             new LayoutVersion.LayoutFeature[] { feature });
   }
+
+  /**
+   * Wait for datanode to reach alive or dead state for waitTime given in
+   * milliseconds.
+   */
+  public static void waitForDatanodeState(
+      final MiniDFSCluster cluster, final String nodeID,
+      final boolean alive, int waitTime)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        FSNamesystem namesystem = cluster.getNamesystem();
+        final DatanodeDescriptor dd = BlockManagerTestUtil.getDatanode(
+            namesystem, nodeID);
+        return (dd.isAlive == alive);
+      }
+    }, 100, waitTime);
+  }
 }

+ 3 - 29
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java

@@ -21,17 +21,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -43,7 +41,6 @@ import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
-import org.apache.hadoop.util.Time;
 import org.junit.After;
 import org.junit.Test;
 
@@ -60,29 +57,6 @@ public class TestDeadDatanode {
     cluster.shutdown();
   }
 
-  /**
-   * wait for datanode to reach alive or dead state for waitTime given in
-   * milliseconds.
-   */
-  private void waitForDatanodeState(String nodeID, boolean alive, int waitTime)
-      throws TimeoutException, InterruptedException {
-    long stopTime = Time.now() + waitTime;
-    FSNamesystem namesystem = cluster.getNamesystem();
-    String state = alive ? "alive" : "dead";
-    while (Time.now() < stopTime) {
-      final DatanodeDescriptor dd = BlockManagerTestUtil.getDatanode(
-          namesystem, nodeID);
-      if (dd.isAlive == alive) {
-        LOG.info("datanode " + nodeID + " is " + state);
-        return;
-      }
-      LOG.info("Waiting for datanode " + nodeID + " to become " + state);
-      Thread.sleep(1000);
-    }
-    throw new TimeoutException("Timedout waiting for datanode reach state "
-        + state);
-  }
-
   /**
    * Test to ensure namenode rejects request from dead datanode
    * - Start a cluster
@@ -104,11 +78,11 @@ public class TestDeadDatanode {
     DatanodeRegistration reg = 
       DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
       
-    waitForDatanodeState(reg.getDatanodeUuid(), true, 20000);
+    DFSTestUtil.waitForDatanodeState(cluster, reg.getDatanodeUuid(), true, 20000);
 
     // Shutdown and wait for datanode to be marked dead
     dn.shutdown();
-    waitForDatanodeState(reg.getDatanodeUuid(), false, 20000);
+    DFSTestUtil.waitForDatanodeState(cluster, reg.getDatanodeUuid(), false, 20000);
 
     DatanodeProtocol dnp = cluster.getNameNodeRpc();
     

+ 32 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java

@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.hadoop.conf.Configuration;
@@ -360,4 +361,35 @@ public class TestDecommissioningStatus {
     dm.refreshNodes(conf);
     cleanupFile(fileSys, f);
   }
+
+  /**
+   * Verify the support for decommissioning a datanode that is already dead.
+   * Under this scenario the datanode should immediately be marked as
+   * DECOMMISSIONED
+   */
+  @Test(timeout=120000)
+  public void testDecommissionDeadDN()
+      throws IOException, InterruptedException, TimeoutException {
+    DatanodeID dnID = cluster.getDataNodes().get(0).getDatanodeId();
+    String dnName = dnID.getXferAddr();
+    DataNodeProperties stoppedDN = cluster.stopDataNode(0);
+    DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
+        false, 30000);
+    FSNamesystem fsn = cluster.getNamesystem();
+    final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
+    DatanodeDescriptor dnDescriptor = dm.getDatanode(dnID);
+    decommissionNode(fsn, localFileSys, dnName);
+    dm.refreshNodes(conf);
+    BlockManagerTestUtil.checkDecommissionState(dm, dnDescriptor);
+    assertTrue(dnDescriptor.isDecommissioned());
+
+    // Add the node back
+    cluster.restartDataNode(stoppedDN, true);
+    cluster.waitActive();
+
+    // Call refreshNodes on FSNamesystem with empty exclude file to remove the
+    // datanode from decommissioning list and make it available again.
+    writeConfigFile(localFileSys, excludeFile, null);
+    dm.refreshNodes(conf);
+  }
 }