فهرست منبع

HDFS-16456. EC: Decommission a rack with only on dn will fail when the rack number is equal with replication. (#4358)

Wei-Chiu Chuang 2 سال پیش
والد
کامیت
4ebdc38634

+ 113 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java

@@ -97,6 +97,13 @@ public class NetworkTopology {
   private int depthOfAllLeaves = -1;
   /** rack counter */
   protected int numOfRacks = 0;
+  /** empty rack map, rackname->nodenumber. */
+  private HashMap<String, Set<String>> rackMap =
+      new HashMap<String, Set<String>>();
+  /** decommission nodes, contained stoped nodes. */
+  private HashSet<String> decommissionNodes = new HashSet<>();
+  /** empty rack counter. */
+  private int numOfEmptyRacks = 0;
 
   /**
    * Whether or not this cluster has ever consisted of more than 1 rack,
@@ -146,6 +153,7 @@ public class NetworkTopology {
         if (rack == null) {
           incrementRacks();
         }
+        interAddNodeWithEmptyRack(node);
         if (depthOfAllLeaves == -1) {
           depthOfAllLeaves = node.getLevel();
         }
@@ -222,6 +230,7 @@ public class NetworkTopology {
         if (rack == null) {
           numOfRacks--;
         }
+        interRemoveNodeWithEmptyRack(node);
       }
       LOG.debug("NetworkTopology became:\n{}", this);
     } finally {
@@ -946,4 +955,108 @@ public class NetworkTopology {
     Preconditions.checkState(idx == activeLen,
         "Sorted the wrong number of nodes!");
   }
+
+  /** @return the number of nonempty racks */
+  public int getNumOfNonEmptyRacks() {
+    return numOfRacks - numOfEmptyRacks;
+  }
+
+  /**
+   * Update empty rack number when add a node like recommission.
+   * @param node node to be added; can be null
+   */
+  public void recommissionNode(Node node) {
+    if (node == null) {
+      return;
+    }
+    if (node instanceof InnerNode) {
+      throw new IllegalArgumentException(
+          "Not allow to remove an inner node: " + NodeBase.getPath(node));
+    }
+    netlock.writeLock().lock();
+    try {
+      decommissionNodes.remove(node.getName());
+      interAddNodeWithEmptyRack(node);
+    } finally {
+      netlock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Update empty rack number when remove a node like decommission.
+   * @param node node to be added; can be null
+   */
+  public void decommissionNode(Node node) {
+    if (node == null) {
+      return;
+    }
+    if (node instanceof InnerNode) {
+      throw new IllegalArgumentException(
+          "Not allow to remove an inner node: " + NodeBase.getPath(node));
+    }
+    netlock.writeLock().lock();
+    try {
+      decommissionNodes.add(node.getName());
+      interRemoveNodeWithEmptyRack(node);
+    } finally {
+      netlock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Internal function for update empty rack number
+   * for add or recommission a node.
+   * @param node node to be added; can be null
+   */
+  private void interAddNodeWithEmptyRack(Node node) {
+    if (node == null) {
+      return;
+    }
+    String rackname = node.getNetworkLocation();
+    Set<String> nodes = rackMap.get(rackname);
+    if (nodes == null) {
+      nodes = new HashSet<String>();
+    }
+    if (!decommissionNodes.contains(node.getName())) {
+      nodes.add(node.getName());
+    }
+    rackMap.put(rackname, nodes);
+    countEmptyRacks();
+  }
+
+  /**
+   * Internal function for update empty rack number
+   * for remove or decommission a node.
+   * @param node node to be removed; can be null
+   */
+  private void interRemoveNodeWithEmptyRack(Node node) {
+    if (node == null) {
+      return;
+    }
+    String rackname = node.getNetworkLocation();
+    Set<String> nodes = rackMap.get(rackname);
+    if (nodes != null) {
+      InnerNode rack = (InnerNode) getNode(node.getNetworkLocation());
+      if (rack == null) {
+        // this node and its rack are both removed.
+        rackMap.remove(rackname);
+      } else if (nodes.contains(node.getName())) {
+        // this node is decommissioned or removed.
+        nodes.remove(node.getName());
+        rackMap.put(rackname, nodes);
+      }
+      countEmptyRacks();
+    }
+  }
+
+  private void countEmptyRacks() {
+    int count = 0;
+    for (Set<String> nodes : rackMap.values()) {
+      if (nodes != null && nodes.isEmpty()) {
+        count++;
+      }
+    }
+    numOfEmptyRacks = count;
+    LOG.debug("Current numOfEmptyRacks is {}", numOfEmptyRacks);
+  }
 }

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

@@ -346,7 +346,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
       totalNumOfReplicas = clusterSize;
     }
     // No calculation needed when there is only one rack or picking one node.
-    int numOfRacks = clusterMap.getNumOfRacks();
+    int numOfRacks = clusterMap.getNumOfNonEmptyRacks();
     // HDFS-14527 return default when numOfRacks = 0 to avoid
     // ArithmeticException when calc maxNodesPerRack at following logic.
     if (numOfRacks <= 1 || totalNumOfReplicas <= 1) {
@@ -1090,7 +1090,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     for (DatanodeInfo dn : locs)
       racks.add(dn.getNetworkLocation());
     return new BlockPlacementStatusDefault(racks.size(), minRacks,
-        clusterMap.getNumOfRacks());
+        clusterMap.getNumOfNonEmptyRacks());
   }
   /**
    * Decide whether deleting the specified replica of the block still makes
@@ -1277,4 +1277,3 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     this.preferLocalNode = prefer;
   }
 }
-

+ 30 - 29
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java

@@ -42,7 +42,7 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD
       totalNumOfReplicas = clusterSize;
     }
     // No calculation needed when there is only one rack or picking one node.
-    int numOfRacks = clusterMap.getNumOfRacks();
+    int numOfRacks = clusterMap.getNumOfNonEmptyRacks();
     // HDFS-14527 return default when numOfRacks = 0 to avoid
     // ArithmeticException when calc maxNodesPerRack at following logic.
     if (numOfRacks <= 1 || totalNumOfReplicas <= 1) {
@@ -90,38 +90,39 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD
                                  EnumMap<StorageType, Integer> storageTypes)
                                  throws NotEnoughReplicasException {
     int totalReplicaExpected = results.size() + numOfReplicas;
-    int numOfRacks = clusterMap.getNumOfRacks();
-    if (totalReplicaExpected < numOfRacks ||
-        totalReplicaExpected % numOfRacks == 0) {
-      writer = chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
-          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
-      return writer;
-    }
+    int numOfRacks = clusterMap.getNumOfNonEmptyRacks();
 
-    assert totalReplicaExpected > (maxNodesPerRack -1) * numOfRacks;
+    try {
+      if (totalReplicaExpected < numOfRacks ||
+          totalReplicaExpected % numOfRacks == 0) {
+        writer = chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+        return writer;
+      }
 
-    // Calculate numOfReplicas for filling each rack exactly (maxNodesPerRack-1)
-    // replicas.
-    HashMap<String, Integer> rackCounts = new HashMap<>();
-    for (DatanodeStorageInfo dsInfo : results) {
-      String rack = dsInfo.getDatanodeDescriptor().getNetworkLocation();
-      Integer count = rackCounts.get(rack);
-      if (count != null) {
-        rackCounts.put(rack, count + 1);
-      } else {
-        rackCounts.put(rack, 1);
+      assert totalReplicaExpected > (maxNodesPerRack -1) * numOfRacks;
+
+      // Calculate numOfReplicas for filling each rack exactly (maxNodesPerRack-1)
+      // replicas.
+      HashMap<String, Integer> rackCounts = new HashMap<>();
+      for (DatanodeStorageInfo dsInfo : results) {
+        String rack = dsInfo.getDatanodeDescriptor().getNetworkLocation();
+        Integer count = rackCounts.get(rack);
+        if (count != null) {
+          rackCounts.put(rack, count + 1);
+        } else {
+          rackCounts.put(rack, 1);
+        }
       }
-    }
-    int excess = 0; // Sum of the above (maxNodesPerRack-1) part of nodes in results
-    for (int count : rackCounts.values()) {
-      if (count > maxNodesPerRack -1) {
-        excess += count - (maxNodesPerRack -1);
+      int excess = 0; // Sum of the above (maxNodesPerRack-1) part of nodes in results
+      for (int count : rackCounts.values()) {
+        if (count > maxNodesPerRack -1) {
+          excess += count - (maxNodesPerRack -1);
+        }
       }
-    }
-    numOfReplicas = Math.min(totalReplicaExpected - results.size(),
-        (maxNodesPerRack -1) * numOfRacks - (results.size() - excess));
+      numOfReplicas = Math.min(totalReplicaExpected - results.size(),
+          (maxNodesPerRack -1) * numOfRacks - (results.size() - excess));
 
-    try {
       // Try to spread the replicas as evenly as possible across racks.
       // This is done by first placing with (maxNodesPerRack-1), then spreading
       // the remainder by calling again with maxNodesPerRack.
@@ -243,7 +244,7 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD
       racks.add(dn.getNetworkLocation());
     }
     return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas,
-        clusterMap.getNumOfRacks());
+        clusterMap.getNumOfNonEmptyRacks());
   }
 
   @Override

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java

@@ -226,6 +226,8 @@ public class DatanodeAdminManager {
     if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
       // Update DN stats maintained by HeartbeatManager
       hbManager.startDecommission(node);
+      // Update cluster's emptyRack
+      blockManager.getDatanodeManager().getNetworkTopology().decommissionNode(node);
       // hbManager.startDecommission will set dead node to decommissioned.
       if (node.isDecommissionInProgress()) {
         for (DatanodeStorageInfo storage : node.getStorageInfos()) {
@@ -250,6 +252,8 @@ public class DatanodeAdminManager {
     if (node.isDecommissionInProgress() || node.isDecommissioned()) {
       // Update DN stats maintained by HeartbeatManager
       hbManager.stopDecommission(node);
+      // Update cluster's emptyRack
+      blockManager.getDatanodeManager().getNetworkTopology().recommissionNode(node);
       // extra redundancy blocks will be detected and processed when
       // the dead node comes back and send in its full block report.
       if (node.isAlive()) {

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -7834,7 +7834,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             getBlockManager().getDatanodeManager().getNumOfDataNodes();
         int numOfRacks =
             getBlockManager().getDatanodeManager().getNetworkTopology()
-                .getNumOfRacks();
+                .getNumOfNonEmptyRacks();
         result = ECTopologyVerifier
             .getECTopologyVerifierResult(numOfRacks, numOfDataNodes, policies);
       }
@@ -8293,7 +8293,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     int numOfDataNodes =
         getBlockManager().getDatanodeManager().getNumOfDataNodes();
     int numOfRacks = getBlockManager().getDatanodeManager().getNetworkTopology()
-        .getNumOfRacks();
+        .getNumOfNonEmptyRacks();
     ErasureCodingPolicy[] enabledEcPolicies =
         getErasureCodingPolicyManager().getCopyOfEnabledPolicies();
     return ECTopologyVerifier
@@ -8351,4 +8351,3 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 }
-

+ 124 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java

@@ -19,25 +19,36 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.net.StaticMapping;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -168,6 +179,108 @@ public class TestBlockPlacementPolicyRackFaultTolerant {
     }
   }
 
+  /**
+   * Verify decommission a dn which is an only node in its rack.
+   */
+  @Test
+  public void testPlacementWithOnlyOneNodeInRackDecommission() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    final String[] racks = {"/RACK0", "/RACK0", "/RACK2", "/RACK3", "/RACK4", "/RACK5", "/RACK2"};
+    final String[] hosts = {"/host0", "/host1", "/host2", "/host3", "/host4", "/host5", "/host6"};
+
+    // enables DFSNetworkTopology
+    conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        BlockPlacementPolicyRackFaultTolerant.class,
+        BlockPlacementPolicy.class);
+    conf.setBoolean(DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY, true);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
+        DEFAULT_BLOCK_SIZE / 2);
+
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(7).racks(racks)
+        .hosts(hosts).build();
+    cluster.waitActive();
+    nameNodeRpc = cluster.getNameNodeRpc();
+    namesystem = cluster.getNamesystem();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    fs.enableErasureCodingPolicy("RS-3-2-1024k");
+    fs.setErasureCodingPolicy(new Path("/"), "RS-3-2-1024k");
+
+    final BlockManager bm = cluster.getNamesystem().getBlockManager();
+    final DatanodeManager dm = bm.getDatanodeManager();
+    assertTrue(dm.getNetworkTopology() instanceof DFSNetworkTopology);
+
+    String clientMachine = "/host4";
+    String clientRack = "/RACK4";
+    String src = "/test";
+
+    final DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager();
+    DatanodeDescriptor dnd4 = dnm.getDatanode(cluster.getDataNodes().get(4).getDatanodeId());
+    assertEquals(dnd4.getNetworkLocation(), clientRack);
+    dnm.getDatanodeAdminManager().startDecommission(dnd4);
+    short replication = 5;
+    short additionalReplication = 1;
+
+    try {
+      // Create the file with client machine
+      HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
+          clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
+          replication, DEFAULT_BLOCK_SIZE * 1024 * 10, null, null, false);
+
+      //test chooseTarget for new file
+      LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
+          null, null, fileStatus.getFileId(), null, null);
+      HashMap<String, Integer> racksCount = new HashMap<String, Integer>();
+      doTestLocatedBlockRacks(racksCount, replication, 4, locatedBlock);
+
+      //test chooseTarget for existing file.
+      LocatedBlock additionalLocatedBlock =
+          nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(),
+              locatedBlock.getBlock(), locatedBlock.getLocations(),
+              locatedBlock.getStorageIDs(), DatanodeInfo.EMPTY_ARRAY,
+              additionalReplication, clientMachine);
+
+      racksCount.clear();
+      doTestLocatedBlockRacks(racksCount, additionalReplication + replication,
+          4, additionalLocatedBlock);
+      assertEquals(racksCount.get("/RACK0"), (Integer)2);
+      assertEquals(racksCount.get("/RACK2"), (Integer)2);
+    } finally {
+      dnm.getDatanodeAdminManager().stopDecommission(dnd4);
+    }
+
+    //test if decommission succeeded
+    DatanodeDescriptor dnd3 = dnm.getDatanode(cluster.getDataNodes().get(3).getDatanodeId());
+    cluster.getNamesystem().writeLock();
+    try {
+      dm.getDatanodeAdminManager().startDecommission(dnd3);
+    } finally {
+      cluster.getNamesystem().writeUnlock();
+    }
+
+    // make sure the decommission finishes and the block in on 4 racks
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return dnd3.isDecommissioned();
+      }
+    }, 1000, 10 * 1000);
+
+    LocatedBlocks locatedBlocks =
+        cluster.getFileSystem().getClient().getLocatedBlocks(
+            src, 0, DEFAULT_BLOCK_SIZE);
+    assertEquals(4, bm.getDatanodeManager().
+        getNetworkTopology().getNumOfNonEmptyRacks());
+    for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
+      BlockPlacementStatus status = bm.getStriptedBlockPlacementPolicy()
+              .verifyBlockPlacement(block.getLocations(), 5);
+      Assert.assertTrue(status.isPlacementPolicySatisfied());
+    }
+  }
+
   private void shuffle(DatanodeInfo[] locs, String[] storageIDs) {
     int length = locs.length;
     Object[][] pairs = new Object[length][];
@@ -199,6 +312,17 @@ public class TestBlockPlacementPolicyRackFaultTolerant {
     assertTrue(maxCount - minCount <= 1);
   }
 
+  private void doTestLocatedBlockRacks(HashMap<String, Integer> racksCount, int replication,
+                                       int validracknum, LocatedBlock locatedBlock) {
+    assertEquals(replication, locatedBlock.getLocations().length);
+
+    for (DatanodeInfo node :
+        locatedBlock.getLocations()) {
+      addToRacksCount(node.getNetworkLocation(), racksCount);
+    }
+    assertEquals(validracknum, racksCount.size());
+  }
+
   private void addToRacksCount(String rack, HashMap<String, Integer> racksCount) {
     Integer count = racksCount.get(rack);
     if (count == null) {

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java

@@ -595,4 +595,20 @@ public class TestNetworkTopology {
           frequency.get(dataNodes[i]) > 0);
     }
   }
+
+  @Test
+  public void testAddAndRemoveNodeWithEmptyRack() {
+    DatanodeDescriptor n1 = DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d2/r3");
+    DatanodeDescriptor n2 = DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3");
+    DatanodeDescriptor n3 = DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3");
+
+    cluster.decommissionNode(n1);
+    assertEquals(6, cluster.getNumOfNonEmptyRacks());
+    cluster.decommissionNode(n2);
+    cluster.decommissionNode(n3);
+    assertEquals(5, cluster.getNumOfNonEmptyRacks());
+
+    cluster.recommissionNode(n1);
+    assertEquals(6, cluster.getNumOfNonEmptyRacks());
+  }
 }