Kaynağa Gözat

HDFS-15614. Initialize snapshot trash root during NameNode startup if enabled (#2370)

Siyao Meng 4 yıl önce
ebeveyn
işleme
a308a1ec22

+ 28 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -388,6 +388,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       "dfs.namenode.snapshot.trashroot.enabled";
   public static final boolean DFS_NAMENODE_SNAPSHOT_TRASHROOT_ENABLED_DEFAULT
       = false;
+  private static final FsPermission SHARED_TRASH_PERMISSION =
+      new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL, true);
 
   private final MetricsRegistry registry = new MetricsRegistry("FSNamesystem");
   @Metric final MutableRatesWithAggregation detailedLockHoldTimeMetrics =
@@ -8524,6 +8526,32 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     logAuditEvent(true, operationName, src);
   }
 
+  /**
+   * Check if snapshot roots are created for all existing snapshottable
+   * directories. Create them if not.
+   */
+  void checkAndProvisionSnapshotTrashRoots() throws IOException {
+    SnapshottableDirectoryStatus[] dirStatusList = getSnapshottableDirListing();
+    if (dirStatusList == null) {
+      return;
+    }
+    for (SnapshottableDirectoryStatus dirStatus : dirStatusList) {
+      String currDir = dirStatus.getFullPath().toString();
+      if (!currDir.endsWith(Path.SEPARATOR)) {
+        currDir += Path.SEPARATOR;
+      }
+      String trashPath = currDir + FileSystem.TRASH_PREFIX;
+      HdfsFileStatus fileStatus = getFileInfo(trashPath, false, false, false);
+      if (fileStatus == null) {
+        LOG.info("Trash doesn't exist for snapshottable directory {}. "
+            + "Creating trash at {}", currDir, trashPath);
+        PermissionStatus permissionStatus = new PermissionStatus(getRemoteUser()
+            .getShortUserName(), null, SHARED_TRASH_PERMISSION);
+        mkdirs(trashPath, permissionStatus, false);
+      }
+    }
+  }
+
   private Supplier<String> getLockReportInfoSupplier(String src) {
     return getLockReportInfoSupplier(src, null);
   }

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -2018,6 +2018,9 @@ public class NameNode extends ReconfigurableBase implements
     public void startActiveServices() throws IOException {
       try {
         namesystem.startActiveServices();
+        if (namesystem.isSnapshotTrashRootEnabled()) {
+          namesystem.checkAndProvisionSnapshotTrashRoots();
+        }
         startTrashEmptier(getConf());
       } catch (Throwable t) {
         doImmediateShutdown(t);

+ 15 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -623,6 +623,10 @@ public class MiniDFSCluster implements AutoCloseable {
       this.startOpt = startOpt;
       this.conf = conf;
     }
+
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+    }
     
     public void setStartOpt(StartupOption startOpt) {
       this.startOpt = startOpt;
@@ -2185,6 +2189,17 @@ public class MiniDFSCluster implements AutoCloseable {
     restartNameNode(nnIndex, true);
   }
 
+  /**
+   * Update an existing NameNode's configuration.
+   */
+  public void setNameNodeConf(int nnIndex, Configuration nnConf) {
+    NameNodeInfo info = getNN(nnIndex);
+    if (info == null) {
+      throw new RuntimeException("Invalid nnIndex!");
+    }
+    info.setConf(nnConf);
+  }
+
   /**
    * Restart the namenode at a given index. Optionally wait for the cluster
    * to become active.

+ 41 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

@@ -24,6 +24,7 @@ import static org.apache.hadoop.hdfs.client.HdfsAdmin.TRASH_PERMISSION;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -1620,7 +1621,7 @@ public class TestDistributedFileSystem {
       cluster.waitActive();
       DistributedFileSystem dfs = cluster.getFileSystem();
       FsServerDefaults fsServerDefaults = dfs.getServerDefaults();
-      Assert.assertNotNull(fsServerDefaults);
+      assertNotNull(fsServerDefaults);
     } finally {
       cluster.shutdown();
     }
@@ -2513,4 +2514,43 @@ public class TestDistributedFileSystem {
           () -> FileUtil.copy(dfs, fstatus, dfs, filePath, false, true, conf));
     }
   }
+
+  @Test
+  public void testNameNodeCreateSnapshotTrashRootOnStartup()
+      throws Exception {
+    // Start NN with dfs.namenode.snapshot.trashroot.enabled=false
+    Configuration conf = getTestConfiguration();
+    conf.setBoolean("dfs.namenode.snapshot.trashroot.enabled", false);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    try {
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      final Path testDir = new Path("/disallowss/test2/");
+      final Path file0path = new Path(testDir, "file-0");
+      dfs.create(file0path).close();
+      dfs.allowSnapshot(testDir);
+      // .Trash won't be created right now since snapshot trash is disabled
+      final Path trashRoot = new Path(testDir, FileSystem.TRASH_PREFIX);
+      assertFalse(dfs.exists(trashRoot));
+      // Set dfs.namenode.snapshot.trashroot.enabled=true
+      conf.setBoolean("dfs.namenode.snapshot.trashroot.enabled", true);
+      cluster.setNameNodeConf(0, conf);
+      cluster.restartNameNode(0);
+      // Check .Trash existence, should be created now
+      assertTrue(dfs.exists(trashRoot));
+      // Check permission
+      FileStatus trashRootStatus = dfs.getFileStatus(trashRoot);
+      assertNotNull(trashRootStatus);
+      assertEquals(TRASH_PERMISSION, trashRootStatus.getPermission());
+
+      // Cleanup
+      dfs.delete(trashRoot, true);
+      dfs.disallowSnapshot(testDir);
+      dfs.delete(testDir, true);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }