Browse Source

HDFS-6474. Namenode needs to get the actual keys and iv from the KeyProvider. (wang)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1609833 13f79535-47bb-0310-9956-ffa450edef68
Andrew Wang 11 years ago
parent
commit
d79f27b429

+ 29 - 9
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileEncryptionInfo.java

@@ -32,20 +32,33 @@ import static com.google.common.base.Preconditions.checkNotNull;
 public class FileEncryptionInfo {
 public class FileEncryptionInfo {
 
 
   private final CipherSuite cipherSuite;
   private final CipherSuite cipherSuite;
-  private final byte[] key;
+  private final byte[] edek;
   private final byte[] iv;
   private final byte[] iv;
+  private final String ezKeyVersionName;
 
 
-  public FileEncryptionInfo(CipherSuite suite, byte[] key, byte[] iv) {
+  /**
+   * Create a FileEncryptionInfo.
+   *
+   * @param suite CipherSuite used to encrypt the file
+   * @param edek encrypted data encryption key (EDEK) of the file
+   * @param iv initialization vector (IV) used to encrypt the file
+   * @param ezKeyVersionName name of the KeyVersion used to encrypt the
+   *                         encrypted data encryption key.
+   */
+  public FileEncryptionInfo(final CipherSuite suite, final byte[] edek,
+      final byte[] iv, final String ezKeyVersionName) {
     checkNotNull(suite);
     checkNotNull(suite);
-    checkNotNull(key);
+    checkNotNull(edek);
     checkNotNull(iv);
     checkNotNull(iv);
-    checkArgument(key.length == suite.getAlgorithmBlockSize(),
+    checkNotNull(ezKeyVersionName);
+    checkArgument(edek.length == suite.getAlgorithmBlockSize(),
         "Unexpected key length");
         "Unexpected key length");
     checkArgument(iv.length == suite.getAlgorithmBlockSize(),
     checkArgument(iv.length == suite.getAlgorithmBlockSize(),
         "Unexpected IV length");
         "Unexpected IV length");
     this.cipherSuite = suite;
     this.cipherSuite = suite;
-    this.key = key;
+    this.edek = edek;
     this.iv = iv;
     this.iv = iv;
+    this.ezKeyVersionName = ezKeyVersionName;
   }
   }
 
 
   /**
   /**
@@ -57,25 +70,32 @@ public class FileEncryptionInfo {
   }
   }
 
 
   /**
   /**
-   * @return encrypted data encryption key for the file
+   * @return encrypted data encryption key (EDEK) for the file
    */
    */
   public byte[] getEncryptedDataEncryptionKey() {
   public byte[] getEncryptedDataEncryptionKey() {
-    return key;
+    return edek;
   }
   }
 
 
   /**
   /**
-   * @return initialization vector for the cipher used to encrypt the file
+   * @return initialization vector (IV) for the cipher used to encrypt the file
    */
    */
   public byte[] getIV() {
   public byte[] getIV() {
     return iv;
     return iv;
   }
   }
 
 
+  /**
+   * @return name of the encryption zone KeyVersion used to encrypt the
+   * encrypted data encryption key (EDEK).
+   */
+  public String getEzKeyVersionName() { return ezKeyVersionName; }
+
   @Override
   @Override
   public String toString() {
   public String toString() {
     StringBuilder builder = new StringBuilder("{");
     StringBuilder builder = new StringBuilder("{");
     builder.append("cipherSuite: " + cipherSuite);
     builder.append("cipherSuite: " + cipherSuite);
-    builder.append(", key: " + Hex.encodeHexString(key));
+    builder.append(", edek: " + Hex.encodeHexString(edek));
     builder.append(", iv: " + Hex.encodeHexString(iv));
     builder.append(", iv: " + Hex.encodeHexString(iv));
+    builder.append(", ezKeyVersionName: " + ezKeyVersionName);
     builder.append("}");
     builder.append("}");
     return builder.toString();
     return builder.toString();
   }
   }

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES-fs-encryption.txt

@@ -39,6 +39,9 @@ fs-encryption (Unreleased)
     HDFS-6635. Refactor encryption zone functionality into new
     HDFS-6635. Refactor encryption zone functionality into new
     EncryptionZoneManager class. (wang)
     EncryptionZoneManager class. (wang)
 
 
+    HDFS-6474. Namenode needs to get the actual keys and iv from the
+    KeyProvider. (wang)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
   BUG FIXES
   BUG FIXES

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -561,6 +561,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
   public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
   public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
   public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
   public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
   public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
+  public static final String DFS_NAMENODE_KEY_VERSION_REFRESH_INTERVAL_MS_KEY = "dfs.namenode.key.version.refresh.interval.ms";
+  public static final int DFS_NAMENODE_KEY_VERSION_REFRESH_INTERVAL_MS_DEFAULT = 5*60*1000;
   
   
   // Journal-node related configs. These are read on the JN side.
   // Journal-node related configs. These are read on the JN side.
   public static final String  DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";
   public static final String  DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -2335,6 +2335,7 @@ public class PBHelper {
         .setSuite(convert(info.getCipherSuite()))
         .setSuite(convert(info.getCipherSuite()))
         .setKey(getByteString(info.getEncryptedDataEncryptionKey()))
         .setKey(getByteString(info.getEncryptedDataEncryptionKey()))
         .setIv(getByteString(info.getIV()))
         .setIv(getByteString(info.getIV()))
+        .setEzKeyVersionName(info.getEzKeyVersionName())
         .build();
         .build();
   }
   }
 
 
@@ -2346,7 +2347,8 @@ public class PBHelper {
     CipherSuite suite = convert(proto.getSuite());
     CipherSuite suite = convert(proto.getSuite());
     byte[] key = proto.getKey().toByteArray();
     byte[] key = proto.getKey().toByteArray();
     byte[] iv = proto.getIv().toByteArray();
     byte[] iv = proto.getIv().toByteArray();
-    return new FileEncryptionInfo(suite, key, iv);
+    String ezKeyVersionName = proto.getEzKeyVersionName();
+    return new FileEncryptionInfo(suite, key, iv, ezKeyVersionName);
   }
   }
 
 
 }
 }

+ 356 - 60
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java

@@ -3,28 +3,50 @@ package org.apache.hadoop.hdfs.server.namenode;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 
 
