Browse Source

HDFS-17218. NameNode should process time out excess redundancy blocks (#6176). Contributed by Haiyang Hu.

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
huhaiyang 1 năm trước cách đây
mục cha
commit
9a6d00aba4

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -315,6 +315,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int
       DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT = 300;
 
+  public static final String  DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY =
+      "dfs.namenode.excess.redundancy.timeout-sec";
+  public static final long DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_DEAFULT = 3600;
+  public static final String DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT
+      = "dfs.namenode.excess.redundancy.timeout.check.limit";
+  public static final long DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT = 1000;
+
   public static final String  DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY =
       "dfs.namenode.maintenance.replication.min";
   public static final int     DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT

+ 115 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -30,6 +30,7 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -50,6 +51,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import javax.management.ObjectName;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -86,6 +88,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBloc
 import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState;
 import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.PendingReconstructionBlocks.PendingBlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.ExcessRedundancyMap.ExcessBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
@@ -116,6 +119,7 @@ import org.apache.hadoop.hdfs.server.namenode.CacheManager;
 
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
 
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -482,6 +486,16 @@ public class BlockManager implements BlockStatsMXBean {
   /** Storages accessible from multiple DNs. */
   private final ProvidedStorageMap providedStorageMap;
 
+  /**
+   * Timeout for excess redundancy block.
+   */
+  private long excessRedundancyTimeout;
+
+  /**
+   * Limits number of blocks used to check for excess redundancy timeout.
+   */
+  private long excessRedundancyTimeoutCheckLimit;
+
   public BlockManager(final Namesystem namesystem, boolean haEnabled,
       final Configuration conf) throws IOException {
     this.namesystem = namesystem;
@@ -589,6 +603,12 @@ public class BlockManager implements BlockStatsMXBean {
         conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
             DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT);
 
+    setExcessRedundancyTimeout(conf.getLong(DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY,
+        DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_DEAFULT));
+    setExcessRedundancyTimeoutCheckLimit(conf.getLong(
+        DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT,
+        DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT));
+
     printInitialConfigs();
   }
 
@@ -3041,6 +3061,100 @@ public class BlockManager implements BlockStatsMXBean {
           (Time.monotonicNow() - startTime), endSize, (startSize - endSize));
     }
   }
