Procházet zdrojové kódy

HDFS-9314. Improve BlockPlacementPolicyDefault's picking of excess replicas. Contributed by Xiao Chen.

Change-Id: Idf99293085531165239369155c039b55db0eed83
Zhe Zhang před 9 roky
rodič
revize
670617f325

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -30,6 +30,9 @@ Release 2.7.3 - UNRELEASED
     HDFS-9313. Possible NullPointerException in BlockManager if no excess
     replica can be chosen. (mingma)
 
+    HDFS-9314. Improve BlockPlacementPolicyDefault's picking of excess
+    replicas. (Xiao Chen via zhz)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 31 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

@@ -887,8 +887,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
   /**
    * Decide whether deleting the specified replica of the block still makes
    * the block conform to the configured block placement policy.
-   * @param replicationFactor The required number of replicas for this block
-   * @param moreThanone The replica locations of this block that are present
+   * @param moreThanOne The replica locations of this block that are present
    *                    on more than one unique racks.
    * @param exactlyOne Replica locations of this block that  are present
    *                    on exactly one unique racks.
@@ -898,9 +897,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
    * @return the replica that is the best candidate for deletion
    */
   @VisibleForTesting
-  public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor,
-      Collection<DatanodeStorageInfo> moreThanone, Collection<DatanodeStorageInfo> exactlyOne,
-      final List<StorageType> excessTypes) {
+  public DatanodeStorageInfo chooseReplicaToDelete(
+      Collection<DatanodeStorageInfo> moreThanOne,
+      Collection<DatanodeStorageInfo> exactlyOne,
+      final List<StorageType> excessTypes,
+      Map<String, List<DatanodeStorageInfo>> rackMap) {
     long oldestHeartbeat =
       monotonicNow() - heartbeatInterval * tolerateHeartbeatMultiplier;
     DatanodeStorageInfo oldestHeartbeatStorage = null;
@@ -909,7 +910,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
 
     // Pick the node with the oldest heartbeat or with the least free space,
     // if all hearbeats are within the tolerable heartbeat interval
-    for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanone, exactlyOne)) {
+    for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanOne,
+        exactlyOne, rackMap)) {
       if (!excessTypes.contains(storage.getStorageType())) {
         continue;
       }
@@ -974,9 +976,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
           moreThanOne, excessTypes)) {
         cur = delNodeHintStorage;
       } else { // regular excessive replica removal
-        cur =
-            chooseReplicaToDelete((short) expectedNumOfReplicas, moreThanOne, exactlyOne,
-                excessTypes);
+        cur = chooseReplicaToDelete(moreThanOne, exactlyOne, excessTypes,
+            rackMap);
       }
       firstOne = false;
       if (cur == null) {
@@ -1018,12 +1019,29 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
    * Pick up replica node set for deleting replica as over-replicated. 
    * First set contains replica nodes on rack with more than one
    * replica while second set contains remaining replica nodes.
-   * So pick up first set if not empty. If first is empty, then pick second.
+   * If only 1 rack, pick all. If 2 racks, pick all that have more than
+   * 1 replicas on the same rack; if no such replicas, pick all.
+   * If 3 or more racks, pick all.
    */
   protected Collection<DatanodeStorageInfo> pickupReplicaSet(
-      Collection<DatanodeStorageInfo> first,
-      Collection<DatanodeStorageInfo> second) {
-    return first.isEmpty() ? second : first;
+      Collection<DatanodeStorageInfo> moreThanOne,
+      Collection<DatanodeStorageInfo> exactlyOne,
+      Map<String, List<DatanodeStorageInfo>> rackMap) {
+    Collection<DatanodeStorageInfo> ret = new ArrayList<>();
+    if (rackMap.size() == 2) {
+      for (List<DatanodeStorageInfo> dsi : rackMap.values()) {
+        if (dsi.size() >= 2) {
+          ret.addAll(dsi);
+        }
+      }
+    }
+    if (ret.isEmpty()) {
+      // Return all replicas if rackMap.size() != 2
+      // or rackMap.size() == 2 but no shared replicas on any rack
+      ret.addAll(moreThanOne);
+      ret.addAll(exactlyOne);
+    }
+    return ret;
   }
   
   @VisibleForTesting

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java

@@ -304,7 +304,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
   @Override
   public Collection<DatanodeStorageInfo> pickupReplicaSet(
       Collection<DatanodeStorageInfo> first,
-      Collection<DatanodeStorageInfo> second) {
+      Collection<DatanodeStorageInfo> second,
+      Map<String, List<DatanodeStorageInfo>> rackMap) {
     // If no replica within same rack, return directly.
     if (first.isEmpty()) {
       return second;

+ 66 - 16
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java

@@ -1043,22 +1043,22 @@ public class TestReplicationPolicy {
       // test returning null
       excessTypes.add(StorageType.SSD);
       assertNull(((BlockPlacementPolicyDefault) replicator)
-          .chooseReplicaToDelete((short) 3, first, second, excessTypes));
+          .chooseReplicaToDelete(first, second, excessTypes, rackMap));
     }
     excessTypes.add(StorageType.DEFAULT);
     DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
-        .chooseReplicaToDelete((short) 3, first, second, excessTypes);
-    // Within first set, storages[1] with less free space
-    assertEquals(chosen, storages[1]);
+        .chooseReplicaToDelete(first, second, excessTypes, rackMap);
+    // Within all storages, storages[5] with least free space
+    assertEquals(chosen, storages[5]);
 
     replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
-    assertEquals(0, first.size());
-    assertEquals(3, second.size());
-    // Within second set, storages[5] with less free space
+    assertEquals(2, first.size());
+    assertEquals(1, second.size());
+    // Within first set, storages[1] with less free space
     excessTypes.add(StorageType.DEFAULT);
     chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
-        (short)2, first, second, excessTypes);
-    assertEquals(chosen, storages[5]);
+        first, second, excessTypes, rackMap);
+    assertEquals(chosen, storages[1]);
   }
 
   @Test
