瀏覽代碼

HDFS-16974. Consider volumes average load of each DataNode when choosing target. (#5541). Contributed by Shuyan Zhang.

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
zhangshuyan 2 年之前
父節點
當前提交
0185afafea

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -271,6 +271,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.namenode.redundancy.considerLoad.factor";
   public static final double
       DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR_DEFAULT = 2.0;
+  public static final String DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_KEY =
+      "dfs.namenode.redundancy.considerLoadByVolume";
+  public static final boolean
+      DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_DEFAULT
+      = false;
   public static final String DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY;
   public static final int DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT = 3;

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

@@ -82,6 +82,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     NOT_IN_SERVICE("the node is not in service"),
     NODE_STALE("the node is stale"),
     NODE_TOO_BUSY("the node is too busy"),
+    NODE_TOO_BUSY_BY_VOLUME("the node is too busy based on volume load"),
     TOO_MANY_NODES_ON_RACK("the rack has too many chosen nodes"),
     NOT_ENOUGH_STORAGE_SPACE("not enough storage space to place the block"),
     NO_REQUIRED_STORAGE_TYPE("required storage types are unavailable"),
@@ -101,6 +102,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
   protected boolean considerLoad;
   private boolean considerLoadByStorageType;
   protected double considerLoadFactor;
+  private boolean considerLoadByVolume = false;
   private boolean preferLocalNode;
   private boolean dataNodePeerStatsEnabled;
   private volatile boolean excludeSlowNodesEnabled;
@@ -131,6 +133,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     this.considerLoadFactor = conf.getDouble(
         DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR,
         DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR_DEFAULT);
+    this.considerLoadByVolume = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_KEY,
+        DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_DEFAULT
+    );
     this.stats = stats;
     this.clusterMap = clusterMap;
     this.host2datanodeMap = host2datanodeMap;
@@ -1007,6 +1013,16 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
           "(load: " + nodeLoad + " > " + maxLoad + ")");
       return true;
     }
+    if (considerLoadByVolume) {
+      final int numVolumesAvailable = node.getNumVolumesAvailable();
+      final double maxLoadForVolumes = considerLoadFactor * numVolumesAvailable *
+          stats.getInServiceXceiverAverageForVolume();
+      if (maxLoadForVolumes > 0.0 && nodeLoad > maxLoadForVolumes) {
+        logNodeIsNotChosen(node, NodeNotChosenReason.NODE_TOO_BUSY_BY_VOLUME,
+            "(load: " + nodeLoad + " > " + maxLoadForVolumes + ") ");
+        return true;
+      }
+    }
     return false;
   }
 

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java

@@ -233,6 +233,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
   // HB processing can use it to tell if it is the first HB since DN restarted
   private boolean heartbeatedSinceRegistration = false;
 
+  /** The number of volumes that can be written.*/
+  private int numVolumesAvailable = 0;
+
   /**
    * DatanodeDescriptor constructor
    * @param nodeID id of the data node
@@ -411,6 +414,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
     long totalNonDfsUsed = 0;
     Set<String> visitedMount = new HashSet<>();
     Set<DatanodeStorageInfo> failedStorageInfos = null;
+    int volumesAvailable = 0;
 
     // Decide if we should check for any missing StorageReport and mark it as
     // failed. There are different scenarios.
@@ -489,7 +493,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
           visitedMount.add(mount);
         }
       }
+      if (report.getRemaining() > 0 && storage.getState() != State.FAILED) {
+        volumesAvailable += 1;
+      }
     }
+    this.numVolumesAvailable = volumesAvailable;
 
     // Update total metrics for the node.
     setCapacity(totalCapacity);
@@ -981,6 +989,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return volumeFailureSummary;
   }
 
+  /**
+   * Return the number of volumes that can be written.
+   * @return the number of volumes that can be written.
+   */
+  public int getNumVolumesAvailable() {
+    return numVolumesAvailable;
+  }
+
   /**
    * @param nodeReg DatanodeID to update registration for.
    */

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -2101,6 +2101,17 @@ public class DatanodeManager {
         return avgLoad;
       }
 
+      @Override
+      public double getInServiceXceiverAverageForVolume() {
+        double avgLoad = 0;
+        final int volumes = heartbeatManager.getInServiceAvailableVolumeCount();
+        if (volumes > 0) {
+          final long xceivers = heartbeatManager.getInServiceXceiverCount();
+          avgLoad = (double)xceivers/volumes;
+        }
+        return avgLoad;
+      }
+
       @Override
       public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
         return heartbeatManager.getStorageTypeStats();

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java