+
+  /**
+   * Sets the timeout (in seconds) for excess redundancy blocks, if the provided timeout is
+   * less than or equal to 0, the default value is used (converted to milliseconds).
+   * @param timeout The time (in seconds) to set as the excess redundancy block timeout.
+   */
+  public void setExcessRedundancyTimeout(long timeout) {
+    if (timeout <= 0) {
+      this.excessRedundancyTimeout = DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_DEAFULT * 1000L;
+    } else {
+      this.excessRedundancyTimeout = timeout * 1000L;
+    }
+  }
+
+  /**
+   * Sets the limit number of blocks for checking excess redundancy timeout.
+   * If the provided limit is less than or equal to 0, the default limit is used.
+   *
+   * @param limit The limit number of blocks used to check for excess redundancy timeout.
+   */
+  public void setExcessRedundancyTimeoutCheckLimit(long limit) {
+    if (excessRedundancyTimeoutCheckLimit <= 0) {
+      this.excessRedundancyTimeoutCheckLimit =
+          DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT;
+    } else {
+      this.excessRedundancyTimeoutCheckLimit = limit;
+    }
+  }
+
+  /**
+   * Process timed-out blocks in the excess redundancy map.
+   */
+  void processTimedOutExcessBlocks() {
+    if (excessRedundancyMap.size() == 0) {
+      return;
+    }
+    namesystem.writeLock();
+    long now = Time.monotonicNow();
+    int processed = 0;
+    try {
+      Iterator<Map.Entry<String, LightWeightHashSet<Block>>> iter =
+          excessRedundancyMap.getExcessRedundancyMap().entrySet().iterator();
+      while (iter.hasNext() && processed < excessRedundancyTimeoutCheckLimit) {
+        Map.Entry<String, LightWeightHashSet<Block>> entry = iter.next();
+        String datanodeUuid = entry.getKey();
+        LightWeightHashSet<Block> blocks = entry.getValue();
+        // Sort blocks by timestamp in descending order.
+        List<ExcessBlockInfo> sortedBlocks = blocks.stream()
+            .filter(block -> block instanceof ExcessBlockInfo)
+            .map(block -> (ExcessBlockInfo) block)
+            .sorted(Comparator.comparingLong(ExcessBlockInfo::getTimeStamp))
+            .collect(Collectors.toList());
+
+        for (ExcessBlockInfo excessBlockInfo : sortedBlocks) {
+          if (processed >= excessRedundancyTimeoutCheckLimit) {
+            break;
+          }
+
+          processed++;
+          // If the datanode doesn't have any excess block that has exceeded the timeout,
+          // can exit this loop.
+          if (now <= excessBlockInfo.getTimeStamp() + excessRedundancyTimeout) {
+            break;
+          }
+
+          BlockInfo blockInfo = excessBlockInfo.getBlockInfo();
+          BlockInfo bi = blocksMap.getStoredBlock(blockInfo);
+          if (bi == null || bi.isDeleted()) {
+            continue;
+          }
+
+          Iterator<DatanodeStorageInfo> iterator = blockInfo.getStorageInfos();
+          while (iterator.hasNext()) {
+            DatanodeStorageInfo datanodeStorageInfo = iterator.next();
+            DatanodeDescriptor datanodeDescriptor = datanodeStorageInfo.getDatanodeDescriptor();
+            if (datanodeDescriptor.getDatanodeUuid().equals(datanodeUuid) &&
+                datanodeStorageInfo.getState().equals(State.NORMAL)) {
+              final Block b = getBlockOnStorage(blockInfo, datanodeStorageInfo);
+              if (!containsInvalidateBlock(datanodeDescriptor, b)) {
+                addToInvalidates(b, datanodeDescriptor);
+                LOG.debug("Excess block timeout ({}, {}) is added to invalidated.",
+                    b, datanodeDescriptor);
+              }
+              excessBlockInfo.setTimeStamp();
+              break;
+            }
+          }
+        }
+      }
+    } finally {
+      namesystem.writeUnlock("processTimedOutExcessBlocks");
+      LOG.info("processTimedOutExcessBlocks {} msecs.", (Time.monotonicNow() - now));
+    }
+  }
   
   Collection<Block> processReport(
       final DatanodeStorageInfo storageInfo,
@@ -5232,6 +5346,7 @@ public class BlockManager implements BlockStatsMXBean {
             computeDatanodeWork();
             processPendingReconstructions();
             rescanPostponedMisreplicatedBlocks();
+            processTimedOutExcessBlocks();
             lastRedundancyCycleTS.set(Time.monotonicNow());
           }
           TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs);

+ 50 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java

@@ -21,12 +21,15 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.slf4j.Logger;
 
 import org.apache.hadoop.classification.VisibleForTesting;
 
+import static org.apache.hadoop.util.Time.monotonicNow;
+
 /**
  * Maps a datnode to the set of excess redundancy details.
  *
@@ -35,7 +38,7 @@ import org.apache.hadoop.classification.VisibleForTesting;
 class ExcessRedundancyMap {
   public static final Logger blockLog = NameNode.blockStateChangeLog;
 
-  private final Map<String, LightWeightHashSet<BlockInfo>> map =new HashMap<>();
+  private final Map<String, LightWeightHashSet<Block>> map = new HashMap<>();
   private final AtomicLong size = new AtomicLong(0L);
 
   /**
@@ -50,7 +53,7 @@ class ExcessRedundancyMap {
    */
   @VisibleForTesting
   synchronized int getSize4Testing(String dnUuid) {
-    final LightWeightHashSet<BlockInfo> set = map.get(dnUuid);
+    final LightWeightHashSet<Block> set = map.get(dnUuid);
     return set == null? 0: set.size();
   }
 
@@ -64,7 +67,7 @@ class ExcessRedundancyMap {
    *         datanode and the given block?
    */
   synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) {
-    final LightWeightHashSet<BlockInfo> set = map.get(dn.getDatanodeUuid());
+    final LightWeightHashSet<Block> set = map.get(dn.getDatanodeUuid());
     return set != null && set.contains(blk);
   }
 
