Browse Source

HDFS-16182.numOfReplicas is given the wrong value in BlockPlacementPolicyDefault$chooseTarget can cause DataStreamer to fail with Heterogeneous Storage. (#3320)

(cherry picked from commit 5626734a36b9794f4b1af8b9a7d8d0055914a094)
Neil 3 years ago
parent
commit
c182bf65e0

+ 7 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

@@ -435,7 +435,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
    * @param storageTypes storage type to be considered for target
    * @return local node of writer (not chosen node)
    */
-  private Node chooseTarget(int numOfReplicas,
+  private Node chooseTarget(final int numOfReplicas,
                             Node writer,
                             final Set<Node> excludedNodes,
                             final long blocksize,
@@ -469,7 +469,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     LOG.trace("storageTypes={}", storageTypes);
 
     try {
-      if ((numOfReplicas = requiredStorageTypes.size()) == 0) {
+      if (requiredStorageTypes.size() == 0) {
         throw new NotEnoughReplicasException(
             "All required storage types are unavailable: "
             + " unavailableStorages=" + unavailableStorages
@@ -498,10 +498,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
         for (DatanodeStorageInfo resultStorage : results) {
           addToExcludedNodes(resultStorage.getDatanodeDescriptor(), oldExcludedNodes);
         }
-        // Set numOfReplicas, since it can get out of sync with the result list
+        // Set newNumOfReplicas, since it can get out of sync with the result list
         // if the NotEnoughReplicasException was thrown in chooseRandom().
-        numOfReplicas = totalReplicasExpected - results.size();
-        return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
+        int newNumOfReplicas = totalReplicasExpected - results.size();
+        return chooseTarget(newNumOfReplicas, writer, oldExcludedNodes, blocksize,
             maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
             newBlock, null);
       }
@@ -520,8 +520,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
           addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
               oldExcludedNodes);
         }
-        numOfReplicas = totalReplicasExpected - results.size();
-        return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
+        int newNumOfReplicas = totalReplicasExpected - results.size();
+        return chooseTarget(newNumOfReplicas, writer, oldExcludedNodes, blocksize,
             maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
             newBlock, null);
       }

+ 53 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java

@@ -1300,6 +1300,59 @@ public class TestBlockStoragePolicy {
     Assert.assertEquals(StorageType.DISK, targets[1].getStorageType());
   }
 
+  /**
+   * Consider a File with All_SSD storage policy.
+   * 1. Choose 3 DISK DNs for pipeline because SSD DNs no enough at
+   * the beginning.
+   * 2. One of DISK DNs fails And it need choose one new DN for existing.
+   * pipeline {@link DataStreamer addDatanode2ExistingPipeline()}.
+   * Make sure the number of target DNs are 3.
+   * see HDFS-16182.
+   */
+  @Test
+  public void testAddDatanode2ExistingPipelineInSsd() throws Exception {
+    BlockStoragePolicy policy = POLICY_SUITE.getPolicy(ALLSSD);
+
+    final String[] racks = {"/d1/r1", "/d2/r2", "/d3/r3", "/d4/r4", "/d5/r5",
+        "/d6/r6", "/d7/r7"};
+    final String[] hosts = {"host1", "host2", "host3", "host4", "host5",
+        "host6", "host7"};
+    final StorageType[] disks = {StorageType.DISK, StorageType.DISK, StorageType.DISK};
+
+    final DatanodeStorageInfo[] diskStorages
+        = DFSTestUtil.createDatanodeStorageInfos(7, racks, hosts, disks);
+    final DatanodeDescriptor[] dataNodes
+        = DFSTestUtil.toDatanodeDescriptor(diskStorages);
+    for (int i = 0; i < dataNodes.length; i++) {
+      BlockManagerTestUtil.updateStorage(dataNodes[i],
+          new DatanodeStorage("ssd" + i + 1, DatanodeStorage.State.NORMAL,
+              StorageType.SSD));
+    }
+
+    FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+    File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        new File(baseDir, "name").getPath());
+    DFSTestUtil.formatNameNode(conf);
+    NameNode namenode = new NameNode(conf);
+
+    final BlockManager bm = namenode.getNamesystem().getBlockManager();
+    BlockPlacementPolicy replicator = bm.getBlockPlacementPolicy();
+    NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology();
+    for (DatanodeDescriptor datanode : dataNodes) {
+      cluster.add(datanode);
+    }
+    // chsenDs are DISK StorageType to simulate not enough SDD Storage
+    List<DatanodeStorageInfo> chsenDs = new ArrayList<>();
+    chsenDs.add(diskStorages[0]);
+    chsenDs.add(diskStorages[1]);
+    DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 1,
+        null, chsenDs, true,
+        new HashSet<Node>(), 0, policy, null);
+    Assert.assertEquals(3, targets.length);
+  }
+
   @Test
   public void testGetFileStoragePolicyAfterRestartNN() throws Exception {
     //HDFS8219