瀏覽代碼

svn merge -c 1605565 from trunk for HDFS-6595. Allow the maximum threads for balancing on datanodes to be configurable.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1605566 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 11 年之前
父節點
當前提交
3a07f1fc55

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -222,6 +222,9 @@ Release 2.5.0 - UNRELEASED
     HDFS-6593. Move SnapshotDiffInfo out of INodeDirectorySnapshottable.
     (Jing Zhao via wheat9)
 
+    HDFS-6595. Allow the maximum threads for balancing on datanodes to be
+    configurable. (Benoy Antony via szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -105,6 +105,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address";
   public static final String  DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec";
   public static final long    DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024;
+  public static final String  DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY = "dfs.datanode.balance.max.concurrent.moves";
+  public static final int     DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT = 5;
   public static final String  DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
   public static final long    DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB
   public static final String  DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes";

+ 17 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -189,7 +189,6 @@ public class Balancer {
   /** The maximum number of concurrent blocks moves for 
    * balancing purpose at a datanode
    */
-  public static final int MAX_NUM_CONCURRENT_MOVES = 5;
   private static final int MAX_NO_PENDING_BLOCK_ITERATIONS = 5;
   public static final long DELAY_AFTER_ERROR = 10 * 1000L; //10 seconds
   public static final int BLOCK_MOVE_READ_TIMEOUT=20*60*1000; // 20 minutes
@@ -231,6 +230,7 @@ public class Balancer {
 
   private final ExecutorService moverExecutor;
   private final ExecutorService dispatcherExecutor;
+  private final int maxConcurrentMovesPerNode;
 
   /* This class keeps track of a scheduled block move */
   private class PendingBlockMove {
@@ -516,8 +516,8 @@ public class Balancer {
     private long scheduledSize = 0L;
     protected long delayUntil = 0L;
     //  blocks being moved but not confirmed yet
-    private final List<PendingBlockMove> pendingBlocks =
-      new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES); 
+    private final List<PendingBlockMove> pendingBlocks;
+    private final int maxConcurrentMoves;
     
     @Override
     public String toString() {
@@ -528,7 +528,8 @@ public class Balancer {
     /* Constructor 
      * Depending on avgutil & threshold, calculate maximum bytes to move 
      */
-    private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold) {
+    private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold,
+        int maxConcurrentMoves) {
       datanode = node;
       utilization = policy.getUtilization(node);
       final double avgUtil = policy.getAvgUtilization();
@@ -545,6 +546,8 @@ public class Balancer {
         maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
       }
       this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+      this.maxConcurrentMoves = maxConcurrentMoves;
+      this.pendingBlocks = new ArrayList<PendingBlockMove>(maxConcurrentMoves);
     }
     
     /** Get the datanode */
@@ -606,7 +609,7 @@ public class Balancer {
     
     /* Check if the node can schedule more blocks to move */
     synchronized private boolean isPendingQNotFull() {
-      if ( pendingBlocks.size() < MAX_NUM_CONCURRENT_MOVES ) {
+      if ( pendingBlocks.size() < this.maxConcurrentMoves ) {
         return true;
       }
       return false;
@@ -655,8 +658,9 @@ public class Balancer {
             = new ArrayList<BalancerBlock>();
     
     /* constructor */
-    private Source(DatanodeInfo node, BalancingPolicy policy, double threshold) {
-      super(node, policy, threshold);
+    private Source(DatanodeInfo node, BalancingPolicy policy, double threshold,
+        int maxConcurrentMoves) {
+      super(node, policy, threshold, maxConcurrentMoves);
     }
     
     /** Add a node task */
@@ -869,6 +873,9 @@ public class Balancer {
     this.dispatcherExecutor = Executors.newFixedThreadPool(
             conf.getInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
                         DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
+    this.maxConcurrentMovesPerNode =
+        conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
   }
   
   /* Given a data node set, build a network topology and decide
@@ -908,7 +915,7 @@ public class Balancer {
       BalancerDatanode datanodeS;
       final double avg = policy.getAvgUtilization();
       if (policy.getUtilization(datanode) > avg) {
-        datanodeS = new Source(datanode, policy, threshold);
+        datanodeS = new Source(datanode, policy, threshold, maxConcurrentMovesPerNode);
         if (isAboveAvgUtilized(datanodeS)) {
           this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
         } else {
@@ -919,7 +926,8 @@ public class Balancer {
               -threshold)*datanodeS.datanode.getCapacity()/100.0);
         }
       } else {
-        datanodeS = new BalancerDatanode(datanode, policy, threshold);
+        datanodeS = new BalancerDatanode(datanode, policy, threshold,
+            maxConcurrentMovesPerNode);
         if ( isBelowOrEqualAvgUtilized(datanodeS)) {
           this.belowAvgUtilizedDatanodes.add(datanodeS);
         } else {

+ 9 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java

@@ -63,14 +63,17 @@ class DataXceiverServer implements Runnable {
    */
   static class BlockBalanceThrottler extends DataTransferThrottler {
    private int numThreads;
+   private int maxThreads;
    
    /**Constructor
     * 
     * @param bandwidth Total amount of bandwidth can be used for balancing 
     */
-   private BlockBalanceThrottler(long bandwidth) {
+   private BlockBalanceThrottler(long bandwidth, int maxThreads) {
      super(bandwidth);
+     this.maxThreads = maxThreads;
      LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
+     LOG.info("Number threads for balancing is "+ maxThreads);
    }
    
    /** Check if the block move can start. 
@@ -79,7 +82,7 @@ class DataXceiverServer implements Runnable {
     * the counter is incremented; False otherwise.
     */
    synchronized boolean acquire() {
-     if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) {
+     if (numThreads >= maxThreads) {
        return false;
      }
      numThreads++;
@@ -120,8 +123,10 @@ class DataXceiverServer implements Runnable {
     
     //set up parameter for cluster balancing
     this.balanceThrottler = new BlockBalanceThrottler(
-      conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 
-                   DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT));
+        conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
+            DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT),
+        conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+            DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT));
   }
 
   @Override

+ 21 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java

@@ -370,8 +370,13 @@ public class TestBalancer {
     // start rebalancing
     Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
     final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
-    assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
-
+    if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 
+        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
+      assertEquals(Balancer.ReturnStatus.NO_MOVE_PROGRESS.code, r);
+      return;
+    } else {
+      assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
+    }
     waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
     LOG.info("Rebalancing with default ctor.");
     waitForBalancer(totalUsedSpace, totalCapacity, client, cluster);
@@ -462,6 +467,20 @@ public class TestBalancer {
         new String[] {RACK0, RACK1});
   }
   
+  @Test(timeout=100000)
+  public void testBalancerWithZeroThreadsForMove() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 0);
+    testBalancer1Internal (conf);
+  }
+
+  @Test(timeout=100000)
+  public void testBalancerWithNonZeroThreadsForMove() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 8);
+    testBalancer1Internal (conf);
+  }
+  
   @Test(timeout=100000)
   public void testBalancer2() throws Exception {
     testBalancer2Internal(new HdfsConfiguration());