+import static org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants
     .CRYPTO_XATTR_ENCRYPTION_ZONE;
     .CRYPTO_XATTR_ENCRYPTION_ZONE;
 
 
 /**
 /**
- * Manages the list of encryption zones in the filesystem. Relies on the
- * FSDirectory lock for synchronization.
+ * Manages the list of encryption zones in the filesystem.
+ * <p/>
+ * The EncryptionZoneManager has its own lock, but relies on the FSDirectory
+ * lock being held for many operations. The FSDirectory lock should not be
+ * taken if the manager lock is already held.
  */
  */
 public class EncryptionZoneManager {
 public class EncryptionZoneManager {
 
 
+  public static Logger LOG = LoggerFactory.getLogger(EncryptionZoneManager
+      .class);
+
   /**
   /**
    * EncryptionZoneInt is the internal representation of an encryption zone. The
    * EncryptionZoneInt is the internal representation of an encryption zone. The
    * external representation of an EZ is embodied in an EncryptionZone and
    * external representation of an EZ is embodied in an EncryptionZone and
@@ -34,9 +56,30 @@ public class EncryptionZoneManager {
     private final String keyId;
     private final String keyId;
     private final long inodeId;
     private final long inodeId;
 
 
+    private final HashSet<KeyVersion> keyVersions;
+    private KeyVersion latestVersion;
+
     EncryptionZoneInt(long inodeId, String keyId) {
     EncryptionZoneInt(long inodeId, String keyId) {
       this.keyId = keyId;
       this.keyId = keyId;
       this.inodeId = inodeId;
       this.inodeId = inodeId;
+      keyVersions = Sets.newHashSet();
+      latestVersion = null;
+    }
+
+    KeyVersion getLatestKeyVersion() {
+      return latestVersion;
+    }
+
+    void addKeyVersion(KeyVersion version) {
+      Preconditions.checkNotNull(version);
+      if (!keyVersions.contains(version)) {
+        LOG.debug("Key {} has new key version {}", keyId, version);
+        keyVersions.add(version);
+      }
+      // Always set the latestVersion to not get stuck on an old version in
+      // racy situations. Should eventually converge thanks to the
+      // monitor.
+      latestVersion = version;
     }
     }
 
 
     String getKeyId() {
     String getKeyId() {
@@ -47,49 +90,265 @@ public class EncryptionZoneManager {
       return inodeId;
       return inodeId;
     }
     }
 
 
-    String getFullPathName() {
-      return dir.getInode(inodeId).getFullPathName();
-    }
   }
   }
 
 
-  private final Map<Long, EncryptionZoneInt> encryptionZones;
+  /**
+   * Protects the <tt>encryptionZones</tt> map and its contents.
+   */
+  private final ReentrantReadWriteLock lock;
+
+  private void readLock() {
+    lock.readLock().lock();
+  }
+
+  private void readUnlock() {
+    lock.readLock().unlock();
+  }
+
+  private void writeLock() {
+    lock.writeLock().lock();
+  }
+
+  private void writeUnlock() {
+    lock.writeLock().unlock();
+  }
+
+  public boolean hasWriteLock() {
+    return lock.isWriteLockedByCurrentThread();
+  }
 
 
+  public boolean hasReadLock() {
+    return lock.getReadHoldCount() > 0 || hasWriteLock();
+  }
+
+  private final Map<Long, EncryptionZoneInt> encryptionZones;
   private final FSDirectory dir;
   private final FSDirectory dir;
+  private final ScheduledExecutorService monitor;
+  private final KeyProvider provider;
 
 
   /**
   /**
    * Construct a new EncryptionZoneManager.
    * Construct a new EncryptionZoneManager.
    *
    *
    * @param dir Enclosing FSDirectory
    * @param dir Enclosing FSDirectory
    */
    */
-  public EncryptionZoneManager(FSDirectory dir) {
+  public EncryptionZoneManager(FSDirectory dir, Configuration conf,
+      KeyProvider provider) {
+
     this.dir = dir;
     this.dir = dir;
+    this.provider = provider;
+    lock = new ReentrantReadWriteLock();
     encryptionZones = new HashMap<Long, EncryptionZoneInt>();
     encryptionZones = new HashMap<Long, EncryptionZoneInt>();
+
+    monitor = Executors.newScheduledThreadPool(1,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat(EncryptionZoneMonitor.class.getSimpleName() + "-%d")
+            .build());
+    final int refreshMs = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_KEY_VERSION_REFRESH_INTERVAL_MS_KEY,
+        DFSConfigKeys.DFS_NAMENODE_KEY_VERSION_REFRESH_INTERVAL_MS_DEFAULT
+    );
+    Preconditions.checkArgument(refreshMs >= 0, "%s cannot be negative",
+        DFSConfigKeys.DFS_NAMENODE_KEY_VERSION_REFRESH_INTERVAL_MS_KEY);
+    monitor.scheduleAtFixedRate(new EncryptionZoneMonitor(), 0, refreshMs,
+        TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Periodically wakes up to fetch the latest version of each encryption
+   * zone key.
+   */
+  private class EncryptionZoneMonitor implements Runnable {
+    @Override
+    public void run() {
+      LOG.debug("Monitor waking up to refresh encryption zone key versions");
+      HashMap<Long, String> toFetch = Maps.newHashMap();
+      HashMap<Long, KeyVersion> toUpdate =
+          Maps.newHashMap();
+      // Determine the keyIds to fetch
+      readLock();
+      try {
+        for (EncryptionZoneInt ezi : encryptionZones.values()) {
+          toFetch.put(ezi.getINodeId(), ezi.getKeyId());
+        }
+      } finally {
+        readUnlock();
+      }
+      LOG.trace("Found {} keys to check", toFetch.size());
+      // Fetch the key versions while not holding the lock
+      for (Map.Entry<Long, String> entry : toFetch.entrySet()) {
+        try {
+          KeyVersion version = provider.getCurrentKey(entry.getValue());
+          toUpdate.put(entry.getKey(), version);
+        } catch (IOException e) {
+          LOG.warn("Error while getting the current key for {} {}",
+              entry.getValue(), e);
+        }
+      }
+      LOG.trace("Fetched {} key versions from KeyProvider", toUpdate.size());
+      // Update the key versions for each encryption zone
+      writeLock();
+      try {
+        for (Map.Entry<Long, KeyVersion> entry : toUpdate.entrySet()) {
+          EncryptionZoneInt ezi = encryptionZones.get(entry.getKey());
+          // zone might have been removed in the intervening time
+          if (ezi == null) {
+            continue;
+          }
+          ezi.addKeyVersion(entry.getValue());
+        }
+      } finally {
+        writeUnlock();
+      }
+    }
+  }
+
+  /**
+   * Forces the EncryptionZoneMonitor to run, waiting until completion.
+   */
+  @VisibleForTesting
+  public void kickMonitor() throws Exception {
+    Future future = monitor.submit(new EncryptionZoneMonitor());
+    future.get();
+  }
+
+  /**
+   * Immediately fetches the latest KeyVersion for an encryption zone,
+   * also updating the encryption zone.
+   *
+   * @param iip of the encryption zone
+   * @return latest KeyVersion
+   * @throws IOException on KeyProvider error
+   */
+  KeyVersion updateLatestKeyVersion(INodesInPath iip) throws IOException {
+    EncryptionZoneInt ezi;
+    readLock();
+    try {
+      ezi = getEncryptionZoneForPath(iip);
+    } finally {
+      readUnlock();
+    }
+    if (ezi == null) {
+      throw new IOException("Cannot update KeyVersion since iip is not within" +
+          " an encryption zone");
+    }
+
+    // Do not hold the lock while doing KeyProvider operations
+    KeyVersion version = provider.getCurrentKey(ezi.getKeyId());
+
+    writeLock();
+    try {
+      ezi.addKeyVersion(version);
+      return version;
+    } finally {
+      writeUnlock();
+    }
   }
   }
 
 
   /**
   /**
    * Add a new encryption zone.
    * Add a new encryption zone.
+   * <p/>
+   * Called while holding the FSDirectory lock.
    *
    *
    * @param inodeId of the encryption zone
    * @param inodeId of the encryption zone
    * @param keyId   encryption zone key id
    * @param keyId   encryption zone key id
    */
    */
   void addEncryptionZone(Long inodeId, String keyId) {
   void addEncryptionZone(Long inodeId, String keyId) {
+    assert dir.hasWriteLock();
     final EncryptionZoneInt ez = new EncryptionZoneInt(inodeId, keyId);
     final EncryptionZoneInt ez = new EncryptionZoneInt(inodeId, keyId);
-    encryptionZones.put(inodeId, ez);
+    writeLock();
+    try {
+      encryptionZones.put(inodeId, ez);
+    } finally {
+      writeUnlock();
+    }
   }
   }
 
 
+  /**
+   * Remove an encryption zone.
+   * <p/>
+   * Called while holding the FSDirectory lock.
+   */
   void removeEncryptionZone(Long inodeId) {
   void removeEncryptionZone(Long inodeId) {
-    encryptionZones.remove(inodeId);
+    assert dir.hasWriteLock();
+    writeLock();
+    try {
+      encryptionZones.remove(inodeId);
+    } finally {
+      writeUnlock();
+    }
   }
   }
 
 
   /**
   /**
    * Returns true if an IIP is within an encryption zone.
    * Returns true if an IIP is within an encryption zone.
+   * <p/>
+   * Called while holding the FSDirectory lock.
    */
    */
   boolean isInAnEZ(INodesInPath iip)
   boolean isInAnEZ(INodesInPath iip)
       throws UnresolvedLinkException, SnapshotAccessControlException {
       throws UnresolvedLinkException, SnapshotAccessControlException {
-    return (getEncryptionZoneForPath(iip) != null);
+    assert dir.hasReadLock();
+    readLock();
+    try {
+      return (getEncryptionZoneForPath(iip) != null);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  /**
+   * Returns the path of the EncryptionZoneInt.
+   * <p/>
+   * Called while holding the FSDirectory lock.
+   */
+  private String getFullPathName(EncryptionZoneInt ezi) {
+    assert dir.hasReadLock();
+    return dir.getInode(ezi.getINodeId()).getFullPathName();
+  }
+
+  KeyVersion getLatestKeyVersion(final INodesInPath iip) {
+    readLock();
+    try {
+      EncryptionZoneInt ezi = getEncryptionZoneForPath(iip);
+      if (ezi == null) {
+        return null;
+      }
+      return ezi.getLatestKeyVersion();
+    } finally {
+      readUnlock();
+    }
+  }
+
+  /**
+   * @return true if the provided <tt>keyVersionName</tt> is the name of a
+   * valid KeyVersion for the encryption zone of <tt>iip</tt>,
+   * and <tt>iip</tt> is within an encryption zone.
+   */
+  boolean isValidKeyVersion(final INodesInPath iip, String keyVersionName) {
+    readLock();
+    try {
+      EncryptionZoneInt ezi = getEncryptionZoneForPath(iip);
+      if (ezi == null) {
+        return false;
+      }
+      for (KeyVersion ezVersion : ezi.keyVersions) {
+        if (keyVersionName.equals(ezVersion.getVersionName())) {
+          return true;
+        }
+      }
+      return false;
+    } finally {
+      readUnlock();
+    }
   }
   }
 
 
+  /**
+   * Looks up the EncryptionZoneInt for a path within an encryption zone.
+   * Returns null if path is not within an EZ.
+   * <p/>
+   * Must be called while holding the manager lock.
+   */
   private EncryptionZoneInt getEncryptionZoneForPath(INodesInPath iip) {
   private EncryptionZoneInt getEncryptionZoneForPath(INodesInPath iip) {
+    assert hasReadLock();
     Preconditions.checkNotNull(iip);
     Preconditions.checkNotNull(iip);
     final INode[] inodes = iip.getINodes();
     final INode[] inodes = iip.getINodes();
     for (int i = inodes.length - 1; i >= 0; i--) {
     for (int i = inodes.length - 1; i >= 0; i--) {
@@ -105,8 +364,10 @@ public class EncryptionZoneManager {
   }
   }
 
 
   /**
   /**
-   * Throws an exception if the provided inode cannot be renamed into the
+   * Throws an exception if the provided path cannot be renamed into the
    * destination because of differing encryption zones.
    * destination because of differing encryption zones.
+   * <p/>
+   * Called while holding the FSDirectory lock.
    *
    *
    * @param srcIIP source IIP
    * @param srcIIP source IIP
    * @param dstIIP destination IIP
    * @param dstIIP destination IIP
@@ -115,66 +376,101 @@ public class EncryptionZoneManager {
    */
    */
   void checkMoveValidity(INodesInPath srcIIP, INodesInPath dstIIP, String src)
   void checkMoveValidity(INodesInPath srcIIP, INodesInPath dstIIP, String src)
       throws IOException {
       throws IOException {
-    final boolean srcInEZ = (getEncryptionZoneForPath(srcIIP) != null);
-    final boolean dstInEZ = (getEncryptionZoneForPath(dstIIP) != null);
-    if (srcInEZ) {
-      if (!dstInEZ) {
-        throw new IOException(src + " can't be moved from an encryption zone.");
-      }
-    } else {
-      if (dstInEZ) {
-        throw new IOException(src + " can't be moved into an encryption zone.");
-      }
-    }
-
-    if (srcInEZ || dstInEZ) {
+    assert dir.hasReadLock();
+    readLock();
+    try {
       final EncryptionZoneInt srcEZI = getEncryptionZoneForPath(srcIIP);
       final EncryptionZoneInt srcEZI = getEncryptionZoneForPath(srcIIP);
       final EncryptionZoneInt dstEZI = getEncryptionZoneForPath(dstIIP);
       final EncryptionZoneInt dstEZI = getEncryptionZoneForPath(dstIIP);
-      Preconditions.checkArgument(srcEZI != null, "couldn't find src EZ?");
-      Preconditions.checkArgument(dstEZI != null, "couldn't find dst EZ?");
-      if (srcEZI != dstEZI) {
-        final String srcEZPath = srcEZI.getFullPathName();
-        final String dstEZPath = dstEZI.getFullPathName();
-        final StringBuilder sb = new StringBuilder(src);
-        sb.append(" can't be moved from encryption zone ");
-        sb.append(srcEZPath);
-        sb.append(" to encryption zone ");
-        sb.append(dstEZPath);
-        sb.append(".");
-        throw new IOException(sb.toString());
+      final boolean srcInEZ = (srcEZI != null);
+      final boolean dstInEZ = (dstEZI != null);
+      if (srcInEZ) {
+        if (!dstInEZ) {
+          throw new IOException(
+              src + " can't be moved from an encryption zone.");
+        }
+      } else {
+        if (dstInEZ) {
+          throw new IOException(
+              src + " can't be moved into an encryption zone.");
+        }
+      }
+
+      if (srcInEZ || dstInEZ) {
+        Preconditions.checkState(srcEZI != null, "couldn't find src EZ?");
+        Preconditions.checkState(dstEZI != null, "couldn't find dst EZ?");
+        if (srcEZI != dstEZI) {
+          final String srcEZPath = getFullPathName(srcEZI);
+          final String dstEZPath = getFullPathName(dstEZI);
+          final StringBuilder sb = new StringBuilder(src);
+          sb.append(" can't be moved from encryption zone ");
+          sb.append(srcEZPath);
+          sb.append(" to encryption zone ");
+          sb.append(dstEZPath);
+          sb.append(".");
+          throw new IOException(sb.toString());
+        }
       }
       }
+    } finally {
+      readUnlock();
     }
     }
   }
   }
 
 
-  XAttr createEncryptionZone(String src, String keyId) throws IOException {
-    if (dir.isNonEmptyDirectory(src)) {
-      throw new IOException(
-          "Attempt to create an encryption zone for a non-empty directory.");
-    }
+  /**
+   * Create a new encryption zone.
+   * <p/>
+   * Called while holding the FSDirectory lock.
+   */
+  XAttr createEncryptionZone(String src, String keyId, KeyVersion keyVersion)
+      throws IOException {
+    assert dir.hasWriteLock();
+    writeLock();
+    try {
+      if (dir.isNonEmptyDirectory(src)) {
+        throw new IOException(
+            "Attempt to create an encryption zone for a non-empty directory.");
+      }
 
 
-    final INodesInPath srcIIP = dir.getINodesInPath4Write(src, false);
-    final EncryptionZoneInt ezi = getEncryptionZoneForPath(srcIIP);
-    if (ezi != null) {
-      throw new IOException("Directory " + src +
-          " is already in an encryption zone. (" + ezi.getFullPathName() + ")");
-    }
+      final INodesInPath srcIIP = dir.getINodesInPath4Write(src, false);
+      EncryptionZoneInt ezi = getEncryptionZoneForPath(srcIIP);
+      if (ezi != null) {
+        throw new IOException("Directory " + src + " is already in an " +
+            "encryption zone. (" + getFullPathName(ezi) + ")");
+      }
 
 
-    final XAttr keyIdXAttr =
-        XAttrHelper.buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, keyId.getBytes());
-    final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
-    xattrs.add(keyIdXAttr);
-    final INode inode =
-        dir.unprotectedSetXAttrs(src, xattrs, EnumSet.of(XAttrSetFlag.CREATE));
-    addEncryptionZone(inode.getId(), keyId);
-    return keyIdXAttr;
+      final XAttr keyIdXAttr = XAttrHelper
+          .buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, keyId.getBytes());
+
+      final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
+      xattrs.add(keyIdXAttr);
+      // updating the xattr will call addEncryptionZone,
+      // done this way to handle edit log loading
+      dir.unprotectedSetXAttrs(src, xattrs, EnumSet.of(XAttrSetFlag.CREATE));
+      // Re-get the new encryption zone add the latest key version
+      ezi = getEncryptionZoneForPath(srcIIP);
+      ezi.addKeyVersion(keyVersion);
+      return keyIdXAttr;
+    } finally {
+      writeUnlock();
+    }
   }
   }
 
 
+  /**
+   * Return the current list of encryption zones.
+   * <p/>
+   * Called while holding the FSDirectory lock.
+   */
   List<EncryptionZone> listEncryptionZones() throws IOException {
   List<EncryptionZone> listEncryptionZones() throws IOException {
-    final List<EncryptionZone> ret =
-        Lists.newArrayListWithExpectedSize(encryptionZones.size());
-    for (EncryptionZoneInt ezi : encryptionZones.values()) {
-      ret.add(new EncryptionZone(ezi.getFullPathName(), ezi.getKeyId()));
+    assert dir.hasReadLock();
+    readLock();
+    try {
+      final List<EncryptionZone> ret =
+          Lists.newArrayListWithExpectedSize(encryptionZones.size());
+      for (EncryptionZoneInt ezi : encryptionZones.values()) {
+        ret.add(new EncryptionZone(getFullPathName(ezi), ezi.getKeyId()));
+      }
+      return ret;
+    } finally {
+      readUnlock();
     }
     }
-    return ret;
   }
   }
 }
 }

+ 43 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -17,6 +17,7 @@
  */
  */
 package org.apache.hadoop.hdfs.server.namenode;
 package org.apache.hadoop.hdfs.server.namenode;
 
 
+import static org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
 import static org.apache.hadoop.util.Time.now;
 import static org.apache.hadoop.util.Time.now;
@@ -35,6 +36,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.FileEncryptionInfo;
@@ -162,7 +164,7 @@ public class FSDirectory implements Closeable {
   }
   }
 
 
   boolean hasReadLock() {
   boolean hasReadLock() {
-    return this.dirLock.getReadHoldCount() > 0;
+    return this.dirLock.getReadHoldCount() > 0 || hasWriteLock();
   }
   }
 
 
   public int getReadHoldCount() {
   public int getReadHoldCount() {
@@ -173,7 +175,8 @@ public class FSDirectory implements Closeable {
     return this.dirLock.getWriteHoldCount();
     return this.dirLock.getWriteHoldCount();
   }
   }
 
 
-  final EncryptionZoneManager ezManager;
+  @VisibleForTesting
+  public final EncryptionZoneManager ezManager;
 
 
   /**
   /**
    * Caches frequently used file names used in {@link INode} to reuse 
    * Caches frequently used file names used in {@link INode} to reuse 
@@ -224,7 +227,7 @@ public class FSDirectory implements Closeable {
     nameCache = new NameCache<ByteArray>(threshold);
     nameCache = new NameCache<ByteArray>(threshold);
     namesystem = ns;
     namesystem = ns;
 
 
-    ezManager = new EncryptionZoneManager(this);
+    ezManager = new EncryptionZoneManager(this, conf, ns.getProvider());
   }
   }
     
     
   private FSNamesystem getFSNamesystem() {
   private FSNamesystem getFSNamesystem() {
@@ -905,16 +908,6 @@ public class FSDirectory implements Closeable {
     }
     }
   }
   }
 
 
-  boolean isInAnEZ(INodesInPath iip)
-    throws UnresolvedLinkException, SnapshotAccessControlException {
-    readLock();
-    try {
-      return ezManager.isInAnEZ(iip);
-    } finally {
-      readUnlock();
-    }
-  }
-
   /**
   /**
    * Set file replication
    * Set file replication
    * 
    * 
@@ -2618,12 +2611,46 @@ public class FSDirectory implements Closeable {
 
 
     return newXAttrs;
     return newXAttrs;
   }
   }
-  
-  XAttr createEncryptionZone(String src, String keyId)
+
+  boolean isInAnEZ(INodesInPath iip)
+      throws UnresolvedLinkException, SnapshotAccessControlException {
+    readLock();
+    try {
+      return ezManager.isInAnEZ(iip);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  KeyVersion getLatestKeyVersion(INodesInPath iip) {
+    readLock();
+    try {
+      return ezManager.getLatestKeyVersion(iip);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  KeyVersion updateLatestKeyVersion(INodesInPath iip) throws
+      IOException {
+    // No locking, this operation does not involve any FSDirectory operations
+    return ezManager.updateLatestKeyVersion(iip);
+  }
+
+  boolean isValidKeyVersion(INodesInPath iip, String keyVersionName) {
+    readLock();
+    try {
+      return ezManager.isValidKeyVersion(iip, keyVersionName);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  XAttr createEncryptionZone(String src, String keyId, KeyVersion keyVersion)
     throws IOException {
     throws IOException {
     writeLock();
     writeLock();
     try {
     try {
-      return ezManager.createEncryptionZone(src, keyId);
+      return ezManager.createEncryptionZone(src, keyId, keyVersion);
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
     }
     }

+ 150 - 42
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -17,6 +17,7 @@
  */
  */
 package org.apache.hadoop.hdfs.server.namenode;
 package org.apache.hadoop.hdfs.server.namenode;
 
 
+import static org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
@@ -100,6 +101,7 @@ import java.io.StringWriter;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.URI;
+import java.security.GeneralSecurityException;
 import java.security.NoSuchAlgorithmException;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
@@ -133,6 +135,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.crypto.CryptoCodec;
 import org.apache.hadoop.crypto.CryptoCodec;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.crypto.key.KeyProviderFactory;
 import org.apache.hadoop.crypto.key.KeyProviderFactory;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.CacheFlag;
@@ -533,7 +536,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
 
   private final NNConf nnConf;
   private final NNConf nnConf;
 
 
-  private KeyProvider provider = null;
+  private KeyProviderCryptoExtension provider = null;
   private KeyProvider.Options providerOptions = null;
   private KeyProvider.Options providerOptions = null;
 
 
   private final CryptoCodec codec;
   private final CryptoCodec codec;
@@ -929,7 +932,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         LOG.error(err);
         LOG.error(err);
         throw new RuntimeException(err);
         throw new RuntimeException(err);
       }
       }
-      provider = providers.get(0);
+      provider = KeyProviderCryptoExtension
+          .createKeyProviderCryptoExtension(providers.get(0));
       if (provider.isTransient()) {
       if (provider.isTransient()) {
         final String err =
         final String err =
             "A KeyProvider was found but it is a transient provider.";
             "A KeyProvider was found but it is a transient provider.";
@@ -2310,7 +2314,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * CipherSuite from the list provided by the client. Since the client may 
    * CipherSuite from the list provided by the client. Since the client may 
    * be newer, need to handle unknown CipherSuites.
    * be newer, need to handle unknown CipherSuites.
    *
    *
-   * @param src path of the file
+   * @param srcIIP path of the file
    * @param cipherSuites client-provided list of supported CipherSuites, 
    * @param cipherSuites client-provided list of supported CipherSuites, 
    *                     in desired order.
    *                     in desired order.
    * @return chosen CipherSuite, or null if file is not in an EncryptionZone
    * @return chosen CipherSuite, or null if file is not in an EncryptionZone
@@ -2349,6 +2353,62 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return chosen;
     return chosen;
   }
   }
 
 
+  /**
+   * Create a new FileEncryptionInfo for a path. Also chooses an
+   * appropriate CipherSuite to use from the list provided by the
+   * client.
+   *
+   * @param src Target path
+   * @param pathComponents Target path split up into path components
+   * @param cipherSuites List of CipherSuites provided by the client
+   * @return a new FileEncryptionInfo, or null if path is not within an
+   * encryption
+   * zone.
+   * @throws IOException
+   */
+  private FileEncryptionInfo newFileEncryptionInfo(String src,
+      byte[][] pathComponents, List<CipherSuite> cipherSuites)
+      throws IOException {
+    INodesInPath iip = null;
+    CipherSuite suite = null;
+    KeyVersion latestEZKeyVersion = null;
+    readLock();
+    try {
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      iip = dir.getINodesInPath4Write(src);
+      // Nothing to do if the path is not within an EZ
+      if (!dir.isInAnEZ(iip)) {
+        return null;
+      }
+      suite = chooseCipherSuite(iip, cipherSuites);
+      if (suite != null) {
+        Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
+            "Chose an UNKNOWN CipherSuite!");
+      }
+      latestEZKeyVersion = dir.getLatestKeyVersion(iip);
+    } finally {
+      readUnlock();
+    }
+
+    // If the latest key version is null, need to fetch it and update
+    if (latestEZKeyVersion == null) {
+      latestEZKeyVersion = dir.updateLatestKeyVersion(iip);
+    }
+    Preconditions.checkState(latestEZKeyVersion != null);
+
+    // Generate the EDEK while not holding the lock
+    KeyProviderCryptoExtension.EncryptedKeyVersion edek = null;
+    try {
+      edek = provider.generateEncryptedKey(latestEZKeyVersion);
+    } catch (GeneralSecurityException e) {
+      throw new IOException(e);
+    }
+    Preconditions.checkNotNull(edek);
+
+    return new FileEncryptionInfo(suite, edek.getEncryptedKey().getMaterial(),
+        edek.getIv(), edek.getKeyVersionName());
+  }
+
   /**
   /**
    * Create a new file entry in the namespace.
    * Create a new file entry in the namespace.
    * 
    * 
@@ -2426,26 +2486,62 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
     boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
 
 
     waitForLoadingFSImage();
     waitForLoadingFSImage();
-    writeLock();
+
+    /*
+     * We want to avoid holding any locks while creating a new
+     * FileEncryptionInfo, since this can be very slow. Since the path can
+     * flip flop between being in an encryption zone and not in the meantime,
+     * we need to recheck the preconditions and generate a new
+     * FileEncryptionInfo in some circumstances.
+     *
+     * A special RetryStartFileException is used to indicate that we should
+     * retry creation of a FileEncryptionInfo.
+     */
     try {
     try {
-      checkOperation(OperationCategory.WRITE);
-      checkNameNodeSafeMode("Cannot create file" + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
-      startFileInternal(pc, src, permissions, holder, clientMachine, create,
-          overwrite, createParent, replication, blockSize, cipherSuites,
-          logRetryCache);
-      stat = dir.getFileInfo(src, false);
-    } catch (StandbyException se) {
-      skipSync = true;
-      throw se;
+      boolean shouldContinue = true;
+      int iters = 0;
+      while (shouldContinue) {
+        skipSync = false;
+        if (iters >= 10) {
+          throw new IOException("Too many retries because of encryption zone " +
+              "operations, something might be broken!");
+        }
+        shouldContinue = false;
+        iters++;
+        // Optimistically generate a FileEncryptionInfo for this path.
+        FileEncryptionInfo feInfo =
+            newFileEncryptionInfo(src, pathComponents, cipherSuites);
+
+        // Try to create the file with this feInfo
+        writeLock();
+        try {
+          checkOperation(OperationCategory.WRITE);
+          checkNameNodeSafeMode("Cannot create file" + src);
+          src = FSDirectory.resolvePath(src, pathComponents, dir);
+          startFileInternal(pc, src, permissions, holder, clientMachine, create,
+              overwrite, createParent, replication, blockSize, feInfo,
+              logRetryCache);
+          stat = dir.getFileInfo(src, false);
+        } catch (StandbyException se) {
+          skipSync = true;
+          throw se;
+        } catch (RetryStartFileException e) {
+          shouldContinue = true;
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Preconditions failed, retrying creation of " +
+                    "FileEncryptionInfo", e);
+          }
+        } finally {
+          writeUnlock();
+        }
+      }
     } finally {
     } finally {
-      writeUnlock();
       // There might be transactions logged while trying to recover the lease.
       // There might be transactions logged while trying to recover the lease.
       // They need to be sync'ed even when an exception was thrown.
       // They need to be sync'ed even when an exception was thrown.
       if (!skipSync) {
       if (!skipSync) {
         getEditLog().logSync();
         getEditLog().logSync();
       }
       }
-    } 
+    }
 
 
     logAuditEvent(true, "create", src, null, stat);
     logAuditEvent(true, "create", src, null, stat);
     return stat;
     return stat;
