浏览代码

HDFS-17646. Add Option to limit Balancer overUtilized nodes num in each iteration. (#7120). Contributed by Zhaobo Huang.

Reviewed-by: Haiyang Hu <huhaiyang926@126.com>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
Zhaobo Huang 7 月之前
父节点
当前提交
00cddf5bea

+ 34 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -208,6 +208,8 @@ public class Balancer {
       + "\n\t[-sortTopNodes]"
       + "\n\t[-sortTopNodes]"
       + "\tSort datanodes based on the utilization so "
       + "\tSort datanodes based on the utilization so "
       + "that highly utilized datanodes get scheduled first."
       + "that highly utilized datanodes get scheduled first."
+      + "\n\t[-limitOverUtilizedNum <specified maximum number of overUtilized datanodes>]"
+      + "\tLimit the maximum number of overUtilized datanodes."
       + "\n\t[-hotBlockTimeInterval]\tprefer to move cold blocks.";
       + "\n\t[-hotBlockTimeInterval]\tprefer to move cold blocks.";
 
 
   @VisibleForTesting
   @VisibleForTesting
@@ -227,6 +229,7 @@ public class Balancer {
   private final long maxSizeToMove;
   private final long maxSizeToMove;
   private final long defaultBlockSize;
   private final long defaultBlockSize;
   private final boolean sortTopNodes;
   private final boolean sortTopNodes;
+  private final int limitOverUtilizedNum;
   private final BalancerMetrics metrics;
   private final BalancerMetrics metrics;
 
 
   // all data node lists
   // all data node lists
@@ -352,6 +355,7 @@ public class Balancer {
     this.sourceNodes = p.getSourceNodes();
     this.sourceNodes = p.getSourceNodes();
     this.runDuringUpgrade = p.getRunDuringUpgrade();
     this.runDuringUpgrade = p.getRunDuringUpgrade();
     this.sortTopNodes = p.getSortTopNodes();
     this.sortTopNodes = p.getSortTopNodes();
+    this.limitOverUtilizedNum = p.getLimitOverUtilizedNum();
 
 
     this.maxSizeToMove = getLongBytes(conf,
     this.maxSizeToMove = getLongBytes(conf,
         DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY,
         DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY,
@@ -456,11 +460,18 @@ public class Balancer {
       sortOverUtilized(overUtilizedPercentage);
       sortOverUtilized(overUtilizedPercentage);
     }
     }
 
 
+    // Limit the maximum number of overUtilized datanodes
+    // If excludedOverUtilizedNum is greater than 0, The overUtilized nodes num is limited
+    int excludedOverUtilizedNum = Math.max(overUtilized.size() - limitOverUtilizedNum, 0);
+    if (excludedOverUtilizedNum > 0) {
+      limitOverUtilizedNum();
+    }
+
     logUtilizationCollections();
     logUtilizationCollections();
     metrics.setNumOfOverUtilizedNodes(overUtilized.size());
     metrics.setNumOfOverUtilizedNodes(overUtilized.size());
     metrics.setNumOfUnderUtilizedNodes(underUtilized.size());
     metrics.setNumOfUnderUtilizedNodes(underUtilized.size());
     
     
-    Preconditions.checkState(dispatcher.getStorageGroupMap().size()
+    Preconditions.checkState(dispatcher.getStorageGroupMap().size() - excludedOverUtilizedNum
         == overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size()
         == overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size()
            + belowAvgUtilized.size(),
            + belowAvgUtilized.size(),
         "Mismatched number of storage groups");
         "Mismatched number of storage groups");
@@ -484,6 +495,20 @@ public class Balancer {
     );
     );
   }
   }
 
 
+  private void limitOverUtilizedNum() {
+    Preconditions.checkState(overUtilized instanceof LinkedList,
+        "Collection overUtilized is not a LinkedList.");
+    LinkedList<Source> list = (LinkedList<Source>) overUtilized;
+
+    LOG.info("Limiting over-utilized nodes num, if using the '-sortTopNodes' param," +
+        " the overUtilized nodes of top will be retained");
+
+    int size = overUtilized.size();
+    for (int i = 0; i < size - limitOverUtilizedNum; i++) {
+      list.removeLast();
+    }
+  }
+
   private static long computeMaxSize2Move(final long capacity, final long remaining,
   private static long computeMaxSize2Move(final long capacity, final long remaining,
       final double utilizationDiff, final long max) {
       final double utilizationDiff, final long max) {
     final double diff = Math.abs(utilizationDiff);
     final double diff = Math.abs(utilizationDiff);
@@ -1071,6 +1096,14 @@ public class Balancer {
               b.setSortTopNodes(true);
               b.setSortTopNodes(true);
               LOG.info("Balancer will sort nodes by" +
               LOG.info("Balancer will sort nodes by" +
                   " capacity usage percentage to prioritize top used nodes");
                   " capacity usage percentage to prioritize top used nodes");
+            } else if ("-limitOverUtilizedNum".equalsIgnoreCase(args[i])) {
+              Preconditions.checkArgument(++i < args.length,
+                  "limitOverUtilizedNum value is missing: args = " + Arrays.toString(args));
+              int limitNum = Integer.parseInt(args[i]);
+              Preconditions.checkArgument(limitNum >= 0,
+                  "limitOverUtilizedNum must be non-negative");
+              LOG.info("Using a limitOverUtilizedNum of {}", limitNum);
+              b.setLimitOverUtilizedNum(limitNum);
             } else {
             } else {
               throw new IllegalArgumentException("args = "
               throw new IllegalArgumentException("args = "
                   + Arrays.toString(args));
                   + Arrays.toString(args));

+ 15 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java

@@ -50,6 +50,8 @@ final class BalancerParameters {
 
 
   private final boolean sortTopNodes;
   private final boolean sortTopNodes;
 
 
+  private final int limitOverUtilizedNum;
+
   static final BalancerParameters DEFAULT = new BalancerParameters();
   static final BalancerParameters DEFAULT = new BalancerParameters();
 
 
   private BalancerParameters() {
   private BalancerParameters() {
@@ -67,6 +69,7 @@ final class BalancerParameters {
     this.runDuringUpgrade = builder.runDuringUpgrade;
     this.runDuringUpgrade = builder.runDuringUpgrade;
     this.runAsService = builder.runAsService;
     this.runAsService = builder.runAsService;
     this.sortTopNodes = builder.sortTopNodes;
     this.sortTopNodes = builder.sortTopNodes;
+    this.limitOverUtilizedNum = builder.limitOverUtilizedNum;
     this.hotBlockTimeInterval = builder.hotBlockTimeInterval;
     this.hotBlockTimeInterval = builder.hotBlockTimeInterval;
   }
   }
 
 
@@ -110,6 +113,10 @@ final class BalancerParameters {
     return this.sortTopNodes;
     return this.sortTopNodes;
   }
   }
 
 
+  int getLimitOverUtilizedNum() {
+    return this.limitOverUtilizedNum;
+  }
+
   long getHotBlockTimeInterval() {
   long getHotBlockTimeInterval() {
     return this.hotBlockTimeInterval;
     return this.hotBlockTimeInterval;
   }
   }
@@ -120,12 +127,12 @@ final class BalancerParameters {
         + " max idle iteration = %s," + " #excluded nodes = %s,"
         + " max idle iteration = %s," + " #excluded nodes = %s,"
         + " #included nodes = %s," + " #source nodes = %s,"
         + " #included nodes = %s," + " #source nodes = %s,"
         + " #blockpools = %s," + " run during upgrade = %s,"
         + " #blockpools = %s," + " run during upgrade = %s,"
-        + " sort top nodes = %s,"
+        + " sort top nodes = %s," + " limit overUtilized nodes num = %s,"
         + " hot block time interval = %s]",
         + " hot block time interval = %s]",
         Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
         Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
         threshold, maxIdleIteration, excludedNodes.size(),
         threshold, maxIdleIteration, excludedNodes.size(),
         includedNodes.size(), sourceNodes.size(), blockpools.size(),
         includedNodes.size(), sourceNodes.size(), blockpools.size(),
-        runDuringUpgrade, sortTopNodes, hotBlockTimeInterval);
+        runDuringUpgrade, sortTopNodes, limitOverUtilizedNum, hotBlockTimeInterval);
   }
   }
 
 
   static class Builder {
   static class Builder {
@@ -141,6 +148,7 @@ final class BalancerParameters {
     private boolean runDuringUpgrade = false;
     private boolean runDuringUpgrade = false;
     private boolean runAsService = false;
     private boolean runAsService = false;
     private boolean sortTopNodes = false;
     private boolean sortTopNodes = false;
+    private int limitOverUtilizedNum = Integer.MAX_VALUE;
     private long hotBlockTimeInterval = 0;
     private long hotBlockTimeInterval = 0;
 
 
     Builder() {
     Builder() {
@@ -201,6 +209,11 @@ final class BalancerParameters {
       return this;
       return this;
     }
     }
 
 
+    Builder setLimitOverUtilizedNum(int overUtilizedNum) {
+      this.limitOverUtilizedNum = overUtilizedNum;
+      return this;
+    }
+
     BalancerParameters build() {
     BalancerParameters build() {
       return new BalancerParameters(this);
       return new BalancerParameters(this);
     }
     }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md

@@ -293,6 +293,7 @@ Usage:
               [-runDuringUpgrade]
               [-runDuringUpgrade]
               [-asService]
               [-asService]
               [-sortTopNodes]
               [-sortTopNodes]
+              [-limitOverUtilizedNum <specified maximum number of overUtilized datanodes>]
               [-hotBlockTimeInterval <specified time interval>]
               [-hotBlockTimeInterval <specified time interval>]
 
 
 | COMMAND\_OPTION | Description |
 | COMMAND\_OPTION | Description |
@@ -307,6 +308,7 @@ Usage:
 | `-runDuringUpgrade` | Whether to run the balancer during an ongoing HDFS upgrade. This is usually not desired since it will not affect used space on over-utilized machines. |
 | `-runDuringUpgrade` | Whether to run the balancer during an ongoing HDFS upgrade. This is usually not desired since it will not affect used space on over-utilized machines. |
 | `-asService` | Run Balancer as a long running service. |
 | `-asService` | Run Balancer as a long running service. |
 | `-sortTopNodes` | Sort datanodes based on the utilization so that highly utilized datanodes get scheduled first. |
 | `-sortTopNodes` | Sort datanodes based on the utilization so that highly utilized datanodes get scheduled first. |
+| `-limitOverUtilizedNum` | Limit the maximum number of overUtilized datanodes. |
 | `-hotBlockTimeInterval` | Prefer moving cold blocks i.e blocks associated with files accessed or modified before the specified time interval. |
 | `-hotBlockTimeInterval` | Prefer moving cold blocks i.e blocks associated with files accessed or modified before the specified time interval. |
 | `-h`\|`--help` | Display the tool usage and help information and exit. |
 | `-h`\|`--help` | Display the tool usage and help information and exit. |
 
 

+ 92 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerLongRunningTasks.java

@@ -672,6 +672,98 @@ public class TestBalancerLongRunningTasks {
     assertEquals(900, maxUsage);
     assertEquals(900, maxUsage);
   }
   }
 
 
+  @Test(timeout = 60000)
+  public void testBalancerWithLimitOverUtilizedNum() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    // Init the config (block size to 100)
+    initConf(conf);
+    conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 30000);
+
+    final long totalCapacity = 1000L;
+    final int diffBetweenNodes = 50;
+
+    // Set up the nodes with two groups:
+    // 5 over-utilized nodes with 80%, 85%, 90%, 95%, 100% usage
+    // 2 under-utilized nodes with 0%, 5% usage
+    // With sortTopNodes and limitOverUtilizedNum option, 100% used ones will be chosen
+    final int numOfOverUtilizedDn = 5;
+    final int numOfUnderUtilizedDn = 2;
+    final int totalNumOfDn = numOfOverUtilizedDn + numOfUnderUtilizedDn;
+    final long[] capacityArray = new long[totalNumOfDn];
+    Arrays.fill(capacityArray, totalCapacity);
+
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(totalNumOfDn)
+        .simulatedCapacities(capacityArray)
+        .build()) {
+      cluster.setDataNodesDead();
+      List<DataNode> dataNodes = cluster.getDataNodes();
+      // Create top used nodes
+      for (int i = 0; i < numOfOverUtilizedDn; i++) {
+        // Bring one node alive
+        DataNodeTestUtils.triggerHeartbeat(dataNodes.get(i));
+        DataNodeTestUtils.triggerBlockReport(dataNodes.get(i));
+        // Create nodes with: 80%, 85%, 90%, 95%, 100%
+        int nodeCapacity = (int) totalCapacity - diffBetweenNodes * (numOfOverUtilizedDn - i - 1);
+        TestBalancer.createFile(cluster, new Path("test_big" + i), nodeCapacity, (short) 1, 0);
+        cluster.setDataNodesDead();
+      }
+
+      // Create under utilized nodes
+      for (int i = numOfUnderUtilizedDn - 1; i >= 0; i--) {
+        int index = i + numOfOverUtilizedDn;
+        // Bring one node alive
+        DataNodeTestUtils.triggerHeartbeat(dataNodes.get(index));
+        DataNodeTestUtils.triggerBlockReport(dataNodes.get(index));
+        // Create nodes with: 5%, 0%
+        int nodeCapacity = diffBetweenNodes * i;
+        TestBalancer.createFile(cluster, new Path("test_small" + i), nodeCapacity, (short) 1, 0);
+        cluster.setDataNodesDead();
+      }
+
+      // Bring all nodes alive
+      cluster.triggerHeartbeats();
+      cluster.triggerBlockReports();
+      cluster.waitFirstBRCompleted(0, 6000);
+
+      final BalancerParameters balancerParameters = Balancer.Cli.parse(new String[] {
+          "-policy", BalancingPolicy.Node.INSTANCE.getName(),
+          "-threshold", "1",
+          "-sortTopNodes",
+          "-limitOverUtilizedNum", "1"
+      });
+
+      client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0)
+              .getUri(), ClientProtocol.class)
+              .getProxy();
+
+      // Set max-size-to-move to small number
+      // so only top two nodes will be chosen in one iteration
+      conf.setLong(DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY, 99L);
+      final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+      List<NameNodeConnector> connectors =
+          NameNodeConnector.newNameNodeConnectors(namenodes, Balancer.class.getSimpleName(),
+              Balancer.BALANCER_ID_PATH, conf, BalancerParameters.DEFAULT.getMaxIdleIteration());
+      final Balancer balancer = new Balancer(connectors.get(0), balancerParameters, conf);
+      Balancer.Result balancerResult = balancer.runOneIteration();
+
+      cluster.triggerDeletionReports();
+      cluster.triggerBlockReports();
+      cluster.triggerHeartbeats();
+
+      DatanodeInfo[] datanodeReport =
+          client.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
+      long maxUsage = 0;
+      for (int i = 0; i < totalNumOfDn; i++) {
+        maxUsage = Math.max(maxUsage, datanodeReport[i].getDfsUsed());
+      }
+      // The maxUsage value is 950, only 100% of the nodes will be balanced
+      assertEquals(950, maxUsage);
+      assertTrue("BalancerResult is not as expected. " + balancerResult,
+          (balancerResult.getBytesAlreadyMoved() == 100 && balancerResult.getBlocksMoved() == 1));
+    }
+  }
+
   @Test(timeout = 100000)
   @Test(timeout = 100000)
   public void testMaxIterationTime() throws Exception {
   public void testMaxIterationTime() throws Exception {
     final Configuration conf = new HdfsConfiguration();
     final Configuration conf = new HdfsConfiguration();