Prechádzať zdrojové kódy

HDFS-14861. Reset LowRedundancyBlocks Iterator periodically. Contributed by Stephen O'Donnell.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
Stephen O'Donnell 5 rokov pred
rodič
commit
900430b990

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -244,6 +244,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY;
   public static final int DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT = 3;
+  public static final String DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS =
+      "dfs.namenode.redundancy.queue.restart.iterations";
+  public static final int
+      DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT = 2400;
   public static final String  DFS_NAMENODE_REPLICATION_MIN_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
   public static final int     DFS_NAMENODE_REPLICATION_MIN_DEFAULT = 1;

+ 33 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -300,6 +300,16 @@ public class BlockManager implements BlockStatsMXBean {
    */
   private final long redundancyRecheckIntervalMs;
 
+  /**
+   * Tracks how many calls have been made to chooseLowReduncancyBlocks since
+   * the queue position was last reset to the queue head. If CallsSinceReset
+   * crosses the threshold the next call will reset the iterators. A threshold
+   * of zero means the queue position will only be reset once the next of the
+   * queue has been reached.
+   */
+  private int replQueueResetToHeadThreshold;
+  private int replQueueCallsSinceReset = 0;
+
   /** How often to check and the limit for the storageinfo efficiency. */
   private final long storageInfoDefragmentInterval;
   private final long storageInfoDefragmentTimeout;
@@ -572,6 +582,18 @@ public class BlockManager implements BlockStatsMXBean {
     }
     this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
 
+    replQueueResetToHeadThreshold = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS,
+        DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT);
+    if (replQueueResetToHeadThreshold < 0) {
+      LOG.warn("{} is set to {} and it must be >= 0. Resetting to default {}",
+          DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS,
+          replQueueResetToHeadThreshold, DFSConfigKeys.
+              DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT);
+      replQueueResetToHeadThreshold = DFSConfigKeys.
+          DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT;
+    }
+
     long heartbeatIntervalSecs = conf.getTimeDuration(
         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
@@ -1912,9 +1934,18 @@ public class BlockManager implements BlockStatsMXBean {
     List<List<BlockInfo>> blocksToReconstruct = null;
     namesystem.writeLock();
     try {
-      // Choose the blocks to be reconstructed
+      boolean reset = false;
+      if (replQueueResetToHeadThreshold > 0) {
+        if (replQueueCallsSinceReset >= replQueueResetToHeadThreshold) {
+          reset = true;
+          replQueueCallsSinceReset = 0;
+        } else {
+          replQueueCallsSinceReset++;
+        }
+      }
+        // Choose the blocks to be reconstructed
       blocksToReconstruct = neededReconstruction
-          .chooseLowRedundancyBlocks(blocksToProcess);
+          .chooseLowRedundancyBlocks(blocksToProcess, reset);
     } finally {
       namesystem.writeUnlock();
     }

+ 23 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java

@@ -488,6 +488,28 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
    */
   synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
       int blocksToProcess) {
+    return chooseLowRedundancyBlocks(blocksToProcess, false);
+  }
+
+  /**
+   * Get a list of block lists without sufficient redundancy. The index of
+   * block lists represents its replication priority. Iterates each block list
+   * in priority order beginning with the highest priority list. Iterators use
+   * a bookmark to resume where the previous iteration stopped. Returns when
+   * the block count is met or iteration reaches the end of the lowest priority
+   * list, in which case bookmarks for each block list are reset to the heads
+   * of their respective lists.
+   *
+   * @param blocksToProcess - number of blocks to fetch from low redundancy
+   *          blocks.
+   * @param resetIterators - After gathering the list of blocks reset the
+   *           position of all queue iterators to the head of the queue so
+   *           subsequent calls will begin at the head of the queue
+   * @return Return a list of block lists to be replicated. The block list
+   *         index represents its redundancy priority.
+   */
+  synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
+      int blocksToProcess, boolean resetIterators) {
     final List<List<BlockInfo>> blocksToReconstruct = new ArrayList<>(LEVEL);
 
     int count = 0;
@@ -509,7 +531,7 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
       }
     }
 
-    if (priority == LEVEL) {
+    if (priority == LEVEL || resetIterators) {
       // Reset all bookmarks because there were no recently added blocks.
       for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) {
         q.resetBookmark();

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -1136,6 +1136,24 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.namenode.redundancy.queue.restart.iterations</name>
+  <value>2400</value>
+  <description>When picking blocks from the low redundancy queues, reset the
+    bookmarked iterator after the set number of iterations to ensure any blocks
+    which were not processed on the first pass are retried before the iterators
+    would naturally reach their end point. This ensures blocks are retried
+    more frequently when there are many pending blocks or blocks are
+    continuously added to the queues preventing the iterator reaching its
+    natural endpoint.
+    The default setting of 2400 combined with the default of
+    dfs.namenode.redundancy.interval.seconds means the iterators will be reset
+    approximately every 2 hours.
+    Setting this parameter to zero disables the feature and the iterators will
+    be reset only when the end of all queues has been reached.
+  </description>
+</property>
+
 <property>
   <name>dfs.namenode.accesstime.precision</name>
   <value>3600000</value>

+ 27 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.hdfs.StripedFileTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -92,6 +93,32 @@ public class TestLowRedundancyBlockQueues {
         queues.getHighestPriorityECBlockCount());
   }
 
+  @Test
+  public void testQueuePositionCanBeReset() throws Throwable {
+    LowRedundancyBlocks queues = new LowRedundancyBlocks();
+    for (int i=0; i< 4; i++) {
+      BlockInfo block = genBlockInfo(i);
+      queues.add(block, 2, 0, 0, 3);
+    }
+    List<List<BlockInfo>> blocks;
+    // Get one block from the queue - should be block ID 0 returned
+    blocks = queues.chooseLowRedundancyBlocks(1, false);
+    assertEquals(1, blocks.get(2).size());
+    assertEquals(0, blocks.get(2).get(0).getBlockId());
+
+    // Get the next blocks - should be ID 1
+    blocks = queues.chooseLowRedundancyBlocks(1, false);
+    assertEquals(1, blocks.get(2).get(0).getBlockId());
+
+    // Get the next block, but also reset this time - should be ID 2 returned
+    blocks = queues.chooseLowRedundancyBlocks(1, true);
+    assertEquals(2, blocks.get(2).get(0).getBlockId());
+
+    // Get one more block and due to resetting the queue it will be block id 0
+    blocks = queues.chooseLowRedundancyBlocks(1, false);
+    assertEquals(0, blocks.get(2).get(0).getBlockId());
+  }
+
   /**
    * Test that adding blocks with different replication counts puts them
    * into different queues.