Pārlūkot izejas kodu

HDFS-15492. Make trash root inside each snapshottable directory (#2176)

Siyao Meng 4 gadi atpakaļ
vecāks
revīzija
3fd3aeb621

+ 18 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java

@@ -56,6 +56,7 @@ public class FsServerDefaults implements Writable {
   private DataChecksum.Type checksumType;
   private String keyProviderUri;
   private byte storagepolicyId;
+  private boolean snapshotTrashRootEnabled;
 
   public FsServerDefaults() {
   }
@@ -83,6 +84,18 @@ public class FsServerDefaults implements Writable {
       boolean encryptDataTransfer, long trashInterval,
       DataChecksum.Type checksumType,
       String keyProviderUri, byte storagepolicy) {
+    this(blockSize, bytesPerChecksum, writePacketSize, replication,
+        fileBufferSize, encryptDataTransfer, trashInterval,
+        checksumType, keyProviderUri, storagepolicy,
+        false);
+  }
+
+  public FsServerDefaults(long blockSize, int bytesPerChecksum,
+      int writePacketSize, short replication, int fileBufferSize,
+      boolean encryptDataTransfer, long trashInterval,
+      DataChecksum.Type checksumType,
+      String keyProviderUri, byte storagepolicy,
+      boolean snapshotTrashRootEnabled) {
     this.blockSize = blockSize;
     this.bytesPerChecksum = bytesPerChecksum;
     this.writePacketSize = writePacketSize;
@@ -93,6 +106,7 @@ public class FsServerDefaults implements Writable {
     this.checksumType = checksumType;
     this.keyProviderUri = keyProviderUri;
     this.storagepolicyId = storagepolicy;
+    this.snapshotTrashRootEnabled = snapshotTrashRootEnabled;
   }
 
   public long getBlockSize() {
@@ -139,6 +153,10 @@ public class FsServerDefaults implements Writable {
     return storagepolicyId;
   }
 
+  public boolean getSnapshotTrashRootEnabled() {
+    return snapshotTrashRootEnabled;
+  }
+
   // /////////////////////////////////////////
   // Writable
   // /////////////////////////////////////////

+ 26 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -3149,6 +3149,32 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     return getKeyProviderUri() != null;
   }
 
+  boolean isSnapshotTrashRootEnabled() throws IOException {
+    return getServerDefaults().getSnapshotTrashRootEnabled();
+  }
+
+  /**
+   * Get the snapshot root of a given file or directory if it exists.
+   * e.g. if /snapdir1 is a snapshottable directory and path given is
+   * /snapdir1/path/to/file, this method would return /snapdir1
+   * @param path Path to a file or a directory.
+   * @return Not null if found in a snapshot root directory.
+   * @throws IOException
+   */
+  String getSnapshotRoot(Path path) throws IOException {
+    SnapshottableDirectoryStatus[] dirStatusList = getSnapshottableDirListing();
+    if (dirStatusList == null) {
+      return null;
+    }
+    for (SnapshottableDirectoryStatus dirStatus : dirStatusList) {
+      String currDir = dirStatus.getFullPath().toString();
+      if (path.toUri().getPath().startsWith(currDir)) {
+        return currDir;
+      }
+    }
+    return null;
+  }
+
   /**
    * Returns the SaslDataTransferClient configured for this DFSClient.
    *

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java

@@ -1040,4 +1040,16 @@ public class DFSUtilClient {
     return (ezpath.equals("/") ? ezpath : ezpath + Path.SEPARATOR)
         + FileSystem.TRASH_PREFIX + Path.SEPARATOR + ugi.getShortUserName();
   }
+
+  /**
+   * Returns trash root in a snapshottable directory.
+   * @param ssRoot String of path to a snapshottable directory root.
+   * @param ugi user of trash owner.
+   * @return unqualified path of trash root.
+   */
+  public static String getSnapshotTrashRoot(String ssRoot,
+      UserGroupInformation ugi) {
+    return (ssRoot.equals("/") ? ssRoot : ssRoot + Path.SEPARATOR)
+        + FileSystem.TRASH_PREFIX + Path.SEPARATOR + ugi.getShortUserName();
+  }
 }

+ 98 - 12
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -129,10 +129,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
+import java.util.Set;
 
 import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
 
@@ -3273,8 +3275,11 @@ public class DistributedFileSystem extends FileSystem
   /**
    * Get the root directory of Trash for a path in HDFS.
    * 1. File in encryption zone returns /ez1/.Trash/username
-   * 2. File not in encryption zone, or encountered exception when checking
-   *    the encryption zone of the path, returns /users/username/.Trash
+   * 2. File in snapshottable directory returns /snapdir1/.Trash/username
+   *    if dfs.namenode.snapshot.trashroot.enabled is set to true.
+   * 3. In other cases, or encountered exception when checking the encryption
+   *    zone or when checking snapshot root of the path, returns
+   *    /users/username/.Trash
    * Caller appends either Current or checkpoint timestamp for trash destination
    * @param path the trash root of the path to be determined.
    * @return trash root
@@ -3283,41 +3288,89 @@ public class DistributedFileSystem extends FileSystem
   public Path getTrashRoot(Path path) {
     statistics.incrementReadOps(1);
     storageStatistics.incrementOpCounter(OpType.GET_TRASH_ROOT);
+    if (path == null) {
+      return super.getTrashRoot(null);
+    }
+
+    // Snapshottable directory trash root, not null if path is inside a
+    // snapshottable directory and isSnapshotTrashRootEnabled is true from NN.
+    String ssTrashRoot = null;
     try {
-      if ((path == null) || !dfs.isHDFSEncryptionEnabled()) {
-        return super.getTrashRoot(path);
+      if (dfs.isSnapshotTrashRootEnabled()) {
+        String ssRoot = dfs.getSnapshotRoot(path);
+        if (ssRoot != null) {
+          ssTrashRoot = DFSUtilClient.getSnapshotTrashRoot(ssRoot, dfs.ugi);
+        }
+      }
+    } catch (IOException ioe) {
+      DFSClient.LOG.warn("Exception while checking whether the path is in a "
+          + "snapshottable directory", ioe);
+    }
+
+    try {
+      if (!dfs.isHDFSEncryptionEnabled()) {
+        if (ssTrashRoot == null) {
+          // the path is not in a snapshottable directory and EZ is not enabled
+          return super.getTrashRoot(path);
+        } else {
+          return this.makeQualified(new Path(ssTrashRoot));
+        }
       }
     } catch (IOException ioe) {
       DFSClient.LOG.warn("Exception while checking whether encryption zone is "
           + "supported", ioe);
     }
 
-    String parentSrc = path.isRoot()?
-        path.toUri().getPath():path.getParent().toUri().getPath();
+    // HDFS encryption is enabled on the cluster at this point, does not
+    // necessary mean the given path is in an EZ hence the check.
+    String parentSrc = path.isRoot() ?
+        path.toUri().getPath() : path.getParent().toUri().getPath();
+    String ezTrashRoot = null;
     try {
       EncryptionZone ez = dfs.getEZForPath(parentSrc);
       if ((ez != null)) {
-        return this.makeQualified(
-            new Path(DFSUtilClient.getEZTrashRoot(ez, dfs.ugi)));
+        ezTrashRoot = DFSUtilClient.getEZTrashRoot(ez, dfs.ugi);
       }
     } catch (IOException e) {
       DFSClient.LOG.warn("Exception in checking the encryption zone for the " +
           "path " + parentSrc + ". " + e.getMessage());
     }
-    return super.getTrashRoot(path);
+
+    if (ssTrashRoot == null) {
+      if (ezTrashRoot == null) {
+        // The path is neither in a snapshottable directory nor in an EZ
+        return super.getTrashRoot(path);
+      } else {
+        return this.makeQualified(new Path(ezTrashRoot));
+      }
+    } else {
+      if (ezTrashRoot == null) {
+        return this.makeQualified(new Path(ssTrashRoot));
+      } else {
+        // The path is in EZ and in a snapshottable directory
+        return this.makeQualified(new Path(
+            ssTrashRoot.length() > ezTrashRoot.length() ?
+                ssTrashRoot : ezTrashRoot));
+      }
+    }
   }
 
   /**
    * Get all the trash roots of HDFS for current user or for all the users.
-   * 1. File deleted from non-encryption zone /user/username/.Trash
-   * 2. File deleted from encryption zones
+   * 1. File deleted from encryption zones
    *    e.g., ez1 rooted at /ez1 has its trash root at /ez1/.Trash/$USER
+   * 2. File deleted from snapshottable directories
+   *    if dfs.namenode.snapshot.trashroot.enabled is set to true.
+   *    e.g., snapshottable directory /snapdir1 has its trash root
+   *    at /snapdir1/.Trash/$USER
+   * 3. File deleted from other directories
+   *    /user/username/.Trash
    * @param allUsers return trashRoots of all users if true, used by emptier
    * @return trash roots of HDFS
    */
   @Override
   public Collection<FileStatus> getTrashRoots(boolean allUsers) {
-    List<FileStatus> ret = new ArrayList<>();
+    Set<FileStatus> ret = new HashSet<>();
     // Get normal trash roots
     ret.addAll(super.getTrashRoots(allUsers));
 
@@ -3348,6 +3401,39 @@ public class DistributedFileSystem extends FileSystem
     } catch (IOException e){
       DFSClient.LOG.warn("Cannot get all encrypted trash roots", e);
     }
+
+    try {
+      // Get snapshottable directory trash roots
+      if (dfs.isSnapshotTrashRootEnabled()) {
+        SnapshottableDirectoryStatus[] lst = dfs.getSnapshottableDirListing();
+        if (lst != null) {
+          for (SnapshottableDirectoryStatus dirStatus : lst) {
+            String ssDir = dirStatus.getFullPath().toString();
+            Path ssTrashRoot = new Path(ssDir, FileSystem.TRASH_PREFIX);
+            if (!exists(ssTrashRoot)) {
+              continue;
+            }
+            if (allUsers) {
+              for (FileStatus candidate : listStatus(ssTrashRoot)) {
+                if (exists(candidate.getPath())) {
+                  ret.add(candidate);
+                }
+              }
+            } else {
+              Path userTrash = new Path(DFSUtilClient.getSnapshotTrashRoot(
+                  ssDir, dfs.ugi));
+              try {
+                ret.add(getFileStatus(userTrash));
+              } catch (FileNotFoundException ignored) {
+              }
+            }
+          }
+        }
+      }
+    } catch (IOException e) {
+      DFSClient.LOG.warn("Cannot get snapshot trash roots", e);
+    }
+
     return ret;
   }
 

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java

@@ -2124,7 +2124,8 @@ public class PBHelperClient {
         fs.getTrashInterval(),
         convert(fs.getChecksumType()),
         fs.hasKeyProviderUri() ? fs.getKeyProviderUri() : null,
-        (byte) fs.getPolicyId());
+        (byte) fs.getPolicyId(),
+        fs.getSnapshotTrashRootEnabled());
   }
 
   public static List<CryptoProtocolVersionProto> convert(
@@ -2298,7 +2299,8 @@ public class PBHelperClient {
         .setEncryptDataTransfer(fs.getEncryptDataTransfer())
         .setTrashInterval(fs.getTrashInterval())
         .setChecksumType(convert(fs.getChecksumType()))
-        .setPolicyId(fs.getDefaultStoragePolicyId());
+        .setPolicyId(fs.getDefaultStoragePolicyId())
+        .setSnapshotTrashRootEnabled(fs.getSnapshotTrashRootEnabled());
     if (fs.getKeyProviderUri() != null) {
       builder.setKeyProviderUri(fs.getKeyProviderUri());
     }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto

@@ -526,6 +526,7 @@ message FsServerDefaultsProto {
   optional ChecksumTypeProto checksumType = 8 [default = CHECKSUM_CRC32];
   optional string keyProviderUri = 9;
   optional uint32 policyId = 10 [default = 0];
+  optional bool snapshotTrashRootEnabled = 11 [default = false];
 }
 
 

+ 9 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -382,6 +382,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   public static final org.slf4j.Logger LOG = LoggerFactory
       .getLogger(FSNamesystem.class.getName());
+
+  // The following are private configurations
+  static final String DFS_NAMENODE_SNAPSHOT_TRASHROOT_ENABLED =
+      "dfs.namenode.snapshot.trashroot.enabled";
+  static final boolean DFS_NAMENODE_SNAPSHOT_TRASHROOT_ENABLED_DEFAULT = false;
+
   private final MetricsRegistry registry = new MetricsRegistry("FSNamesystem");
   @Metric final MutableRatesWithAggregation detailedLockHoldTimeMetrics =
       registry.newRatesWithAggregation("detailedLockHoldTimeMetrics");
@@ -902,7 +908,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           conf.getTrimmed(
               CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
               ""),
-          blockManager.getStoragePolicySuite().getDefaultPolicy().getId());
+          blockManager.getStoragePolicySuite().getDefaultPolicy().getId(),
+          conf.getBoolean(DFS_NAMENODE_SNAPSHOT_TRASHROOT_ENABLED,
+              DFS_NAMENODE_SNAPSHOT_TRASHROOT_ENABLED_DEFAULT));
 
       this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, 
                                        DFS_NAMENODE_MAX_OBJECTS_DEFAULT);

