소스 검색

HDFS-6391. Get the Key/IV from the NameNode for encrypted files in DFSClient. Contributed by Charles Lamb and Andrew Wang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1606220 13f79535-47bb-0310-9956-ffa450edef68
Andrew Wang 11 년 전
부모
커밋
2efea95213
25개의 변경된 파일442개의 추가작업 그리고 204개의 파일을 삭제
  1. 62 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CipherSuite.java
  2. 83 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileEncryptionInfo.java
  3. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES-fs-encryption.txt
  4. 3 38
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
  5. 55 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  6. 6 20
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
  7. 9 20
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
  8. 17 15
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  9. 0 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
  10. 7 20
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java
  11. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
  12. 9 13
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
  13. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java
  14. 55 16
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  15. 5 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  16. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
  17. 82 19
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  18. 12 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  19. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
  20. 14 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
  21. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
  22. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
  23. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
  24. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
  25. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtil.java

+ 62 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CipherSuite.java

@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.crypto;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Defines properties of a CipherSuite. Modeled after the ciphers in
+ * {@link javax.crypto.Cipher}.
+ */
+@InterfaceAudience.Private
+public enum CipherSuite {
+  AES_CTR_NOPADDING("AES/CTR/NoPadding", 128);
+
+  private final String name;
+  private final int blockBits;
+
+  CipherSuite(String name, int blockBits) {
+    this.name = name;
+    this.blockBits = blockBits;
+  }
+
+  /**
+   * @return name of cipher suite, as in {@link javax.crypto.Cipher}
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * @return size of an algorithm block in bits
+   */
+  public int getNumberBlockBits() {
+    return blockBits;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("{");
+    builder.append("name: " + getName() + ", ");
+    builder.append("numBlockBits: " + getNumberBlockBits());
+    builder.append("}");
+    return builder.toString();
+  }
+}

+ 83 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileEncryptionInfo.java

@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.crypto.CipherSuite;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * FileEncryptionInfo encapsulates all the encryption-related information for
+ * an encrypted file.
+ */
+@InterfaceAudience.Private
+public class FileEncryptionInfo {
+
+  private final CipherSuite cipherSuite;
+  private final byte[] key;
+  private final byte[] iv;
+
+  public FileEncryptionInfo(CipherSuite suite, byte[] key, byte[] iv) {
+    checkNotNull(suite);
+    checkNotNull(key);
+    checkNotNull(iv);
+    checkArgument(key.length == suite.getNumberBlockBits() / 8,
+        "Unexpected key length");
+    checkArgument(iv.length == suite.getNumberBlockBits() / 8,
+        "Unexpected IV length");
+    this.cipherSuite = suite;
+    this.key = key;
+    this.iv = iv;
+  }
+
+  /**
+   * @return {@link org.apache.hadoop.crypto.CipherSuite} used to encrypt
+   * the file.
+   */
+  public CipherSuite getCipherSuite() {
+    return cipherSuite;
+  }
+
+  /**
+   * @return encrypted data encryption key for the file
+   */
+  public byte[] getEncryptedDataEncryptionKey() {
+    return key;
+  }
+
+  /**
+   * @return initialization vector for the cipher used to encrypt the file
+   */
+  public byte[] getIV() {
+    return iv;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("{");
+    builder.append("cipherSuite: " + cipherSuite);
+    builder.append(", key: " + Hex.encodeHexString(key));
+    builder.append(", iv: " + Hex.encodeHexString(iv));
+    builder.append("}");
+    return builder.toString();
+  }
+}

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES-fs-encryption.txt

@@ -23,6 +23,9 @@ fs-encryption (Unreleased)
     HDFS-6476. Print out the KeyProvider after finding KP successfully on
     startup. (Juan Yu via wang)
 
+    HDFS-6391. Get the Key/IV from the NameNode for encrypted files in
+    DFSClient. (Charles Lamb and wang)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 3 - 38
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java

@@ -31,8 +31,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CryptoCodec;
-import org.apache.hadoop.crypto.CryptoOutputStream;
-import org.apache.hadoop.crypto.CryptoInputStream;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -57,8 +55,6 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.apache.hadoop.util.Progressable;
 