@@ -60,7 +60,9 @@ public interface DatanodeStatistics {
   
   /** @return number of non-decommission(ing|ed) nodes */
   public int getNumDatanodesInService();
-  
+
+  /** @return average xceiver count for writable volumes. */
+  int getInServiceAvailableVolumeCount();
   /**
    * @return the total used space by data nodes for non-DFS purposes
    * such as storing temporary files on the local file system

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java

@@ -44,6 +44,7 @@ class DatanodeStats {
 
   private int nodesInService = 0;
   private int nodesInServiceXceiverCount = 0;
+  private int nodesInServiceAvailableVolumeCount = 0;
   private int expiredHeartbeats = 0;
 
   synchronized void add(final DatanodeDescriptor node) {
@@ -58,6 +59,7 @@ class DatanodeStats {
       capacityRemaining += node.getRemaining();
       cacheCapacity += node.getCacheCapacity();
       cacheUsed += node.getCacheUsed();
+      nodesInServiceAvailableVolumeCount += node.getNumVolumesAvailable();
     } else if (node.isDecommissionInProgress() ||
         node.isEnteringMaintenance()) {
       cacheCapacity += node.getCacheCapacity();
@@ -87,6 +89,7 @@ class DatanodeStats {
       capacityRemaining -= node.getRemaining();
       cacheCapacity -= node.getCacheCapacity();
       cacheUsed -= node.getCacheUsed();
+      nodesInServiceAvailableVolumeCount -= node.getNumVolumesAvailable();
     } else if (node.isDecommissionInProgress() ||
         node.isEnteringMaintenance()) {
       cacheCapacity -= node.getCacheCapacity();
@@ -149,6 +152,10 @@ class DatanodeStats {
     return nodesInServiceXceiverCount;
   }
 
+  synchronized int getNodesInServiceAvailableVolumeCount() {
+    return nodesInServiceAvailableVolumeCount;
+  }
+
   synchronized int getExpiredHeartbeats() {
     return expiredHeartbeats;
   }

+ 12 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java

@@ -53,14 +53,24 @@ public interface FSClusterStats {
   public int getNumDatanodesInService();
 
   /**
-   * an indication of the average load of non-decommission(ing|ed) nodes
-   * eligible for block placement
+   * An indication of the average load of non-decommission(ing|ed) nodes
+   * eligible for block placement.
    *
    * @return average of the in service number of block transfers and block
    *         writes that are currently occurring on the cluster.
    */
   public double getInServiceXceiverAverage();
 
+  /**
+   * An indication of the average load of volumes at non-decommission(ing|ed)
+   * nodes eligible for block placement.
+   *
+   * @return average of in service number of block transfers and block
+   *         writes that are currently occurring on the volumes of the
+   *         cluster.
+   */
+  double getInServiceXceiverAverageForVolume();
+
   /**
    * Indicates the storage statistics per storage type.
    * @return storage statistics per storage type.

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java

@@ -183,6 +183,11 @@ class HeartbeatManager implements DatanodeStatistics {
   public int getNumDatanodesInService() {
     return stats.getNodesInService();
   }
+
+  @Override
+  public int getInServiceAvailableVolumeCount() {
+    return stats.getNodesInServiceAvailableVolumeCount();
+  }
   
   @Override
   public long getCacheCapacity() {

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -334,6 +334,14 @@
     </description>
   </property>
 
+  <property>
+    <name>dfs.namenode.redundancy.considerLoadByVolume</name>
+    <value>false</value>
+    <description>Decide if chooseTarget considers the target's volume load or
+      not.
+    </description>
+  </property>
+
 <property>
   <name>dfs.namenode.read.considerLoad</name>
   <value>false</value>

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java

@@ -56,13 +56,13 @@ abstract public class BaseReplicationPolicyTest {
   protected String blockPlacementPolicy;
   protected NamenodeProtocols nameNodeRpc = null;
 
-  static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
+  void updateHeartbeatWithUsage(DatanodeDescriptor dn,
     long capacity, long dfsUsed, long remaining, long blockPoolUsed,
     long dnCacheCapacity, long dnCacheUsed, int xceiverCount,
     int volFailures) {
     dn.getStorageInfos()[0].setUtilizationForTesting(
         capacity, dfsUsed, remaining, blockPoolUsed);
-    dn.updateHeartbeat(
+    dnManager.getHeartbeatManager().updateHeartbeat(dn,
         BlockManagerTestUtil.getStorageReportsForDatanode(dn),
         dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
   }

+ 169 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyRatioConsiderLoadWithStorage.java

@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Verify that chooseTarget can exclude nodes with high volume average load.
+ */
+public class TestReplicationPolicyRatioConsiderLoadWithStorage
+    extends BaseReplicationPolicyTest {
+
+  public TestReplicationPolicyRatioConsiderLoadWithStorage() {
+    this.blockPlacementPolicy = BlockPlacementPolicyDefault.class.getName();
+  }
+
+  @Override
+  DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) {
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
+        true);
+    conf.setDouble(DFSConfigKeys
+        .DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR, 2);
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_KEY, true);
+
+    final String[] racks = {
+        "/rack1",
+        "/rack2",
+        "/rack3",
+        "/rack4",
+        "/rack5"};
+    storages = DFSTestUtil.createDatanodeStorageInfos(racks);
+    DatanodeDescriptor[] descriptors =
+        DFSTestUtil.toDatanodeDescriptor(storages);
+    long storageCapacity =
+        2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE;
+    // Each datanode has 6 storages, but the number of available storages
+    // varies.
+    for (int i = 0; i < descriptors.length; i++) {
+      for (int j = 0; j < 5; j++) {
+        DatanodeStorage s =
+            new DatanodeStorage("s" + i + j);
+        descriptors[i].updateStorage(s);
+
+      }
+      for (int j = 0; j < descriptors[i].getStorageInfos().length; j++) {
+        DatanodeStorageInfo dsInfo = descriptors[i].getStorageInfos()[j];
+        if (j > i + 1) {
+          dsInfo.setUtilizationForTesting(storageCapacity, storageCapacity, 0,
+              storageCapacity);
+        } else {
+          dsInfo.setUtilizationForTesting(storageCapacity, 0, storageCapacity,
+              0);
+        }
+      }
+    }
+    return descriptors;
+  }
+
+  /**
+   * Tests that chooseTarget with considerLoad and consider volume load set to
+   * true and correctly calculates load.
+   */
+  @Test
+  public void testChooseTargetWithRatioConsiderLoad() {
+    namenode.getNamesystem().writeLock();
+    try {
+      // After heartbeat has been processed, the total load should be 200.
+      // And average load per node should be 40. The max load should be 2 * 40;
+      // And average load per storage should be 10. Considering available
+      // storages, the max load should be:
+      // 2*10*2, 3*10*2, 4*10*2, 5*10*2, 6*10*2.
+      // Considering the load of every node and number of storages:
+      // Index:             0,   1,   2,   3,   4
+      // Available Storage: 2,   3,   4,   5,   6
+      // Load:             50, 110,  28,   2,  10
+      // So, dataNodes[1] should be never chosen because over-load of node.
+      // And dataNodes[0] should be never chosen because over-load of per
+      // storage.
+      dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[0],
+          BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[0]),
+          dataNodes[0].getCacheCapacity(),
+          dataNodes[0].getCacheUsed(),
+          50, 0, null);
+      dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[1],
+          BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[1]),
+          dataNodes[0].getCacheCapacity(),
+          dataNodes[0].getCacheUsed(),
+          110, 0, null);
+      dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[2],
+          BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[2]),
+          dataNodes[0].getCacheCapacity(),
+          dataNodes[0].getCacheUsed(),
+          28, 0, null);
+      dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[3],
+          BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
+          dataNodes[0].getCacheCapacity(),
+          dataNodes[0].getCacheUsed(),
+          2, 0, null);
+      dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[4],
+          BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[4]),
+          dataNodes[0].getCacheCapacity(),
+          dataNodes[0].getCacheUsed(),
+          10, 0, null);
+
+      Set<DatanodeDescriptor> targetSet = new HashSet<>();
+
+      // Try to choose 3 datanode targets.
+      DatanodeDescriptor writerDn = dataNodes[2];
+      DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
+          .getBlockPlacementPolicy()
+          .chooseTarget("testFile.txt", 3, writerDn, new ArrayList<>(), false,
+              null, 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
+      // The result contains 3 nodes(dataNodes[2],dataNodes[3],dataNodes[4]).
+      assertEquals(3, targets.length);
+      for (DatanodeStorageInfo dsi : targets) {
+        targetSet.add(dsi.getDatanodeDescriptor());
+      }
+      assertTrue(targetSet.contains(dataNodes[2]));
+      assertTrue(targetSet.contains(dataNodes[3]));
+      assertTrue(targetSet.contains(dataNodes[4]));
+
+      // Try to choose 4 datanode targets.
+      targets = namenode.getNamesystem().getBlockManager()
+          .getBlockPlacementPolicy()
+          .chooseTarget("testFile.txt", 4, writerDn, new ArrayList<>(), false,
+              null, 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
+      // The result contains 3 nodes(dataNodes[2],dataNodes[3],dataNodes[4]).
+      assertEquals(3, targets.length);
+      targetSet.clear();
+      for (DatanodeStorageInfo dsi : targets) {
+        targetSet.add(dsi.getDatanodeDescriptor());
+      }
+      assertTrue(targetSet.contains(dataNodes[2]));
+      assertTrue(targetSet.contains(dataNodes[3]));
+      assertTrue(targetSet.contains(dataNodes[4]));
+    } finally {
+      namenode.getNamesystem().writeUnlock();
+    }
+  }
+}