Browse Source

HDFS-10966. Enhance Dispatcher logic on deciding when to give up a source DataNode. Contributed by Mark Wagner and Zhe Zhang.

Kihwal Lee 8 years ago
parent
commit
01ad9dee08

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -64,6 +64,9 @@ Release 2.7.4 - UNRELEASED
     HDFS-10872. Add MutableRate metrics for FSNamesystemLock operations.
     (Erik Krogen via zhz)
 
+    HDFS-10966. Enhance Dispatcher logic on deciding when to give up a source
+    DataNode (Mark Wagner and zhz via kihwal)
+
   OPTIMIZATIONS
 
     HDFS-10896. Move lock logging logic from FSNamesystem into FSNamesystemLock.

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -460,6 +460,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long    DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT = 10L*1024*1024; // 10MB
   public static final String  DFS_BALANCER_BLOCK_MOVE_TIMEOUT = "dfs.balancer.block-move.timeout";
   public static final int     DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0;
+  public static final String  DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.balancer.max-no-move-interval";
+  public static final int    DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute
 
   public static final String  DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
   public static final long    DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
@@ -467,6 +469,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_MOVER_MOVERTHREADS_DEFAULT = 1000;
   public static final String  DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY = "dfs.mover.retry.max.attempts";
   public static final int     DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT = 10;
+  public static final String  DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.mover.max-no-move-interval";
+  public static final int    DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute
 
   public static final String  DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
   public static final int     DFS_DATANODE_DEFAULT_PORT = 50010;

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -258,11 +258,14 @@ public class Balancer {
     final int blockMoveTimeout = conf.getInt(
         DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT,
         DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT);
+    final int maxNoMoveInterval = conf.getInt(
+        DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY,
+        DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT);
 
     this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
         p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
         maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize,
-        blockMoveTimeout, conf);
+        blockMoveTimeout,maxNoMoveInterval, conf);
     this.threshold = p.threshold;
     this.policy = p.policy;
 

+ 21 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java