@@ -75,12 +78,12 @@ class ExcessRedundancyMap {
    * @return true if the block is added.
    */
   synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) {
-    LightWeightHashSet<BlockInfo> set = map.get(dn.getDatanodeUuid());
+    LightWeightHashSet<Block> set = map.get(dn.getDatanodeUuid());
     if (set == null) {
       set = new LightWeightHashSet<>();
       map.put(dn.getDatanodeUuid(), set);
     }
-    final boolean added = set.add(blk);
+    final boolean added = set.add(new ExcessBlockInfo(blk));
     if (added) {
       size.incrementAndGet();
       blockLog.debug("BLOCK* ExcessRedundancyMap.add({}, {})", dn, blk);
@@ -95,11 +98,10 @@ class ExcessRedundancyMap {
    * @return true if the block is removed.
    */
   synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) {
-    final LightWeightHashSet<BlockInfo> set = map.get(dn.getDatanodeUuid());
+    final LightWeightHashSet<Block> set = map.get(dn.getDatanodeUuid());
     if (set == null) {
       return false;
     }
-
     final boolean removed = set.remove(blk);
     if (removed) {
       size.decrementAndGet();
@@ -111,4 +113,45 @@ class ExcessRedundancyMap {
     }
     return removed;
   }
+
+  synchronized Map<String, LightWeightHashSet<Block>> getExcessRedundancyMap() {
+    return map;
+  }
+
+  /**
+   * An object that contains information about a block that is being excess redundancy.
+   * It records the timestamp when added excess redundancy map of this block.
+   */
+  static class ExcessBlockInfo extends Block {
+    private long timeStamp;
+    private final BlockInfo blockInfo;
+
+    ExcessBlockInfo(BlockInfo blockInfo) {
+      super(blockInfo.getBlockId(), blockInfo.getNumBytes(), blockInfo.getGenerationStamp());
+      this.timeStamp = monotonicNow();
+      this.blockInfo = blockInfo;
+    }
+
+    public BlockInfo getBlockInfo() {
+      return blockInfo;
+    }
+
+    long getTimeStamp() {
+      return timeStamp;
+    }
+
+    void setTimeStamp() {
+      timeStamp = monotonicNow();
+    }
+
+    @Override
+    public int hashCode() {
+      return super.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return super.equals(obj);
+    }
+  }
 }

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -5425,6 +5425,24 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.namenode.excess.redundancy.timeout-sec</name>
+  <value>3600</value>
+  <description>
+    Timeout in seconds for excess redundancy block. If this value is 0 or less,
+    then it will default to 3600 minutes.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.excess.redundancy.timeout.check.limit</name>
+  <value>1000</value>
+  <description>
+    Limits number of blocks used to check for excess redundancy timeout.
+    If this value is 0 or less, then it will default to 1000.
+  </description>
+</property>
+
 <property>
   <name>dfs.namenode.stale.datanode.minimum.interval</name>
   <value>3</value>

+ 19 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -2092,6 +2092,25 @@ public class MiniDFSCluster implements AutoCloseable {
         .newInstance(dn);
   }
 
+  /**
+   * Wait for the datanodes in the cluster to process any block
+   * deletions that have already been asynchronously queued.
+   */
+  public void waitForDNDeletions()
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        for (DataNode dn : getDataNodes()) {
+          if (getFsDatasetTestUtils(dn).getPendingAsyncDeletions() > 0) {
+            return false;
+          }
+        }
+        return true;
+      }
+    }, 1000, 10000);
+  }
+
   /**
    * Gets the rpc port used by the NameNode, because the caller
    * supplied port is not necessarily the actual port used.

+ 128 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.thirdparty.com.google.common.collect.LinkedListMultimap;
@@ -112,6 +113,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -2201,4 +2203,130 @@ public class TestBlockManager {
       assertEquals(1, getLongCounter("IncrementalBlockReportsNumOps", rb));
     }
   }
+
+  /**
+   * Test NameNode should process time out excess redundancy blocks.
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test(timeout = 360000)
+  public void testProcessTimedOutExcessBlocks() throws IOException,
+      InterruptedException, TimeoutException {
+    Configuration config = new HdfsConfiguration();
+    // Bump up replication interval.
+    config.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10000);
+    // Set the excess redundancy block timeout.
+    long timeOut = 60L;
+    config.setLong(DFSConfigKeys.DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY, timeOut);
+
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+
+    final Semaphore semaphore = new Semaphore(0);
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(config).numDataNodes(3).build()) {
+      DistributedFileSystem fs = cluster.getFileSystem();
+      BlockManager blockManager = cluster.getNameNode().getNamesystem().getBlockManager();
+      cluster.waitActive();
+
+      final DataNodeFaultInjector injector = new DataNodeFaultInjector() {
+        @Override
+        public void delayDeleteReplica() {
+          // Lets wait for the remove replica process.
+          try {
+            semaphore.acquire(1);
+          } catch (InterruptedException e) {
+            // ignore.
+          }
+        }
+      };
+      DataNodeFaultInjector.set(injector);
+
+      // Create file.
+      Path path = new Path("/testfile");
+      DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0);
+      DFSTestUtil.waitReplication(fs, path, (short) 3);
+      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0);
+      ExtendedBlock extendedBlock = lb.getBlock();
+      DatanodeInfo[] loc = lb.getLocations();
+      assertEquals(3, loc.length);
+
+      // Set replication as 2, to choose excess.
+      fs.setReplication(path, (short) 2);
+
+      // Check excessRedundancyMap and invalidateBlocks size as 1.
+      assertEquals(1, blockManager.getExcessBlocksCount());
+      assertEquals(1, blockManager.getPendingDeletionBlocksCount());
+      DataNode excessDn = Arrays.stream(loc).
+          filter(datanodeInfo -> blockManager.getExcessSize4Testing(
+              datanodeInfo.getDatanodeUuid()) > 0)
+          .map(datanodeInfo -> cluster.getDataNode(datanodeInfo.getIpcPort()))
+          .findFirst()
+          .orElse(null);
+
+      // Schedule blocks for deletion at excessDn.
+      assertEquals(1, blockManager.computeInvalidateWork(1));
+      // Check excessRedundancyMap size as 1.
+      assertEquals(1, blockManager.getExcessBlocksCount());
+      // Check invalidateBlocks size as 0.
+      assertEquals(0, blockManager.getPendingDeletionBlocksCount());
+      assertNotNull(excessDn);
+
+      // NameNode will ask datanode to delete replicas in heartbeat response.
+      cluster.triggerHeartbeats();
+
+      // Wait for the datanode to process any block deletions
+      // that have already been asynchronously queued.
+      DataNode finalExcessDn = excessDn;
+      GenericTestUtils.waitFor(
+          () -> cluster.getFsDatasetTestUtils(finalExcessDn).getPendingAsyncDeletions() == 1,
+          100, 1000);
+
+      // Restart the datanode.
+      int ipcPort = excessDn.getDatanodeId().getIpcPort();
+      MiniDFSCluster.DataNodeProperties dataNodeProperties = cluster.stopDataNode(
+          excessDn.getDatanodeId().getXferAddr());
+      assertTrue(cluster.restartDataNode(dataNodeProperties, true));
+      semaphore.release(1);
+      cluster.waitActive();
+
+      // Check replica is exists in excessDn.
+      excessDn = cluster.getDataNode(ipcPort);
+      assertNotNull(cluster.getFsDatasetTestUtils(excessDn).fetchReplica(extendedBlock));
+      assertEquals(0, cluster.getFsDatasetTestUtils(excessDn).getPendingAsyncDeletions());
+
+      // Verify excess redundancy blocks have not timed out.
+      blockManager.processTimedOutExcessBlocks();
+      assertEquals(0, blockManager.getPendingDeletionBlocksCount());
+
+      // Verify excess redundancy block time out.
+      Thread.sleep(timeOut * 1000);
+      blockManager.processTimedOutExcessBlocks();
+
+      // Check excessRedundancyMap and invalidateBlocks size as 1.
+      assertEquals(1, blockManager.getExcessSize4Testing(excessDn.getDatanodeUuid()));
+      assertEquals(1, blockManager.getExcessBlocksCount());
+      assertEquals(1, blockManager.getPendingDeletionBlocksCount());
+
+      // Schedule blocks for deletion.
+      assertEquals(1, blockManager.computeInvalidateWork(1));
+
+      cluster.triggerHeartbeats();
+
+      // Make it resume the removeReplicaFromMem method.
+      semaphore.release(1);
+
+      // Wait for the datanode in the cluster to process any block
+      // deletions that have already been asynchronously queued
+      cluster.waitForDNDeletions();
+
+      // Trigger immediate deletion report.
+      cluster.triggerDeletionReports();
+
+      // The replica num should be 2.
+      assertEquals(2, DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations().length);
+      assertEquals(0, blockManager.getExcessBlocksCount());
+    } finally {
+      DataNodeFaultInjector.set(oldInjector);
+    }
+  }
 }