@@ -1103,17 +1103,15 @@ public class TestReplicationPolicy {
         excessTypes, storages[3].getDatanodeDescriptor(), null);
     assertTrue(excessReplicas.contains(excessStorage));
 
-
     // The block was initially created on excessSSD(rack r1),
     // storages[4](rack r3) and storages[5](rack r3) with
-    // ONESSD_STORAGE_POLICY_NAME storage policy.
+    // ONESSD_STORAGE_POLICY_NAME storage policy. Replication factor = 3.
     // Right after balancer moves the block from storages[5] to
     // storages[3](rack r2), the application changes the storage policy from
     // ONESSD_STORAGE_POLICY_NAME to HOT_STORAGE_POLICY_ID. In this case,
-    // no replica can be chosen as the excessive replica as
-    // chooseReplicasToDelete only considers storages[4] and storages[5] that
-    // are the same rack. But neither's storage type is SSD.
-    // TODO BlockPlacementPolicyDefault should be able to delete excessSSD.
+    // we should be able to delete excessSSD since the remaining
+    // storages ({storages[3]}, {storages[4], storages[5]})
+    // are on different racks (r2, r3).
     nonExcess.clear();
     nonExcess.add(excessSSD);
     nonExcess.add(storages[3]);
@@ -1124,7 +1122,59 @@ public class TestReplicationPolicy {
     excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
         excessTypes, storages[3].getDatanodeDescriptor(),
         storages[5].getDatanodeDescriptor());