+ 298 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

@@ -42,6 +42,7 @@ import java.security.NoSuchAlgorithmException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
@@ -2144,4 +2145,301 @@ public class TestDistributedFileSystem {
       LambdaTestUtils.intercept(IOException.class, "", () -> str.close());
     }
   }
+
+  @Test
+  public void testGetTrashRoot() throws IOException {
+    Configuration conf = getTestConfiguration();
+    conf.setBoolean("dfs.namenode.snapshot.trashroot.enabled", true);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    try {
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      Path testDir = new Path("/ssgtr/test1/");
+      Path file0path = new Path(testDir, "file-0");
+      dfs.create(file0path);
+
+      Path trBeforeAllowSnapshot = dfs.getTrashRoot(file0path);
+      String trBeforeAllowSnapshotStr = trBeforeAllowSnapshot.toUri().getPath();
+      // The trash root should be in user home directory
+      String homeDirStr = dfs.getHomeDirectory().toUri().getPath();
+      assertTrue(trBeforeAllowSnapshotStr.startsWith(homeDirStr));
+
+      dfs.allowSnapshot(testDir);
+
+      Path trAfterAllowSnapshot = dfs.getTrashRoot(file0path);
+      String trAfterAllowSnapshotStr = trAfterAllowSnapshot.toUri().getPath();
+      // The trash root should now be in the snapshot root
+      String testDirStr = testDir.toUri().getPath();
+      assertTrue(trAfterAllowSnapshotStr.startsWith(testDirStr));
+
+      // Cleanup
+      dfs.disallowSnapshot(testDir);
+      dfs.delete(testDir, true);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  private boolean isPathInUserHome(String pathStr, DistributedFileSystem dfs) {
+    String homeDirStr = dfs.getHomeDirectory().toUri().getPath();
+    return pathStr.startsWith(homeDirStr);
+  }
+
+  @Test
+  public void testGetTrashRoots() throws IOException {
+    Configuration conf = getTestConfiguration();
+    conf.setBoolean("dfs.namenode.snapshot.trashroot.enabled", true);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    try {
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      Path testDir = new Path("/ssgtr/test1/");
+      Path file0path = new Path(testDir, "file-0");
+      dfs.create(file0path);
+      // Create user trash
+      Path currUserHome = dfs.getHomeDirectory();
+      Path currUserTrash = new Path(currUserHome, FileSystem.TRASH_PREFIX);
+      dfs.mkdirs(currUserTrash);
+      // Create trash inside test directory
+      Path testDirTrash = new Path(testDir, FileSystem.TRASH_PREFIX);
+      Path testDirTrashCurrUser = new Path(testDirTrash,
+          UserGroupInformation.getCurrentUser().getShortUserName());
+      dfs.mkdirs(testDirTrashCurrUser);
+
+      Collection<FileStatus> trashRoots = dfs.getTrashRoots(false);
+      // getTrashRoots should only return 1 empty user trash in the home dir now
+      assertEquals(1, trashRoots.size());
+      FileStatus firstFileStatus = trashRoots.iterator().next();
+      String pathStr = firstFileStatus.getPath().toUri().getPath();
+      assertTrue(isPathInUserHome(pathStr, dfs));
+      // allUsers should not make a difference for now because we have one user
+      Collection<FileStatus> trashRootsAllUsers = dfs.getTrashRoots(true);
+      assertEquals(trashRoots, trashRootsAllUsers);
+
+      dfs.allowSnapshot(testDir);
+
+      Collection<FileStatus> trashRootsAfter = dfs.getTrashRoots(false);
+      // getTrashRoots should return 1 more trash root inside snapshottable dir
+      assertEquals(trashRoots.size() + 1, trashRootsAfter.size());
+      boolean foundUserHomeTrash = false;
+      boolean foundSnapDirUserTrash = false;
+      String testDirStr = testDir.toUri().getPath();
+      for (FileStatus fileStatus : trashRootsAfter) {
+        String currPathStr = fileStatus.getPath().toUri().getPath();
+        if (isPathInUserHome(currPathStr, dfs)) {
+          foundUserHomeTrash = true;
+        } else if (currPathStr.startsWith(testDirStr)) {
+          foundSnapDirUserTrash = true;
+        }
+      }
+      assertTrue(foundUserHomeTrash);
+      assertTrue(foundSnapDirUserTrash);
+      // allUsers should not make a difference for now because we have one user
+      Collection<FileStatus> trashRootsAfterAllUsers = dfs.getTrashRoots(true);
+      assertEquals(trashRootsAfter, trashRootsAfterAllUsers);
+
+      // Create trash root for user0
+      UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user0");
+      String user0HomeStr = DFSUtilClient.getHomeDirectory(conf, ugi);
+      Path user0Trash = new Path(user0HomeStr, FileSystem.TRASH_PREFIX);
+      dfs.mkdirs(user0Trash);
+      // allUsers flag set to false should be unaffected
+      Collection<FileStatus> trashRootsAfter2 = dfs.getTrashRoots(false);
+      assertEquals(trashRootsAfter, trashRootsAfter2);
+      // allUsers flag set to true should include new user's trash
+      trashRootsAfter2 = dfs.getTrashRoots(true);
+      assertEquals(trashRootsAfter.size() + 1, trashRootsAfter2.size());
+
+      // Create trash root inside the snapshottable directory for user0
+      Path testDirTrashUser0 = new Path(testDirTrash, ugi.getShortUserName());
+      dfs.mkdirs(testDirTrashUser0);
+      Collection<FileStatus> trashRootsAfter3 = dfs.getTrashRoots(true);
+      assertEquals(trashRootsAfter2.size() + 1, trashRootsAfter3.size());
+
+      // Cleanup
+      dfs.disallowSnapshot(testDir);
+      dfs.delete(testDir, true);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test
+  public void testGetTrashRootsOnSnapshottableDirWithEZ()
+      throws IOException, NoSuchAlgorithmException {
+    Configuration conf = getTestConfiguration();
+    conf.setBoolean("dfs.namenode.snapshot.trashroot.enabled", true);
+    // Set encryption zone config
+    File tmpDir = GenericTestUtils.getTestDir(UUID.randomUUID().toString());
+    final Path jksPath = new Path(tmpDir.toString(), "test.jks");
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+        JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri());
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    // Create key for EZ
+    final KeyProvider provider =
+        cluster.getNameNode().getNamesystem().getProvider();
+    final KeyProvider.Options options = KeyProvider.options(conf);
+    provider.createKey("key", options);
+    provider.flush();
+
+    try {
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      Path testDir = new Path("/ssgtr/test2/");
+      dfs.mkdirs(testDir);
+      dfs.createEncryptionZone(testDir, "key");
+
+      // Create trash inside test directory
+      Path testDirTrash = new Path(testDir, FileSystem.TRASH_PREFIX);
+      Path testDirTrashCurrUser = new Path(testDirTrash,
+          UserGroupInformation.getCurrentUser().getShortUserName());
+      dfs.mkdirs(testDirTrashCurrUser);
+
+      Collection<FileStatus> trashRoots = dfs.getTrashRoots(false);
+      assertEquals(1, trashRoots.size());
+      FileStatus firstFileStatus = trashRoots.iterator().next();
+      String pathStr = firstFileStatus.getPath().toUri().getPath();
+      String testDirStr = testDir.toUri().getPath();
+      assertTrue(pathStr.startsWith(testDirStr));
+
+      dfs.allowSnapshot(testDir);
+
+      Collection<FileStatus> trashRootsAfter = dfs.getTrashRoots(false);
+      // getTrashRoots should give the same result
+      assertEquals(trashRoots, trashRootsAfter);
+
+      // Cleanup
+      dfs.disallowSnapshot(testDir);
+      dfs.delete(testDir, true);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test
+  public void testGetTrashRootOnSnapshottableDirInEZ()
+      throws IOException, NoSuchAlgorithmException {
+    Configuration conf = getTestConfiguration();
+    conf.setBoolean("dfs.namenode.snapshot.trashroot.enabled", true);
+    // Set EZ config
+    File tmpDir = GenericTestUtils.getTestDir(UUID.randomUUID().toString());
+    final Path jksPath = new Path(tmpDir.toString(), "test.jks");
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+        JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri());
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    // Create key for EZ
+    final KeyProvider provider =
+        cluster.getNameNode().getNamesystem().getProvider();
+    final KeyProvider.Options options = KeyProvider.options(conf);
+    provider.createKey("key", options);
+    provider.flush();
+
+    try {
+      DistributedFileSystem dfs = cluster.getFileSystem();
+
+      Path testDir = new Path("/ssgtr/test3ez/");
+      dfs.mkdirs(testDir);
+      dfs.createEncryptionZone(testDir, "key");
+      Path testSubD = new Path(testDir, "sssubdir");
+      Path file1Path = new Path(testSubD, "file1");
+      dfs.create(file1Path);
+
+      final Path trBefore = dfs.getTrashRoot(file1Path);
+      final String trBeforeStr = trBefore.toUri().getPath();
+      // The trash root should be directly under testDir
+      final Path testDirTrash = new Path(testDir, FileSystem.TRASH_PREFIX);
+      final String testDirTrashStr = testDirTrash.toUri().getPath();
+      assertTrue(trBeforeStr.startsWith(testDirTrashStr));
+
+      dfs.allowSnapshot(testSubD);
+      final Path trAfter = dfs.getTrashRoot(file1Path);
+      final String trAfterStr = trAfter.toUri().getPath();
+      // The trash is now located in the dir inside
+      final Path testSubDirTrash = new Path(testSubD, FileSystem.TRASH_PREFIX);
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      final Path testSubDirUserTrash = new Path(testSubDirTrash,
+          ugi.getShortUserName());
+      final String testSubDirUserTrashStr =
+          testSubDirUserTrash.toUri().getPath();
+      assertEquals(testSubDirUserTrashStr, trAfterStr);
+
+      // Cleanup
+      dfs.disallowSnapshot(testSubD);
+      dfs.delete(testDir, true);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test
+  public void testGetTrashRootOnEZInSnapshottableDir()
+      throws IOException, NoSuchAlgorithmException {
+    Configuration conf = getTestConfiguration();
+    conf.setBoolean("dfs.namenode.snapshot.trashroot.enabled", true);
+    // Set EZ config
+    File tmpDir = GenericTestUtils.getTestDir(UUID.randomUUID().toString());
+    final Path jksPath = new Path(tmpDir.toString(), "test.jks");
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+        JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri());
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    // Create key for EZ
+    final KeyProvider provider =
+        cluster.getNameNode().getNamesystem().getProvider();
+    final KeyProvider.Options options = KeyProvider.options(conf);
+    provider.createKey("key", options);
+    provider.flush();
+
+    try {
+      DistributedFileSystem dfs = cluster.getFileSystem();
+
+      Path testDir = new Path("/ssgtr/test3ss/");
+      dfs.mkdirs(testDir);
+      dfs.allowSnapshot(testDir);
+      Path testSubD = new Path(testDir, "ezsubdir");
+      dfs.mkdirs(testSubD);
+      Path file1Path = new Path(testSubD, "file1");
+      dfs.create(file1Path);
+
+      final Path trBefore = dfs.getTrashRoot(file1Path);
+      final String trBeforeStr = trBefore.toUri().getPath();
+      // The trash root should be directly under testDir
+      final Path testDirTrash = new Path(testDir, FileSystem.TRASH_PREFIX);
+      final String testDirTrashStr = testDirTrash.toUri().getPath();
+      assertTrue(trBeforeStr.startsWith(testDirTrashStr));
+
+      // Need to remove the file inside the dir to establish EZ
+      dfs.delete(file1Path, false);
+      dfs.createEncryptionZone(testSubD, "key");
+      dfs.create(file1Path);
+
+      final Path trAfter = dfs.getTrashRoot(file1Path);
+      final String trAfterStr = trAfter.toUri().getPath();
+      // The trash is now located in the dir inside
+      final Path testSubDirTrash = new Path(testSubD, FileSystem.TRASH_PREFIX);
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      final Path testSubDirUserTrash = new Path(testSubDirTrash,
+          ugi.getShortUserName());
+      final String testSubDirUserTrashStr =
+          testSubDirUserTrash.toUri().getPath();
+      assertEquals(testSubDirUserTrashStr, trAfterStr);
+
+      // Cleanup
+      dfs.disallowSnapshot(testDir);
+      dfs.delete(testDir, true);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }