ソースを参照

HDFS-13163. Move invalidated blocks to replica-trash with disk layout based on timestamp. Contributed by Bharat Viswanadham.

Arpit Agarwal 7 年 前
コミット
e37cde0a6a

+ 82 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java

@@ -20,6 +20,10 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -29,11 +33,16 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
@@ -42,6 +51,8 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIOException;
 
+import static org.apache.hadoop.hdfs.server.common.Storage.STORAGE_DIR_CURRENT;
+
 /**
  * This class is a container of multiple thread pools, each for a volume,
  * so that we can schedule async disk operations easily.
@@ -65,7 +76,17 @@ class FsDatasetAsyncDiskService {
   // ThreadPool maximum pool size
   private static final int MAXIMUM_THREADS_PER_VOLUME = 4;
   // ThreadPool keep-alive time for threads over core pool size
-  private static final long THREADS_KEEP_ALIVE_SECONDS = 60; 
+  private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
+
+  private static final String BLOCK_POOL_ID_PATTERN_BASE =
+      Pattern.quote(File.separator) +
+          "BP-\\d+-\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}-\\d+" +
+          Pattern.quote(File.separator);
+
+  private static final Pattern BLOCK_POOL_CURRENT_PATH_PATTERN =
+      Pattern.compile("^(.*)(" + BLOCK_POOL_ID_PATTERN_BASE + ")(" +
+          STORAGE_DIR_CURRENT + ")(.*)$");
+
   
   private final DataNode datanode;
   private final FsDatasetImpl fsdatasetImpl;
@@ -76,6 +97,8 @@ class FsDatasetAsyncDiskService {
       = new HashMap<String, Set<Long>>();
   private static final int MAX_DELETED_BLOCKS = 64;
   private int numDeletedBlocks = 0;
+  private final Configuration conf;
+  private final boolean replicaTrashEnabled;
   
   /**
    * Create a AsyncDiskServices with a set of volumes (specified by their
@@ -84,10 +107,15 @@ class FsDatasetAsyncDiskService {
    * The AsyncDiskServices uses one ThreadPool per volume to do the async
    * disk operations.
    */
-  FsDatasetAsyncDiskService(DataNode datanode, FsDatasetImpl fsdatasetImpl) {
+  FsDatasetAsyncDiskService(DataNode datanode, FsDatasetImpl fsdatasetImpl,
+                            Configuration configuration) {
     this.datanode = datanode;
     this.fsdatasetImpl = fsdatasetImpl;
     this.threadGroup = new ThreadGroup(getClass().getSimpleName());
+    this.conf = configuration;
+    this.replicaTrashEnabled = conf.getBoolean(DFSConfigKeys
+        .DFS_DATANODE_ENABLE_REPLICA_TRASH_KEY, DFSConfigKeys
+        .DFS_DATANODE_ENABLE_REPLICA_TRASH_DEFAULT);
   }
 
   private void addExecutorForVolume(final FsVolumeImpl volume) {
@@ -277,6 +305,49 @@ class FsDatasetAsyncDiskService {
         (replicaToDelete.deleteMetadata() || !replicaToDelete.metadataExists());
     }
 
+    private boolean moveFilesToReplicaTrash() {
+
+      DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-HH");
+      Date date = new Date();
+
+      URI blockURI = replicaToDelete.getBlockURI();
+
+      String replicaTrashBaseDir;
+      File blockFile = new File(blockURI);
+      Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile
+          .getParent());
+      replicaTrashBaseDir = matcher.replaceFirst("$1$2" + DataStorage
+            .STORAGE_DIR_REPLICA_TRASH);
+
+      File replicaTrashDir = new File(replicaTrashBaseDir + File
+          .separator + dateFormat.format(date));
+
+      try {
+        volume.getFileIoProvider().mkdirsWithExistsCheck(
+            volume, replicaTrashDir);
+      } catch (IOException e) {
+        return false;
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Moving files " + replicaToDelete.getBlockURI() + " and " +
+            replicaToDelete.getMetadataURI() + " to replica-trash.");
+      }
+
+      final String blockName = replicaToDelete.getBlockName();
+      final long genstamp = replicaToDelete.getGenerationStamp();
+      File newBlockFile = new File(replicaTrashDir, blockName);
+      File newMetaFile = new File(replicaTrashDir,
+          DatanodeUtil.getMetaName(blockName, genstamp));
+      try {
+        return (replicaToDelete.renameData(newBlockFile.toURI()) &&
+            replicaToDelete.renameMeta(newMetaFile.toURI()));
+      } catch (IOException e) {
+        LOG.error("Error moving files to trash: " + replicaToDelete, e);
+      }
+      return false;
+    }
+
     private boolean moveFiles() {
       if (trashDirectory == null) {
         LOG.error("Trash dir for replica " + replicaToDelete + " is null");
@@ -316,11 +387,18 @@ class FsDatasetAsyncDiskService {
       final long metaLength = replicaToDelete.getMetadataLength();
       boolean result;
 
-      result = (trashDirectory == null) ? deleteFiles() : moveFiles();
+      if (trashDirectory != null) {
+        result = moveFiles();
+      } else if (replicaTrashEnabled) {
+        result = moveFilesToReplicaTrash();
+      } else {
+        result = deleteFiles();
+      }
 
       if (!result) {
         LOG.warn("Unexpected error trying to "
-            + (trashDirectory == null ? "delete" : "move")
+            + ((trashDirectory == null || !replicaTrashEnabled) ?
+            "delete" : "move")
             + " block " + block.getBlockPoolId() + " " + block.getLocalBlock()
             + " at file " + replicaToDelete.getBlockURI() + ". Ignored.");
       } else {

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -308,7 +308,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             VolumeChoosingPolicy.class), conf);
     volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
         blockChooserImpl);
-    asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
+    asyncDiskService = new FsDatasetAsyncDiskService(datanode, this,
+        new Configuration(conf));
     asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, conf);
     deletingBlock = new HashMap<String, Set<Long>>();
 

+ 166 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeReplicaTrash.java

@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Test Datanode Replica Trash with enable and disable.
+ */
+public class TestDatanodeReplicaTrash {
+  private final static Logger LOG = LoggerFactory.getLogger(
+      TestDatanodeReplicaTrash.class);
+  private final Configuration conf = new Configuration();
+  private static final Random RANDOM = new Random();
+  private static final String FILE_NAME = "/tmp.txt";
+  private static final int DEFAULT_BLOCK_SIZE = 512;
+
+  @Test
+  public void testDeleteWithReplicaTrashEnable() throws Exception {
+
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ENABLE_REPLICA_TRASH_KEY,
+        true);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 2);
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).storagesPerDatanode(1).build();
+    try {
+      cluster.waitActive();
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      final ClientProtocol client = cluster.getNameNode().getRpcServer();
+      final Path f = new Path(FILE_NAME);
+      int len = 1024;
+      DFSTestUtil.createFile(dfs, f, len, (short) 1, RANDOM.nextLong());
+
+      LocatedBlocks blockLocations = client.getBlockLocations(f.toString(),
+          0, 1024);
+      String bpId =  blockLocations.getLocatedBlocks().get(0).getBlock()
+          .getBlockPoolId();
+
+      Collection<String> locations = conf.getTrimmedStringCollection(
+          DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+
+      String loc;
+      File replicaTrashDir = null;
+
+      for (String location : locations) {
+        loc = location.replace("[DISK]file:", "");
+        replicaTrashDir = new File(loc + File.separator + Storage
+            .STORAGE_DIR_CURRENT + File.separator + bpId + File
+            .separator + DataStorage.STORAGE_DIR_REPLICA_TRASH);
+      }
+
+      //Before Delete replica-trash dir should be empty
+      Assert.assertTrue(replicaTrashDir.list().length == 0);
+
+      dfs.delete(f, true);
+      LOG.info("File is being deleted");
+
+
+      List<DataNode> datanodes = cluster.getDataNodes();
+      for (DataNode datanode : datanodes) {
+        DataNodeTestUtils.triggerHeartbeat(datanode);
+      }
+
+      final File replicaTrash = replicaTrashDir;
+      //After delete, replica-trash dir should not be empty
+      LambdaTestUtils.await(30000, 1000,
+          () -> {
+            return replicaTrash.list().length > 0;
+          });
+    } finally {
+      cluster.shutdown();
+    }
+
+  }
+
+  @Test
+  public void testDeleteWithReplicaTrashDisable() throws Exception {
+
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ENABLE_REPLICA_TRASH_KEY,
+        false);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 2);
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).storagesPerDatanode(1).build();
+    try {
+      cluster.waitActive();
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      ClientProtocol client = cluster.getNameNode().getRpcServer();
+      DataNode dn = cluster.getDataNodes().get(0);
+      Path f = new Path(FILE_NAME);
+      int len = 100;
+      DFSTestUtil.createFile(dfs, f, len, (short) 1, RANDOM.nextLong());
+
+      LocatedBlocks blockLocations = client.getBlockLocations(f.toString(),
+          0, 100);
+      String bpId =  blockLocations.getLocatedBlocks().get(0).getBlock()
+          .getBlockPoolId();
+
+      Collection<String> locations = conf.getTrimmedStringCollection(
+          DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+
+      String loc;
+      File replicaTrashDir = null;
+
+
+      for (String location : locations) {
+        loc = location.replace("[DISK]file:", "");
+        replicaTrashDir = new File(loc + File.separator + Storage
+            .STORAGE_DIR_CURRENT + File.separator + bpId + File
+            .separator + DataStorage.STORAGE_DIR_REPLICA_TRASH);
+      }
+
+      dfs.delete(f, true);
+      LOG.info("File is being deleted");
+
+      List<DataNode> datanodes = cluster.getDataNodes();
+      for (DataNode datanode : datanodes) {
+        DataNodeTestUtils.triggerHeartbeat(datanode);
+      }
+
+      //replica-trash folder should not be created, as replica trash is not
+      // enabled
+      Assert.assertTrue(!replicaTrashDir.exists());
+    } finally {
+      cluster.shutdown();
+    }
+
+  }
+
+}