소스 검색

HDFS-16846. EC: Only EC blocks should be effected by max-streams-hard-limit configuration (#5143)

Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
caozhiqiang 2 년 전
부모
커밋
35c65005d0

+ 38 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java

@@ -197,8 +197,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
   /** A queue of blocks to be replicated by this datanode */
   private final BlockQueue<BlockTargetPair> replicateBlocks =
       new BlockQueue<>();
-  /** A queue of blocks to be erasure coded by this datanode */
-  private final BlockQueue<BlockECReconstructionInfo> erasurecodeBlocks =
+  /** A queue of ec blocks to be replicated by this datanode. */
+  private final BlockQueue<BlockTargetPair> ecBlocksToBeReplicated = new BlockQueue<>();
+  /** A queue of ec blocks to be erasure coded by this datanode. */
+  private final BlockQueue<BlockECReconstructionInfo> ecBlocksToBeErasureCoded =
       new BlockQueue<>();
   /** A queue of blocks to be recovered by this datanode */
   private final BlockQueue<BlockInfo> recoverBlocks = new BlockQueue<>();
@@ -358,7 +360,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
     this.recoverBlocks.clear();
     this.replicateBlocks.clear();
-    this.erasurecodeBlocks.clear();
+    this.ecBlocksToBeReplicated.clear();
+    this.ecBlocksToBeErasureCoded.clear();
     // pendingCached, cached, and pendingUncached are protected by the
     // FSN lock.
     this.pendingCached.clear();
@@ -678,6 +681,15 @@ public class DatanodeDescriptor extends DatanodeInfo {
     replicateBlocks.offer(new BlockTargetPair(block, targets));
   }
 
+  /**
+   * Store ec block to be replicated work.
+   */
+  @VisibleForTesting
+  public void addECBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
+    assert (block != null && targets != null && targets.length > 0);
+    ecBlocksToBeReplicated.offer(new BlockTargetPair(block, targets));
+  }
+
   /**
    * Store block erasure coding work.
    */
@@ -687,9 +699,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
     assert (block != null && sources != null && sources.length > 0);
     BlockECReconstructionInfo task = new BlockECReconstructionInfo(block,
         sources, targets, liveBlockIndices, excludeReconstrutedIndices, ecPolicy);
-    erasurecodeBlocks.offer(task);
+    ecBlocksToBeErasureCoded.offer(task);
     BlockManager.LOG.debug("Adding block reconstruction task " + task + "to "
-        + getName() + ", current queue size is " + erasurecodeBlocks.size());
+        + getName() + ", current queue size is " + ecBlocksToBeErasureCoded.size());
   }
 
   /**
@@ -720,7 +732,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
    * The number of work items that are pending to be replicated.
    */
   int getNumberOfBlocksToBeReplicated() {
-    return pendingReplicationWithoutTargets + replicateBlocks.size();
+    return pendingReplicationWithoutTargets + replicateBlocks.size()
+        + ecBlocksToBeReplicated.size();
   }
 
   /**
@@ -728,7 +741,15 @@ public class DatanodeDescriptor extends DatanodeInfo {
    */
   @VisibleForTesting
   public int getNumberOfBlocksToBeErasureCoded() {
-    return erasurecodeBlocks.size();
+    return ecBlocksToBeErasureCoded.size();
+  }
+
+  /**
+   * The number of ec work items that are pending to be replicated.
+   */
+  @VisibleForTesting
+  public int getNumberOfECBlocksToBeReplicated() {
+    return ecBlocksToBeReplicated.size();
   }
 
   @VisibleForTesting
@@ -740,9 +761,13 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return replicateBlocks.poll(maxTransfers);
   }
 