-import com.google.common.base.Preconditions;
-
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class Hdfs extends AbstractFileSystem {
@@ -108,23 +104,8 @@ public class Hdfs extends AbstractFileSystem {
     final DFSOutputStream dfsos = dfs.primitiveCreate(getUriPath(f),
       absolutePermission, createFlag, createParent, replication, blockSize,
       progress, bufferSize, checksumOpt);
-    final byte[] key = dfsos.getKey();
-    final byte[] iv = dfsos.getIv();
-    Preconditions.checkState(!(key == null ^ iv == null),
-      "Only one of the Key and IV were found.");
-    if (false && key != null) {
-
-      /*
-       * The Key and IV were found. Wrap up the output stream with an encryption
-       * wrapper.
-       */
-      final CryptoOutputStream cbos =
-        new CryptoOutputStream(dfsos, factory, key, iv);
-      return new HdfsDataOutputStream(cbos, getStatistics());
-    } else {
-      /* No key/IV present so no encryption. */
-      return new HdfsDataOutputStream(dfsos, getStatistics());
-    }
+    return dfs.createWrappedOutputStream(dfsos, statistics,
+        dfsos.getInitialLen());
   }
 
   @Override
@@ -335,23 +316,7 @@ public class Hdfs extends AbstractFileSystem {
       throws IOException, UnresolvedLinkException {
     final DFSInputStream dfsis = dfs.open(getUriPath(f),
       bufferSize, verifyChecksum);
-    final byte[] key = dfsis.getKey();
-    final byte[] iv = dfsis.getIv();
-    Preconditions.checkState(!(key == null ^ iv == null),
-      "Only one of the Key and IV were found.");
-    if (false && key != null) {
-
-      /*
-       * The Key and IV were found. Wrap up the input stream with an encryption
-       * wrapper.
-       */
-      final CryptoInputStream cbis =
-        new CryptoInputStream(dfsis, factory, key, iv);
-      return new HdfsDataInputStream(cbis);
-    } else {
-      /* No key/IV pair so no encryption. */
-      return new HdfsDataInputStream(dfsis);
-    }
+    return dfs.createWrappedInputStream(dfsis);
   }
 
   @Override

+ 55 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -94,6 +94,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.CryptoInputStream;
+import org.apache.hadoop.crypto.CryptoOutputStream;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockStorageLocation;
 import org.apache.hadoop.fs.CacheFlag;
@@ -101,6 +104,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsStatus;
@@ -241,6 +245,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
   private static final DFSHedgedReadMetrics HEDGED_READ_METRIC =
       new DFSHedgedReadMetrics();
   private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
+  private final CryptoCodec codec;
   
   /**
    * DFSClient configuration 
@@ -573,6 +578,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
     this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
     this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + 
         DFSUtil.getRandom().nextInt()  + "_" + Thread.currentThread().getId();
+    this.codec = CryptoCodec.getInstance(conf);
     
     int numResponseToDrop = conf.getInt(
         DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
@@ -1267,7 +1273,54 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
 
     return volumeBlockLocations;
   }
-  
+
+  /**
+   * Wraps the stream in a CryptoInputStream if the underlying file is
+   * encrypted.
+   */
+  public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis)
+      throws IOException {
+    final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo();
+    if (feInfo != null) {
+      // File is encrypted, wrap the stream in a crypto stream.
+      final CryptoInputStream cryptoIn =
+          new CryptoInputStream(dfsis, codec,
+              feInfo.getEncryptedDataEncryptionKey(), feInfo.getIV());
+      return new HdfsDataInputStream(cryptoIn);
+    } else {
+      // No key/IV pair so no encryption.
+      return new HdfsDataInputStream(dfsis);
+    }
+  }
+
+  /**
+   * Wraps the stream in a CryptoOutputStream if the underlying file is
+   * encrypted.
+   */
+  public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos,
+      FileSystem.Statistics statistics) throws IOException {
+    return createWrappedOutputStream(dfsos, statistics, 0);
+  }
+
+  /**
+   * Wraps the stream in a CryptoOutputStream if the underlying file is
+   * encrypted.
+   */
+  public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos,
+      FileSystem.Statistics statistics, long startPos) throws IOException {
+    final FileEncryptionInfo feInfo = dfsos.getFileEncryptionInfo();
+    if (feInfo != null) {
+      // File is encrypted, wrap the stream in a crypto stream.
+      final CryptoOutputStream cryptoOut =
+          new CryptoOutputStream(dfsos, codec,
+              feInfo.getEncryptedDataEncryptionKey(), feInfo.getIV(), startPos);
+      return new HdfsDataOutputStream(cryptoOut, statistics, startPos);
+    } else {
+      // No key/IV present so no encryption.
+      return new HdfsDataOutputStream(dfsos, statistics, startPos);
+    }
+  }
+
   public DFSInputStream open(String src) 
       throws IOException, UnresolvedLinkException {
     return open(src, dfsClientConf.ioBufferSize, true, null);
@@ -1595,7 +1648,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
       final Progressable progress, final FileSystem.Statistics statistics
       ) throws IOException {
     final DFSOutputStream out = append(src, buffersize, progress);
-    return new HdfsDataOutputStream(out, statistics, out.getInitialLen());
+    return createWrappedOutputStream(out, statistics, out.getInitialLen());
   }
 
   private DFSOutputStream append(String src, int buffersize, Progressable progress) 

+ 6 - 20
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -53,6 +53,7 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
@@ -88,8 +89,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   private final boolean verifyChecksum;
   private LocatedBlocks locatedBlocks = null;
   private long lastBlockBeingWrittenLength = 0;
-  private byte[] key = null;
-  private byte[] iv = null;
+  private FileEncryptionInfo fileEncryptionInfo = null;
   private DatanodeInfo currentNode = null;
   private LocatedBlock currentLocatedBlock = null;
   private long pos = 0;
@@ -299,8 +299,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       }
     }
 
-    key = locatedBlocks.getKey();
-    iv = locatedBlocks.getIv();
+    fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
+
     currentNode = null;
     return lastBlockBeingWrittenLength;
   }
@@ -1521,22 +1521,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     return new ReadStatistics(readStatistics);
   }
 
-  /**
-   * Get the encryption key for this stream.
-   *
-   * @return byte[] the key
-   */
-  public synchronized byte[] getKey() {
-    return key;
-  }
-
-  /**
-   * Get the encryption initialization vector (IV) for this stream.
-   *
-   * @return byte[] the initialization vector (IV).
-   */
-  public synchronized byte[] getIv() {
-    return iv;
+  public synchronized FileEncryptionInfo getFileEncryptionInfo() {
+    return fileEncryptionInfo;
   }
 
   private synchronized void closeCurrentBlockReader() {

+ 9 - 20
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSOutputSummer;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -154,9 +155,8 @@ public class DFSOutputStream extends FSOutputSummer
   private boolean shouldSyncBlock = false; // force blocks to disk upon close
   private final AtomicReference<CachingStrategy> cachingStrategy;
   private boolean failPacket = false;
-  private byte[] key = null;
-  private byte[] iv = null;
-  
+  private FileEncryptionInfo fileEncryptionInfo;
+
   private static class Packet {
     private static final long HEART_BEAT_SEQNO = -1L;
     final long seqno; // sequencenumber of buffer in block
@@ -1564,8 +1564,7 @@ public class DFSOutputStream extends FSOutputSummer
     this.fileId = stat.getFileId();
     this.blockSize = stat.getBlockSize();
     this.blockReplication = stat.getReplication();
-    this.key = stat.getKey();
-    this.iv = stat.getIv();
+    this.fileEncryptionInfo = stat.getFileEncryptionInfo();
     this.progress = progress;
     this.cachingStrategy = new AtomicReference<CachingStrategy>(
         dfsClient.getDefaultWriteCachingStrategy());
@@ -1654,6 +1653,7 @@ public class DFSOutputStream extends FSOutputSummer
           checksum.getBytesPerChecksum());
       streamer = new DataStreamer();
     }
+    this.fileEncryptionInfo = stat.getFileEncryptionInfo();
   }
 
   static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
@@ -2178,26 +2178,15 @@ public class DFSOutputStream extends FSOutputSummer
   /**
    * Returns the size of a file as it was when this stream was opened
    */
-  long getInitialLen() {
+  public long getInitialLen() {
     return initialFileSize;
   }
 
   /**
-   * Get the encryption key for this stream.
-   *
-   * @return byte[] the key.
-   */
-  public byte[] getKey() {
-    return key;
-  }
-
-  /**
-   * Get the encryption initialization vector (IV) for this stream.
-   *
-   * @return byte[] the initialization vector (IV).
+   * @return the FileEncryptionInfo for this stream, or null if not encrypted.
    */
-  public byte[] getIv() {
-    return iv;
+  public FileEncryptionInfo getFileEncryptionInfo() {
+    return fileEncryptionInfo;
   }
 
   /**

+ 17 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -60,7 +60,6 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
-import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -291,8 +290,9 @@ public class DistributedFileSystem extends FileSystem {
       @Override
       public FSDataInputStream doCall(final Path p)
           throws IOException, UnresolvedLinkException {
-        return new HdfsDataInputStream(
-            dfs.open(getPathName(p), bufferSize, verifyChecksum));
+        final DFSInputStream dfsis =
+          dfs.open(getPathName(p), bufferSize, verifyChecksum);
+        return dfs.createWrappedInputStream(dfsis);
       }
       @Override
       public FSDataInputStream next(final FileSystem fs, final Path p)
@@ -357,7 +357,7 @@ public class DistributedFileSystem extends FileSystem {
                 : EnumSet.of(CreateFlag.CREATE),
             true, replication, blockSize, progress, bufferSize, null,
             favoredNodes);
-        return new HdfsDataOutputStream(out, statistics);
+        return dfs.createWrappedOutputStream(out, statistics);
       }
       @Override
       public HdfsDataOutputStream next(final FileSystem fs, final Path p)
@@ -385,9 +385,10 @@ public class DistributedFileSystem extends FileSystem {
       @Override
       public FSDataOutputStream doCall(final Path p)
           throws IOException, UnresolvedLinkException {
-        return new HdfsDataOutputStream(dfs.create(getPathName(p), permission,
-            cflags, replication, blockSize, progress, bufferSize, checksumOpt),
-            statistics);
+        final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
+                cflags, replication, blockSize, progress, bufferSize,
+                checksumOpt);
+        return dfs.createWrappedOutputStream(dfsos, statistics);
       }
       @Override
       public FSDataOutputStream next(final FileSystem fs, final Path p)
@@ -404,11 +405,12 @@ public class DistributedFileSystem extends FileSystem {
     short replication, long blockSize, Progressable progress,
     ChecksumOpt checksumOpt) throws IOException {
     statistics.incrementWriteOps(1);
-    return new HdfsDataOutputStream(dfs.primitiveCreate(
-        getPathName(fixRelativePart(f)),
-        absolutePermission, flag, true, replication, blockSize,
-        progress, bufferSize, checksumOpt),statistics);
-   }
+    final DFSOutputStream dfsos = dfs.primitiveCreate(
+      getPathName(fixRelativePart(f)),
+      absolutePermission, flag, true, replication, blockSize,
+      progress, bufferSize, checksumOpt);
+    return dfs.createWrappedOutputStream(dfsos, statistics);
+  }
 
   /**
    * Same as create(), except fails if parent directory doesn't already exist.
@@ -428,9 +430,9 @@ public class DistributedFileSystem extends FileSystem {
       @Override
       public FSDataOutputStream doCall(final Path p) throws IOException,
           UnresolvedLinkException {
-        return new HdfsDataOutputStream(dfs.create(getPathName(p), permission,
-            flag, false, replication, blockSize, progress, bufferSize, null),
-            statistics);
+        final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
+          flag, false, replication, blockSize, progress, bufferSize, null);
+        return dfs.createWrappedOutputStream(dfsos, statistics);
       }
 
       @Override

+ 0 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java

@@ -161,13 +161,4 @@ public class HdfsConstants {
   
   public static final String SEPARATOR_DOT_SNAPSHOT_DIR
       = Path.SEPARATOR + DOT_SNAPSHOT_DIR; 
-
-  public static final String CRYPTO_XATTR_KEY_ID = "system.hdfs.crypto.key-id";
-  public static final String CRYPTO_XATTR_KEY_VERSION_ID =
-    "system.hdfs.crypto.key-version-id";
-  public static final String CRYPTO_XATTR_IV = "system.hdfs.crypto.iv";
-  public static final int CRYPTO_KEY_SIZE = 128;
-  /* Temporary until we stop hard-coding these values. */
-  public static final byte[] KEY = "0123456789012345".getBytes();
-  public static final byte[] IV = "ABCDEFGJIJKLMNOP".getBytes();
 }

+ 7 - 20
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java

@@ -21,6 +21,7 @@ import java.net.URI;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -45,8 +46,7 @@ public class HdfsFileStatus {
   private final String group;
   private final long fileId;
 
-  private final byte[] key;
-  private final byte[] iv;
+  private final FileEncryptionInfo feInfo;
   
   // Used by dir, not including dot and dotdot. Always zero for a regular file.
   private final int childrenNum;
@@ -66,20 +66,12 @@ public class HdfsFileStatus {
    * @param group the group of the path
    * @param path the local name in java UTF8 encoding the same as that in-memory
    * @param fileId the file id
+   * @param feInfo the file's encryption info
    */
   public HdfsFileStatus(long length, boolean isdir, int block_replication,
       long blocksize, long modification_time, long access_time,
       FsPermission permission, String owner, String group, byte[] symlink,
-      byte[] path, long fileId, int childrenNum) {
-    this(length, isdir, block_replication, blocksize, modification_time,
-      access_time, permission, owner, group, symlink, path, fileId,
-      childrenNum, HdfsConstants.KEY, HdfsConstants.IV);
-  }
-
-  public HdfsFileStatus(long length, boolean isdir, int block_replication,
-      long blocksize, long modification_time, long access_time,
-      FsPermission permission, String owner, String group, byte[] symlink,
-    byte[] path, long fileId, int childrenNum, byte[] key, byte[] iv) {
+    byte[] path, long fileId, int childrenNum, FileEncryptionInfo feInfo) {
     this.length = length;
     this.isdir = isdir;
     this.block_replication = (short)block_replication;
@@ -97,8 +89,7 @@ public class HdfsFileStatus {
     this.path = path;
     this.fileId = fileId;
     this.childrenNum = childrenNum;
-    this.key = key;
-    this.iv = iv;
+    this.feInfo = feInfo;
   }
 
   /**
@@ -252,12 +243,8 @@ public class HdfsFileStatus {
     return fileId;
   }
   
-  final public byte[] getKey() {
-    return key;
-  }
-
-  final public byte[] getIv() {
-    return iv;
+  final public FileEncryptionInfo getFileEncryptionInfo() {
+    return feInfo;
   }
 
   final public int getChildrenNum() {

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java

@@ -21,6 +21,7 @@ import java.net.URI;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -51,15 +52,16 @@ public class HdfsLocatedFileStatus extends HdfsFileStatus {
    * @param path local path name in java UTF8 format 
    * @param fileId the file id
    * @param locations block locations
+   * @param feInfo file encryption info
    */
   public HdfsLocatedFileStatus(long length, boolean isdir,
       int block_replication, long blocksize, long modification_time,
       long access_time, FsPermission permission, String owner, String group,
       byte[] symlink, byte[] path, long fileId, LocatedBlocks locations,
-    int childrenNum, byte[] key, byte[] iv) {
+    int childrenNum, FileEncryptionInfo feInfo) {
     super(length, isdir, block_replication, blocksize, modification_time,
       access_time, permission, owner, group, symlink, path, fileId,
-      childrenNum, key, iv);
+      childrenNum, feInfo);
     this.locations = locations;
   }
 	

+ 9 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java

@@ -23,6 +23,7 @@ import java.util.Comparator;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 
 /**
  * Collection of blocks with their locations and the file length.
@@ -35,27 +36,23 @@ public class LocatedBlocks {
   private final boolean underConstruction;
   private LocatedBlock lastLocatedBlock = null;
   private boolean isLastBlockComplete = false;
-  private final byte[] key;
-  private final byte[] iv;
+  private FileEncryptionInfo fileEncryptionInfo = null;
 
   public LocatedBlocks() {
     fileLength = 0;
     blocks = null;
     underConstruction = false;
-    key = null;
-    iv = null;
   }
 
   public LocatedBlocks(long flength, boolean isUnderConstuction,
     List<LocatedBlock> blks, LocatedBlock lastBlock,
-    boolean isLastBlockCompleted, byte[] key, byte[] iv) {
+    boolean isLastBlockCompleted, FileEncryptionInfo feInfo) {
     fileLength = flength;
     blocks = blks;
     underConstruction = isUnderConstuction;
     this.lastLocatedBlock = lastBlock;
     this.isLastBlockComplete = isLastBlockCompleted;
-    this.key = key;
-    this.iv = iv;
+    this.fileEncryptionInfo = feInfo;
   }
   
   /**
@@ -103,13 +100,12 @@ public class LocatedBlocks {
   public boolean isUnderConstruction() {
     return underConstruction;
   }
-  
-  public byte[] getKey() {
-    return key;
-  }
 
-  public byte[] getIv() {
-    return iv;
+  /**
+   * @return the FileEncryptionInfo for the LocatedBlocks
+   */
+  public FileEncryptionInfo getFileEncryptionInfo() {
+    return fileEncryptionInfo;
   }
 
   /**

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java

@@ -61,7 +61,7 @@ public class SnapshottableDirectoryStatus {
       int snapshotNumber, int snapshotQuota, byte[] parentFullPath) {
     this.dirStatus = new HdfsFileStatus(0, true, 0, 0, modification_time,
         access_time, permission, owner, group, null, localName, inodeId,
-        childrenNum, null /* key */, null /* IV */);
+        childrenNum, null);
     this.snapshotNumber = snapshotNumber;
     this.snapshotQuota = snapshotQuota;
     this.parentFullPath = parentFullPath;

+ 55 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -44,7 +44,6 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -52,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolStats;
+import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdfs.protocol.FsAclPermission;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
@@ -1133,8 +1134,8 @@ public class PBHelper {
         PBHelper.convertLocatedBlock(lb.getBlocksList()),
         lb.hasLastBlock() ? PBHelper.convert(lb.getLastBlock()) : null,
         lb.getIsLastBlockComplete(),
-        lb.hasKey() ? lb.getKey().toByteArray() : null,
-        lb.hasIv() ? lb.getIv().toByteArray() : null);
+        lb.hasFileEncryptionInfo() ? convert(lb.getFileEncryptionInfo()) :
+            null);
   }
   
   public static LocatedBlocksProto convert(LocatedBlocks lb) {
@@ -1146,11 +1147,8 @@ public class PBHelper {
     if (lb.getLastLocatedBlock() != null) {
       builder.setLastBlock(PBHelper.convert(lb.getLastLocatedBlock()));
     }
-    if (lb.getKey() != null) {
-      builder.setKey(ByteString.copyFrom(lb.getKey()));
-    }
-    if (lb.getIv() != null) {
-      builder.setIv(ByteString.copyFrom(lb.getIv()));
+    if (lb.getFileEncryptionInfo() != null) {
+      builder.setFileEncryptionInfo(convert(lb.getFileEncryptionInfo()));
     }
     return builder.setFileLength(lb.getFileLength())
         .setUnderConstruction(lb.isUnderConstruction())
@@ -1278,8 +1276,8 @@ public class PBHelper {
         fs.hasFileId()? fs.getFileId(): INodeId.GRANDFATHER_INODE_ID,
         fs.hasLocations() ? PBHelper.convert(fs.getLocations()) : null,
         fs.hasChildrenNum() ? fs.getChildrenNum() : -1,
-        fs.hasKey() ? fs.getKey().toByteArray() : null,
-        fs.hasIv() ? fs.getIv().toByteArray() : null);
+        fs.hasFileEncryptionInfo() ? convert(fs.getFileEncryptionInfo()) :
+            null);
   }
 
   public static SnapshottableDirectoryStatus convert(
@@ -1329,11 +1327,8 @@ public class PBHelper {
     if (fs.isSymlink())  {
       builder.setSymlink(ByteString.copyFrom(fs.getSymlinkInBytes()));
     }
-    if (fs.getKey() != null) {
-      builder.setKey(ByteString.copyFrom(fs.getKey()));
-    }
-    if (fs.getIv() != null) {
-      builder.setIv(ByteString.copyFrom(fs.getIv()));
+    if (fs.getFileEncryptionInfo() != null) {
+      builder.setFileEncryptionInfo(convert(fs.getFileEncryptionInfo()));
     }
     if (fs instanceof HdfsLocatedFileStatus) {
       LocatedBlocks locations = ((HdfsLocatedFileStatus)fs).getBlockLocations();
@@ -2280,5 +2275,49 @@ public class PBHelper {
   public static ShmId convert(ShortCircuitShmIdProto shmId) {
     return new ShmId(shmId.getHi(), shmId.getLo());
   }
-}
 
+  public static HdfsProtos.FileEncryptionInfoProto.CipherType
+      convert(CipherSuite type) {
+    switch (type) {
+    case AES_CTR_NOPADDING:
+      return HdfsProtos.FileEncryptionInfoProto.CipherType
+          .AES_CTR_NOPADDING;
+    default:
+      return null;
+    }
+  }
+
+  public static CipherSuite convert(
+      HdfsProtos.FileEncryptionInfoProto.CipherType proto) {
+    switch (proto) {
+    case AES_CTR_NOPADDING:
+      return CipherSuite.AES_CTR_NOPADDING;
+    default:
+      return null;
+    }
+  }
+
+  public static HdfsProtos.FileEncryptionInfoProto convert(
+      FileEncryptionInfo info) {
+    if (info == null) {
+      return null;
+    }
+    return HdfsProtos.FileEncryptionInfoProto.newBuilder()
+        .setType(convert(info.getCipherSuite()))
+        .setKey(getByteString(info.getEncryptedDataEncryptionKey()))
+        .setIv(getByteString(info.getIV()))
+        .build();
+  }
+
+  public static FileEncryptionInfo convert(
+      HdfsProtos.FileEncryptionInfoProto proto) {
+    if (proto == null) {
+      return null;
+    }
+    CipherSuite type = convert(proto.getType());
+    byte[] key = proto.getKey().toByteArray();
+    byte[] iv = proto.getIv().toByteArray();
+    return new FileEncryptionInfo(type, key, iv);
+  }
+
+}

+ 5 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -52,7 +52,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -842,15 +842,15 @@ public class BlockManager {
   public LocatedBlocks createLocatedBlocks(final BlockInfo[] blocks,
       final long fileSizeExcludeBlocksUnderConstruction,
       final boolean isFileUnderConstruction, final long offset,
-      final long length, final boolean needBlockToken, final boolean inSnapshot)
+      final long length, final boolean needBlockToken,
+      final boolean inSnapshot, FileEncryptionInfo feInfo)
       throws IOException {
     assert namesystem.hasReadLock();
     if (blocks == null) {
       return null;
     } else if (blocks.length == 0) {
       return new LocatedBlocks(0, isFileUnderConstruction,
-          Collections.<LocatedBlock>emptyList(), null, false,
-          null /* key */, null /* IV */);
+          Collections.<LocatedBlock>emptyList(), null, false, null);
     } else {
       if (LOG.isDebugEnabled()) {
         LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
@@ -875,8 +875,7 @@ public class BlockManager {
       }
       return new LocatedBlocks(
           fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
-          locatedblocks, lastlb, isComplete,
-              HdfsConstants.KEY, HdfsConstants.IV);
+          locatedblocks, lastlb, isComplete, feInfo);
     }
   }
 

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java

@@ -292,5 +292,10 @@ public final class HdfsServerConstants {
   
   public static final String NAMENODE_LEASE_HOLDER = "HDFS_NameNode";
   public static final long NAMENODE_LEASE_RECHECK_INTERVAL = 2000;
+
+  public static final String CRYPTO_XATTR_ENCRYPTION_ZONE =
+      "system.hdfs.crypto.encryption.zone";
+  public static final String CRYPTO_XATTR_FILE_ENCRYPTION_INFO =
+      "system.hdfs.crypto.file.encryption.info";
 }
 

+ 82 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
 import static org.apache.hadoop.util.Time.now;
 
 import java.io.Closeable;
@@ -29,12 +31,13 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.crypto.CryptoCodec;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
@@ -66,6 +69,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -85,10 +90,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.CRYPTO_XATTR_KEY_ID;
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.CRYPTO_XATTR_IV;
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.CRYPTO_XATTR_KEY_VERSION_ID;
-
 /**
  * Both FSDirectory and FSNamesystem manage the state of the namespace.
  * FSDirectory is a pure in-memory data structure, all of whose operations
@@ -133,7 +134,6 @@ public class FSDirectory implements Closeable {
   private final INodeMap inodeMap; // Synchronized by dirLock
   private long yieldCount = 0; // keep track of lock yield count.
   private final int inodeXAttrsLimit; //inode xattrs max limit
-  private final CryptoCodec codec;
 
   // lock to protect the directory and BlockMap
   private final ReentrantReadWriteLock dirLock;
@@ -200,7 +200,7 @@ public class FSDirectory implements Closeable {
     this.inodeXAttrsLimit = conf.getInt(
         DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_KEY,
         DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_DEFAULT);
-    this.codec = CryptoCodec.getInstance(conf);
+
     Preconditions.checkArgument(this.inodeXAttrsLimit >= 0,
         "Cannot set a negative limit on the number of xattrs per inode (%s).",
         DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_KEY);
@@ -1470,8 +1470,8 @@ public class FSDirectory implements Closeable {
    * @return object containing information regarding the file
    *         or null if file not found
    */
-  HdfsFileStatus getFileInfo(String src, boolean resolveLink) 
-      throws UnresolvedLinkException {
+  HdfsFileStatus getFileInfo(String src, boolean resolveLink)
+    throws UnresolvedLinkException, IOException {
     String srcs = normalizePath(src);
     readLock();
     try {
@@ -1480,6 +1480,8 @@ public class FSDirectory implements Closeable {
       }
       final INodesInPath inodesInPath = getLastINodeInPath(srcs, resolveLink);
       final INode i = inodesInPath.getINode(0);
+
+      final int snapshotId = inodesInPath.getPathSnapshotId();
       return i == null? null: createFileStatus(HdfsFileStatus.EMPTY_NAME, i,
           inodesInPath.getPathSnapshotId());
     } finally {
@@ -1498,7 +1500,7 @@ public class FSDirectory implements Closeable {
       throws UnresolvedLinkException {
     if (getINode4DotSnapshot(src) != null) {
       return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
-          HdfsFileStatus.EMPTY_NAME, -1L, 0, null /* key */, null /* IV */);
+          HdfsFileStatus.EMPTY_NAME, -1L, 0, null);
     }
     return null;
   }
@@ -2326,7 +2328,7 @@ public class FSDirectory implements Closeable {
    * Create FileStatus by file INode 
    */
    HdfsFileStatus createFileStatus(byte[] path, INode node,
-       int snapshot) {
+       int snapshot) throws IOException {
      long size = 0;     // length is zero for directories
      short replication = 0;
      long blocksize = 0;
@@ -2338,7 +2340,9 @@ public class FSDirectory implements Closeable {
      }
      int childrenNum = node.isDirectory() ? 
          node.asDirectory().getChildrenNum(snapshot) : 0;
-         
+
+     FileEncryptionInfo feInfo = getFileEncryptionInfo(node, snapshot);
+
      return new HdfsFileStatus(
         size, 
         node.isDirectory(), 
@@ -2353,8 +2357,7 @@ public class FSDirectory implements Closeable {
         path,
         node.getId(),
         childrenNum,
-        HdfsConstants.KEY,  // key
-        HdfsConstants.IV); // IV
+        feInfo);
   }
 
   /**
@@ -2377,16 +2380,20 @@ public class FSDirectory implements Closeable {
       final boolean isUc = !inSnapshot && fileNode.isUnderConstruction();
       final long fileSize = !inSnapshot && isUc ? 
           fileNode.computeFileSizeNotIncludingLastUcBlock() : size;
+      final FileEncryptionInfo feInfo = getFileEncryptionInfo(node, snapshot);
+
       loc = getFSNamesystem().getBlockManager().createLocatedBlocks(
           fileNode.getBlocks(), fileSize, isUc, 0L, size, false,
-          inSnapshot);
+          inSnapshot, feInfo);
       if (loc == null) {
         loc = new LocatedBlocks();
       }
     }
     int childrenNum = node.isDirectory() ? 
         node.asDirectory().getChildrenNum(snapshot) : 0;
-        
+
+    final FileEncryptionInfo feInfo = getFileEncryptionInfo(node, snapshot);
+
     HdfsLocatedFileStatus status =
         new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
           blocksize, node.getModificationTime(snapshot),
@@ -2394,7 +2401,7 @@ public class FSDirectory implements Closeable {
           getPermissionForFileStatus(node, snapshot),
           node.getUserName(snapshot), node.getGroupName(snapshot),
           node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
-          node.getId(), loc, childrenNum, null /* key */, null /* IV */);
+          node.getId(), loc, childrenNum, feInfo);
     // Set caching information for the located blocks.
     if (loc != null) {
       CacheManager cacheManager = namesystem.getCacheManager();
@@ -2665,7 +2672,7 @@ public class FSDirectory implements Closeable {
           "Attempt to create an encryption zone for a non-empty directory.");
       }
       final XAttr keyIdXAttr =
-        XAttrHelper.buildXAttr(CRYPTO_XATTR_KEY_ID, keyId.getBytes());
+        XAttrHelper.buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, keyId.getBytes());
       List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
       xattrs.add(keyIdXAttr);
       unprotectedSetXAttrs(src, xattrs, EnumSet.of(XAttrSetFlag.CREATE));
@@ -2684,7 +2691,7 @@ public class FSDirectory implements Closeable {
           "Attempt to delete an encryption zone for a non-empty directory.");
       }
       final XAttr keyIdXAttr =
-        XAttrHelper.buildXAttr(CRYPTO_XATTR_KEY_ID, null);
+        XAttrHelper.buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, null);
       List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
       xattrs.add(keyIdXAttr);
       final List<XAttr> removedXAttrs = unprotectedRemoveXAttrs(src, xattrs);
@@ -2698,6 +2705,62 @@ public class FSDirectory implements Closeable {
     }
   }
 