@@ -122,6 +122,11 @@ public class Dispatcher {
   private final long getBlocksSize;
   private final long getBlocksMinBlockSize;
   private final long blockMoveTimeout;
+  /**
+   * If no block can be moved out of a {@link Source} after this configured
+   * amount of time, the Source should give up choosing the next possible move.
+   */
+  private final int maxNoMoveInterval;
 
   static class Allocator {
     private final int max;
@@ -793,7 +798,7 @@ public class Dispatcher {
      */
     private void dispatchBlocks() {
       this.blocksToReceive = 2 * getScheduledSize();
-      int noPendingMoveIteration = 0;
+      long previousMoveTimestamp = Time.monotonicNow();
       while (getScheduledSize() > 0 && !isIterationOver()
           && (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
         if (LOG.isTraceEnabled()) {
@@ -803,8 +808,8 @@ public class Dispatcher {
         }
         final PendingMove p = chooseNextMove();
         if (p != null) {
-          // Reset no pending move counter
-          noPendingMoveIteration=0;
+          // Reset previous move timestamp
+          previousMoveTimestamp = Time.monotonicNow();
           executePendingMove(p);
           continue;
         }
@@ -823,13 +828,11 @@ public class Dispatcher {
             return;
           }
         } else {
-          // source node cannot find a pending block to move, iteration +1
-          noPendingMoveIteration++;
-          // in case no blocks can be moved for source node's task,
-          // jump out of while-loop after 5 iterations.
-          if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) {
-            LOG.info("Failed to find a pending move "  + noPendingMoveIteration
-                + " times.  Skipping " + this);
+          // jump out of while-loop after the configured timeout.
+          long noMoveInterval = Time.monotonicNow() - previousMoveTimestamp;
+          if (noMoveInterval > maxNoMoveInterval) {
+            LOG.info("Failed to find a pending move for "  + noMoveInterval
+                + " ms.  Skipping " + this);
             resetScheduledSize();
           }
         }
@@ -840,6 +843,9 @@ public class Dispatcher {
           synchronized (Dispatcher.this) {
             Dispatcher.this.wait(1000); // wait for targets/sources to be idle
           }
+          // Didn't find a possible move in this iteration of the while loop,
+          // adding a small delay before choosing next move again.
+          Thread.sleep(100);
         } catch (InterruptedException ignored) {
         }
       }
@@ -864,17 +870,18 @@ public class Dispatcher {
   /** Constructor called by Mover. */
   public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
       Set<String> excludedNodes, long movedWinWidth, int moverThreads,
-      int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) {
+      int dispatcherThreads, int maxConcurrentMovesPerNode,
+      int maxNoMoveInterval, Configuration conf) {
     this(nnc, includedNodes, excludedNodes, movedWinWidth,
         moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
-        0L, 0L, 0,  conf);
+        0L, 0L, 0, maxNoMoveInterval, conf);
   }
 
   Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
       Set<String> excludedNodes, long movedWinWidth, int moverThreads,
       int dispatcherThreads, int maxConcurrentMovesPerNode,
       long getBlocksSize, long getBlocksMinBlockSize,
-      int blockMoveTimeout, Configuration conf) {
+      int blockMoveTimeout, int maxNoMoveInterval, Configuration conf) {
     this.nnc = nnc;
     this.excludedNodes = excludedNodes;
     this.includedNodes = includedNodes;
@@ -890,6 +897,7 @@ public class Dispatcher {
     this.getBlocksSize = getBlocksSize;
     this.getBlocksMinBlockSize = getBlocksMinBlockSize;
     this.blockMoveTimeout = blockMoveTimeout;
+    this.maxNoMoveInterval = maxNoMoveInterval;
 
     this.saslClient = new SaslDataTransferClient(conf,
         DataTransferSaslUtil.getSaslPropertiesResolver(conf),

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java

@@ -124,13 +124,16 @@ public class Mover {
     final int maxConcurrentMovesPerNode = conf.getInt(
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+    final int maxNoMoveInterval = conf.getInt(
+        DFSConfigKeys.DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY,
+        DFSConfigKeys.DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT);
     this.retryMaxAttempts = conf.getInt(
         DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY,
         DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT);
     this.retryCount = retryCount;
     this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(),
         Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
-        maxConcurrentMovesPerNode, conf);
+        maxConcurrentMovesPerNode, maxNoMoveInterval, conf);
     this.storages = new StorageMap();
     this.targetPaths = nnc.getTargetPaths();
     this.blockStoragePolicies = new BlockStoragePolicy[1 <<

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -695,6 +695,16 @@
   </description>
 </property>
 
+ <property>
+  <name>dfs.mover.max-no-move-interval</name>
+  <value>60000</value>
+  <description>
+    If this specified amount of time has elapsed and no block has been moved
+    out of a source DataNode, on more effort will be made to move blocks out of
+    this DataNode in the current Mover iteration.
+  </description>
+</property>
+
 <property>
   <name>dfs.hosts</name>
   <value></value>
@@ -2554,6 +2564,16 @@
     </description>
   </property>
 
+<property>
+  <name>dfs.balancer.max-no-move-interval</name>
+  <value>60000</value>
+  <description>
+    If this specified amount of time has elapsed and no block has been moved
+    out of a source DataNode, on more effort will be made to move blocks out of
+    this DataNode in the current Balancer iteration.
+  </description>
+</property>
+
   <property>
     <name>dfs.lock.suppress.warning.interval</name>
     <value>10s</value>

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java

@@ -219,6 +219,7 @@ public class TestBalancer {
 
     conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
     conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
+    conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000);
   }
 
   static void initConfWithRamDisk(Configuration conf) {
@@ -228,6 +229,7 @@ public class TestBalancer {
     conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
     conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
     conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, DEFAULT_RAM_DISK_BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000);
 
     conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
   }