-    assertTrue(excessReplicas.size() == 0);
+    assertEquals(1, excessReplicas.size());
+    assertTrue(excessReplicas.contains(excessSSD));
+
+    // Similar to above, but after policy change and before deletion,
+    // the replicas are located on excessSSD(rack r1), storages[1](rack r1),
+    // storages[2](rack r2) and storages[3](rack r2). Replication factor = 3.
+    // In this case, we should be able to delete excessSSD since the remaining
+    // storages ({storages[1]} , {storages[2], storages[3]})
+    // are on different racks (r1, r2).
+    nonExcess.clear();
+    nonExcess.add(excessSSD);
+    nonExcess.add(storages[1]);
+    nonExcess.add(storages[2]);
+    nonExcess.add(storages[3]);
+    excessTypes = storagePolicy.chooseExcess((short) 3,
+        DatanodeStorageInfo.toStorageTypes(nonExcess));
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
+        excessTypes, storages[1].getDatanodeDescriptor(),
+        storages[3].getDatanodeDescriptor());
+    assertEquals(1, excessReplicas.size());
+    assertTrue(excessReplicas.contains(excessSSD));
+
+    // Similar to above, but after policy change and before deletion,
+    // the replicas are located on excessSSD(rack r1), storages[2](rack r2)
+    // Replication factor = 1. We should be able to delete excessSSD.
+    nonExcess.clear();
+    nonExcess.add(excessSSD);
+    nonExcess.add(storages[2]);
+    excessTypes = storagePolicy.chooseExcess((short) 1,
+        DatanodeStorageInfo.toStorageTypes(nonExcess));
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 1,
+        excessTypes, storages[2].getDatanodeDescriptor(), null);
+    assertEquals(1, excessReplicas.size());
+    assertTrue(excessReplicas.contains(excessSSD));
+
+    // The block was initially created on excessSSD(rack r1),
+    // storages[4](rack r3) and storages[5](rack r3) with
+    // ONESSD_STORAGE_POLICY_NAME storage policy. Replication factor = 2.
+    // In this case, no replica can be chosen as the excessive replica by
+    // chooseReplicasToDelete because if the SSD storage is deleted,
+    // the remaining storages[4] and storages[5] are the same rack (r3),
+    // violating block placement policy (i.e. the number of racks >= 2).
+    // TODO BlockPlacementPolicyDefault should be able to rebalance the replicas
+    // and then delete excessSSD.
+    nonExcess.clear();
+    nonExcess.add(excessSSD);
+    nonExcess.add(storages[4]);
+    nonExcess.add(storages[5]);
+    excessTypes = storagePolicy.chooseExcess((short) 2,
+        DatanodeStorageInfo.toStorageTypes(nonExcess));
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 2,
+        excessTypes, null, null);
+    assertEquals(0, excessReplicas.size());
   }
 
  @Test

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java

@@ -616,7 +616,7 @@ public class TestReplicationPolicyWithNodeGroup {
     List<StorageType> excessTypes = new ArrayList<StorageType>();
     excessTypes.add(StorageType.DEFAULT);
     DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
-        .chooseReplicaToDelete((short) 3, first, second, excessTypes);
+        .chooseReplicaToDelete(first, second, excessTypes, rackMap);
     // Within first set {dataNodes[0], dataNodes[1], dataNodes[2]}, 
     // dataNodes[0] and dataNodes[1] are in the same nodegroup, 
     // but dataNodes[1] is chosen as less free space
@@ -629,7 +629,7 @@ public class TestReplicationPolicyWithNodeGroup {
     // as less free space
     excessTypes.add(StorageType.DEFAULT);
     chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
-        (short) 2, first, second, excessTypes);
+        first, second, excessTypes, rackMap);
     assertEquals(chosen, storages[2]);
 
     replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
@@ -638,7 +638,7 @@ public class TestReplicationPolicyWithNodeGroup {
     // Within second set, dataNodes[5] with less free space
     excessTypes.add(StorageType.DEFAULT);
     chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
-        (short) 1, first, second, excessTypes);
+        first, second, excessTypes, rackMap);
     assertEquals(chosen, storages[5]);
   }
   

+ 8 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java

@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
 import com.google.common.base.Supplier;
@@ -629,12 +630,13 @@ public class TestDNFencing {
     }
 
     @Override
-    public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor,
-        Collection<DatanodeStorageInfo> first, Collection<DatanodeStorageInfo> second,
-        List<StorageType> excessTypes) {
-      
-      Collection<DatanodeStorageInfo> chooseFrom = !first.isEmpty() ? first : second;
-
+    public DatanodeStorageInfo chooseReplicaToDelete(
+        Collection<DatanodeStorageInfo> moreThanOne,
+        Collection<DatanodeStorageInfo> exactlyOne,
+        List<StorageType> excessTypes,
+        Map<String, List<DatanodeStorageInfo>> rackMap) {
+      Collection<DatanodeStorageInfo> chooseFrom = !moreThanOne.isEmpty() ?
+          moreThanOne : exactlyOne;
       List<DatanodeStorageInfo> l = Lists.newArrayList(chooseFrom);
       return l.get(DFSUtil.getRandom().nextInt(l.size()));
     }