+  /**
+   * Set the FileEncryptionInfo for an INode.
+   */
+  void setFileEncryptionInfo(String src, FileEncryptionInfo info)
+      throws IOException {
+    // Make the PB for the xattr
+    final HdfsProtos.FileEncryptionInfoProto proto = PBHelper.convert(info);
+    final byte[] protoBytes = proto.toByteArray();
+    final XAttr fileEncryptionAttr =
+        XAttrHelper.buildXAttr(CRYPTO_XATTR_FILE_ENCRYPTION_INFO, protoBytes);
+    final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
+    xAttrs.add(fileEncryptionAttr);
+
+    writeLock();
+    try {
+      unprotectedSetXAttrs(src, xAttrs, EnumSet.of(XAttrSetFlag.CREATE));
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  /**
+   * Return the FileEncryptionInfo for an INode, or null if the INode is not
+   * an encrypted file.
+   */
+  FileEncryptionInfo getFileEncryptionInfo(INode inode, int snapshotId)
+      throws IOException {
+    if (!inode.isFile()) {
+      return null;
+    }
+    readLock();
+    try {
+      List<XAttr> xAttrs = XAttrStorage.readINodeXAttrs(inode, snapshotId);
+      if (xAttrs == null) {
+        return null;
+      }
+      for (XAttr x : xAttrs) {
+        if (XAttrHelper.getPrefixName(x)
+            .equals(CRYPTO_XATTR_FILE_ENCRYPTION_INFO)) {
+          try {
+            HdfsProtos.FileEncryptionInfoProto proto =
+                HdfsProtos.FileEncryptionInfoProto.parseFrom(x.getValue());
+            FileEncryptionInfo feInfo = PBHelper.convert(proto);
+            return feInfo;
+          } catch (InvalidProtocolBufferException e) {
+            throw new IOException("Could not parse file encryption info for " +
+                "inode " + inode, e);
+          }
+        }
+      }
+      return null;
+    } finally {
+      readUnlock();
+    }
+  }
+
   void setXAttrs(final String src, final List<XAttr> xAttrs,
       final EnumSet<XAttrSetFlag> flag) throws IOException {
     writeLock();

+ 12 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -83,9 +83,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROU
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.CRYPTO_KEY_SIZE;
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.CRYPTO_XATTR_IV;
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.CRYPTO_XATTR_KEY_VERSION_ID;
 import static org.apache.hadoop.util.Time.now;
 
 import java.io.*;
@@ -122,6 +119,8 @@ import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoCodec;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
 import org.apache.hadoop.crypto.key.KeyProviderFactory;
@@ -131,6 +130,7 @@ import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.InvalidPathException;
@@ -154,7 +154,6 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -530,6 +529,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private KeyProvider.Options providerOptions = null;
 
   private final Map<String, EncryptionZone> encryptionZones;
+  private final CryptoCodec codec;
 
   private volatile boolean imageLoaded = false;
   private final Condition cond;
@@ -747,6 +747,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throws IOException {
     initializeKeyProvider(conf);
     providerOptions = KeyProvider.options(conf);
+    this.codec = CryptoCodec.getInstance(conf);
     if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY,
                         DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT)) {
       LOG.info("Enabling async auditlog");
@@ -1873,9 +1874,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           length = Math.min(length, fileSize - offset);
           isUc = false;
         }
-        LocatedBlocks blocks =
+
+        final FileEncryptionInfo feInfo = dir.getFileEncryptionInfo(inode,
+            iip.getPathSnapshotId());
+
+        final LocatedBlocks blocks =
           blockManager.createLocatedBlocks(inode.getBlocks(), fileSize,
-            isUc, offset, length, needBlockToken, iip.isSnapshot());
+            isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo);
         // Set caching information for the located blocks.
         for (LocatedBlock lb: blocks.getLocatedBlocks()) {
           cacheManager.setCachedLocations(lb);
@@ -8296,7 +8301,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     final String keyId = UUID.randomUUID().toString();
     // TODO pass in hdfs://HOST:PORT (HDFS-6490)
     providerOptions.setDescription(src);
-    providerOptions.setBitLength(CRYPTO_KEY_SIZE);
+    providerOptions.setBitLength(codec.getAlgorithmBlockSize()*8);
     try {
       provider.createKey(keyId, providerOptions);
     } catch (NoSuchAlgorithmException e) {

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java

@@ -253,7 +253,7 @@ public class JsonUtil {
     return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication,
         blockSize, mTime, aTime, permission, owner, group,
         symlink, DFSUtil.string2Bytes(localName), fileId, childrenNum,
-        null /* key */, null /* IV */);
+        null);
   }
 
   /** Convert an ExtendedBlock to a Json map. */
@@ -533,7 +533,7 @@ public class JsonUtil {
         (Map<?, ?>)m.get("lastLocatedBlock"));
     final boolean isLastBlockComplete = (Boolean)m.get("isLastBlockComplete");
     return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks,
-        lastLocatedBlock, isLastBlockComplete, null /* key */, null /* IV */);
+        lastLocatedBlock, isLastBlockComplete, null);
   }
 
   /** Convert a ContentSummary to a Json string. */

+ 14 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto

@@ -169,6 +169,17 @@ message DataEncryptionKeyProto {
   optional string encryptionAlgorithm = 6;
 }
 
+/**
+ * Encryption information for a file.
+ */
+message FileEncryptionInfoProto {
+  enum CipherType {
+    AES_CTR_NOPADDING = 1;
+  }
+  required CipherType type = 1;
+  required bytes key = 2;
+  required bytes iv = 3;
+}
 
 /**
  * A set of file blocks and their locations.
@@ -179,11 +190,9 @@ message LocatedBlocksProto {
   required bool underConstruction = 3;
   optional LocatedBlockProto lastBlock = 4;
   required bool isLastBlockComplete = 5;
-  optional bytes key = 6;
-  optional bytes iv = 7;
+  optional FileEncryptionInfoProto fileEncryptionInfo = 6;
 }
 
-
 /**
  * Status of a file, directory or symlink
  * Optionally includes a file's block locations if requested by client on the rpc call.
@@ -215,9 +224,8 @@ message HdfsFileStatusProto {
   optional uint64 fileId = 13 [default = 0]; // default as an invalid id
   optional int32 childrenNum = 14 [default = -1];
 
-  // Optional fields for key/iv for encryption
-  optional bytes key = 15;
-  optional bytes iv = 16;
+  // Optional field for file encryption
+  optional FileEncryptionInfoProto fileEncryptionInfo = 15;
 } 
 
 /**

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java

@@ -253,12 +253,12 @@ public class TestDFSClientRetries {
     Mockito.doReturn(
             new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
                 (short) 777), "owner", "group", new byte[0], new byte[0],
-                1010, 0, null, null)).when(mockNN).getFileInfo(anyString());
+                1010, 0, null)).when(mockNN).getFileInfo(anyString());
     
     Mockito.doReturn(
             new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
                 (short) 777), "owner", "group", new byte[0], new byte[0],
-                1010, 0, null, null))
+                1010, 0, null))
         .when(mockNN)
         .create(anyString(), (FsPermission) anyObject(), anyString(),
             (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
@@ -495,7 +495,7 @@ public class TestDFSClientRetries {
       badBlocks.add(badLocatedBlock);
       return new LocatedBlocks(goodBlockList.getFileLength(), false,
                                badBlocks, null, true,
-                               null /* key */, null /* IV */);
+                               null);
     }
   }
   

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java

