浏览代码

HDFS-16811. Support DecommissionBackoffMonitor parameters reconfigurable (#5122)

Signed-off-by: Tao Li <tomscut@apache.org>
huhaiyang 2 年之前
父节点
当前提交
033ceca090

+ 24 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.HashMap;
@@ -70,10 +71,10 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
       outOfServiceNodeBlocks = new HashMap<>();
 
   /**
-   * The numbe of blocks to process when moving blocks to pendingReplication
+   * The number of blocks to process when moving blocks to pendingReplication
    * before releasing and reclaiming the namenode lock.
    */
-  private int blocksPerLock;
+  private volatile int blocksPerLock;
 
   /**
    * The number of blocks that have been checked on this tick.
@@ -82,7 +83,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
   /**
    * The maximum number of blocks to hold in PendingRep at any time.
    */
-  private int pendingRepLimit;
+  private volatile int pendingRepLimit;
 
   /**
    * The list of blocks which have been placed onto the replication queue
@@ -801,6 +802,26 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
     return false;
   }
 
+  @VisibleForTesting
+  @Override
+  public int getPendingRepLimit() {
+    return pendingRepLimit;
+  }
+
+  public void setPendingRepLimit(int pendingRepLimit) {
+    this.pendingRepLimit = pendingRepLimit;
+  }
+
+  @VisibleForTesting
+  @Override
+  public int getBlocksPerLock() {
+    return blocksPerLock;
+  }
+
+  public void setBlocksPerLock(int blocksPerLock) {
+    this.blocksPerLock = blocksPerLock;
+  }
+
   static class BlockStats {
     private LightWeightHashSet<Long> openFiles =
         new LightWeightLinkedSet<>();

+ 23 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.util.CyclicIteration;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.util.ChunkedArrayList;
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,6 +138,28 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase
     return numNodesChecked;
   }
 
+  @VisibleForTesting
+  @Override
+  public int getPendingRepLimit() {
+    return 0;
+  }
+
+  @Override
+  public void setPendingRepLimit(int pendingRepLimit) {
+    // nothing.
+  }
+
+  @VisibleForTesting
+  @Override
+  public int getBlocksPerLock() {
+    return 0;
+  }
+
+  @Override
+  public void setBlocksPerLock(int blocksPerLock) {
+    // nothing.
+  }
+
   @Override
   public void run() {
     LOG.debug("DatanodeAdminMonitor is running.");

+ 26 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java

@@ -416,4 +416,30 @@ public class DatanodeAdminManager {
     executor.submit(monitor).get();
   }
 
+  public void refreshPendingRepLimit(int pendingRepLimit, String key) {
+    ensurePositiveInt(pendingRepLimit, key);
+    this.monitor.setPendingRepLimit(pendingRepLimit);
+  }
+
+  @VisibleForTesting
+  public int getPendingRepLimit() {
+    return this.monitor.getPendingRepLimit();
+  }
+
+  public void refreshBlocksPerLock(int blocksPerLock, String key) {
+    ensurePositiveInt(blocksPerLock, key);
+    this.monitor.setBlocksPerLock(blocksPerLock);
+  }
+
+  @VisibleForTesting
+  public int getBlocksPerLock() {
+    return this.monitor.getBlocksPerLock();
+  }
+
+  private void ensurePositiveInt(int val, String key) {
+    checkArgument(
+        (val > 0),
+        key + " = '" + val + "' is invalid. " +
+            "It should be a positive, non-zero integer value.");
+  }
 }

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminMonitorInterface.java

@@ -37,4 +37,12 @@ public interface DatanodeAdminMonitorInterface extends Runnable {
   void setBlockManager(BlockManager bm);
   void setDatanodeAdminManager(DatanodeAdminManager dnm);
   void setNameSystem(Namesystem ns);
+
+  int getPendingRepLimit();
+
+  void setPendingRepLimit(int pendingRepLimit);
+
+  int getBlocksPerLock();
+
+  void setBlocksPerLock(int blocksPerLock);
 }

+ 41 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -197,6 +197,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPO
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK_DEFAULT;
 
 import static org.apache.hadoop.util.ExitUtil.terminate;
 import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
@@ -343,7 +347,9 @@ public class NameNode extends ReconfigurableBase implements
           DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
           DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
           DFS_BLOCK_INVALIDATE_LIMIT_KEY,
-          DFS_DATANODE_PEER_STATS_ENABLED_KEY));
+          DFS_DATANODE_PEER_STATS_ENABLED_KEY,
+          DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT,
+          DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK));
 
   private static final String USAGE = "Usage: hdfs namenode ["
       + StartupOption.BACKUP.getName() + "] | \n\t["
@@ -1848,7 +1854,7 @@ public class NameNode extends ReconfigurableBase implements
     }
   }
 
-  synchronized void monitorHealth() 
+  synchronized void monitorHealth()
       throws HealthCheckFailedException, AccessControlException {
     namesystem.checkSuperuserPrivilege();
     if (!haEnabled) {
@@ -1872,7 +1878,7 @@ public class NameNode extends ReconfigurableBase implements
     }
   }
   
-  synchronized void transitionToActive() 
+  synchronized void transitionToActive()
       throws ServiceFailedException, AccessControlException {
     namesystem.checkSuperuserPrivilege();
     if (!haEnabled) {
@@ -2216,6 +2222,10 @@ public class NameNode extends ReconfigurableBase implements
       return reconfigureSlowNodesParameters(datanodeManager, property, newVal);
     } else if (property.equals(DFS_BLOCK_INVALIDATE_LIMIT_KEY)) {
       return reconfigureBlockInvalidateLimit(datanodeManager, property, newVal);
+    } else if (property.equals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT) ||
+        (property.equals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK))) {
+      return reconfigureDecommissionBackoffMonitorParameters(datanodeManager, property,
+          newVal);
     } else {
       throw new ReconfigurationException(property, newVal, getConf().get(
           property));
@@ -2481,6 +2491,34 @@ public class NameNode extends ReconfigurableBase implements
     }
   }
 
+  private String reconfigureDecommissionBackoffMonitorParameters(
+      final DatanodeManager datanodeManager, final String property, final String newVal)
+      throws ReconfigurationException {
+    String newSetting = null;
+    try {
+      if (property.equals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT)) {
+        int pendingRepLimit = (newVal == null ?
+            DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT_DEFAULT :
+            Integer.parseInt(newVal));
+        datanodeManager.getDatanodeAdminManager().refreshPendingRepLimit(pendingRepLimit,
+            DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT);
+        newSetting = String.valueOf(datanodeManager.getDatanodeAdminManager().getPendingRepLimit());
+      } else if (property.equals(
+          DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK)) {
+        int blocksPerLock = (newVal == null ?
+            DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK_DEFAULT :
+            Integer.parseInt(newVal));
+        datanodeManager.getDatanodeAdminManager().refreshBlocksPerLock(blocksPerLock,
+            DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK);
+        newSetting = String.valueOf(datanodeManager.getDatanodeAdminManager().getBlocksPerLock());
+      }
+      LOG.info("RECONFIGURE* changed reconfigureDecommissionBackoffMonitorParameters {} to {}",
+          property, newSetting);
+      return newSetting;
+    } catch (IllegalArgumentException e) {
+      throw new ReconfigurationException(property, newVal, getConf().get(property), e);
+    }
+  }
 
   @Override  // ReconfigurableBase
   protected Configuration getNewConf() {

+ 85 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java

@@ -41,6 +41,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminBackoffMonitor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminMonitorInterface;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.test.GenericTestUtils;
 
@@ -57,6 +59,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KE
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK;
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT;
 
 public class TestNameNodeReconfigure {
@@ -509,6 +513,87 @@ public class TestNameNodeReconfigure {
 
   }
 
+  @Test
+  public void testReconfigureDecommissionBackoffMonitorParameters()
+      throws ReconfigurationException, IOException {
+    Configuration conf = new HdfsConfiguration();
+    conf.setClass(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MONITOR_CLASS,
+        DatanodeAdminBackoffMonitor.class, DatanodeAdminMonitorInterface.class);
+    int defaultPendingRepLimit = 1000;
+    conf.setInt(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT, defaultPendingRepLimit);
+    int defaultBlocksPerLock = 1000;
+    conf.setInt(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK,
+        defaultBlocksPerLock);
+
+    try (MiniDFSCluster newCluster = new MiniDFSCluster.Builder(conf).build()) {
+      newCluster.waitActive();
+      final NameNode nameNode = newCluster.getNameNode();
+      final DatanodeManager datanodeManager = nameNode.namesystem
+          .getBlockManager().getDatanodeManager();
+
+      // verify defaultPendingRepLimit.
+      assertEquals(datanodeManager.getDatanodeAdminManager().getPendingRepLimit(),
+          defaultPendingRepLimit);
+
+      // try invalid pendingRepLimit.
+      try {
+        nameNode.reconfigureProperty(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT,
+            "non-numeric");
+        fail("Should not reach here");
+      } catch (ReconfigurationException e) {
+        assertEquals("Could not change property " +
+            "dfs.namenode.decommission.backoff.monitor.pending.limit from '" +
+            defaultPendingRepLimit + "' to 'non-numeric'", e.getMessage());
+      }
+
+      try {
+        nameNode.reconfigureProperty(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT,
+            "-1");
+        fail("Should not reach here");
+      } catch (ReconfigurationException e) {
+        assertEquals("Could not change property " +
+            "dfs.namenode.decommission.backoff.monitor.pending.limit from '" +
+            defaultPendingRepLimit + "' to '-1'", e.getMessage());
+      }
+
+      // try correct pendingRepLimit.
+      nameNode.reconfigureProperty(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT,
+          "20000");
+      assertEquals(datanodeManager.getDatanodeAdminManager().getPendingRepLimit(), 20000);
+
+      // verify defaultBlocksPerLock.
+      assertEquals(datanodeManager.getDatanodeAdminManager().getBlocksPerLock(),
+          defaultBlocksPerLock);
+
+      // try invalid blocksPerLock.
+      try {
+        nameNode.reconfigureProperty(
+            DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK,
+            "non-numeric");
+        fail("Should not reach here");
+      } catch (ReconfigurationException e) {
+        assertEquals("Could not change property " +
+            "dfs.namenode.decommission.backoff.monitor.pending.blocks.per.lock from '" +
+            defaultBlocksPerLock + "' to 'non-numeric'", e.getMessage());
+      }
+
+      try {
+        nameNode.reconfigureProperty(
+            DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK, "-1");
+        fail("Should not reach here");
+      } catch (ReconfigurationException e) {
+        assertEquals("Could not change property " +
+            "dfs.namenode.decommission.backoff.monitor.pending.blocks.per.lock from '" +
+            defaultBlocksPerLock + "' to '-1'", e.getMessage());
+      }
+
+      // try correct blocksPerLock.
+      nameNode.reconfigureProperty(
+          DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK, "10000");
+      assertEquals(datanodeManager.getDatanodeAdminManager().getBlocksPerLock(), 10000);
+    }
+  }
+
   @After
   public void shutDown() throws IOException {
     if (cluster != null) {

+ 7 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java

@@ -29,6 +29,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSN
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK;
 
 import java.util.function.Supplier;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
@@ -429,7 +431,7 @@ public class TestDFSAdmin {
     final List<String> outs = Lists.newArrayList();
     final List<String> errs = Lists.newArrayList();
     getReconfigurableProperties("namenode", address, outs, errs);
-    assertEquals(18, outs.size());
+    assertEquals(20, outs.size());
     assertTrue(outs.get(0).contains("Reconfigurable properties:"));
     assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY, outs.get(1));
     assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(2));
@@ -439,8 +441,10 @@ public class TestDFSAdmin {
     assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(6));
     assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(7));
     assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(8));
-    assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(9));
-    assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(10));
+    assertEquals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK, outs.get(9));
+    assertEquals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT, outs.get(10));
+    assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(11));
+    assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(12));
     assertEquals(errs.size(), 0);
   }