瀏覽代碼

HDFS-16613. EC: Improve performance of decommissioning dn with many ec blocks (#4398)

caozhiqiang 3 年之前
父節點
當前提交
9e3fc40ecb

+ 10 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -1790,8 +1790,8 @@ public class DatanodeManager {
   /** Handle heartbeat from datanodes. */
   public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
       StorageReport[] reports, final String blockPoolId,
-      long cacheCapacity, long cacheUsed, int xceiverCount, 
-      int maxTransfers, int failedVolumes,
+      long cacheCapacity, long cacheUsed, int xceiverCount,
+      int xmitsInProgress, int failedVolumes,
       VolumeFailureSummary volumeFailureSummary,
       @Nonnull SlowPeerReports slowPeers,
       @Nonnull SlowDiskReports slowDisks) throws IOException {
@@ -1835,6 +1835,14 @@ public class DatanodeManager {
     int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded();
     int totalBlocks = totalReplicateBlocks + totalECBlocks;
     if (totalBlocks > 0) {
+      int maxTransfers;
+      if (nodeinfo.isDecommissionInProgress()) {
+        maxTransfers = blockManager.getReplicationStreamsHardLimit()
+            - xmitsInProgress;
+      } else {
+        maxTransfers = blockManager.getMaxReplicationStreams()
+            - xmitsInProgress;
+      }
       int numReplicationTasks = (int) Math.ceil(
           (double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
       int numECTasks = (int) Math.ceil(
@@ -2249,4 +2257,3 @@ public class DatanodeManager {
     return datanodeMap;
   }
 }
-

+ 1 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -4393,11 +4393,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     readLock();
     try {
       //get datanode commands
-      final int maxTransfer = blockManager.getMaxReplicationStreams()
-          - xmitsInProgress;
       DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
           nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
-          xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
+          xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary,
           slowPeers, slowDisks);
       long blockReportLeaseId = 0;
       if (requestFullBlockReportLease) {

+ 20 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java

@@ -86,6 +86,10 @@ public class TestDatanodeManager {
   private static DatanodeManager mockDatanodeManager(
       FSNamesystem fsn, Configuration conf) throws IOException {
     BlockManager bm = Mockito.mock(BlockManager.class);
+    Mockito.when(bm.getMaxReplicationStreams()).thenReturn(
+        conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 2));
+    Mockito.when(bm.getReplicationStreamsHardLimit()).thenReturn(
+        conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, 2));
     BlockReportLeaseManager blm = new BlockReportLeaseManager(conf);
     Mockito.when(bm.getBlockReportLeaseManager()).thenReturn(blm);
     DatanodeManager dm = new DatanodeManager(bm, fsn, conf);
@@ -965,25 +969,33 @@ public class TestDatanodeManager {
    * @param numReplicationBlocks the number of replication blocks in the queue.
    * @param numECBlocks number of EC blocks in the queue.
    * @param maxTransfers the maxTransfer value.
+   * @param maxTransfersHardLimit the maxTransfer hard limit value.
    * @param numReplicationTasks the number of replication tasks polled from
    *                            the queue.
    * @param numECTasks the number of EC tasks polled from the queue.
+   * @param isDecommissioning if the node is in the decommissioning process.
    *
    * @throws IOException
    */
   private void verifyPendingRecoveryTasks(
       int numReplicationBlocks, int numECBlocks,
-      int maxTransfers, int numReplicationTasks, int numECTasks)
+      int maxTransfers, int maxTransfersHardLimit,
+      int numReplicationTasks, int numECTasks, boolean isDecommissioning)
       throws IOException {
     FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
     Mockito.when(fsn.hasWriteLock()).thenReturn(true);
     Configuration conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, maxTransfers);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
+        maxTransfersHardLimit);
     DatanodeManager dm = Mockito.spy(mockDatanodeManager(fsn, conf));
 
     DatanodeDescriptor nodeInfo = Mockito.mock(DatanodeDescriptor.class);
     Mockito.when(nodeInfo.isRegistered()).thenReturn(true);
     Mockito.when(nodeInfo.getStorageInfos())
         .thenReturn(new DatanodeStorageInfo[0]);
+    Mockito.when(nodeInfo.isDecommissionInProgress())
+        .thenReturn(isDecommissioning);
 
     if (numReplicationBlocks > 0) {
       Mockito.when(nodeInfo.getNumberOfReplicateBlocks())
@@ -1010,7 +1022,7 @@ public class TestDatanodeManager {
     DatanodeRegistration dnReg = Mockito.mock(DatanodeRegistration.class);
     Mockito.when(dm.getDatanode(dnReg)).thenReturn(nodeInfo);
     DatanodeCommand[] cmds = dm.handleHeartbeat(
-        dnReg, new StorageReport[1], "bp-123", 0, 0, 10, maxTransfers, 0, null,
+        dnReg, new StorageReport[1], "bp-123", 0, 0, 10, 0, 0, null,
         SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
 
     long expectedNumCmds = Arrays.stream(
@@ -1042,11 +1054,14 @@ public class TestDatanodeManager {
   @Test
   public void testPendingRecoveryTasks() throws IOException {
     // Tasks are slitted according to the ratio between queue lengths.
-    verifyPendingRecoveryTasks(20, 20, 20, 10, 10);
-    verifyPendingRecoveryTasks(40, 10, 20, 16, 4);
+    verifyPendingRecoveryTasks(20, 20, 20, 30, 10, 10, false);
+    verifyPendingRecoveryTasks(40, 10, 20, 30, 16, 4, false);
 
     // Approximately load tasks if the ratio between queue length is large.
-    verifyPendingRecoveryTasks(400, 1, 20, 20, 1);
+    verifyPendingRecoveryTasks(400, 1, 20, 30, 20, 1, false);
+
+    // Tasks use dfs.namenode.replication.max-streams-hard-limit for decommissioning node
+    verifyPendingRecoveryTasks(30, 30, 20, 30, 15, 15, true);
   }
 
   @Test