Bläddra i källkod

HDFS-16839 It should consider EC reconstruction work when we determine if a node is busy (#5128)

Co-authored-by: Takanobu Asanuma <tasanuma@apache.org>
Reviewed-by: Tao Li <tomscut@apache.org>
Kidd5368 2 år sedan
förälder
incheckning
72749a4ff8

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -2599,7 +2599,8 @@ public class BlockManager implements BlockStatsMXBean {
 
       if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
           && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance())
-          && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
+          && node.getNumberOfBlocksToBeReplicated() +
+          node.getNumberOfBlocksToBeErasureCoded() >= maxReplicationStreams) {
         if (isStriped && (state == StoredReplicaState.LIVE
             || state == StoredReplicaState.DECOMMISSIONING)) {
           liveBusyBlockIndices.add(blockIndex);
@@ -2609,7 +2610,8 @@ public class BlockManager implements BlockStatsMXBean {
         continue; // already reached replication limit
       }
 
-      if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) {
+      if (node.getNumberOfBlocksToBeReplicated() +
+          node.getNumberOfBlocksToBeErasureCoded() >= replicationStreamsHardLimit) {
         if (isStriped && (state == StoredReplicaState.LIVE
             || state == StoredReplicaState.DECOMMISSIONING)) {
           liveBusyBlockIndices.add(blockIndex);

+ 52 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

@@ -957,6 +957,58 @@ public class TestBlockManager {
     assertNull(work);
   }
 
+  @Test
+  public void testSkipReconstructionWithManyBusyNodes3() {
+    NameNode.initMetrics(new Configuration(), HdfsServerConstants.NamenodeRole.NAMENODE);
+    long blockId = -9223372036854775776L; // Real ec block id
+    // RS-3-2 EC policy
+    ErasureCodingPolicy ecPolicy =
+            SystemErasureCodingPolicies.getPolicies().get(1);
+
+    // Create an EC block group: 3 data blocks + 2 parity blocks.
+    Block aBlockGroup = new Block(blockId, ecPolicy.getCellSize() * ecPolicy.getNumDataUnits(), 0);
+    BlockInfoStriped aBlockInfoStriped = new BlockInfoStriped(aBlockGroup, ecPolicy);
+
+    // Create 4 storageInfo, which means 1 block is missing.
+    DatanodeStorageInfo ds1 = DFSTestUtil.createDatanodeStorageInfo(
+            "storage1", "1.1.1.1", "rack1", "host1");
+    DatanodeStorageInfo ds2 = DFSTestUtil.createDatanodeStorageInfo(
+            "storage2", "2.2.2.2", "rack2", "host2");
+    DatanodeStorageInfo ds3 = DFSTestUtil.createDatanodeStorageInfo(
+            "storage3", "3.3.3.3", "rack3", "host3");
+    DatanodeStorageInfo ds4 = DFSTestUtil.createDatanodeStorageInfo(
+            "storage4", "4.4.4.4", "rack4", "host4");
+
+    // Link block with storage.
+    aBlockInfoStriped.addStorage(ds1, aBlockGroup);
+    aBlockInfoStriped.addStorage(ds2, new Block(blockId + 1, 0, 0));
+    aBlockInfoStriped.addStorage(ds3, new Block(blockId + 2, 0, 0));
+    aBlockInfoStriped.addStorage(ds4, new Block(blockId + 3, 0, 0));
+
+    addEcBlockToBM(blockId, ecPolicy);
+    aBlockInfoStriped.setBlockCollectionId(mockINodeId);
+
+    // Reconstruction should be scheduled.
+    BlockReconstructionWork work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
+    assertNotNull(work);
+
+    ExtendedBlock dummyBlock = new ExtendedBlock("bpid", 1, 1, 1);
+    DatanodeDescriptor dummyDD = ds1.getDatanodeDescriptor();
+    DatanodeDescriptor[] dummyDDArray = new DatanodeDescriptor[]{dummyDD};
+    DatanodeStorageInfo[] dummyDSArray = new DatanodeStorageInfo[]{ds1};
+    // Simulate the 2 nodes reach maxReplicationStreams.
+    for(int i = 0; i < bm.maxReplicationStreams; i++){ //Add some dummy EC reconstruction task.
+      ds3.getDatanodeDescriptor().addBlockToBeErasureCoded(dummyBlock, dummyDDArray,
+              dummyDSArray, new byte[0], new byte[0], ecPolicy);
+      ds4.getDatanodeDescriptor().addBlockToBeErasureCoded(dummyBlock, dummyDDArray,
+              dummyDSArray, new byte[0], new byte[0], ecPolicy);
+    }
+
+    // Reconstruction should be skipped since the number of non-busy nodes are not enough.
+    work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
+    assertNull(work);
+  }
+
   @Test
   public void testFavorDecomUntilHardLimit() throws Exception {
     bm.maxReplicationStreams = 0;