+  List<BlockTargetPair> getECReplicatedCommand(int maxTransfers) {
+    return ecBlocksToBeReplicated.poll(maxTransfers);
+  }
+
   public List<BlockECReconstructionInfo> getErasureCodeCommand(
       int maxTransfers) {
-    return erasurecodeBlocks.poll(maxTransfers);
+    return ecBlocksToBeErasureCoded.poll(maxTransfers);
   }
 
   public BlockInfo[] getLeaseRecoveryCommand(int maxTransfers) {
@@ -994,7 +1019,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
     if (repl > 0) {
       sb.append(" ").append(repl).append(" blocks to be replicated;");
     }
-    int ec = erasurecodeBlocks.size();
+    int ecRepl = ecBlocksToBeReplicated.size();
+    if (ecRepl > 0) {
+      sb.append(" ").append(ecRepl).append(" ec blocks to be replicated;");
+    }
+    int ec = ecBlocksToBeErasureCoded.size();
     if(ec > 0) {
       sb.append(" ").append(ec).append(" blocks to be erasure coded;");
     }

+ 28 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -1825,28 +1825,41 @@ public class DatanodeManager {
     // Allocate _approximately_ maxTransfers pending tasks to DataNode.
     // NN chooses pending tasks based on the ratio between the lengths of
     // replication and erasure-coded block queues.
-    int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks();
-    int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded();
-    int totalBlocks = totalReplicateBlocks + totalECBlocks;
+    int replicationBlocks = nodeinfo.getNumberOfReplicateBlocks();
+    int ecBlocksToBeReplicated = nodeinfo.getNumberOfECBlocksToBeReplicated();
+    int ecBlocksToBeErasureCoded = nodeinfo.getNumberOfBlocksToBeErasureCoded();
+    int totalBlocks = replicationBlocks + ecBlocksToBeReplicated + ecBlocksToBeErasureCoded;
     if (totalBlocks > 0) {
-      int maxTransfers;
+      int maxTransfers = blockManager.getMaxReplicationStreams() - xmitsInProgress;
+      int maxECReplicatedTransfers;
       if (nodeinfo.isDecommissionInProgress()) {
-        maxTransfers = blockManager.getReplicationStreamsHardLimit()
+        maxECReplicatedTransfers = blockManager.getReplicationStreamsHardLimit()
             - xmitsInProgress;
       } else {
-        maxTransfers = blockManager.getMaxReplicationStreams()
-            - xmitsInProgress;
+        maxECReplicatedTransfers = maxTransfers;
       }
       int numReplicationTasks = (int) Math.ceil(
-          (double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
-      int numECTasks = (int) Math.ceil(
-          (double) (totalECBlocks * maxTransfers) / totalBlocks);
-      LOG.debug("Pending replication tasks: {} erasure-coded tasks: {}.",
-          numReplicationTasks, numECTasks);
+          (double) (replicationBlocks * maxTransfers) / totalBlocks);
+      int numEcReplicatedTasks = (int) Math.ceil(
+              (double) (ecBlocksToBeReplicated * maxECReplicatedTransfers) / totalBlocks);
+      int numECReconstructedTasks = (int) Math.ceil(
+          (double) (ecBlocksToBeErasureCoded * maxTransfers) / totalBlocks);
+      LOG.debug("Pending replication tasks: {} ec to be replicated tasks: {} " +
+                      "ec reconstruction tasks: {}.",
+          numReplicationTasks, numEcReplicatedTasks, numECReconstructedTasks);
       // check pending replication tasks
-      List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
+      List<BlockTargetPair> pendingReplicationList = nodeinfo.getReplicationCommand(
           numReplicationTasks);
-      if (pendingList != null && !pendingList.isEmpty()) {
+      List<BlockTargetPair> pendingECReplicatedList = nodeinfo.getECReplicatedCommand(
+              numEcReplicatedTasks);
+      List<BlockTargetPair> pendingList = new ArrayList<BlockTargetPair>();
+      if(pendingReplicationList != null && !pendingReplicationList.isEmpty()) {
+        pendingList.addAll(pendingReplicationList);
+      }
+      if(pendingECReplicatedList != null && !pendingECReplicatedList.isEmpty()) {
+        pendingList.addAll(pendingECReplicatedList);
+      }
+      if (!pendingList.isEmpty()) {
         // If the block is deleted, the block size will become
         // BlockCommand.NO_ACK (LONG.MAX_VALUE) . This kind of block we don't
         // need
@@ -1868,7 +1881,7 @@ public class DatanodeManager {
       }
       // check pending erasure coding tasks
       List<BlockECReconstructionInfo> pendingECList = nodeinfo
-          .getErasureCodeCommand(numECTasks);
+          .getErasureCodeCommand(numECReconstructedTasks);
       if (pendingECList != null && !pendingECList.isEmpty()) {
         cmds.add(new BlockECReconstructionCommand(
             DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList));

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java

@@ -164,7 +164,7 @@ class ErasureCodingWork extends BlockReconstructionWork {
         stripedBlk.getDataBlockNum(), blockIndex);
     final Block targetBlk = new Block(stripedBlk.getBlockId() + blockIndex,
         internBlkLen, stripedBlk.getGenerationStamp());
-    source.addBlockToBeReplicated(targetBlk,
+    source.addECBlockToBeReplicated(targetBlk,
         new DatanodeStorageInfo[] {target});
     LOG.debug("Add replication task from source {} to "
         + "target {} for EC block {}", source, target, targetBlk);

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java

@@ -759,7 +759,7 @@ public class TestDecommissionWithStriped {
     DatanodeInfo extraDn = getDatanodeOutOfTheBlock(blk);
     DatanodeDescriptor target = bm.getDatanodeManager()
         .getDatanode(extraDn.getDatanodeUuid());
-    dn0.addBlockToBeReplicated(targetBlk,
+    dn0.addECBlockToBeReplicated(targetBlk,
         new DatanodeStorageInfo[] {target.getStorageInfos()[0]});
 
     // dn0 replicates in success
@@ -883,7 +883,7 @@ public class TestDecommissionWithStriped {
         .getDatanode(extraDn.getDatanodeUuid());
     DatanodeDescriptor dnStartIndexDecommission = bm.getDatanodeManager()
         .getDatanode(dnLocs[decommNodeIndex].getDatanodeUuid());
-    dnStartIndexDecommission.addBlockToBeReplicated(targetBlk,
+    dnStartIndexDecommission.addECBlockToBeReplicated(targetBlk,
         new DatanodeStorageInfo[] {target.getStorageInfos()[0]});
 
     // Wait for replication success.
@@ -972,7 +972,7 @@ public class TestDecommissionWithStriped {
     DatanodeInfo extraDn = getDatanodeOutOfTheBlock(blk);
     DatanodeDescriptor target = bm.getDatanodeManager()
         .getDatanode(extraDn.getDatanodeUuid());
-    dn0.addBlockToBeReplicated(targetBlk,
+    dn0.addECBlockToBeReplicated(targetBlk,
         new DatanodeStorageInfo[] {target.getStorageInfos()[0]});
 
     // dn0 replicates in success

+ 37 - 22
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java

@@ -967,20 +967,22 @@ public class TestDatanodeManager {
    * Verify the correctness of pending recovery process.
    *
    * @param numReplicationBlocks the number of replication blocks in the queue.
-   * @param numECBlocks number of EC blocks in the queue.
+   * @param numEcBlocksToBeReplicated the number of EC blocks to be replicated in the queue.
+   * @param numBlocksToBeErasureCoded number of EC blocks to be erasure coded in the queue.
    * @param maxTransfers the maxTransfer value.
    * @param maxTransfersHardLimit the maxTransfer hard limit value.
-   * @param numReplicationTasks the number of replication tasks polled from
-   *                            the queue.
-   * @param numECTasks the number of EC tasks polled from the queue.
+   * @param numReplicationTasks the number of replication tasks polled from the queue.
+   * @param numECTasksToBeReplicated the number of EC tasks to be replicated polled from the queue.
+   * @param numECTasksToBeErasureCoded the number of EC tasks to be erasure coded polled from
+   *                                   the queue.
    * @param isDecommissioning if the node is in the decommissioning process.
    *
    * @throws IOException
    */
   private void verifyPendingRecoveryTasks(
-      int numReplicationBlocks, int numECBlocks,
-      int maxTransfers, int maxTransfersHardLimit,
-      int numReplicationTasks, int numECTasks, boolean isDecommissioning)
+      int numReplicationBlocks, int numEcBlocksToBeReplicated, int numBlocksToBeErasureCoded,
+      int maxTransfers, int maxTransfersHardLimit, int numReplicationTasks,
+      int numECTasksToBeReplicated, int numECTasksToBeErasureCoded, boolean isDecommissioning)
       throws IOException {
     FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
     Mockito.when(fsn.hasWriteLock()).thenReturn(true);
@@ -1009,13 +1011,25 @@ public class TestDatanodeManager {
           .thenReturn(tasks);
     }
 
-    if (numECBlocks > 0) {
+    if (numEcBlocksToBeReplicated > 0) {
+      Mockito.when(nodeInfo.getNumberOfECBlocksToBeReplicated())
+              .thenReturn(numEcBlocksToBeReplicated);
+
+      List<BlockTargetPair> ecReplicatedTasks =
+              Collections.nCopies(
+                      Math.min(numECTasksToBeReplicated, numEcBlocksToBeReplicated),
+                      new BlockTargetPair(null, null));
+      Mockito.when(nodeInfo.getECReplicatedCommand(numECTasksToBeReplicated))
+              .thenReturn(ecReplicatedTasks);
+    }
+
+    if (numBlocksToBeErasureCoded > 0) {
       Mockito.when(nodeInfo.getNumberOfBlocksToBeErasureCoded())
-          .thenReturn(numECBlocks);
+          .thenReturn(numBlocksToBeErasureCoded);
 
       List<BlockECReconstructionInfo> tasks =
-          Collections.nCopies(numECTasks, null);
-      Mockito.when(nodeInfo.getErasureCodeCommand(numECTasks))
+          Collections.nCopies(numECTasksToBeErasureCoded, null);
+      Mockito.when(nodeInfo.getErasureCodeCommand(numECTasksToBeErasureCoded))
           .thenReturn(tasks);
     }
 
@@ -1026,42 +1040,43 @@ public class TestDatanodeManager {
         SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
 
     long expectedNumCmds = Arrays.stream(
-        new int[]{numReplicationTasks, numECTasks})
+        new int[]{numReplicationTasks + numECTasksToBeReplicated, numECTasksToBeErasureCoded})
         .filter(x -> x > 0)
         .count();
     assertEquals(expectedNumCmds, cmds.length);
 
     int idx = 0;
-    if (numReplicationTasks > 0) {
+    if (numReplicationTasks > 0 || numECTasksToBeReplicated > 0) {
       assertTrue(cmds[idx] instanceof BlockCommand);
       BlockCommand cmd = (BlockCommand) cmds[0];
-      assertEquals(numReplicationTasks, cmd.getBlocks().length);
-      assertEquals(numReplicationTasks, cmd.getTargets().length);
+      assertEquals(numReplicationTasks + numECTasksToBeReplicated, cmd.getBlocks().length);
+      assertEquals(numReplicationTasks + numECTasksToBeReplicated, cmd.getTargets().length);
       idx++;
     }
 
-    if (numECTasks > 0) {
+    if (numECTasksToBeErasureCoded > 0) {
       assertTrue(cmds[idx] instanceof BlockECReconstructionCommand);
       BlockECReconstructionCommand cmd =
           (BlockECReconstructionCommand) cmds[idx];
-      assertEquals(numECTasks, cmd.getECTasks().size());
+      assertEquals(numECTasksToBeErasureCoded, cmd.getECTasks().size());
     }
 
     Mockito.verify(nodeInfo).getReplicationCommand(numReplicationTasks);
-    Mockito.verify(nodeInfo).getErasureCodeCommand(numECTasks);
+    Mockito.verify(nodeInfo).getECReplicatedCommand(numECTasksToBeReplicated);
+    Mockito.verify(nodeInfo).getErasureCodeCommand(numECTasksToBeErasureCoded);
   }
 
   @Test
   public void testPendingRecoveryTasks() throws IOException {
     // Tasks are slitted according to the ratio between queue lengths.
-    verifyPendingRecoveryTasks(20, 20, 20, 30, 10, 10, false);
-    verifyPendingRecoveryTasks(40, 10, 20, 30, 16, 4, false);
+    verifyPendingRecoveryTasks(20, 0, 20, 20, 30, 10, 0, 10, false);
+    verifyPendingRecoveryTasks(40, 0, 10, 20, 30, 16, 0, 4, false);
 
     // Approximately load tasks if the ratio between queue length is large.
-    verifyPendingRecoveryTasks(400, 1, 20, 30, 20, 1, false);
+    verifyPendingRecoveryTasks(400, 0, 1, 20, 30, 20, 0, 1, false);
 
     // Tasks use dfs.namenode.replication.max-streams-hard-limit for decommissioning node
-    verifyPendingRecoveryTasks(30, 30, 20, 30, 15, 15, true);
+    verifyPendingRecoveryTasks(20, 10, 10, 20, 40, 10, 10, 5, true);
   }
 
   @Test