@@ -2463,11 +2559,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private void startFileInternal(FSPermissionChecker pc, String src,
   private void startFileInternal(FSPermissionChecker pc, String src,
       PermissionStatus permissions, String holder, String clientMachine,
       PermissionStatus permissions, String holder, String clientMachine,
       boolean create, boolean overwrite, boolean createParent,
       boolean create, boolean overwrite, boolean createParent,
-      short replication, long blockSize, List<CipherSuite> cipherSuites,
+      short replication, long blockSize, FileEncryptionInfo feInfo,
       boolean logRetryEntry)
       boolean logRetryEntry)
       throws FileAlreadyExistsException, AccessControlException,
       throws FileAlreadyExistsException, AccessControlException,
       UnresolvedLinkException, FileNotFoundException,
       UnresolvedLinkException, FileNotFoundException,
-      ParentNotDirectoryException, IOException {
+      ParentNotDirectoryException, RetryStartFileException, IOException {
     assert hasWriteLock();
     assert hasWriteLock();
     // Verify that the destination does not exist as a directory already.
     // Verify that the destination does not exist as a directory already.
     final INodesInPath iip = dir.getINodesInPath4Write(src);
     final INodesInPath iip = dir.getINodesInPath4Write(src);
@@ -2477,22 +2573,21 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           " already exists as a directory");
           " already exists as a directory");
     }
     }
 
 
-    FileEncryptionInfo feInfo = null;
-    CipherSuite suite = chooseCipherSuite(iip, cipherSuites);
-    if (suite != null) {
-      Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN), 
-          "Chose an UNKNOWN CipherSuite!");
-      // TODO: fill in actual key/iv in HDFS-6474
-      // For now, populate with dummy data
-      byte[] key = new byte[suite.getAlgorithmBlockSize()];
-      for (int i = 0; i < key.length; i++) {
-        key[i] = (byte)i;
+    if (!dir.isInAnEZ(iip)) {
+      // If the path is not in an EZ, we don't need an feInfo.
+      // Null it out in case one was already generated.
+      feInfo = null;
+    } else {
+      // The path is now within an EZ, but no feInfo. Retry.
+      if (feInfo == null) {
+        throw new RetryStartFileException();
       }
       }
-      byte[] iv = new byte[suite.getAlgorithmBlockSize()];
-      for (int i = 0; i < iv.length; i++) {
-        iv[i] = (byte)(3+i*2);
+      // It's in an EZ and we have a provided feInfo. Make sure the
+      // keyVersion of the encryption key used matches one of the keyVersions of
+      // the key of the encryption zone.
+      if (!dir.isValidKeyVersion(iip, feInfo.getEzKeyVersionName())) {
+        throw new RetryStartFileException();
       }
       }
-      feInfo = new FileEncryptionInfo(suite, key, iv);
     }
     }
 
 
     final INodeFile myFile = INodeFile.valueOf(inode, src, true);
     final INodeFile myFile = INodeFile.valueOf(inode, src, true);
