Parcourir la source

HDFS-6386. HDFS Encryption Zones (clamb)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1603658 13f79535-47bb-0310-9956-ffa450edef68
Charles Lamb il y a 11 ans
Parent
commit
6ef3a9e746

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES-fs-encryption.txt

@@ -10,6 +10,8 @@ fs-encryption (Unreleased)
 
   IMPROVEMENTS
 
+    HDFS-6386. HDFS Encryption Zones (clamb)
+
     HDFS-6473. Protocol and API for Encryption Zones (clamb)
 
     HDFS-6392. Wire crypto streams for encrypted files in

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -2802,7 +2802,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
 
   public List<EncryptionZone> listEncryptionZones() throws IOException {
     checkOpen();
-    return namenode.listEncryptionZones();
+    try {
+      return namenode.listEncryptionZones();
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class);
+    }
   }
 
   public void setXAttr(String src, String name, byte[] value, 

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java

@@ -162,6 +162,11 @@ public class HdfsConstants {
   public static final String SEPARATOR_DOT_SNAPSHOT_DIR
       = Path.SEPARATOR + DOT_SNAPSHOT_DIR; 
 
+  public static final String CRYPTO_XATTR_KEY_ID = "system.hdfs.crypto.key-id";
+  public static final String CRYPTO_XATTR_KEY_VERSION_ID =
+    "system.hdfs.crypto.key-version-id";
+  public static final String CRYPTO_XATTR_IV = "system.hdfs.crypto.iv";
+  public static final int CRYPTO_KEY_SIZE = 128;
   /* Temporary until we stop hard-coding these values. */
   public static final byte[] KEY = "0123456789012345".getBytes();
   public static final byte[] IV = "ABCDEFGJIJKLMNOP".getBytes();

+ 7 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

@@ -1282,9 +1282,13 @@ public class ClientNamenodeProtocolTranslatorPB implements
   @Override
   public void createEncryptionZone(String src, String keyId)
     throws IOException {
-    final CreateEncryptionZoneRequestProto req =
-      CreateEncryptionZoneRequestProto.newBuilder().
-      setSrc(src).setKeyId(keyId).build();
+    final CreateEncryptionZoneRequestProto.Builder builder =
+      CreateEncryptionZoneRequestProto.newBuilder();
+    builder.setSrc(src);
+    if (keyId != null && !keyId.isEmpty()) {
+      builder.setKeyId(keyId);
+    }
+    CreateEncryptionZoneRequestProto req = builder.build();
     try {
       rpcProxy.createEncryptionZone(null, req);
     } catch (ServiceException e) {

+ 46 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoCodec;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.Options;
@@ -50,6 +51,7 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -84,6 +86,10 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.CRYPTO_XATTR_KEY_ID;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.CRYPTO_XATTR_IV;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.CRYPTO_XATTR_KEY_VERSION_ID;
+
 /*************************************************
  * FSDirectory stores the filesystem directory state.
  * It handles writing/loading values to disk, and logging
@@ -130,6 +136,7 @@ public class FSDirectory implements Closeable {
   private final INodeMap inodeMap; // Synchronized by dirLock
   private long yieldCount = 0; // keep track of lock yield count.
   private final int inodeXAttrsLimit; //inode xattrs max limit
+  private final CryptoCodec codec;
 
   // lock to protect the directory and BlockMap
   private final ReentrantReadWriteLock dirLock;
@@ -198,6 +205,7 @@ public class FSDirectory implements Closeable {
     this.inodeXAttrsLimit = conf.getInt(
         DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_KEY,
         DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_DEFAULT);
+    this.codec = CryptoCodec.getInstance(conf);
     Preconditions.checkArgument(this.inodeXAttrsLimit >= 0,
         "Cannot set a negative limit on the number of xattrs per inode (%s).",
         DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_KEY);
@@ -2662,6 +2670,44 @@ public class FSDirectory implements Closeable {
     return xAttrs;
   }
   
+  XAttr createEncryptionZone(String src, String keyId)
+    throws IOException {
+    writeLock();
+    try {
+      if (isNonEmptyDirectory(src)) {
+        throw new IOException(
+          "Attempt to create an encryption zone for a non-empty directory.");
+      }
+      final XAttr keyIdXAttr =
+        XAttrHelper.buildXAttr(CRYPTO_XATTR_KEY_ID, keyId.getBytes());
+      unprotectedSetXAttr(src, keyIdXAttr, EnumSet.of(XAttrSetFlag.CREATE));
+      return keyIdXAttr;
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  XAttr deleteEncryptionZone(String src)
+    throws IOException {
+    writeLock();
+    try {
+      if (isNonEmptyDirectory(src)) {
+        throw new IOException(
+          "Attempt to delete an encryption zone for a non-empty directory.");
+      }
+      final XAttr keyIdXAttr =
+        XAttrHelper.buildXAttr(CRYPTO_XATTR_KEY_ID, null);
+      final XAttr removedXAttr = unprotectedRemoveXAttr(src, keyIdXAttr);
+      if (removedXAttr == null) {
+        throw new IOException(
+          src + " does not appear to be the root of an encryption zone");
+      }
+      return removedXAttr;
+    } finally {
+      writeUnlock();
+    }
+  }
+
   void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
           throws IOException {
     writeLock();

+ 250 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -83,12 +83,16 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROU
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.CRYPTO_KEY_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.CRYPTO_XATTR_IV;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.CRYPTO_XATTR_KEY_VERSION_ID;
 import static org.apache.hadoop.util.Time.now;
 
 import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.URI;
+import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -102,6 +106,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -116,6 +121,9 @@ import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+import org.apache.hadoop.crypto.key.KeyProviderFactory;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
@@ -145,6 +153,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -261,6 +270,7 @@ import org.mortbay.util.ajax.JSON;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -515,6 +525,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
   private final NNConf nnConf;
 
+  private KeyProvider provider = null;
+  private KeyProvider.Options providerOptions = null;
+
+  private final Map<String, EncryptionZone> encryptionZones;
+
   /**
    * Set the last allocated inode id when fsimage or editlog is loaded. 
    */
@@ -675,6 +690,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
       throws IOException {
+    initializeKeyProvider(conf);
+    providerOptions = KeyProvider.options(conf);
     if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY,
                         DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT)) {
       LOG.info("Enabling async auditlog");
@@ -781,6 +798,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         auditLoggers.get(0) instanceof DefaultAuditLogger;
       this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
       this.nnConf = new NNConf(conf);
+      this.encryptionZones = new HashMap<String, EncryptionZone>();
     } catch(IOException e) {
       LOG.error(getClass().getSimpleName() + " initialization failed.", e);
       close();
@@ -826,6 +844,42 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
   }
 
+  private void initializeKeyProvider(final Configuration conf) {
+    try {
+      final List<KeyProvider> providers = KeyProviderFactory.getProviders(conf);
+      if (providers == null) {
+        return;
+      }
+
+      if (providers.size() == 0) {
+        LOG.info("No KeyProviders found.");
+        return;
+      }
+
+      if (providers.size() > 1) {
+        final String err =
+            "Multiple KeyProviders found. Only one is permitted.";
+        LOG.error(err);
+        throw new RuntimeException(err);
+      }
+      provider = providers.get(0);
+      if (provider.isTransient()) {
+        final String err =
+            "A KeyProvider was found but it is a transient provider.";
+        LOG.error(err);
+        throw new RuntimeException(err);
+      }
+      LOG.info("Found KeyProvider: " + provider.toString());
+    } catch (IOException e) {
+      LOG.error("Exception while initializing KeyProvider", e);
+    }
+  }
+
+  @VisibleForTesting
+  public KeyProvider getProvider() {
+    return provider;
+  }
+
   @VisibleForTesting
   static RetryCache initRetryCache(Configuration conf) {
     boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY,
@@ -2358,7 +2412,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throw ie;
     }
   }
-  
+
   /**
    * Append to an existing file for append.
    * <p>
@@ -8057,14 +8111,206 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
   }
   
-  void createEncryptionZone(final String src, final String keyId)
-          throws IOException {
+  /**
+   * Create an encryption zone on directory src either using keyIdArg if
+   * supplied or generating a keyId if it's null.
+   *
+   * @param src the path of a directory which will be the root of the
+   * encryption zone. The directory must be empty.
+   *
+   * @param keyIdArg an optional keyId of a key in the configured
+   * KeyProvider. If this is null, then a a new key is generated.
+   *
+   * @throws AccessControlException if the caller is not the superuser.
+   *
+   * @throws UnresolvedLinkException if the path can't be resolved.
+   *
+   * @throws SafeModeException if the Namenode is in safe mode.
+   */
+  void createEncryptionZone(final String src, String keyIdArg)
+    throws IOException, UnresolvedLinkException,
+      SafeModeException, AccessControlException {
+    final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
+
+    boolean createdKey = false;
+    String keyId = keyIdArg;
+    boolean success = false;
+    try {
+      if (keyId == null || keyId.isEmpty()) {
+        keyId = createNewKey(src);
+        createdKey = true;
+      } else {
+        if (provider.getCurrentKey(keyId) == null) {
+
+          /*
+           * It would be nice if we threw something more specific than
+           * IOException when the key is not found, but the KeyProvider API
+           * doesn't provide for that. If that API is ever changed to throw
+           * something more specific (e.g. UnknownKeyException) then we can
+           * update this to match it, or better yet, just rethrow the
+           * KeyProvider's exception.
+           */
+          throw new IOException("Key " + keyId + " doesn't exist.");
+        }
+      }
+      createEncryptionZoneInt(src, keyId, cacheEntry != null);
+      success = true;
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "createEncryptionZone", src);
+      throw e;
+    } finally {
+      RetryCache.setState(cacheEntry, success);
+      if (!success && createdKey) {
+        /* Unwind key creation. */
+        provider.deleteKey(keyId);
+      }
+    }
   }
 
-  void deleteEncryptionZone(final String src) throws IOException {
+  private void createEncryptionZoneInt(final String srcArg, String keyId,
+    final boolean logRetryCache) throws IOException {
+    String src = srcArg;
+    HdfsFileStatus resultingStat = null;
+    checkSuperuserPrivilege();
+    checkOperation(OperationCategory.WRITE);
+    final byte[][] pathComponents =
+      FSDirectory.getPathComponentsForReservedPath(src);
+    writeLock();
+    try {
+      checkSuperuserPrivilege();
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot create encryption zone on " + src);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+
+      EncryptionZone ez = getEncryptionZoneForPath(src);
+      if (ez != null) {
+        throw new IOException("Directory " + src +
+          " is already in an encryption zone. (" + ez.getPath() + ")");
+      }
+
+      final XAttr keyIdXAttr = dir.createEncryptionZone(src, keyId);
+      getEditLog().logSetXAttr(src, keyIdXAttr, logRetryCache);
+      encryptionZones.put(src, new EncryptionZone(src, keyId));
+      resultingStat = getAuditFileInfo(src, false);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    logAuditEvent(true, "createEncryptionZone", src, null, resultingStat);
+  }
+
+  private String createNewKey(String src)
+    throws IOException {
+    final String keyId = UUID.randomUUID().toString();
+    // TODO pass in hdfs://HOST:PORT (HDFS-6490)
+    providerOptions.setDescription(src);
+    providerOptions.setBitLength(CRYPTO_KEY_SIZE);
+    try {
+      provider.createKey(keyId, providerOptions);
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException(e);
+    }
+    return keyId;
+  }
+
+  /**
+   * Delete the encryption zone on directory src.
+   *
+   * @param src the path of a directory which is the root of the encryption
+   * zone. The directory must be empty and must be marked as an encryption
+   * zone.
+   *
+   * @throws AccessControlException if the caller is not the superuser.
+   *
+   * @throws UnresolvedLinkException if the path can't be resolved.
+   *
+   * @throws SafeModeException if the Namenode is in safe mode.
+   */
+  void deleteEncryptionZone(final String src)
+    throws IOException, UnresolvedLinkException,
+      SafeModeException, AccessControlException {
+    final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
+
+    boolean success = false;
+    try {
+      deleteEncryptionZoneInt(src, cacheEntry != null);
+      encryptionZones.remove(src);
+      success = true;
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "deleteEncryptionZone", src);
+      throw e;
+    } finally {
+      RetryCache.setState(cacheEntry, success);
+    }
+  }
+
+  private void deleteEncryptionZoneInt(final String srcArg,
+    final boolean logRetryCache) throws IOException {
+    String src = srcArg;
+    HdfsFileStatus resultingStat = null;
+    checkSuperuserPrivilege();
+    checkOperation(OperationCategory.WRITE);
+    final byte[][] pathComponents =
+      FSDirectory.getPathComponentsForReservedPath(src);
+    writeLock();
+    try {
+      checkSuperuserPrivilege();
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot delete encryption zone on " + src);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      final EncryptionZone ez = encryptionZones.get(src);
+      if (ez == null) {
+        throw new IOException("Directory " + src +
+          " is not the root of an encryption zone.");
+      }
+      final XAttr removedXAttr = dir.deleteEncryptionZone(src);
+      if (removedXAttr != null) {
+        getEditLog().logRemoveXAttr(src, removedXAttr);
+      }
+      encryptionZones.remove(src);
+      resultingStat = getAuditFileInfo(src, false);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    logAuditEvent(true, "deleteEncryptionZone", src, null, resultingStat);
   }
 
   List<EncryptionZone> listEncryptionZones() throws IOException {
+    boolean success = false;
+    checkSuperuserPrivilege();
+    checkOperation(OperationCategory.READ);
+    readLock();
+    try {
+      checkSuperuserPrivilege();
+      checkOperation(OperationCategory.READ);
+      final List<EncryptionZone> ret =
+          Lists.newArrayList(encryptionZones.values());
+      success = true;
+      return ret;
+    } finally {
+      readUnlock();
+      logAuditEvent(success, "listEncryptionZones", null);
+    }
+  }
+
+  /** Lookup the encryption zone of a path. */
+  private EncryptionZone getEncryptionZoneForPath(String src) {
+    final String[] components = INode.getPathNames(src);
+    for (int i = components.length; i > 0; i--) {
+      final List<String> l = Arrays.asList(Arrays.copyOfRange(components, 0, i));
+      String p = Joiner.on(Path.SEPARATOR).join(l);
+      final EncryptionZone ret = encryptionZones.get(p);
+      if (ret != null) {
+        return ret;
+      }
+    }
     return null;
   }
 

+ 0 - 37
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -27,8 +27,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.crypto.key.KeyProvider;
-import org.apache.hadoop.crypto.key.KeyProviderFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@@ -272,9 +270,6 @@ public class NameNode implements NameNodeStatusMXBean {
   
   private NameNodeRpcServer rpcServer;
 
-  /* The KeyProvider, if any. */
-  private KeyProvider provider = null;
-
   private JvmPauseMonitor pauseMonitor;
   private ObjectName nameNodeStatusBeanName;
   /**
@@ -586,7 +581,6 @@ public class NameNode implements NameNodeStatusMXBean {
       startHttpServer(conf);
     }
     loadNamesystem(conf);
-    initializeKeyProvider(conf);
 
     rpcServer = createRpcServer(conf);
     if (clientNamenodeAddress == null) {
@@ -705,37 +699,6 @@ public class NameNode implements NameNodeStatusMXBean {
     }
   }
 
-  private void initializeKeyProvider(final Configuration conf) {
-    try {
-      final List<KeyProvider> providers = KeyProviderFactory.getProviders(conf);
-      if (providers == null) {
-        return;
-      }
-
-      if (providers.size() == 0) {
-        LOG.info("No KeyProviders found.");
-        return;
-      }
-
-      if (providers.size() > 1) {
-        final String err =
-            "Multiple KeyProviders found. Only one is permitted.";
-        LOG.error(err);
-        throw new RuntimeException(err);
-      }
-      provider = providers.get(0);
-      if (provider.isTransient()) {
-        final String err =
-            "A KeyProvider was found but it is a transient provider.";
-        LOG.error(err);
-        throw new RuntimeException(err);
-      }
-      LOG.info("Found KeyProvider: " + provider.toString());
-    } catch (IOException e) {
-      LOG.error("Exception while initializing KeyProvider", e);
-    }
-  }
-
   /**
    * Start NameNode.
    * <p>

+ 404 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesAPI.java

@@ -0,0 +1,404 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Preconditions;
+
+import static org.junit.Assert.fail;
+
+public class TestEncryptionZonesAPI {
+
+  private static final Path TEST_PATH = new Path("/test");
+  private static final Path TEST_PATH_WITH_CHILD = new Path(TEST_PATH, "foo");
+  private static final Path TEST_PATH_WITH_MULTIPLE_CHILDREN =
+    new Path(TEST_PATH_WITH_CHILD, "baz");
+  private static final String TEST_KEYID = "mykeyid";
+  private final Configuration conf = new Configuration();
+  private MiniDFSCluster cluster;
+  private static File tmpDir;
+  private FileSystem fs;
+
+  @Before
+  public void setUpCluster() throws IOException {
+    tmpDir = new File(System.getProperty("test.build.data", "target"),
+        UUID.randomUUID().toString()).getAbsoluteFile();
+    conf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
+        JavaKeyStoreProvider.SCHEME_NAME + "://file" + tmpDir + "/test.jks");
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    fs = cluster.getFileSystem();
+  }
+
+  @After
+  public void shutDownCluster() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /** Test failure of Create EZ on a directory that doesn't exist. */
+  @Test(timeout = 30000)
+  public void testCreateEncryptionZoneDirectoryDoesntExist() throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    try {
+      dfsAdmin.createEncryptionZone(TEST_PATH, null);
+      fail("expected /test doesn't exist");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("cannot find", e);
+    }
+  }
+
+  /** Test failure of Create EZ on a directory which is already an EZ. */
+  @Test(timeout = 30000)
+  public void testCreateEncryptionZoneWhichAlreadyExists()
+    throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    FileSystem.mkdirs(fs, TEST_PATH, new FsPermission((short) 0777));
+    dfsAdmin.createEncryptionZone(TEST_PATH, null);
+    try {
+      dfsAdmin.createEncryptionZone(TEST_PATH, null);
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("already in an encryption zone",
+          e);
+    }
+  }
+
+  /** Test success of Create EZ in which a key is created. */
+  @Test(timeout = 30000)
+  public void testCreateEncryptionZoneAndGenerateKeyDirectoryEmpty()
+    throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    FileSystem.mkdirs(fs, TEST_PATH, new FsPermission((short) 0777));
+    dfsAdmin.createEncryptionZone(TEST_PATH, null);
+  }
+
+  /** Test failure of Create EZ operation in an existing EZ. */
+  @Test(timeout = 30000)
+  public void testCreateEncryptionZoneInExistingEncryptionZone()
+    throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    FileSystem.mkdirs(fs, TEST_PATH, new FsPermission((short) 0777));
+    dfsAdmin.createEncryptionZone(TEST_PATH, null);
+    FileSystem.mkdirs(fs, TEST_PATH_WITH_CHILD,
+        new FsPermission((short) 0777));
+    try {
+      dfsAdmin.createEncryptionZone(TEST_PATH_WITH_CHILD, null);
+      fail("EZ in an EZ");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("already in an encryption zone", e);
+    }
+  }
+
+  /** Test failure of creating an EZ using a non-empty directory. */
+  @Test(timeout = 30000)
+  public void testCreateEncryptionZoneAndGenerateKeyDirectoryNotEmpty()
+    throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    FileSystem.mkdirs(fs, TEST_PATH, new FsPermission((short) 0777));
+    FileSystem.create(fs, new Path("/test/foo"),
+            new FsPermission((short) 0777));
+    try {
+      dfsAdmin.createEncryptionZone(TEST_PATH, null);
+      fail("expected key doesn't exist");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("create an encryption zone", e);
+    }
+  }
+
+  /** Test failure of creating an EZ passing a key that doesn't exist. */
+  @Test(timeout = 30000)
+  public void testCreateEncryptionZoneKeyDoesntExist() throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    try {
+      dfsAdmin.createEncryptionZone(TEST_PATH, TEST_KEYID);
+      fail("expected key doesn't exist");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("doesn't exist.", e);
+    }
+    final List<EncryptionZone> zones = dfsAdmin.listEncryptionZones();
+    Preconditions.checkState(zones.size() == 0, "More than one zone found?");
+  }
+
+  /** Test success of creating an EZ when they key exists. */
+  @Test(timeout = 30000)
+  public void testCreateEncryptionZoneKeyExist() throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    FileSystem.mkdirs(fs, TEST_PATH, new FsPermission((short) 0777));
+    createAKey(TEST_KEYID);
+    dfsAdmin.createEncryptionZone(TEST_PATH, TEST_KEYID);
+    final List<EncryptionZone> zones = dfsAdmin.listEncryptionZones();
+    Preconditions.checkState(zones.size() == 1, "More than one zone found?");
+    final EncryptionZone ez = zones.get(0);
+      GenericTestUtils.assertMatches(ez.toString(),
+              "EncryptionZone \\[path=/test, keyId=");
+  }
+
+  /** Helper function to create a key in the Key Provider. */
+  private void createAKey(String keyId)
+    throws NoSuchAlgorithmException, IOException {
+    KeyProvider provider =
+        cluster.getNameNode().getNamesystem().getProvider();
+    final KeyProvider.Options options = KeyProvider.options(conf);
+    provider.createKey(keyId, options);
+    provider.flush();
+  }
+
+  /** Test failure of create/delete encryption zones as a non super user. */
+  @Test(timeout = 30000)
+  public void testCreateAndDeleteEncryptionZoneAsNonSuperUser()
+    throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+
+    final UserGroupInformation user = UserGroupInformation.
+      createUserForTesting("user", new String[] { "mygroup" });
+
+    FileSystem.mkdirs(fs, TEST_PATH, new FsPermission((short) 0700));
+
+    user.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          final HdfsAdmin userAdmin =
+            new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+          try {
+            userAdmin.createEncryptionZone(TEST_PATH, null);
+            fail("createEncryptionZone is superuser-only operation");
+          } catch (AccessControlException e) {
+            GenericTestUtils.assertExceptionContains(
+                    "Superuser privilege is required", e);
+          }
+          return null;
+        }
+      });
+    dfsAdmin.createEncryptionZone(TEST_PATH, null);
+
+    user.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        final HdfsAdmin userAdmin =
+                new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+        try {
+          userAdmin.deleteEncryptionZone(TEST_PATH);
+          fail("deleteEncryptionZone is superuser-only operation");
+        } catch (AccessControlException e) {
+          GenericTestUtils.assertExceptionContains(
+                  "Superuser privilege is required", e);
+        }
+        return null;
+      }
+    });
+  }
+
+  /** Test failure of deleting an EZ passing a directory that doesn't exist. */
+  @Test(timeout = 30000)
+  public void testDeleteEncryptionZoneDirectoryDoesntExist() throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    try {
+      dfsAdmin.deleteEncryptionZone(TEST_PATH);
+      fail("Directory doesn't exist");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains(
+        "is not the root of an encryption zone", e);
+    }
+  }
+
+  /** Test failure of deleting an EZ which is not empty. */
+  @Test(timeout = 30000)
+  public void testDeleteEncryptionZoneAndGenerateKeyDirectoryNotEmpty()
+    throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    FileSystem.mkdirs(fs, TEST_PATH, new FsPermission((short) 0777));
+    dfsAdmin.createEncryptionZone(TEST_PATH, null);
+    FileSystem.create(fs, new Path("/test/foo"),
+      new FsPermission((short) 0777));
+    try {
+      dfsAdmin.deleteEncryptionZone(TEST_PATH);
+      fail("Directory not empty");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("non-empty directory", e);
+    }
+  }
+
+  /** Test success of deleting an EZ. */
+  @Test(timeout = 30000)
+  public void testDeleteEncryptionZone()
+    throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    FileSystem.mkdirs(fs, TEST_PATH, new FsPermission((short) 0777));
+    dfsAdmin.createEncryptionZone(TEST_PATH, null);
+    List<EncryptionZone> zones = dfsAdmin.listEncryptionZones();
+    Preconditions.checkState(zones.size() == 1, "More than one zone found?");
+    dfsAdmin.deleteEncryptionZone(TEST_PATH);
+    zones = dfsAdmin.listEncryptionZones();
+    Preconditions.checkState(zones.size() == 0, "More than one zone found?");
+  }
+
+  /**
+   * Test failure of deleting an EZ on a subdir that is not the root of an EZ.
+   */
+  @Test(timeout = 30000)
+  public void testDeleteEncryptionZoneInExistingEncryptionZone()
+    throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    FileSystem.mkdirs(fs, TEST_PATH, new FsPermission((short) 0777));
+    dfsAdmin.createEncryptionZone(TEST_PATH, null);
+    FileSystem.mkdirs(fs, TEST_PATH_WITH_CHILD, new FsPermission((short) 0777));
+    try {
+      dfsAdmin.deleteEncryptionZone(TEST_PATH_WITH_CHILD);
+      fail("EZ in an EZ");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains(
+        "is not the root of an encryption zone", e);
+    }
+  }
+
+  /**
+   * Test success of creating and deleting an encryption zone a few levels down.
+   */
+  @Test(timeout = 30000)
+  public void testCreateAndDeleteEncryptionZoneDownAFewLevels()
+    throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    FileSystem.mkdirs(fs, TEST_PATH, new FsPermission((short) 0777));
+    dfsAdmin.createEncryptionZone(TEST_PATH, null);
+    FileSystem.mkdirs(fs, TEST_PATH_WITH_MULTIPLE_CHILDREN,
+      new FsPermission((short) 0777));
+    try {
+      dfsAdmin.deleteEncryptionZone(TEST_PATH_WITH_MULTIPLE_CHILDREN);
+      fail("EZ in an EZ");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains(
+        "is not the root of an encryption zone", e);
+    }
+    final List<EncryptionZone> zones = dfsAdmin.listEncryptionZones();
+    Preconditions.checkState(zones.size() == 1, "More than one zone found?");
+    final EncryptionZone ez = zones.get(0);
+      GenericTestUtils.assertMatches(ez.toString(),
+         "EncryptionZone \\[path=/test, keyId=");
+  }
+
+  /** Test failure of creating an EZ using a non-empty directory. */
+  @Test(timeout = 30000)
+  public void testCreateFileInEncryptionZone() throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    FileSystem.mkdirs(fs, TEST_PATH, new FsPermission((short) 0777));
+    dfsAdmin.createEncryptionZone(TEST_PATH, null);
+    FileSystem.create(fs, TEST_PATH_WITH_CHILD, new FsPermission((short) 0777));
+
+    final List<EncryptionZone> zones = dfsAdmin.listEncryptionZones();
+    final EncryptionZone ez = zones.get(0);
+      GenericTestUtils.assertMatches(ez.toString(),
+         "EncryptionZone \\[path=/test, keyId=");
+  }
+
+  /** Test listing encryption zones. */
+  @Test(timeout = 30000)
+  public void testListEncryptionZones() throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    final int N_EZs = 5;
+    final Set<String> ezPathNames = new HashSet<String>(N_EZs);
+    for (int i = 0; i < N_EZs; i++) {
+      final Path p = new Path(TEST_PATH, "" + i);
+      ezPathNames.add(p.toString());
+      FileSystem.mkdirs(fs, p, new FsPermission((short) 0777));
+      dfsAdmin.createEncryptionZone(p, null);
+    }
+
+    final List<EncryptionZone> zones = dfsAdmin.listEncryptionZones();
+    Preconditions.checkState(zones.size() == N_EZs, "wrong number of EZs returned");
+    for (EncryptionZone z : zones) {
+      final String ezPathName = z.getPath();
+      Preconditions.checkState(ezPathNames.remove(
+          ezPathName), "Path " + ezPathName + " not returned from listEZ");
+    }
+    Preconditions.checkState(ezPathNames.size() == 0);
+  }
+
+  /** Test listing encryption zones as a non super user. */
+  @Test(timeout = 30000)
+  public void testListEncryptionZonesAsNonSuperUser() throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+
+    final UserGroupInformation user = UserGroupInformation.
+      createUserForTesting("user", new String[] {"mygroup"});
+
+    final Path TEST_PATH_SUPERUSER_ONLY = new Path(TEST_PATH, "superuseronly");
+    final Path TEST_PATH_ALL = new Path(TEST_PATH, "accessall");
+
+    FileSystem.mkdirs(fs, TEST_PATH_SUPERUSER_ONLY,
+      new FsPermission((short) 0700));
+    dfsAdmin.createEncryptionZone(TEST_PATH_SUPERUSER_ONLY, null);
+    FileSystem.mkdirs(fs, TEST_PATH_ALL,
+      new FsPermission((short) 0707));
+    dfsAdmin.createEncryptionZone(TEST_PATH_ALL, null);
+
+    user.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        final HdfsAdmin userAdmin =
+                new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+        try {
+          final List<EncryptionZone> zones = userAdmin.listEncryptionZones();
+        } catch (AccessControlException e) {
+          GenericTestUtils.assertExceptionContains(
+                  "Superuser privilege is required", e);
+        }
+        return null;
+      }
+    });
+  }
+}