Переглянути джерело

HDFS-10324. Trash directory in an encryption zone should be pre-created with correct permissions. Contributed by Wei-Chiu Chuang.

Xiaoyu Yao 9 роки тому
батько
коміт
dacd1f50fe

+ 0 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java

@@ -56,7 +56,6 @@ public class TrashPolicyDefault extends TrashPolicy {
     LogFactory.getLog(TrashPolicyDefault.class);
 
   private static final Path CURRENT = new Path("Current");
-  private static final Path TRASH = new Path(".Trash/");  
 
   private static final FsPermission PERMISSION =
     new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java

@@ -29,7 +29,6 @@ import java.nio.ByteBuffer;
 import java.util.EnumSet;
 
 import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
-import org.apache.hadoop.crypto.key.KeyProviderFactory;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystemTestHelper;
@@ -38,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
 import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
@@ -118,6 +118,8 @@ public class TestRpcProgramNfs3 {
   private static final String TEST_KEY = "test_key";
   private static FileSystemTestHelper fsHelper;
   private static File testRootDir;
+  private static final EnumSet<CreateEncryptionZoneFlag> NO_TRASH =
+      EnumSet.of(CreateEncryptionZoneFlag.NO_TRASH);
 
   @BeforeClass
   public static void setup() throws Exception {
@@ -340,7 +342,7 @@ public class TestRpcProgramNfs3 {
 
     final Path zone = new Path("/zone");
     hdfs.mkdirs(zone);
-    dfsAdmin.createEncryptionZone(zone, TEST_KEY);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
 
     final byte[] buffer = new byte[len];
     for (int i = 0; i < len; i++) {

+ 71 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/CreateEncryptionZoneFlag.java

@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * CreateEncryptionZoneFlag is used in
+ * {@link HdfsAdmin#createEncryptionZone(Path, String, EnumSet)} to indicate
+ * what should be done when creating an encryption zone.
+ *
+ * Use CreateEncryptionZoneFlag as follows:
+ * <ol>
+ *   <li>PROVISION_TRASH - provision a trash directory for the encryption zone
+ *   to support soft delete.</li>
+ * </ol>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public enum CreateEncryptionZoneFlag {
+
+  /**
+   * Do not provision a trash directory in the encryption zone.
+   *
+   * @see CreateEncryptionZoneFlag#NO_TRASH
+   */
+  NO_TRASH((short) 0x00),
+  /**
+   * Provision a trash directory .Trash/ in the
+   * encryption zone.
+   *
+   * @see CreateEncryptionZoneFlag#PROVISION_TRASH
+   */
+  PROVISION_TRASH((short) 0x01);
+
+  private final short mode;
+
+  CreateEncryptionZoneFlag(short mode) {
+    this.mode = mode;
+  }
+
+  public static CreateEncryptionZoneFlag valueOf(short mode) {
+    for (CreateEncryptionZoneFlag flag : CreateEncryptionZoneFlag.values()) {
+      if (flag.getMode() == mode) {
+        return flag;
+      }
+    }
+    return null;
+  }
+
+  public short getMode() {
+    return mode;
+  }
+}
+

+ 88 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java

@@ -22,14 +22,18 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.EnumSet;
 
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CacheFlag;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -56,6 +60,8 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 public class HdfsAdmin {
   
   private DistributedFileSystem dfs;
+  private static final FsPermission TRASH_PERMISSION = new FsPermission(
+      FsAction.ALL, FsAction.ALL, FsAction.ALL, true);
   
   /**
    * Create a new HdfsAdmin client.
@@ -270,9 +276,51 @@ public class HdfsAdmin {
    * @throws AccessControlException if the caller does not have access to path
    * @throws FileNotFoundException  if the path does not exist
    */
+  @Deprecated
   public void createEncryptionZone(Path path, String keyName)
-    throws IOException, AccessControlException, FileNotFoundException {
+      throws IOException, AccessControlException, FileNotFoundException {
+    dfs.createEncryptionZone(path, keyName);
+  }
+
+  /**
+   * Create an encryption zone rooted at an empty existing directory, using the
+   * specified encryption key. An encryption zone has an associated encryption
+   * key used when reading and writing files within the zone.
+   *
+   * Additional options, such as provisioning the trash directory, can be
+   * specified using {@link CreateEncryptionZoneFlag} flags.
+   *
+   * @param path    The path of the root of the encryption zone. Must refer to
+   *                an empty, existing directory.
+   * @param keyName Name of key available at the KeyProvider.
+   * @param flags   flags for this operation.
+   * @throws IOException            if there was a general IO exception
+   * @throws AccessControlException if the caller does not have access to path
+   * @throws FileNotFoundException  if the path does not exist
+   * @throws HadoopIllegalArgumentException if the flags are invalid
+   */
+  public void createEncryptionZone(Path path, String keyName,
+      EnumSet<CreateEncryptionZoneFlag> flags)
+      throws IOException, AccessControlException, FileNotFoundException,
+      HadoopIllegalArgumentException{
     dfs.createEncryptionZone(path, keyName);
+    if (flags.contains(CreateEncryptionZoneFlag.PROVISION_TRASH)) {
+      if (flags.contains(CreateEncryptionZoneFlag.NO_TRASH)) {
+        throw new HadoopIllegalArgumentException(
+            "can not have both PROVISION_TRASH and NO_TRASH flags");
+      }
+      this.provisionEZTrash(path);
+    }
+  }
+
+  /**
+   * Provision a trash directory for a given encryption zone.
+
+   * @param path the root of the encryption zone
+   * @throws IOException if the trash directory can not be created.
+   */
+  public void provisionEncryptionZoneTrash(Path path) throws IOException {
+    this.provisionEZTrash(path);
   }
 
   /**
@@ -399,4 +447,43 @@ public class HdfsAdmin {
   public ErasureCodingPolicy[] getErasureCodingPolicies() throws IOException {
     return dfs.getClient().getErasureCodingPolicies();
   }
+
+  private void provisionEZTrash(Path path) throws IOException {
+    // make sure the path is an EZ
+    EncryptionZone ez = dfs.getEZForPath(path);
+    if (ez == null) {
+      throw new IllegalArgumentException(path + " is not an encryption zone.");
+    }
+
+    String ezPath = ez.getPath();
+    if (!path.toString().equals(ezPath)) {
+      throw new IllegalArgumentException(path + " is not the root of an " +
+          "encryption zone. Do you mean " + ez.getPath() + "?");
+    }
+
+    // check if the trash directory exists
+
+    Path trashPath = new Path(ez.getPath(), FileSystem.TRASH_PREFIX);
+
+    if (dfs.exists(trashPath)) {
+      String errMessage = "Will not provision new trash directory for " +
+          "encryption zone " + ez.getPath() + ". Path already exists.";
+      FileStatus trashFileStatus = dfs.getFileStatus(trashPath);
+      if (!trashFileStatus.isDirectory()) {
+        errMessage += "\r\n" +
+            "Warning: " + trashPath.toString() + " is not a directory";
+      }
+      if (!trashFileStatus.getPermission().equals(TRASH_PERMISSION)) {
+        errMessage += "\r\n" +
+            "Warning: the permission of " +
+            trashPath.toString() + " is not " + TRASH_PERMISSION;
+      }
+      throw new IOException(errMessage);
+    }
+
+    // Update the permission bits
+    dfs.mkdir(trashPath, TRASH_PERMISSION);
+    dfs.setPermission(trashPath, TRASH_PERMISSION);
+  }
+
 }

+ 27 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/package-info.java

@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package provides the administrative APIs for HDFS.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+package org.apache.hadoop.hdfs.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 58 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CryptoAdmin.java

@@ -18,15 +18,18 @@
 package org.apache.hadoop.hdfs.tools;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.tools.TableListing;
 import org.apache.hadoop.util.StringUtils;
@@ -103,12 +106,13 @@ public class CryptoAdmin extends Configured implements Tool {
     public String getLongUsage() {
       final TableListing listing = AdminHelper.getOptionDescriptionListing();
       listing.addRow("<path>", "The path of the encryption zone to create. " +
-        "It must be an empty directory.");
+          "It must be an empty directory. A trash directory is provisioned " +
+          "under this path.");
       listing.addRow("<keyName>", "Name of the key to use for the " +
           "encryption zone.");
       return getShortUsage() + "\n" +
-        "Create a new encryption zone.\n\n" +
-        listing.toString();
+          "Create a new encryption zone.\n\n" +
+          listing.toString();
     }
 
     @Override
@@ -131,15 +135,16 @@ public class CryptoAdmin extends Configured implements Tool {
         return 1;
       }
 
-      final DistributedFileSystem dfs = AdminHelper.getDFS(conf);
+      HdfsAdmin admin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+      EnumSet<CreateEncryptionZoneFlag> flags =
+          EnumSet.of(CreateEncryptionZoneFlag.PROVISION_TRASH);
       try {
-        dfs.createEncryptionZone(new Path(path), keyName);
+        admin.createEncryptionZone(new Path(path), keyName, flags);
         System.out.println("Added encryption zone " + path);
       } catch (IOException e) {
         System.err.println(prettifyException(e));
         return 2;
       }
-
       return 0;
     }
   }
@@ -168,12 +173,12 @@ public class CryptoAdmin extends Configured implements Tool {
         return 1;
       }
 
-      final DistributedFileSystem dfs = AdminHelper.getDFS(conf);
+      HdfsAdmin admin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
       try {
         final TableListing listing = new TableListing.Builder()
           .addField("").addField("", true)
           .wrapWidth(AdminHelper.MAX_LINE_WIDTH).hideHeaders().build();
-        final RemoteIterator<EncryptionZone> it = dfs.listEncryptionZones();
+        final RemoteIterator<EncryptionZone> it = admin.listEncryptionZones();
         while (it.hasNext()) {
           EncryptionZone ez = it.next();
           listing.addRow(ez.getPath(), ez.getKeyName());
@@ -188,8 +193,50 @@ public class CryptoAdmin extends Configured implements Tool {
     }
   }
 
+  private static class ProvisionTrashCommand implements AdminHelper.Command {
+    @Override
+    public String getName() {
+      return "-provisionTrash";
+    }
+
+    @Override
+    public String getShortUsage() {
+      return "[" + getName() + " -path <path>]\n";
+    }
+
+    @Override
+    public String getLongUsage() {
+      final TableListing listing = AdminHelper.getOptionDescriptionListing();
+      listing.addRow("<path>", "The path to the root of the encryption zone. ");
+      return getShortUsage() + "\n" +
+          "Provision a trash directory for an encryption zone.\n\n" +
+          listing.toString();
+    }
+
+    @Override
+    public int run(Configuration conf, List<String> args) throws IOException {
+      final String path = StringUtils.popOptionWithArgument("-path", args);
+
+      if (!args.isEmpty()) {
+        System.err.println("Can't understand argument: " + args.get(0));
+        return 1;
+      }
+
+      HdfsAdmin admin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+      try {
+        admin.provisionEncryptionZoneTrash(new Path(path));
+        System.out.println("Created a trash directory for " + path);
+      } catch (IOException ioe) {
+        System.err.println(prettifyException(ioe));
+        return 2;
+      }
+      return 0;
+    }
+  }
+
   private static final AdminHelper.Command[] COMMANDS = {
-    new CreateZoneCommand(),
-    new ListZonesCommand()
+      new CreateZoneCommand(),
+      new ListZonesCommand(),
+      new ProvisionTrashCommand()
   };
 }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md

@@ -338,6 +338,7 @@ Usage:
 
       hdfs crypto -createZone -keyName <keyName> -path <path>
       hdfs crypto -listZones
+      hdfs crypto -provisionTrash -path <path>
       hdfs crypto -help <command-name>
 
 See the [HDFS Transparent Encryption Documentation](./TransparentEncryption.html#crypto_command-line_interface) for more information.

+ 12 - 1
hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/TransparentEncryption.md

@@ -170,7 +170,7 @@ Create a new encryption zone.
 
 | | |
 |:---- |:---- |
-| *path* | The path of the encryption zone to create. It must be an empty directory. |
+| *path* | The path of the encryption zone to create. It must be an empty directory. A trash directory is provisioned under this path.|
 | *keyName* | Name of the key to use for the encryption zone. |
 
 ### <a name="listZones"></a>listZones
@@ -179,6 +179,16 @@ Usage: `[-listZones]`
 
 List all encryption zones. Requires superuser permissions.
 
+### <a name="provisionTrash"></a>provisionTrash
+
+Usage: `[-provisionTrash -path <path>]`
+
+Provision a trash directory for an encryption zone.
+
+| | |
+|:---- |:---- |
+| *path* | The path to the root of the encryption zone. |
+
 <a name="Example_usage"></a>Example usage
 -------------
 
@@ -222,6 +232,7 @@ This restriction enhances security and eases system management significantly. Al
 
 To comply with the above rule, each encryption zone has its own `.Trash` directory under the "zone directory". E.g., after `hdfs dfs rm /zone/encryptedFile`, `encryptedFile` will be moved to `/zone/.Trash`, instead of the `.Trash` directory under the user's home directory. When the entire encryption zone is deleted, the "zone directory" will be moved to the `.Trash` directory under the user's home directory.
 
+The `crypto` command before Hadoop 2.8.0 does not provision the `.Trash` directory automatically. If an encryption zone is created before Hadoop 2.8.0, and then the cluster is upgraded to Hadoop 2.8.0 or above, the trash directory can be provisioned using `-provisionTrash` option (e.g., `hdfs crypto -provisionTrash -path /zone`).
 <a name="Attack_vectors"></a>Attack vectors
 --------------
 

+ 157 - 49
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java

@@ -29,6 +29,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -59,7 +60,9 @@ import org.apache.hadoop.fs.FileSystemTestWrapper;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
@@ -70,6 +73,7 @@ import org.apache.hadoop.hdfs.server.namenode.EncryptionFaultInjector;
 import org.apache.hadoop.hdfs.server.namenode.EncryptionZoneManager;
 import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
+import org.apache.hadoop.hdfs.tools.CryptoAdmin;
 import org.apache.hadoop.hdfs.tools.DFSck;
 import org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter;
 import org.apache.hadoop.hdfs.web.WebHdfsConstants;
@@ -131,6 +135,9 @@ public class TestEncryptionZones {
   protected FileSystemTestWrapper fsWrapper;
   protected FileContextTestWrapper fcWrapper;
 
+  protected static final EnumSet< CreateEncryptionZoneFlag > NO_TRASH =
+      EnumSet.of(CreateEncryptionZoneFlag.NO_TRASH);
+
   protected String getKeyProviderURI() {
     return JavaKeyStoreProvider.SCHEME_NAME + "://file" +
       new Path(testRootDir.toString(), "test.jks").toUri();
@@ -215,6 +222,106 @@ public class TestEncryptionZones {
     );
   }
 
+  /**
+   * Make sure hdfs crypto -createZone command creates a trash directory
+   * with sticky bits.
+   * @throws Exception
+   */
+  @Test(timeout = 60000)
+  public void testTrashStickyBit() throws Exception {
+    // create an EZ /zones/zone1, make it world writable.
+    final Path zoneParent = new Path("/zones");
+    final Path zone1 = new Path(zoneParent, "zone1");
+    CryptoAdmin cryptoAdmin = new CryptoAdmin(conf);
+    fsWrapper.mkdir(zone1, FsPermission.getDirDefault(), true);
+    fsWrapper.setPermission(zone1,
+        new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+    String[] cryptoArgv = new String[]{"-createZone", "-keyName", TEST_KEY,
+        "-path", zone1.toUri().getPath()};
+    cryptoAdmin.run(cryptoArgv);
+
+    // create a file in EZ
+    final Path ezfile1 = new Path(zone1, "file1");
+    // Create the encrypted file in zone1
+    final int len = 8192;
+    DFSTestUtil.createFile(fs, ezfile1, len, (short) 1, 0xFEED);
+
+    // enable trash, delete /zones/zone1/file1,
+    // which moves the file to
+    // /zones/zone1/.Trash/$SUPERUSER/Current/zones/zone1/file1
+    Configuration clientConf = new Configuration(conf);
+    clientConf.setLong(FS_TRASH_INTERVAL_KEY, 1);
+    final FsShell shell = new FsShell(clientConf);
+    String[] argv = new String[]{"-rm", ezfile1.toString()};
+    int res = ToolRunner.run(shell, argv);
+    assertEquals("Can't remove a file in EZ as superuser", 0, res);
+
+    final Path trashDir = new Path(zone1, FileSystem.TRASH_PREFIX);
+    assertTrue(fsWrapper.exists(trashDir));
+    FileStatus trashFileStatus = fsWrapper.getFileStatus(trashDir);
+    assertTrue(trashFileStatus.getPermission().getStickyBit());
+
+    // create a non-privileged user
+    final UserGroupInformation user = UserGroupInformation.
+        createUserForTesting("user", new String[] { "mygroup" });
+
+    user.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        final Path ezfile2 = new Path(zone1, "file2");
+        final int len = 8192;
+        // create a file /zones/zone1/file2 in EZ
+        // this file is owned by user:mygroup
+        FileSystem fs2 = FileSystem.get(cluster.getConfiguration(0));
+        DFSTestUtil.createFile(fs2, ezfile2, len, (short) 1, 0xFEED);
+        // delete /zones/zone1/file2,
+        // which moves the file to
+        // /zones/zone1/.Trash/user/Current/zones/zone1/file2
+        String[] argv = new String[]{"-rm", ezfile2.toString()};
+        int res = ToolRunner.run(shell, argv);
+        assertEquals("Can't remove a file in EZ as user:mygroup", 0, res);
+        return null;
+      }
+    });
+  }
+
+  /**
+   * Make sure hdfs crypto -provisionTrash command creates a trash directory
+   * with sticky bits.
+   * @throws Exception
+   */
+  @Test(timeout = 60000)
+  public void testProvisionTrash() throws Exception {
+    // create an EZ /zones/zone1
+    final Path zoneParent = new Path("/zones");
+    final Path zone1 = new Path(zoneParent, "zone1");
+    CryptoAdmin cryptoAdmin = new CryptoAdmin(conf);
+    fsWrapper.mkdir(zone1, FsPermission.getDirDefault(), true);
+    String[] cryptoArgv = new String[]{"-createZone", "-keyName", TEST_KEY,
+        "-path", zone1.toUri().getPath()};
+    cryptoAdmin.run(cryptoArgv);
+
+    // remove the trash directory
+    Configuration clientConf = new Configuration(conf);
+    clientConf.setLong(FS_TRASH_INTERVAL_KEY, 1);
+    final FsShell shell = new FsShell(clientConf);
+    final Path trashDir = new Path(zone1, FileSystem.TRASH_PREFIX);
+    String[] argv = new String[]{"-rmdir", trashDir.toUri().getPath()};
+    int res = ToolRunner.run(shell, argv);
+    assertEquals("Unable to delete trash directory.", 0, res);
+    assertFalse(fsWrapper.exists(trashDir));
+
+    // execute -provisionTrash command option and make sure the trash
+    // directory has sticky bit.
+    String[] provisionTrashArgv = new String[]{"-provisionTrash", "-path",
+        zone1.toUri().getPath()};
+    cryptoAdmin.run(provisionTrashArgv);
+
+    assertTrue(fsWrapper.exists(trashDir));
+    FileStatus trashFileStatus = fsWrapper.getFileStatus(trashDir);
+    assertTrue(trashFileStatus.getPermission().getStickyBit());
+  }
+
   @Test(timeout = 60000)
   public void testBasicOperations() throws Exception {
 
@@ -223,8 +330,9 @@ public class TestEncryptionZones {
     /* Test failure of create EZ on a directory that doesn't exist. */
     final Path zoneParent = new Path("/zones");
     final Path zone1 = new Path(zoneParent, "zone1");
+
     try {
-      dfsAdmin.createEncryptionZone(zone1, TEST_KEY);
+      dfsAdmin.createEncryptionZone(zone1, TEST_KEY, NO_TRASH);
       fail("expected /test doesn't exist");
     } catch (IOException e) {
       assertExceptionContains("cannot find", e);
@@ -232,20 +340,20 @@ public class TestEncryptionZones {
 
     /* Normal creation of an EZ */
     fsWrapper.mkdir(zone1, FsPermission.getDirDefault(), true);
-    dfsAdmin.createEncryptionZone(zone1, TEST_KEY);
+    dfsAdmin.createEncryptionZone(zone1, TEST_KEY, NO_TRASH);
     assertNumZones(++numZones);
     assertZonePresent(null, zone1.toString());
 
     /* Test failure of create EZ on a directory which is already an EZ. */
     try {
-      dfsAdmin.createEncryptionZone(zone1, TEST_KEY);
+      dfsAdmin.createEncryptionZone(zone1, TEST_KEY, NO_TRASH);
     } catch (IOException e) {
       assertExceptionContains("is already an encryption zone", e);
     }
 
     /* create EZ on parent of an EZ should fail */
     try {
-      dfsAdmin.createEncryptionZone(zoneParent, TEST_KEY);
+      dfsAdmin.createEncryptionZone(zoneParent, TEST_KEY, NO_TRASH);
       fail("EZ over an EZ");
     } catch (IOException e) {
       assertExceptionContains("encryption zone for a non-empty directory", e);
@@ -256,7 +364,7 @@ public class TestEncryptionZones {
     final Path notEmptyChild = new Path(notEmpty, "child");
     fsWrapper.mkdir(notEmptyChild, FsPermission.getDirDefault(), true);
     try {
-      dfsAdmin.createEncryptionZone(notEmpty, TEST_KEY);
+      dfsAdmin.createEncryptionZone(notEmpty, TEST_KEY, NO_TRASH);
       fail("Created EZ on an non-empty directory with folder");
     } catch (IOException e) {
       assertExceptionContains("create an encryption zone", e);
@@ -266,7 +374,7 @@ public class TestEncryptionZones {
     /* create EZ on a folder with a file fails */
     fsWrapper.createFile(notEmptyChild);
     try {
-      dfsAdmin.createEncryptionZone(notEmpty, TEST_KEY);
+      dfsAdmin.createEncryptionZone(notEmpty, TEST_KEY, NO_TRASH);
       fail("Created EZ on an non-empty directory with file");
     } catch (IOException e) {
       assertExceptionContains("create an encryption zone", e);
@@ -274,7 +382,7 @@ public class TestEncryptionZones {
 
     /* Test failure of create EZ on a file. */
     try {
-      dfsAdmin.createEncryptionZone(notEmptyChild, TEST_KEY);
+      dfsAdmin.createEncryptionZone(notEmptyChild, TEST_KEY, NO_TRASH);
       fail("Created EZ on a file");
     } catch (IOException e) {
       assertExceptionContains("create an encryption zone for a file.", e);
@@ -285,7 +393,7 @@ public class TestEncryptionZones {
     fsWrapper.mkdir(zone2, FsPermission.getDirDefault(), false);
     final String myKeyName = "mykeyname";
     try {
-      dfsAdmin.createEncryptionZone(zone2, myKeyName);
+      dfsAdmin.createEncryptionZone(zone2, myKeyName, NO_TRASH);
       fail("expected key doesn't exist");
     } catch (IOException e) {
       assertExceptionContains("doesn't exist.", e);
@@ -293,13 +401,13 @@ public class TestEncryptionZones {
 
     /* Test failure of empty and null key name */
     try {
-      dfsAdmin.createEncryptionZone(zone2, "");
+      dfsAdmin.createEncryptionZone(zone2, "", NO_TRASH);
       fail("created a zone with empty key name");
     } catch (IOException e) {
       assertExceptionContains("Must specify a key name when creating", e);
     }
     try {
-      dfsAdmin.createEncryptionZone(zone2, null);
+      dfsAdmin.createEncryptionZone(zone2, null, NO_TRASH);
       fail("created a zone with null key name");
     } catch (IOException e) {
       assertExceptionContains("Must specify a key name when creating", e);
@@ -309,7 +417,7 @@ public class TestEncryptionZones {
 
     /* Test success of creating an EZ when they key exists. */
     DFSTestUtil.createKey(myKeyName, cluster, conf);
-    dfsAdmin.createEncryptionZone(zone2, myKeyName);
+    dfsAdmin.createEncryptionZone(zone2, myKeyName, NO_TRASH);
     assertNumZones(++numZones);
     assertZonePresent(myKeyName, zone2.toString());
 
@@ -325,7 +433,7 @@ public class TestEncryptionZones {
         final HdfsAdmin userAdmin =
             new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
         try {
-          userAdmin.createEncryptionZone(nonSuper, TEST_KEY);
+          userAdmin.createEncryptionZone(nonSuper, TEST_KEY, NO_TRASH);
           fail("createEncryptionZone is superuser-only operation");
         } catch (AccessControlException e) {
           assertExceptionContains("Superuser privilege is required", e);
@@ -337,7 +445,7 @@ public class TestEncryptionZones {
     // Test success of creating an encryption zone a few levels down.
     Path deepZone = new Path("/d/e/e/p/zone");
     fsWrapper.mkdir(deepZone, FsPermission.getDirDefault(), true);
-    dfsAdmin.createEncryptionZone(deepZone, TEST_KEY);
+    dfsAdmin.createEncryptionZone(deepZone, TEST_KEY, NO_TRASH);
     assertNumZones(++numZones);
     assertZonePresent(null, deepZone.toString());
 
@@ -345,7 +453,7 @@ public class TestEncryptionZones {
     for (int i=1; i<6; i++) {
       final Path zonePath = new Path("/listZone" + i);
       fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false);
-      dfsAdmin.createEncryptionZone(zonePath, TEST_KEY);
+      dfsAdmin.createEncryptionZone(zonePath, TEST_KEY, NO_TRASH);
       numZones++;
       assertNumZones(numZones);
       assertZonePresent(null, zonePath.toString());
@@ -365,7 +473,7 @@ public class TestEncryptionZones {
     // without persisting the namespace.
     Path nonpersistZone = new Path("/nonpersistZone");
     fsWrapper.mkdir(nonpersistZone, FsPermission.getDirDefault(), false);
-    dfsAdmin.createEncryptionZone(nonpersistZone, TEST_KEY);
+    dfsAdmin.createEncryptionZone(nonpersistZone, TEST_KEY, NO_TRASH);
     numZones++;
     cluster.restartNameNode(true);
     assertNumZones(numZones);
@@ -379,7 +487,7 @@ public class TestEncryptionZones {
     final Path zone1 = new Path(rootDir, "zone1");
 
     /* Normal creation of an EZ on rootDir */
-    dfsAdmin.createEncryptionZone(rootDir, TEST_KEY);
+    dfsAdmin.createEncryptionZone(rootDir, TEST_KEY, NO_TRASH);
     assertNumZones(++numZones);
     assertZonePresent(null, rootDir.toString());
 
@@ -407,10 +515,10 @@ public class TestEncryptionZones {
     final Path allPath = new Path(testRoot, "accessall");
 
     fsWrapper.mkdir(superPath, new FsPermission((short) 0700), true);
-    dfsAdmin.createEncryptionZone(superPath, TEST_KEY);
+    dfsAdmin.createEncryptionZone(superPath, TEST_KEY, NO_TRASH);
 
     fsWrapper.mkdir(allPath, new FsPermission((short) 0707), true);
-    dfsAdmin.createEncryptionZone(allPath, TEST_KEY);
+    dfsAdmin.createEncryptionZone(allPath, TEST_KEY, NO_TRASH);
 
     user.doAs(new PrivilegedExceptionAction<Object>() {
       @Override
@@ -450,8 +558,8 @@ public class TestEncryptionZones {
     fsWrapper.mkdir(superPath, new FsPermission((short) 0700), false);
     fsWrapper.mkdir(allPath, new FsPermission((short) 0777), false);
     fsWrapper.mkdir(nonEZDir, new FsPermission((short) 0777), false);
-    dfsAdmin.createEncryptionZone(superPath, TEST_KEY);
-    dfsAdmin.createEncryptionZone(allPath, TEST_KEY);
+    dfsAdmin.createEncryptionZone(superPath, TEST_KEY, NO_TRASH);
+    dfsAdmin.createEncryptionZone(allPath, TEST_KEY, NO_TRASH);
     dfsAdmin.allowSnapshot(new Path("/"));
     final Path newSnap = fs.createSnapshot(new Path("/"));
     DFSTestUtil.createFile(fs, superPathFile, len, (short) 1, 0xFEED);
@@ -556,7 +664,7 @@ public class TestEncryptionZones {
     final Path pathFooBarFile = new Path(pathFooBar, "file");
     final int len = 8192;
     wrapper.mkdir(pathFoo, FsPermission.getDirDefault(), true);
-    dfsAdmin.createEncryptionZone(pathFoo, TEST_KEY);
+    dfsAdmin.createEncryptionZone(pathFoo, TEST_KEY, NO_TRASH);
     wrapper.mkdir(pathFooBaz, FsPermission.getDirDefault(), true);
     DFSTestUtil.createFile(fs, pathFooBazFile, len, (short) 1, 0xFEED);
     String contents = DFSTestUtil.readFile(fs, pathFooBazFile);
@@ -615,7 +723,7 @@ public class TestEncryptionZones {
     // Create the first enc file
     final Path zone = new Path("/zone");
     fs.mkdirs(zone);
-    dfsAdmin.createEncryptionZone(zone, TEST_KEY);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
     final Path encFile1 = new Path(zone, "myfile");
     DFSTestUtil.createFile(fs, encFile1, len, (short) 1, 0xFEED);
     // Read them back in and compare byte-by-byte
@@ -650,7 +758,7 @@ public class TestEncryptionZones {
 
     final Path zone = new Path("/zone");
     fs.mkdirs(zone);
-    dfsAdmin.createEncryptionZone(zone, TEST_KEY);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
 
     /* Create an unencrypted file for comparison purposes. */
     final Path unencFile = new Path("/unenc");
@@ -696,7 +804,7 @@ public class TestEncryptionZones {
         new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
     final Path zone = new Path("/zone");
     fs.mkdirs(zone);
-    dfsAdmin.createEncryptionZone(zone, TEST_KEY);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
     // Create a file in an EZ, which should succeed
     DFSTestUtil
         .createFile(fs, new Path(zone, "success1"), 0, (short) 1, 0xFEED);
@@ -827,7 +935,7 @@ public class TestEncryptionZones {
     final Path zone1 = new Path("/zone1");
     fsWrapper.mkdir(zone1, FsPermission.getDirDefault(), true);
     try {
-      dfsAdmin.createEncryptionZone(zone1, TEST_KEY);
+      dfsAdmin.createEncryptionZone(zone1, TEST_KEY, NO_TRASH);
       fail("expected exception");
     } catch (IOException e) {
       assertExceptionContains("since no key provider is available", e);
@@ -869,7 +977,7 @@ public class TestEncryptionZones {
     // Create an encrypted file to check isEncrypted returns true
     final Path zone = new Path(prefix, "zone");
     fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
-    dfsAdmin.createEncryptionZone(zone, TEST_KEY);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
     final Path encFile = new Path(zone, "encfile");
     fsWrapper.createFile(encFile);
     stat = fsWrapper.getFileStatus(encFile);
@@ -1010,7 +1118,7 @@ public class TestEncryptionZones {
     executor.submit(new InjectFaultTask() {
       @Override
       public void doFault() throws Exception {
-        dfsAdmin.createEncryptionZone(zone1, TEST_KEY);
+        dfsAdmin.createEncryptionZone(zone1, TEST_KEY, NO_TRASH);
       }
       @Override
       public void doCleanup() throws Exception {
@@ -1036,14 +1144,14 @@ public class TestEncryptionZones {
     fsWrapper.mkdir(zone1, FsPermission.getDirDefault(), true);
     final String otherKey = "other_key";
     DFSTestUtil.createKey(otherKey, cluster, conf);
-    dfsAdmin.createEncryptionZone(zone1, TEST_KEY);
+    dfsAdmin.createEncryptionZone(zone1, TEST_KEY, NO_TRASH);
 
     executor.submit(new InjectFaultTask() {
       @Override
       public void doFault() throws Exception {
         fsWrapper.delete(zone1, true);
         fsWrapper.mkdir(zone1, FsPermission.getDirDefault(), true);
-        dfsAdmin.createEncryptionZone(zone1, otherKey);
+        dfsAdmin.createEncryptionZone(zone1, otherKey, NO_TRASH);
       }
       @Override
       public void doCleanup() throws Exception {
@@ -1056,7 +1164,7 @@ public class TestEncryptionZones {
     fsWrapper.mkdir(zone1, FsPermission.getDirDefault(), true);
     final String anotherKey = "another_key";
     DFSTestUtil.createKey(anotherKey, cluster, conf);
-    dfsAdmin.createEncryptionZone(zone1, anotherKey);
+    dfsAdmin.createEncryptionZone(zone1, anotherKey, NO_TRASH);
     String keyToUse = otherKey;
 
     MyInjector injector = new MyInjector();
@@ -1068,7 +1176,7 @@ public class TestEncryptionZones {
       injector.ready.await();
       fsWrapper.delete(zone1, true);
       fsWrapper.mkdir(zone1, FsPermission.getDirDefault(), true);
-      dfsAdmin.createEncryptionZone(zone1, keyToUse);
+      dfsAdmin.createEncryptionZone(zone1, keyToUse, NO_TRASH);
       if (keyToUse == otherKey) {
         keyToUse = anotherKey;
       } else {
@@ -1129,7 +1237,7 @@ public class TestEncryptionZones {
     final Path zone1 = new Path(zoneParent, "zone1");
     final Path zone1File = new Path(zone1, "file");
     fsWrapper.mkdir(zone1, FsPermission.getDirDefault(), true);
-    dfsAdmin.createEncryptionZone(zone1, TEST_KEY);
+    dfsAdmin.createEncryptionZone(zone1, TEST_KEY, NO_TRASH);
     DFSTestUtil.createFile(fs, zone1File, len, (short) 1, 0xFEED);
     ByteArrayOutputStream bStream = new ByteArrayOutputStream();
     PrintStream out = new PrintStream(bStream, true);
@@ -1164,7 +1272,7 @@ public class TestEncryptionZones {
     final Path zoneFile = new Path(zone, "zoneFile");
     fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
     dfsAdmin.allowSnapshot(zoneParent);
-    dfsAdmin.createEncryptionZone(zone, TEST_KEY);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
     DFSTestUtil.createFile(fs, zoneFile, len, (short) 1, 0xFEED);
     String contents = DFSTestUtil.readFile(fs, zoneFile);
     final Path snap1 = fs.createSnapshot(zoneParent, "snap1");
@@ -1182,7 +1290,7 @@ public class TestEncryptionZones {
         dfsAdmin.getEncryptionZoneForPath(snap2Zone));
 
     // Create the encryption zone again
-    dfsAdmin.createEncryptionZone(zone, TEST_KEY2);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY2, NO_TRASH);
     final Path snap3 = fs.createSnapshot(zoneParent, "snap3");
     final Path snap3Zone = new Path(snap3, zone.getName());
     // Check that snap3's EZ has the correct settings
@@ -1245,7 +1353,7 @@ public class TestEncryptionZones {
     final Path link = new Path(linkParent, "link");
     final Path target = new Path(targetParent, "target");
     fs.mkdirs(parent);
-    dfsAdmin.createEncryptionZone(parent, TEST_KEY);
+    dfsAdmin.createEncryptionZone(parent, TEST_KEY, NO_TRASH);
     fs.mkdirs(linkParent);
     fs.mkdirs(targetParent);
     DFSTestUtil.createFile(fs, target, len, (short)1, 0xFEED);
@@ -1259,8 +1367,8 @@ public class TestEncryptionZones {
     // encryption zones
     fs.mkdirs(linkParent);
     fs.mkdirs(targetParent);
-    dfsAdmin.createEncryptionZone(linkParent, TEST_KEY);
-    dfsAdmin.createEncryptionZone(targetParent, TEST_KEY);
+    dfsAdmin.createEncryptionZone(linkParent, TEST_KEY, NO_TRASH);
+    dfsAdmin.createEncryptionZone(targetParent, TEST_KEY, NO_TRASH);
     DFSTestUtil.createFile(fs, target, len, (short)1, 0xFEED);
     content = DFSTestUtil.readFile(fs, target);
     fs.createSymlink(target, link, false);
@@ -1275,7 +1383,7 @@ public class TestEncryptionZones {
     final int len = 8192;
     final Path ez = new Path("/ez");
     fs.mkdirs(ez);
-    dfsAdmin.createEncryptionZone(ez, TEST_KEY);
+    dfsAdmin.createEncryptionZone(ez, TEST_KEY, NO_TRASH);
     final Path src1 = new Path(ez, "src1");
     final Path src2 = new Path(ez, "src2");
     final Path target = new Path(ez, "target");
@@ -1302,7 +1410,7 @@ public class TestEncryptionZones {
     final Path zone1 = new Path(zoneParent, "zone1");
     final Path zone1File = new Path(zone1, "file");
     fsWrapper.mkdir(zone1, FsPermission.getDirDefault(), true);
-    dfsAdmin.createEncryptionZone(zone1, TEST_KEY);
+    dfsAdmin.createEncryptionZone(zone1, TEST_KEY, NO_TRASH);
     DFSTestUtil.createFile(fs, zone1File, len, (short) 1, 0xFEED);
     fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
     fs.saveNamespace();
@@ -1332,7 +1440,7 @@ public class TestEncryptionZones {
     final Path rootDir = new Path("/");
     final Path zoneFile = new Path(rootDir, "file");
     final Path rawFile = new Path("/.reserved/raw/file");
-    dfsAdmin.createEncryptionZone(rootDir, TEST_KEY);
+    dfsAdmin.createEncryptionZone(rootDir, TEST_KEY, NO_TRASH);
     DFSTestUtil.createFile(fs, zoneFile, len, (short) 1, 0xFEED);
 
     assertEquals("File can be created on the root encryption zone " +
@@ -1353,7 +1461,7 @@ public class TestEncryptionZones {
     final Path zoneFile = new Path("file");
     fs.setWorkingDirectory(baseDir);
     fs.mkdirs(zoneDir);
-    dfsAdmin.createEncryptionZone(zoneDir, TEST_KEY);
+    dfsAdmin.createEncryptionZone(zoneDir, TEST_KEY, NO_TRASH);
     DFSTestUtil.createFile(fs, zoneFile, len, (short) 1, 0xFEED);
 
     assertNumZones(1);
@@ -1367,7 +1475,7 @@ public class TestEncryptionZones {
   public void testGetEncryptionZoneOnANonExistentZoneFile() throws Exception {
     final Path ez = new Path("/ez");
     fs.mkdirs(ez);
-    dfsAdmin.createEncryptionZone(ez, TEST_KEY);
+    dfsAdmin.createEncryptionZone(ez, TEST_KEY, NO_TRASH);
     Path zoneFile = new Path(ez, "file");
     try {
       fs.getEZForPath(zoneFile);
@@ -1392,7 +1500,7 @@ public class TestEncryptionZones {
         new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
     final Path zone1 = new Path("/zone1");
     fs.mkdirs(zone1);
-    dfsAdmin.createEncryptionZone(zone1, TEST_KEY);
+    dfsAdmin.createEncryptionZone(zone1, TEST_KEY, NO_TRASH);
 
     // Create the encrypted file in zone1
     final Path encFile1 = new Path(zone1, "encFile1");
@@ -1413,12 +1521,12 @@ public class TestEncryptionZones {
 
     final Path topEZ = new Path("/topEZ");
     fs.mkdirs(topEZ);
-    dfsAdmin.createEncryptionZone(topEZ, TEST_KEY);
+    dfsAdmin.createEncryptionZone(topEZ, TEST_KEY, NO_TRASH);
     final String NESTED_EZ_TEST_KEY = "nested_ez_test_key";
     DFSTestUtil.createKey(NESTED_EZ_TEST_KEY, cluster, conf);
     final Path nestedEZ = new Path(topEZ, "nestedEZ");
     fs.mkdirs(nestedEZ);
-    dfsAdmin.createEncryptionZone(nestedEZ, NESTED_EZ_TEST_KEY);
+    dfsAdmin.createEncryptionZone(nestedEZ, NESTED_EZ_TEST_KEY, NO_TRASH);
     final Path topEZFile = new Path(topEZ, "file");
     final Path nestedEZFile = new Path(nestedEZ, "file");
     DFSTestUtil.createFile(fs, topEZFile, len, (short) 1, 0xFEED);
@@ -1433,7 +1541,7 @@ public class TestEncryptionZones {
   public void testRootDirEZTrash() throws Exception {
     final HdfsAdmin dfsAdmin =
         new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
-    dfsAdmin.createEncryptionZone(new Path("/"), TEST_KEY);
+    dfsAdmin.createEncryptionZone(new Path("/"), TEST_KEY, NO_TRASH);
     final Path encFile = new Path("/encFile");
     final int len = 8192;
     DFSTestUtil.createFile(fs, encFile, len, (short) 1, 0xFEED);
@@ -1449,13 +1557,13 @@ public class TestEncryptionZones {
         new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
     Path ezRoot1 = new Path("/ez1");
     fs.mkdirs(ezRoot1);
-    dfsAdmin.createEncryptionZone(ezRoot1, TEST_KEY);
+    dfsAdmin.createEncryptionZone(ezRoot1, TEST_KEY, NO_TRASH);
     Path ezRoot2 = new Path("/ez2");
     fs.mkdirs(ezRoot2);
-    dfsAdmin.createEncryptionZone(ezRoot2, TEST_KEY);
+    dfsAdmin.createEncryptionZone(ezRoot2, TEST_KEY, NO_TRASH);
     Path ezRoot3 = new Path("/ez3");
     fs.mkdirs(ezRoot3);
-    dfsAdmin.createEncryptionZone(ezRoot3, TEST_KEY);
+    dfsAdmin.createEncryptionZone(ezRoot3, TEST_KEY, NO_TRASH);
     Collection<FileStatus> trashRootsBegin = fs.getTrashRoots(true);
     assertEquals("Unexpected getTrashRoots result", 0, trashRootsBegin.size());
 

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -33,6 +34,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.EnumSet;
 
 /**
  * Tests interaction of encryption zones with HA failover.
@@ -49,7 +51,8 @@ public class TestEncryptionZonesWithHA {
   private File testRootDir;
 
   private final String TEST_KEY = "test_key";
-
+  protected static final EnumSet< CreateEncryptionZoneFlag > NO_TRASH =
+      EnumSet.of(CreateEncryptionZoneFlag.NO_TRASH);
 
   @Before
   public void setupCluster() throws Exception {
@@ -101,7 +104,7 @@ public class TestEncryptionZonesWithHA {
     final Path dirChild = new Path(dir, "child");
     final Path dirFile = new Path(dir, "file");
     fs.mkdir(dir, FsPermission.getDirDefault());
-    dfsAdmin0.createEncryptionZone(dir, TEST_KEY);
+    dfsAdmin0.createEncryptionZone(dir, TEST_KEY, NO_TRASH);
     fs.mkdir(dirChild, FsPermission.getDirDefault());
     DFSTestUtil.createFile(fs, dirFile, len, (short) 1, 0xFEED);
     String contents = DFSTestUtil.readFile(fs, dirFile);

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithKMS.java

@@ -73,7 +73,7 @@ public class TestEncryptionZonesWithKMS extends TestEncryptionZones {
   public void testCreateEZPopulatesEDEKCache() throws Exception {
     final Path zonePath = new Path("/TestEncryptionZone");
     fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false);
-    dfsAdmin.createEncryptionZone(zonePath, TEST_KEY);
+    dfsAdmin.createEncryptionZone(zonePath, TEST_KEY, NO_TRASH);
     @SuppressWarnings("unchecked")
     KMSClientProvider kcp = (KMSClientProvider) Whitebox
         .getInternalState(cluster.getNamesystem().getProvider(), "extension");
@@ -102,7 +102,7 @@ public class TestEncryptionZonesWithKMS extends TestEncryptionZones {
   public void testWarmupEDEKCacheOnStartup() throws Exception {
     final Path zonePath = new Path("/TestEncryptionZone");
     fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false);
-    dfsAdmin.createEncryptionZone(zonePath, TEST_KEY);
+    dfsAdmin.createEncryptionZone(zonePath, TEST_KEY, NO_TRASH);
 
     @SuppressWarnings("unchecked")
     KMSClientProvider spy = (KMSClientProvider) Whitebox

+ 8 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReservedRawPaths.java

@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.EnumSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
@@ -31,6 +32,7 @@ import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.FileSystemTestWrapper;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.server.namenode.EncryptionZoneManager;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
@@ -61,6 +63,8 @@ public class TestReservedRawPaths {
 
   protected FileSystemTestWrapper fsWrapper;
   protected FileContextTestWrapper fcWrapper;
+  protected static final EnumSet< CreateEncryptionZoneFlag > NO_TRASH =
+      EnumSet.of(CreateEncryptionZoneFlag.NO_TRASH);
 
   @Before
   public void setup() throws Exception {
@@ -114,7 +118,7 @@ public class TestReservedRawPaths {
     // Create the first enc file
     final Path zone = new Path("/zone");
     fs.mkdirs(zone);
-    dfsAdmin.createEncryptionZone(zone, TEST_KEY);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
     final Path encFile1 = new Path(zone, "myfile");
     DFSTestUtil.createFile(fs, encFile1, len, (short) 1, 0xFEED);
     // Read them back in and compare byte-by-byte
@@ -154,7 +158,7 @@ public class TestReservedRawPaths {
     final Path zone = new Path("zone");
     final Path slashZone = new Path("/", zone);
     fs.mkdirs(slashZone);
-    dfsAdmin.createEncryptionZone(slashZone, TEST_KEY);
+    dfsAdmin.createEncryptionZone(slashZone, TEST_KEY, NO_TRASH);
 
     final Path base = new Path("base");
     final Path reservedRaw = new Path("/.reserved/raw");
@@ -186,7 +190,7 @@ public class TestReservedRawPaths {
     final Path zone = new Path("zone");
     final Path slashZone = new Path("/", zone);
     fs.mkdirs(slashZone);
-    dfsAdmin.createEncryptionZone(slashZone, TEST_KEY);
+    dfsAdmin.createEncryptionZone(slashZone, TEST_KEY, NO_TRASH);
     final Path rawRoot = new Path("/.reserved/raw");
     final Path dir1 = new Path("dir1");
     final Path rawDir1 = new Path(rawRoot, dir1);
@@ -224,7 +228,7 @@ public class TestReservedRawPaths {
     final Path zone = new Path("zone");
     final Path slashZone = new Path("/", zone);
     fs.mkdirs(slashZone);
-    dfsAdmin.createEncryptionZone(slashZone, TEST_KEY);
+    dfsAdmin.createEncryptionZone(slashZone, TEST_KEY, NO_TRASH);
     final Path base = new Path("base");
     final Path reservedRaw = new Path("/.reserved/raw");
     final int len = 8192;

+ 86 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testCryptoConf.xml

@@ -68,6 +68,7 @@
         <command>-fs NAMENODE -mkdir /foo</command>
         <command>-fs NAMENODE -ls /</command>-
         <crypto-admin-command>-createZone -path /foo -keyName myKey</crypto-admin-command>
+        <command>-fs NAMENODE -rmdir /foo/.Trash</command>
         <crypto-admin-command>-createZone -path /foo -keyName myKey</crypto-admin-command>
       </test-commands>
       <cleanup-commands>
@@ -91,7 +92,9 @@
         <crypto-admin-command>-createZone -keyName myKey -path /foo/bar</crypto-admin-command>
       </test-commands>
       <cleanup-commands>
+        <command>-fs NAMENODE -rmdir /foo/bar/.Trash</command>
         <command>-fs NAMENODE -rmdir /foo/bar</command>
+        <command>-fs NAMENODE -rmdir /foo/.Trash</command>
         <command>-fs NAMENODE -rmdir /foo</command>
       </cleanup-commands>
       <comparators>
@@ -180,6 +183,7 @@
         <crypto-admin-command>-createZone -path /foo/bar/baz -keyName myKey</crypto-admin-command>
       </test-commands>
       <cleanup-commands>
+        <command>-fs NAMENODE -rmdir /foo/bar/baz/.Trash</command>
         <command>-fs NAMENODE -rmdir /foo/bar/baz</command>
         <command>-fs NAMENODE -rmdir /foo/bar</command>
         <command>-fs NAMENODE -rmdir /foo/</command>
@@ -205,7 +209,9 @@
       </test-commands>
       <cleanup-commands>
         <command>-fs NAMENODE -rmdir /src/subdir</command>
+        <command>-fs NAMENODE -rmdir /src/.Trash</command>
         <command>-fs NAMENODE -rmdir /src</command>
+        <command>-fs NAMENODE -rmdir /dst/.Trash</command>
         <command>-fs NAMENODE -rmdir /dst</command>
       </cleanup-commands>
       <comparators>
@@ -227,6 +233,7 @@
       </test-commands>
       <cleanup-commands>
         <command>-fs NAMENODE -rmdir /src</command>
+        <command>-fs NAMENODE -rmdir /dst/.Trash</command>
         <command>-fs NAMENODE -rmdir /dst</command>
       </cleanup-commands>
       <comparators>
@@ -249,6 +256,7 @@
         </test-commands>
         <cleanup-commands>
             <command>-fs NAMENODE -rm /src/foo</command>
+            <command>-fs NAMENODE -rmdir /src/.Trash</command>
             <command>-fs NAMENODE -rmdir /src</command>
             <command>-fs NAMENODE -rmdir /dst</command>
         </cleanup-commands>
@@ -269,6 +277,7 @@
         <command>-fs NAMENODE -ls /</command>-
       </test-commands>
       <cleanup-commands>
+        <command>-fs NAMENODE -rmdir /src/.Trash</command>
         <command>-fs NAMENODE -rmdir /src</command>
         <command>-fs NAMENODE -rmdir /dst</command>
       </cleanup-commands>
@@ -292,6 +301,7 @@
       <cleanup-commands>
         <command>-fs NAMENODE -rmdir /src/subdir2/subdir1</command>
         <command>-fs NAMENODE -rmdir /src/subdir2</command>
+        <command>-fs NAMENODE -rmdir /src/.Trash</command>
         <command>-fs NAMENODE -rmdir /src</command>
       </cleanup-commands>
       <comparators>
@@ -302,5 +312,81 @@
       </comparators>
     </test>
 
+    <test>
+      <description>Test failure of provisioning a trash dir if the EZ does not exist</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir /src</command>
+        <crypto-admin-command>-provisionTrash -path /src</crypto-admin-command>-
+      </test-commands>
+      <cleanup-commands>
+        <command>-fs NAMENODE -rmdir /src</command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>SubstringComparator</type>
+          <expected-output>/src is not an encryption zone.</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+
+    <test>
+      <description>Test failure of provisioning a trash dir if .Trash exists</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir /src</command>
+        <crypto-admin-command>-createZone -path /src -keyName myKey</crypto-admin-command>
+        <crypto-admin-command>-provisionTrash -path /src</crypto-admin-command>-
+      </test-commands>
+      <cleanup-commands>
+        <command>-fs NAMENODE -rm /src/.Trash</command>
+        <command>-fs NAMENODE -rmdir /src</command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>SubstringComparator</type>
+          <expected-output>Will not provision new trash directory for encryption zone /src. Path already exists.</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+
+    <test>
+      <description>Test success of provisioning a trash dir</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir /src</command>
+        <crypto-admin-command>-createZone -path /src -keyName myKey</crypto-admin-command>
+        <command>-fs NAMENODE -rmdir /src/.Trash</command>
+        <crypto-admin-command>-provisionTrash -path /src</crypto-admin-command>-
+      </test-commands>
+      <cleanup-commands>
+        <command>-fs NAMENODE -rmdir /src/.Trash</command>
+        <command>-fs NAMENODE -rmdir /src</command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>SubstringComparator</type>
+          <expected-output>Created a trash directory for /src</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+
+    <test>
+      <description>Test failure of provisioning a trash dir for incorrect EZ directory</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir /src</command>
+        <crypto-admin-command>-createZone -path /src -keyName myKey</crypto-admin-command>
+        <command>-fs NAMENODE -mkdir /src/dir1</command>
+        <crypto-admin-command>-provisionTrash -path /src/dir1</crypto-admin-command>-
+      </test-commands>
+      <cleanup-commands>
+        <command>-fs NAMENODE -rmdir /src/dir1</command>
+        <command>-fs NAMENODE -rmdir /src/.Trash</command>
+        <command>-fs NAMENODE -rmdir /src</command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>SubstringComparator</type>
+          <expected-output>/src/dir1 is not the root of an encryption zone.</expected-output>
+        </comparator>
+      </comparators>
+    </test>
   </tests>
 </configuration>