@@ -95,7 +95,7 @@ public class TestDFSUtil {
     LocatedBlock l2 = new LocatedBlock(b2, ds, 0, true);
 
     List<LocatedBlock> ls = Arrays.asList(l1, l2);
-    LocatedBlocks lbs = new LocatedBlocks(10, false, ls, l2, true, null, null);
+    LocatedBlocks lbs = new LocatedBlocks(10, false, ls, l2, true, null);
 
     BlockLocation[] bs = DFSUtil.locatedBlocks2Locations(lbs);
 

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java

@@ -339,12 +339,12 @@ public class TestLease {
     Mockito.doReturn(
         new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
             (short) 777), "owner", "group", new byte[0], new byte[0],
-            1010, 0, null, null)).when(mcp).getFileInfo(anyString());
+            1010, 0, null)).when(mcp).getFileInfo(anyString());
     Mockito
         .doReturn(
             new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
                 (short) 777), "owner", "group", new byte[0], new byte[0],
-                1010, 0, null, null))
+                1010, 0, null))
         .when(mcp)
         .create(anyString(), (FsPermission) anyObject(), anyString(),
             (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

@@ -1015,7 +1015,7 @@ public class TestFsck {
 
     HdfsFileStatus file = new HdfsFileStatus(length, isDir, blockReplication,
         blockSize, modTime, accessTime, perms, owner, group, symlink, path,
-        fileId, numChildren, null, null);
+        fileId, numChildren, null);
     Result res = new Result(conf);
 
     try {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtil.java

@@ -64,7 +64,7 @@ public class TestJsonUtil {
     final HdfsFileStatus status = new HdfsFileStatus(1001L, false, 3, 1L << 26,
         now, now + 10, new FsPermission((short) 0644), "user", "group",
         DFSUtil.string2Bytes("bar"), DFSUtil.string2Bytes("foo"),
-        INodeId.GRANDFATHER_INODE_ID, 0, null, null);
+        INodeId.GRANDFATHER_INODE_ID, 0, null);
     final FileStatus fstatus = toFileStatus(status, parent);
     System.out.println("status  = " + status);
     System.out.println("fstatus = " + fstatus);