@@ -8319,12 +8414,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     String keyId = keyIdArg;
     String keyId = keyIdArg;
     boolean success = false;
     boolean success = false;
     try {
     try {
+      KeyVersion keyVersion;
       if (keyId == null || keyId.isEmpty()) {
       if (keyId == null || keyId.isEmpty()) {
-        keyId = createNewKey(src);
+        keyId = UUID.randomUUID().toString();
+        keyVersion = createNewKey(keyId, src);
         createdKey = true;
         createdKey = true;
       } else {
       } else {
-        if (provider.getCurrentKey(keyId) == null) {
-
+        keyVersion = provider.getCurrentKey(keyId);
+        if (keyVersion == null) {
           /*
           /*
            * It would be nice if we threw something more specific than
            * It would be nice if we threw something more specific than
            * IOException when the key is not found, but the KeyProvider API
            * IOException when the key is not found, but the KeyProvider API
@@ -8336,7 +8433,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           throw new IOException("Key " + keyId + " doesn't exist.");
           throw new IOException("Key " + keyId + " doesn't exist.");
         }
         }
       }
       }
-      createEncryptionZoneInt(src, keyId, cacheEntry != null);
+      createEncryptionZoneInt(src, keyId, keyVersion, cacheEntry != null);
       success = true;
       success = true;
     } catch (AccessControlException e) {
     } catch (AccessControlException e) {
       logAuditEvent(false, "createEncryptionZone", src);
       logAuditEvent(false, "createEncryptionZone", src);
@@ -8351,7 +8448,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
   }
 
 
   private void createEncryptionZoneInt(final String srcArg, String keyId,
   private void createEncryptionZoneInt(final String srcArg, String keyId,
-    final boolean logRetryCache) throws IOException {
+    final KeyVersion keyVersion, final boolean logRetryCache) throws
+      IOException {
     String src = srcArg;
     String src = srcArg;
     HdfsFileStatus resultingStat = null;
     HdfsFileStatus resultingStat = null;
     checkSuperuserPrivilege();
     checkSuperuserPrivilege();
@@ -8365,7 +8463,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       checkNameNodeSafeMode("Cannot create encryption zone on " + src);
       checkNameNodeSafeMode("Cannot create encryption zone on " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
 
 
-      final XAttr keyIdXAttr = dir.createEncryptionZone(src, keyId);
+      final XAttr keyIdXAttr = dir.createEncryptionZone(src, keyId, keyVersion);
       List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
       List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
       xAttrs.add(keyIdXAttr);
       xAttrs.add(keyIdXAttr);
       getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
       getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
@@ -8377,19 +8475,29 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     logAuditEvent(true, "createEncryptionZone", src, null, resultingStat);
     logAuditEvent(true, "createEncryptionZone", src, null, resultingStat);
   }
   }
 
 
-  private String createNewKey(String src)
+  /**
+   * Create a new key on the KeyProvider for an encryption zone.
+   *
+   * @param keyId id of the key
+   * @param src path of the encryption zone.
+   * @return KeyVersion of the created key
+   * @throws IOException
+   */
+  private KeyVersion createNewKey(String keyId, String src)
     throws IOException {
     throws IOException {
-    final String keyId = UUID.randomUUID().toString();
+    Preconditions.checkNotNull(keyId);
+    Preconditions.checkNotNull(src);
     // TODO pass in hdfs://HOST:PORT (HDFS-6490)
     // TODO pass in hdfs://HOST:PORT (HDFS-6490)
     providerOptions.setDescription(src);
     providerOptions.setDescription(src);
     providerOptions.setBitLength(codec.getCipherSuite()
     providerOptions.setBitLength(codec.getCipherSuite()
         .getAlgorithmBlockSize()*8);
         .getAlgorithmBlockSize()*8);
+    KeyVersion version = null;
     try {
     try {
-      provider.createKey(keyId, providerOptions);
+      version = provider.createKey(keyId, providerOptions);
     } catch (NoSuchAlgorithmException e) {
     } catch (NoSuchAlgorithmException e) {
       throw new IOException(e);
       throw new IOException(e);
     }
     }
-    return keyId;
+    return version;
   }
   }
 
 
   List<EncryptionZone> listEncryptionZones() throws IOException {
   List<EncryptionZone> listEncryptionZones() throws IOException {

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RetryStartFileException.java

@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+public class RetryStartFileException extends Exception {
+}

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto

@@ -184,6 +184,7 @@ message FileEncryptionInfoProto {
   required CipherSuite suite = 1;
   required CipherSuite suite = 1;
   required bytes key = 2;
   required bytes key = 2;
   required bytes iv = 3;
   required bytes iv = 3;
+  required string ezKeyVersionName = 4;
 }
 }
 
 
 /**
 /**

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -2008,4 +2008,15 @@
   </description>
   </description>
 </property>
 </property>
 
 
+<property>
+  <name>dfs.namenode.key.version.refresh.interval.ms</name>
+  <value>300000</value>
+  <description>How frequently the namenode will attempt to fetch the latest
+      key version of encryption zone keys from the configured KeyProvider, in
+      milliseconds. New key versions are created when a key is rolled. This
+      setting thus controls the window of staleness where an old key version
+      is used after a key is rolled.
+  </description>
+</property>
+
 </configuration>
 </configuration>

+ 80 - 13
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesAPI.java

@@ -21,17 +21,20 @@ import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
 import java.security.NoSuchAlgorithmException;
 import java.security.NoSuchAlgorithmException;
 import java.security.PrivilegedExceptionAction;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.List;
 import java.util.Set;
 import java.util.Set;
 import java.util.UUID;
 import java.util.UUID;
 
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
 import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderFactory;
 import org.apache.hadoop.crypto.key.KeyProviderFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
@@ -39,16 +42,20 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.namenode.EncryptionZoneManager;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
 
 
-import com.google.common.base.Preconditions;
 
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 import static org.junit.Assert.fail;
 
 
 public class TestEncryptionZonesAPI {
 public class TestEncryptionZonesAPI {
@@ -71,6 +78,7 @@ public class TestEncryptionZonesAPI {
         JavaKeyStoreProvider.SCHEME_NAME + "://file" + tmpDir + "/test.jks");
         JavaKeyStoreProvider.SCHEME_NAME + "://file" + tmpDir + "/test.jks");
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
     fs = (DistributedFileSystem) createFileSystem(conf);
     fs = (DistributedFileSystem) createFileSystem(conf);
+    Logger.getLogger(EncryptionZoneManager.class).setLevel(Level.TRACE);
   }
   }
 
 
   protected FileSystem createFileSystem(Configuration conf) throws IOException {
   protected FileSystem createFileSystem(Configuration conf) throws IOException {
@@ -382,21 +390,80 @@ public class TestEncryptionZonesAPI {
     fs.getClient().cipherSuites.add(CipherSuite.AES_CTR_NOPADDING);
     fs.getClient().cipherSuites.add(CipherSuite.AES_CTR_NOPADDING);
     DFSTestUtil.createFile(fs, new Path(zone, "success3"), 4096, (short) 1,
     DFSTestUtil.createFile(fs, new Path(zone, "success3"), 4096, (short) 1,
         0xFEED);
         0xFEED);
+    // Check KeyProvider state
+    // Flushing the KP on the NN, since it caches, and init a test one
+    cluster.getNamesystem().getProvider().flush();
+    KeyProvider provider = KeyProviderFactory.getProviders(conf).get(0);
+    List<String> keys = provider.getKeys();
+    assertEquals("Expected NN to have created one key per zone", 1,
+        keys.size());
+    List<KeyProvider.KeyVersion> allVersions = Lists.newArrayList();
+    for (String key : keys) {
+      List<KeyProvider.KeyVersion> versions = provider.getKeyVersions(key);
+      assertEquals("Should only have one key version per key", 1,
+          versions.size());
+      allVersions.addAll(versions);
+    }
     // Check that the specified CipherSuite was correctly saved on the NN
     // Check that the specified CipherSuite was correctly saved on the NN
     for (int i=2; i<=3; i++) {
     for (int i=2; i<=3; i++) {
-      LocatedBlocks blocks =
-          fs.getClient().getLocatedBlocks(zone.toString() + "/success2", 0);
-      FileEncryptionInfo feInfo = blocks.getFileEncryptionInfo();
+      FileEncryptionInfo feInfo =
+          getFileEncryptionInfo(new Path(zone.toString() +
+              "/success" + i));
       assertEquals(feInfo.getCipherSuite(), CipherSuite.AES_CTR_NOPADDING);
       assertEquals(feInfo.getCipherSuite(), CipherSuite.AES_CTR_NOPADDING);
-      // TODO: validate against actual key/iv in HDFS-6474
-      byte[] key = feInfo.getEncryptedDataEncryptionKey();
-      for (int j = 0; j < key.length; j++) {
-        assertEquals("Unexpected key byte", (byte)j, key[j]);
-      }
-      byte[] iv = feInfo.getIV();
-      for (int j = 0; j < iv.length; j++) {
-        assertEquals("Unexpected IV byte", (byte)(3+j*2), iv[j]);
-      }
     }
     }
   }
   }
+
+  private void validateFiles(Path p1, Path p2, int len) throws Exception {
+    FSDataInputStream in1 = fs.open(p1);
+    FSDataInputStream in2 = fs.open(p2);
+    for (int i=0; i<len; i++) {
+      assertEquals("Mismatch at byte " + i, in1.read(), in2.read());
+    }
+    in1.close();
+    in2.close();
+  }
+
+  private FileEncryptionInfo getFileEncryptionInfo(Path path) throws Exception {
+    LocatedBlocks blocks = fs.getClient().getLocatedBlocks(path.toString(), 0);
+    return blocks.getFileEncryptionInfo();
+  }
+
+  @Test(timeout = 120000)
+  public void testReadWrite() throws Exception {
+    final HdfsAdmin dfsAdmin =
+        new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    // Create a base file for comparison
+    final Path baseFile = new Path("/base");
+    final int len = 8192;
+    DFSTestUtil.createFile(fs, baseFile, len, (short) 1, 0xFEED);
+    // Create the first enc file
+    final Path zone = new Path("/zone");
+    fs.mkdirs(zone);
+    dfsAdmin.createEncryptionZone(zone, null);
+    final Path encFile1 = new Path(zone, "myfile");
+    DFSTestUtil.createFile(fs, encFile1, len, (short) 1, 0xFEED);
+    // Read them back in and compare byte-by-byte
+    validateFiles(baseFile, encFile1, len);
+    // Roll the key of the encryption zone
+    List<EncryptionZone> zones = dfsAdmin.listEncryptionZones();
+    assertEquals("Expected 1 EZ", 1, zones.size());
+    String keyId = zones.get(0).getKeyId();
+    cluster.getNamesystem().getProvider().rollNewVersion(keyId);
+    cluster.getNamesystem().getFSDirectory().ezManager.kickMonitor();
+    // Read them back in and compare byte-by-byte
+    validateFiles(baseFile, encFile1, len);
+    // Write a new enc file and validate
+    final Path encFile2 = new Path(zone, "myfile2");
+    DFSTestUtil.createFile(fs, encFile2, len, (short) 1, 0xFEED);
+    // FEInfos should be different
+    FileEncryptionInfo feInfo1 = getFileEncryptionInfo(encFile1);
+    FileEncryptionInfo feInfo2 = getFileEncryptionInfo(encFile2);
+    assertFalse("EDEKs should be different", Arrays.equals(
+        feInfo1.getEncryptedDataEncryptionKey(),
+        feInfo2.getEncryptedDataEncryptionKey()));
+    assertNotEquals("Key was rolled, versions should be different",
+        feInfo1.getEzKeyVersionName(), feInfo2.getEzKeyVersionName());
+    // Contents still equal
+    validateFiles(encFile1, encFile2, len);
+  }
 }
 }