浏览代码

HDFS-12703. Exceptions are fatal to decommissioning monitor. Contributed by He Xiaoqiao.

(cherry picked from commit 3d396786cf6eaab49c1c9b8b2a4652c2e440b9e3)
Inigo Goiri 5 年之前
父节点
当前提交
950aa74d5f

+ 81 - 64
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java

@@ -485,6 +485,7 @@ public class DatanodeAdminManager {
 
     @Override
     public void run() {
+      LOG.debug("DatanodeAdminMonitor is running.");
       if (!namesystem.isRunning()) {
         LOG.info("Namesystem is not running, skipping " +
             "decommissioning/maintenance checks.");
@@ -499,6 +500,9 @@ public class DatanodeAdminManager {
       try {
         processPendingNodes();
         check();
+      } catch (Exception e) {
+        LOG.warn("DatanodeAdminMonitor caught exception when processing node.",
+            e);
       } finally {
         namesystem.writeUnlock();
       }
@@ -532,83 +536,96 @@ public class DatanodeAdminManager {
         final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>
             entry = it.next();
         final DatanodeDescriptor dn = entry.getKey();
-        AbstractList<BlockInfo> blocks = entry.getValue();
-        boolean fullScan = false;
-        if (dn.isMaintenance() && dn.maintenanceExpired()) {
-          // If maintenance expires, stop tracking it.
-          stopMaintenance(dn);
-          toRemove.add(dn);
-          continue;
-        }
-        if (dn.isInMaintenance()) {
-          // The dn is IN_MAINTENANCE and the maintenance hasn't expired yet.
-          continue;
-        }
-        if (blocks == null) {
-          // This is a newly added datanode, run through its list to schedule
-          // under-replicated blocks for replication and collect the blocks
-          // that are insufficiently replicated for further tracking
-          LOG.debug("Newly-added node {}, doing full scan to find " +
-              "insufficiently-replicated blocks.", dn);
-          blocks = handleInsufficientlyStored(dn);
-          outOfServiceNodeBlocks.put(dn, blocks);
-          fullScan = true;
-        } else {
-          // This is a known datanode, check if its # of insufficiently
-          // replicated blocks has dropped to zero and if it can move
-          // to the next state.
-          LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
-          pruneReliableBlocks(dn, blocks);
-        }
-        if (blocks.size() == 0) {
-          if (!fullScan) {
-            // If we didn't just do a full scan, need to re-check with the
-            // full block map.
-            //
-            // We've replicated all the known insufficiently replicated
-            // blocks. Re-check with the full block map before finally
-            // marking the datanode as DECOMMISSIONED or IN_MAINTENANCE.
-            LOG.debug("Node {} has finished replicating current set of "
-                + "blocks, checking with the full block map.", dn);
+        try {
+          AbstractList<BlockInfo> blocks = entry.getValue();
+          boolean fullScan = false;
+          if (dn.isMaintenance() && dn.maintenanceExpired()) {
+            // If maintenance expires, stop tracking it.
+            stopMaintenance(dn);
+            toRemove.add(dn);
+            continue;
+          }
+          if (dn.isInMaintenance()) {
+            // The dn is IN_MAINTENANCE and the maintenance hasn't expired yet.
+            continue;
+          }
+          if (blocks == null) {
+            // This is a newly added datanode, run through its list to schedule
+            // under-replicated blocks for replication and collect the blocks
+            // that are insufficiently replicated for further tracking
+            LOG.debug("Newly-added node {}, doing full scan to find " +
+                "insufficiently-replicated blocks.", dn);
             blocks = handleInsufficientlyStored(dn);
             outOfServiceNodeBlocks.put(dn, blocks);
+            fullScan = true;
+          } else {
+            // This is a known datanode, check if its # of insufficiently
+            // replicated blocks has dropped to zero and if it can move
+            // to the next state.
+            LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
+            pruneReliableBlocks(dn, blocks);
           }
-          // If the full scan is clean AND the node liveness is okay,
-          // we can finally mark as DECOMMISSIONED or IN_MAINTENANCE.
-          final boolean isHealthy =
-              blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
-          if (blocks.size() == 0 && isHealthy) {
-            if (dn.isDecommissionInProgress()) {
-              setDecommissioned(dn);
-              toRemove.add(dn);
-            } else if (dn.isEnteringMaintenance()) {
-              // IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to
-              // to track maintenance expiration.
-              setInMaintenance(dn);
+          if (blocks.size() == 0) {
+            if (!fullScan) {
+              // If we didn't just do a full scan, need to re-check with the
+              // full block map.
+              //
+              // We've replicated all the known insufficiently replicated
+              // blocks. Re-check with the full block map before finally
+              // marking the datanode as DECOMMISSIONED or IN_MAINTENANCE.
+              LOG.debug("Node {} has finished replicating current set of "
+                  + "blocks, checking with the full block map.", dn);
+              blocks = handleInsufficientlyStored(dn);
+              outOfServiceNodeBlocks.put(dn, blocks);
+            }
+            // If the full scan is clean AND the node liveness is okay,
+            // we can finally mark as DECOMMISSIONED or IN_MAINTENANCE.
+            final boolean isHealthy =
+                blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
+            if (blocks.size() == 0 && isHealthy) {
+              if (dn.isDecommissionInProgress()) {
+                setDecommissioned(dn);
+                toRemove.add(dn);
+              } else if (dn.isEnteringMaintenance()) {
+                // IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to
+                // to track maintenance expiration.
+                setInMaintenance(dn);
+              } else {
+                Preconditions.checkState(false,
+                    "Node %s is in an invalid state! "
+                      + "Invalid state: %s %s blocks are on this dn.",
+                        dn, dn.getAdminState(), blocks.size());
+              }
+              LOG.debug("Node {} is sufficiently replicated and healthy, "
+                  + "marked as {}.", dn, dn.getAdminState());
             } else {
-              Preconditions.checkState(false,
-                  "A node is in an invalid state!");
+              LOG.debug("Node {} {} healthy."
+                  + " It needs to replicate {} more blocks."
+                  + " {} is still in progress.", dn,
+                  isHealthy ? "is": "isn't", blocks.size(), dn.getAdminState());
             }
-            LOG.debug("Node {} is sufficiently replicated and healthy, "
-                + "marked as {}.", dn, dn.getAdminState());
           } else {
-            LOG.debug("Node {} {} healthy."
-                + " It needs to replicate {} more blocks."
-                + " {} is still in progress.", dn,
-                isHealthy ? "is": "isn't", blocks.size(), dn.getAdminState());
+            LOG.debug("Node {} still has {} blocks to replicate "
+                + "before it is a candidate to finish {}.",
+                dn, blocks.size(), dn.getAdminState());
           }
-        } else {
-          LOG.debug("Node {} still has {} blocks to replicate "
-              + "before it is a candidate to finish {}.",
-              dn, blocks.size(), dn.getAdminState());
+        } catch (Exception e) {
+          // Log and postpone to process node when meet exception since it is in
+          // an invalid state.
+          LOG.warn("DatanodeAdminMonitor caught exception when processing node "
+              + "{}.", dn, e);
+          pendingNodes.add(dn);
+          toRemove.add(dn);
+        } finally {
+          iterkey = dn;
         }
-        iterkey = dn;
       }
       // Remove the datanodes that are DECOMMISSIONED or in service after
       // maintenance expiration.
       for (DatanodeDescriptor dn : toRemove) {
         Preconditions.checkState(dn.isDecommissioned() || dn.isInService(),
-            "Removing a node that is not yet decommissioned or in service!");
+            "Removing node %s that is not yet decommissioned or in service!",
+                dn);
         outOfServiceNodeBlocks.remove(dn);
       }
     }

+ 52 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java

@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -35,6 +36,7 @@ import java.util.Map;
 import java.util.Scanner;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
 
 import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
@@ -1181,6 +1183,56 @@ public class TestDecommission extends AdminStatesBaseTest {
     }
   }
 
+  /**
+   * Test DatanodeAdminManager#monitor can swallow any exceptions by default.
+   */
+  @Test(timeout=120000)
+  public void testPendingNodeButDecommissioned() throws Exception {
+    // Only allow one node to be decom'd at a time
+    getConf().setInt(
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
+        1);
+    // Disable the normal monitor runs
+    getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
+        Integer.MAX_VALUE);
+    startCluster(1, 2);
+    final DatanodeManager datanodeManager =
+        getCluster().getNamesystem().getBlockManager().getDatanodeManager();
+    final DatanodeAdminManager decomManager =
+        datanodeManager.getDatanodeAdminManager();
+
+    ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
+    List<DataNode> dns = getCluster().getDataNodes();
+    // Try to decommission 2 datanodes
+    for (int i = 0; i < 2; i++) {
+      DataNode d = dns.get(i);
+      DatanodeInfo dn = takeNodeOutofService(0, d.getDatanodeUuid(), 0,
+          decommissionedNodes, AdminStates.DECOMMISSION_INPROGRESS);
+      decommissionedNodes.add(dn);
+    }
+
+    assertEquals(2, decomManager.getNumPendingNodes());
+
+    // Set one datanode state to Decommissioned after decommission ops.
+    DatanodeDescriptor dn = datanodeManager.getDatanode(dns.get(0)
+        .getDatanodeId());
+    dn.setDecommissioned();
+
+    try {
+      // Trigger DatanodeAdminManager#monitor
+      BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
+
+      // Wait for OutOfServiceNodeBlocks to be 0
+      GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == 0,
+          500, 30000);
+      assertTrue(GenericTestUtils.anyThreadMatching(
+          Pattern.compile("DatanodeAdminMonitor-.*")));
+    } catch (ExecutionException e) {
+      GenericTestUtils.assertExceptionContains("in an invalid state!", e);
+      fail("DatanodeAdminManager#monitor does not swallow exceptions.");
+    }
+  }
+
   @Test(timeout=120000)
   public void testPendingNodes() throws Exception {
     org.apache.log4j.Logger.getLogger(DatanodeAdminManager.class)