瀏覽代碼

HDFS-17564. Erasure Coding: Fix the issue of inaccurate metrics when decommission mark busy DN

Haiyang.Hu 10 月之前
父節點
當前提交
b7d6356894

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -2396,7 +2396,9 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     // Add block to the datanode's task list
-    rw.addTaskToDatanode(numReplicas);
+    if (!rw.addTaskToDatanode(numReplicas)) {
+      return false;
+    }
     DatanodeStorageInfo.incrementBlocksScheduled(targets);
 
     // Move the block-replication into a "pending" state.

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java

@@ -145,5 +145,5 @@ abstract class BlockReconstructionWork {
    *
    * @param numberReplicas replica details
    */
-  abstract void addTaskToDatanode(NumberReplicas numberReplicas);
+  abstract boolean addTaskToDatanode(NumberReplicas numberReplicas);
 }

+ 6 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java

@@ -136,11 +136,11 @@ class ErasureCodingWork extends BlockReconstructionWork {
   }
 
   @Override
-  void addTaskToDatanode(NumberReplicas numberReplicas) {
+  boolean addTaskToDatanode(NumberReplicas numberReplicas) {
     final DatanodeStorageInfo[] targets = getTargets();
     assert targets.length > 0;
     BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();
-
+    boolean flag = true;
     if (hasNotEnoughRack()) {
       // if we already have all the internal blocks, but not enough racks,
       // we only need to replicate one internal block to a new rack
@@ -152,6 +152,9 @@ class ErasureCodingWork extends BlockReconstructionWork {
       List<Integer> leavingServiceSources = findLeavingServiceSources();
       // decommissioningSources.size() should be >= targets.length
       final int num = Math.min(leavingServiceSources.size(), targets.length);
+      if (num == 0) {
+        flag = false;
+      }
       for (int i = 0; i < num; i++) {
         createReplicationWork(leavingServiceSources.get(i), targets[i]);
       }
@@ -160,6 +163,7 @@ class ErasureCodingWork extends BlockReconstructionWork {
           new ExtendedBlock(blockPoolId, stripedBlk), getSrcNodes(), targets,
           liveBlockIndices, excludeReconstructedIndices, stripedBlk.getErasureCodingPolicy());
     }
+    return flag;
   }
 
   private void createReplicationWork(int sourceIndex,

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java

@@ -61,7 +61,8 @@ class ReplicationWork extends BlockReconstructionWork {
   }
 
   @Override
-  void addTaskToDatanode(NumberReplicas numberReplicas) {
+  boolean addTaskToDatanode(NumberReplicas numberReplicas) {
     getSrcNodes()[0].addBlockToBeReplicated(getBlock(), getTargets());
+    return true;
   }
 }

+ 53 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java

@@ -462,6 +462,59 @@ public class TestDecommissionWithStriped {
         fileChecksum1.equals(fileChecksum2));
   }
 
+  /**
+   * Test decommission when DN marked as busy.
+   * @throwsException
+   */
+  @Test(timeout = 120000)
+  public void testBusyAfterDecommissionNode() throws Exception {
+    byte busyDNIndex = 0;
+    //1. create EC file
+    final Path ecFile = new Path(ecDir, "testBusyAfterDecommissionNode");
+    int writeBytes = cellSize * dataBlocks;
+    writeStripedFile(dfs, ecFile, writeBytes);
+    Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
+    FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes);
+
+    //2. make once DN busy
+    final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
+        .getINode4Write(ecFile.toString()).asFile();
+    BlockInfo firstBlock = fileNode.getBlocks()[0];
+    DatanodeStorageInfo[] dnStorageInfos = bm.getStorages(firstBlock);
+    DatanodeDescriptor busyNode =
+        dnStorageInfos[busyDNIndex].getDatanodeDescriptor();
+    for (int j = 0; j < replicationStreamsHardLimit; j++) {
+      busyNode.incrementPendingReplicationWithoutTargets();
+    }
+
+    //3. decomission one node
+    List<DatanodeInfo> decommisionNodes = new ArrayList<>();
+    decommisionNodes.add(busyNode);
+    decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSION_INPROGRESS);
+
+    final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+    bm.getDatanodeManager().fetchDatanodes(live, null, false);
+    int liveDecommissioning = 0;
+    for (DatanodeDescriptor node : live) {
+      liveDecommissioning += node.isDecommissionInProgress() ? 1 : 0;
+    }
+    assertEquals(decommisionNodes.size(), liveDecommissioning);
+
+    //4. wait for decommission block to replicate
+    Thread.sleep(3000);
+
+    int blocksScheduled = 0;
+    final List<DatanodeDescriptor> dnList = new ArrayList<>();
+    fsn.getBlockManager().getDatanodeManager().fetchDatanodes(dnList, null,
+        false);
+    for (DatanodeDescriptor dn : dnList) {
+      blocksScheduled += dn.getBlocksScheduled();
+    }
+    assertEquals(0, blocksScheduled);
+    assertEquals(0, bm.getPendingReconstructionBlocksCount());
+    assertEquals(1, bm.getLowRedundancyBlocksCount());
+  }
+
   private void testDecommission(int writeBytes, int storageCount,
       int decomNodeCount, String filename) throws IOException, Exception {
     Path ecFile = new Path(ecDir, filename);