瀏覽代碼

HDFS-13277. Improve move to Replica trash to limit trash sub-dir size. Contributed by Bharat Viswanadham.

Hanisha Koneru 7 年之前
父節點
當前提交
ffee1a026c

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -1082,6 +1082,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.datanode.enable.replica.trash";
   public static final boolean DFS_DATANODE_ENABLE_REPLICA_TRASH_DEFAULT = false;
 
+  public static final String DFS_DATANODE_REPLICA_TRASH_SUBDIR_MAX_BLOCKS =
+      "dfs.datanode.replica.trash.subdir.max.blocks";
+
   // Slow io warning log threshold settings for dfsclient and datanode.
   public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY =
     "dfs.datanode.slow.io.warning.threshold.ms";

+ 97 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java

@@ -99,6 +99,12 @@ class FsDatasetAsyncDiskService {
   private int numDeletedBlocks = 0;
   private final Configuration conf;
   private final boolean replicaTrashEnabled;
+  private int replicaTrashSubDirMaxBlocks;
+
+  //Holds information about current replica trash directory
+  private Map<String, ReplicaTrashCurDirInfo> replicaTrashCurDirInfoMap = new
+      HashMap<>();
+
   
   /**
    * Create a AsyncDiskServices with a set of volumes (specified by their
@@ -116,6 +122,12 @@ class FsDatasetAsyncDiskService {
     this.replicaTrashEnabled = conf.getBoolean(DFSConfigKeys
         .DFS_DATANODE_ENABLE_REPLICA_TRASH_KEY, DFSConfigKeys
         .DFS_DATANODE_ENABLE_REPLICA_TRASH_DEFAULT);
+    int blockInvalidate = conf.getInt(DFSConfigKeys
+        .DFS_BLOCK_INVALIDATE_LIMIT_KEY, DFSConfigKeys
+        .DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
+    this.replicaTrashSubDirMaxBlocks = conf.getInt(DFSConfigKeys
+        .DFS_DATANODE_REPLICA_TRASH_SUBDIR_MAX_BLOCKS,
+        blockInvalidate);
   }
 
   private void addExecutorForVolume(final FsVolumeImpl volume) {
@@ -319,8 +331,37 @@ class FsDatasetAsyncDiskService {
       replicaTrashBaseDir = matcher.replaceFirst("$1$2" + DataStorage
             .STORAGE_DIR_REPLICA_TRASH);
 
-      File replicaTrashDir = new File(replicaTrashBaseDir + File
-          .separator + dateFormat.format(date));
+      String key = volume.getStorageID() + "-" + block.getBlockPoolId();
+      String subDir = dateFormat.format(date);
+      ReplicaTrashCurDirInfo current =  replicaTrashCurDirInfoMap.get(key);
+      File replicaTrashDir;
+      synchronized (replicaTrashCurDirInfoMap) {
+        if (current == null) {
+          // first time moving blocks in this volume
+          ReplicaTrashCurDirInfo replicaTrashInfo = new
+              ReplicaTrashCurDirInfo();
+          replicaTrashInfo.setCurReplicaTrashSubDir(subDir);
+          replicaTrashInfo.setIteration(0);
+          replicaTrashInfo.setBlocks(0);
+          replicaTrashCurDirInfoMap.put(key, replicaTrashInfo);
+          current = replicaTrashInfo;
+        } else if (current.getCurReplicaTrashSubDir().equals(subDir)) {
+          if (current.getBlocks() == replicaTrashSubDirMaxBlocks) {
+            //reached max entries in a sub directory
+            current.incrementIteration();
+            current.setBlocks(0);
+          }
+        } else {
+          //date change, reset to current date
+          current.setCurReplicaTrashSubDir(subDir);
+          current.setIteration(0);
+          current.setBlocks(0);
+        }
+        current.incrementBlocks();
+      }
+      replicaTrashDir = new File(replicaTrashBaseDir + File
+          .separator + current.getCurReplicaTrashSubDir() + "_" + current
+          .getIteration());
 
       try {
         volume.getFileIoProvider().mkdirsWithExistsCheck(
@@ -414,6 +455,60 @@ class FsDatasetAsyncDiskService {
       IOUtils.cleanup(null, volumeRef);
     }
   }
+
+  /**
+   * Information about current replica trash directory details.
+   */
+  static class ReplicaTrashCurDirInfo {
+    private String curReplicaTrashSubDir;
+    private int blocks;
+    private int iteration;
+
+    /**
+     * @return current replica trash sub directory
+     */
+    public String getCurReplicaTrashSubDir() {
+      return curReplicaTrashSubDir;
+    }
+
+    /** Sets current replica trash sub directory. */
+    public void setCurReplicaTrashSubDir(String currentDir) {
+      this.curReplicaTrashSubDir = currentDir;
+    }
+
+    /**
+     * @return Num of Blocks
+     */
+    public int getBlocks() {
+      return blocks;
+    }
+
+    /** Sets number of blocks in the current replica trash sub directory. */
+    public void setBlocks(int val) {
+      this.blocks = val;
+    }
+
+    /**
+     * @return current iteration
+     */
+    public int getIteration() {
+      return iteration;
+    }
+
+    /** Sets current iteration. */
+    public void setIteration(int val) {
+      this.iteration = val;
+    }
+
+    /** Increment number of blocks. */
+    public void incrementBlocks() {
+      ++blocks;
+    }
+    /** Increnent current iteration. */
+    public void incrementIteration() {
+      ++iteration;
+    }
+  }
   
   private synchronized void updateDeletedBlockId(ExtendedBlock block) {
     Set<Long> blockIds = deletedBlockIds.get(block.getBlockPoolId());

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -3809,6 +3809,16 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.datanode.replica.trash.subdir.max.blocks</name>
+  <value>${dfs.block.invalidate.limit}</value>
+  <description>
+    Maximum number of blocks per sub directory in replica-trash. This limits
+    number of blocks under each subdirectory in replica-trash. If this
+    property is not set, the default value will be dfs.block.invalidate.limit.
+  </description>
+</property>
+
 <property>
   <name>dfs.datanode.balance.max.concurrent.moves</name>
   <value>50</value>

+ 72 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeReplicaTrash.java

@@ -108,6 +108,78 @@ public class TestDatanodeReplicaTrash {
 
   }
 
+  @Test
+  public void testDeleteWithTrashSubDirLimit() throws Exception {
+
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ENABLE_REPLICA_TRASH_KEY,
+        true);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 2);
+    conf.setInt(DFSConfigKeys
+        .DFS_DATANODE_REPLICA_TRASH_SUBDIR_MAX_BLOCKS, 2);
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).storagesPerDatanode(1).build();
+    try {
+      cluster.waitActive();
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      final ClientProtocol client = cluster.getNameNode().getRpcServer();
+      final Path f = new Path(FILE_NAME);
+      int len = 10240;
+      DFSTestUtil.createFile(dfs, f, len, (short) 1, RANDOM.nextLong());
+
+      LocatedBlocks blockLocations = client.getBlockLocations(f.toString(),
+          0, 1024);
+      String bpId =  blockLocations.getLocatedBlocks().get(0).getBlock()
+          .getBlockPoolId();
+
+      Collection<String> locations = conf.getTrimmedStringCollection(
+          DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+
+      String loc;
+      File replicaTrashDir = null;
+
+      // Here no of storages per datanode is set to 1
+      Assert.assertEquals(locations.size(), 1);
+      for (String location : locations) {
+        loc = location.replace("[DISK]file:", "");
+        replicaTrashDir = new File(loc + File.separator + Storage
+            .STORAGE_DIR_CURRENT + File.separator + bpId + File
+            .separator + DataStorage.STORAGE_DIR_REPLICA_TRASH);
+      }
+
+      //Before Delete replica-trash dir should be empty
+      Assert.assertTrue(replicaTrashDir.list().length == 0);
+
+      dfs.delete(f, true);
+      LOG.info("File is being deleted");
+
+
+      List<DataNode> datanodes = cluster.getDataNodes();
+      for (DataNode datanode : datanodes) {
+        DataNodeTestUtils.triggerHeartbeat(datanode);
+      }
+
+      final File replicaTrash = replicaTrashDir;
+      //After delete, replica-trash dir should not be empty
+      //No of sub directories in replica-trash should be 10, as no of blocks
+      // for 10KB file is 20(10240/512)
+      LambdaTestUtils.await(30000, 1000,
+          () -> {
+          File[] subDirs = replicaTrash.listFiles(
+              file -> file.isDirectory());
+          if (subDirs != null) {
+            return subDirs.length == 10;
+          } else {
+            return false;
+          }
+        });
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+
   @Test
   public void testDeleteWithReplicaTrashDisable() throws Exception {